1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Portal-RPC reconnection and replay operations, for use in recovery.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Copyright (C) 1996 Peter J. Braam <braam@stelias.com>
10 * Copyright (C) 1999 Stelias Computing Inc. <braam@stelias.com>
11 * Copyright (C) 1999 Seagate Technology Inc.
12 * Copyright (C) 2001 Mountain View Data, Inc.
13 * Copyright (C) 2002 Cluster File Systems, Inc.
17 #include <linux/config.h>
18 #include <linux/module.h>
19 #include <linux/kmod.h>
21 #define DEBUG_SUBSYSTEM S_RPC
23 #include <linux/lustre_ha.h>
24 #include <linux/lustre_net.h>
25 #include <linux/obd.h>
27 int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc)
29 struct obd_device *obd = imp->imp_obd;
30 struct client_obd *cli = &obd->u.cli;
31 int size[] = { sizeof(cli->cl_target_uuid), sizeof(obd->obd_uuid) };
32 char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid };
33 struct ptlrpc_connection *conn = imp->imp_connection;
34 struct lustre_handle old_hdl;
35 struct ptlrpc_request *request;
36 struct obd_export *ldlmexp;
39 request = ptlrpc_prep_req(imp, rq_opc, 2, size, tmp);
40 request->rq_level = LUSTRE_CONN_NEW;
41 request->rq_replen = lustre_msg_size(0, NULL);
43 * This address is the export that represents our client-side LDLM
44 * service (for ASTs). We should only have one on this list, so we
45 * just grab the first one.
47 * XXX tear down export, call class_obd_connect?
49 ldlmexp = list_entry(obd->obd_exports.next, struct obd_export,
51 request->rq_reqmsg->addr = (__u64)(unsigned long)ldlmexp;
52 request->rq_reqmsg->cookie = ldlmexp->exp_cookie;
53 rc = ptlrpc_queue_wait(request);
57 /* already connected! */
58 memset(&old_hdl, 0, sizeof(old_hdl));
59 if (!memcmp(&old_hdl.addr, &request->rq_repmsg->addr,
60 sizeof (old_hdl.addr)) &&
61 !memcmp(&old_hdl.cookie, &request->rq_repmsg->cookie,
62 sizeof (old_hdl.cookie))) {
63 CERROR("%s@%s didn't like our handle %Lx/%Lx, failed\n",
64 cli->cl_target_uuid, conn->c_remote_uuid,
65 (__u64)(unsigned long)ldlmexp,
67 GOTO(out_disc, rc = -ENOTCONN);
70 old_hdl.addr = request->rq_repmsg->addr;
71 old_hdl.cookie = request->rq_repmsg->cookie;
72 if (memcmp(&imp->imp_handle, &old_hdl, sizeof(old_hdl))) {
73 CERROR("%s@%s changed handle from %Lx/%Lx to %Lx/%Lx; "
74 "copying, but this may foreshadow disaster\n",
75 cli->cl_target_uuid, conn->c_remote_uuid,
76 old_hdl.addr, old_hdl.cookie,
77 imp->imp_handle.addr, imp->imp_handle.cookie);
78 imp->imp_handle.addr = request->rq_repmsg->addr;
79 imp->imp_handle.cookie = request->rq_repmsg->cookie;
80 GOTO(out_disc, rc = EALREADY);
83 CERROR("reconnected to %s@%s after partition\n",
84 cli->cl_target_uuid, conn->c_remote_uuid);
85 GOTO(out_disc, rc = EALREADY);
87 old_hdl = imp->imp_handle;
88 imp->imp_handle.addr = request->rq_repmsg->addr;
89 imp->imp_handle.cookie = request->rq_repmsg->cookie;
90 CERROR("now connected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n",
91 cli->cl_target_uuid, conn->c_remote_uuid,
92 imp->imp_handle.addr, imp->imp_handle.cookie,
93 old_hdl.addr, old_hdl.cookie);
94 GOTO(out_disc, rc = 0);
96 CERROR("cannot connect to %s@%s: rc = %d\n",
97 cli->cl_target_uuid, conn->c_remote_uuid, rc);
98 GOTO(out_disc, rc = -ENOTCONN); /* XXX preserve rc? */
102 ptlrpc_req_finished(request);
106 int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
113 argv[0] = obd_recovery_upcall;
114 argv[1] = conn->c_remote_uuid;
118 envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
121 rc = call_usermodehelper(argv[0], argv, envp);
123 CERROR("Error invoking recovery upcall %s for %s: %d\n",
124 argv[0], argv[1], rc);
125 CERROR("Check /proc/sys/lustre/recovery_upcall?\n");
127 CERROR("Invoked upcall %s for connection %s\n",
132 * We don't want to make this a "failed" recovery, because the system
133 * administrator -- or, perhaps, tester -- may well be able to rescue
134 * things by running the correct upcall.
139 int ptlrpc_replay(struct obd_import *imp, int send_last_flag)
142 struct list_head *tmp, *pos;
143 struct ptlrpc_request *req;
144 __u64 committed = imp->imp_peer_committed_transno;
147 /* It might have committed some after we last spoke, so make sure we
148 * get rid of them now.
150 ptlrpc_free_committed(imp);
152 spin_lock(&imp->imp_lock);
154 CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
155 imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
157 list_for_each(tmp, &imp->imp_replay_list) {
158 req = list_entry(tmp, struct ptlrpc_request, rq_list);
159 DEBUG_REQ(D_HA, req, "RETAINED: ");
162 list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
163 req = list_entry(tmp, struct ptlrpc_request, rq_list);
165 if (req->rq_transno == imp->imp_max_transno &&
167 req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
168 DEBUG_REQ(D_HA, req, "LAST_REPLAY:");
170 DEBUG_REQ(D_HA, req, "REPLAY:");
173 rc = ptlrpc_replay_req(req);
174 req->rq_reqmsg->flags &= ~MSG_LAST_REPLAY;
177 CERROR("recovery replay error %d for req %Ld\n",
184 spin_unlock(&imp->imp_lock);
188 #define NO_RESEND 0 /* No action required. */
189 #define RESEND 1 /* Resend required. */
190 #define RESEND_IGNORE 2 /* Resend, ignore the reply (already saw it). */
191 #define RESTART 3 /* Have to restart the call, sorry! */
193 static int resend_type(struct ptlrpc_request *req, __u64 committed)
195 if (req->rq_transno < committed) {
196 if (req->rq_flags & PTL_RPC_FL_REPLIED) {
197 /* Saw the reply and it was committed, no biggie. */
198 DEBUG_REQ(D_HA, req, "NO_RESEND");
201 /* Request committed, but no reply: have to restart. */
205 if (req->rq_flags & PTL_RPC_FL_REPLIED) {
206 /* Saw reply, so resend and ignore new reply. */
207 return RESEND_IGNORE;
210 /* Didn't see reply either, so resend. */
215 int ptlrpc_resend(struct obd_import *imp)
218 struct list_head *tmp, *pos;
219 struct ptlrpc_request *req;
220 __u64 committed = imp->imp_peer_committed_transno;
224 spin_lock(&imp->imp_lock);
225 list_for_each(tmp, &imp->imp_sending_list) {
226 req = list_entry(tmp, struct ptlrpc_request, rq_list);
227 DEBUG_REQ(D_HA, req, "SENDING: ");
230 list_for_each_safe(tmp, pos, &imp->imp_sending_list) {
231 req = list_entry(tmp, struct ptlrpc_request, rq_list);
233 switch(resend_type(req, committed)) {
238 DEBUG_REQ(D_HA, req, "RESTART:");
239 ptlrpc_restart_req(req);
243 DEBUG_REQ(D_HA, req, "RESEND_IGNORE:");
244 rc = ptlrpc_replay_req(req);
246 DEBUG_REQ(D_ERROR, req, "error %d resending:",
248 ptlrpc_restart_req(req); /* might as well */
253 DEBUG_REQ(D_HA, req, "RESEND:");
254 ptlrpc_resend_req(req);
264 void ptlrpc_wake_delayed(struct obd_import *imp)
266 struct list_head *tmp, *pos;
267 struct ptlrpc_request *req;
269 spin_lock(&imp->imp_lock);
270 list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
271 req = list_entry(tmp, struct ptlrpc_request, rq_list);
272 DEBUG_REQ(D_HA, req, "waking:");
273 wake_up(&req->rq_wait_for_rep);
275 spin_unlock(&imp->imp_lock);