Whamcloud - gitweb
- Rename the ptlrpc-general reconnection and replay functions, and export them
authorshaver <shaver>
Tue, 15 Oct 2002 02:39:34 +0000 (02:39 +0000)
committershaver <shaver>
Tue, 15 Oct 2002 02:39:34 +0000 (02:39 +0000)
  for use in other places (such as llite).
- Replace many dangerous ptlrpc_free_req calls with its refcount-friendly
  ptlrpc_req_finished counterpart.
- Remove excess and harmful p_req_finished in ll_file_open.
- Move recovd_conn_manage calls from client_obd_connect to ll_read_super,
  since the recovery function is now llite-specific.
- Resurrect llite/recover.c for said function (coming soon).
- Rationalized the portals-owned refcount (added in ptl_send_rpc, removed in
  request_out_callback).
- Free repmsg from free_req, not req_finished, in case one of the remaining
  ref-holders has plans for it.
- Diagnostics for freed reqs with outstanding refcounts, to help track down
  any remaining cases.
- Initialize c_recovd_data.rd_managed_chain at allocation time, for better karma.
- Marginally nicer dump_connection_list output, and LBUG check for ilooping.
- Add diagnostics for double-managing of connections, which turned out not to be
  the bug I was chasing earlier, but might well be in the future.
- Set rd_phase and rd_next_phase before calling the recovery state-machine hooks,
  so that adjustments within those hooks (such as calls to recovd_conn_fixed)
  don't result in an inconsistent state.
- Failure to run the upcall no longer aborts recovery, so that an administrator
  can manually salvage things.
- More complete diagnostics for request replay and sleeping.

20 files changed:
lustre/include/linux/lustre_ha.h
lustre/ldlm/ldlm_request.c
lustre/lib/client.c
lustre/llite/Makefile.am
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/recover.c [new file with mode: 0644]
lustre/llite/super.c
lustre/llite/symlink.c
lustre/lov/lov_obd.c
lustre/mdc/mdc_request.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/rpc.c

index d72a804..09610b2 100644 (file)
@@ -49,6 +49,7 @@ int recovd_cleanup(struct recovd_obd *mgr);
 
 extern struct recovd_obd *ptlrpc_recovd;
 
-int ll_recover(struct recovd_data *rd, int phase);
+int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
+int ptlrpc_reconnect_and_replay(struct ptlrpc_connection *conn);
 
 #endif
index fc1415c..7684162 100644 (file)
@@ -290,7 +290,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         }
 
         if (!req_passed_in)
-                ptlrpc_free_req(req);
+                ptlrpc_req_finished(req);
 
         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
                                blocking);
@@ -410,7 +410,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         EXIT;
  out:
         LDLM_LOCK_PUT(lock);
-        ptlrpc_free_req(req);
+        ptlrpc_req_finished(req);
         return rc;
 }
 
@@ -454,7 +454,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
                 rc = ptlrpc_queue_wait(req);
                 rc = ptlrpc_check_status(req, rc);
-                ptlrpc_free_req(req);
+                ptlrpc_req_finished(req);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
 
index 93c9e03..29f4a63 100644 (file)
@@ -160,11 +160,9 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         cli->cl_import.imp_handle.addr = request->rq_repmsg->addr;
         cli->cl_import.imp_handle.cookie = request->rq_repmsg->cookie;
 
-        recovd_conn_manage(c, ptlrpc_recovd, ll_recover);
-
         EXIT;
 out_req:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         if (rc) {
 out_ldlm:
                 ldlm_namespace_free(obd->obd_namespace);
@@ -221,7 +219,7 @@ int client_obd_disconnect(struct lustre_handle *conn)
         EXIT;
  out_req:
         if (request)
-                ptlrpc_free_req(request);
+                ptlrpc_req_finished(request);
  out_disco:
         err = class_disconnect(conn);
         if (!rc && err)
index e46300d..7078657 100644 (file)
@@ -11,7 +11,7 @@ EXTRA_PROGRAMS = llite
 
 llite_SOURCES = dcache.c commit_callback.c super.c rw.c
 llite_SOURCES += file.c dir.c sysctl.c namei.c symlink.c
-llite_SOURCES += lov_pack.c
+llite_SOURCES += lov_pack.c recover.c
 
 lov_pack.c: 
        test -e lov_pack.c || ln -sf $(top_srcdir)/lib/lov_pack.c .
index 892a45d..b9bcc38 100644 (file)
@@ -77,7 +77,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
 
         rc = ll_lock(inode, NULL, &it, &lockh);
         request = (struct ptlrpc_request *)it.it_data;
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         if (rc != ELDLM_OK) {
                 CERROR("lock enqueue: err: %d\n", rc);
                 UnlockPage(page);
@@ -102,7 +102,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 else
                         inode->i_size = body->size;
         }
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         EXIT;
 
  readpage_out:
index cf1067d..ef95464 100644 (file)
@@ -102,7 +102,11 @@ static int ll_file_open(struct inode *inode, struct file *file)
         rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
                       file->f_flags, lsm, &fd->fd_mdshandle, &req);
         fd->fd_req = req;
-        ptlrpc_req_finished(req);
+
+        /* We don't call ptlrpc_req_finished here, because the request is
+         * preserved until we see a matching close, at which point it is
+         * released (and likely freed).  (See ll_file_release.)
+         */
         if (rc)
                 GOTO(out_req, -abs(rc));
         if (!fd->fd_mdshandle.addr ||
index 86deb38..b8cf40c 100644 (file)
@@ -230,7 +230,7 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                         valid |= OBD_MD_LINKNAME;
                         symlen = lic.lic_body->size;
                 }
-                ptlrpc_free_req(request);
+                ptlrpc_req_finished(request);
                 request = NULL;
                 err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
                                   valid, symlen, &request);
@@ -385,7 +385,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
  out:
         if (lmm)
                 OBD_FREE(lmm, mds_md_size);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return inode;
 }
 
@@ -400,7 +400,7 @@ static int ll_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode,
 
         err = mdc_unlink(&sbi->ll_mdc_conn, dir, child, mode, name, len,
                          &request);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 
         RETURN(err);
 }
@@ -416,7 +416,7 @@ int ll_mdc_link(struct dentry *src, struct inode *dir,
 
         err = mdc_link(&sbi->ll_mdc_conn, src, dir, name,
                        len, &request);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 
         RETURN(err);
 }
@@ -433,7 +433,7 @@ int ll_mdc_rename(struct inode *src, struct inode *tgt,
         err = mdc_rename(&sbi->ll_mdc_conn, src, tgt,
                          old->d_name.name, old->d_name.len,
                          new->d_name.name, new->d_name.len, &request);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 
         RETURN(err);
 }
diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c
new file mode 100644 (file)
index 0000000..1d2f5ad
--- /dev/null
@@ -0,0 +1,38 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite recovery infrastructure.
+ *
+ * Copyright (C) 2002 Cluster File Systems Inc.
+ */
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_lite.h>
+#include <linux/lustre_ha.h>
+
+static int ll_retry_recovery(struct ptlrpc_connection *conn)
+{
+    ENTRY;
+    RETURN(0);
+}
+
+int ll_recover(struct recovd_data *rd, int phase)
+{
+        struct ptlrpc_connection *conn = class_rd2conn(rd);
+
+        LASSERT(conn);
+        ENTRY;
+
+        switch (phase) {
+            case PTLRPC_RECOVD_PHASE_PREPARE:
+                RETURN(ptlrpc_run_recovery_upcall(conn));
+            case PTLRPC_RECOVD_PHASE_RECOVER:
+                RETURN(ptlrpc_reconnect_and_replay(conn));
+            case PTLRPC_RECOVD_PHASE_FAILURE:
+                RETURN(ll_retry_recovery(conn));
+        }
+
+        LBUG();
+        RETURN(-ENOSYS);
+}
index b79facd..08739a0 100644 (file)
@@ -153,6 +153,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
         mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
         mdc_conn->c_level = LUSTRE_CONN_FULL;
         list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
+        spin_lock(&ptlrpc_recovd->recovd_lock);
+        recovd_conn_manage(class_conn2export(&sbi->ll_mdc_conn)->exp_connection,
+                           ptlrpc_recovd, ll_recover);
+        spin_unlock(&ptlrpc_recovd->recovd_lock);
 
         obd = class_uuid2obd(osc);
         if (!obd) {
@@ -165,6 +169,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, sb = NULL);
         }
+        spin_lock(&ptlrpc_recovd->recovd_lock);
+        recovd_conn_manage(class_conn2export(&sbi->ll_osc_conn)->exp_connection,
+                           ptlrpc_recovd, ll_recover);
+        spin_unlock(&ptlrpc_recovd->recovd_lock);
 
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
index d65220b..927d6b0 100644 (file)
@@ -78,7 +78,7 @@ static int ll_readlink(struct dentry *dentry, char *buffer, int buflen)
         rc = vfs_readlink(dentry, buffer, buflen, symname);
  out:
         up(&lli->lli_open_sem);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 
         RETURN(rc);
 }
@@ -102,7 +102,7 @@ static int ll_follow_link(struct dentry *dentry, struct nameidata *nd)
        rc = vfs_follow_link(nd, symname);
  out:
         up(&lli->lli_open_sem);
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 
         RETURN(rc);
 }
index 0496edd..db1a2e2 100644 (file)
@@ -138,7 +138,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         }
 
  out:
-        ptlrpc_free_req(req);
+        ptlrpc_req_finished(req);
         return rc;
 
  out_disc:
index 093640c..59a3b4f 100644 (file)
@@ -72,7 +72,7 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
 
         EXIT;
  out:
-        ptlrpc_free_req(req);
+        ptlrpc_req_finished(req);
         return rc;
 }
 
index 8801d81..a976023 100644 (file)
@@ -65,7 +65,7 @@ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -100,7 +100,7 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -135,7 +135,7 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -160,7 +160,7 @@ static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
 
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -208,7 +208,7 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
         *ea = lsm;
         EXIT;
 out_req:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
 out:
         if (rc && !*ea)
                 OBD_FREE(lsm, oa->o_easize);
@@ -256,7 +256,7 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -293,7 +293,7 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -713,7 +713,7 @@ static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
index 38d1502..a67a50e 100644 (file)
@@ -174,14 +174,18 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         INIT_LIST_HEAD(&request->rq_list);
         /*
          * This will be reduced once when the sender is finished (waiting for
-         * reply, f.e.), once when the request has been committed and is
-         * removed from the to-be-committed list, and once when portals is
-         * finished with it and has called request_out_callback.
+         * reply, f.e.), and once when the request has been committed and is
+         * removed from the to-be-committed list.
+         *
+         * Also, the refcount will be increased in ptl_send_rpc immediately
+         * before we hand it off to portals, and there will be a corresponding
+         * decrease in request_out_cb (which is called to indicate that portals
+         * is finished with the request, and it can be safely freed).
          *
          * (Except in the DLM server case, where it will be dropped twice
          * by the sender, and then the last time by request_out_callback.)
          */
-        atomic_set(&request->rq_refcount, 3);
+        atomic_set(&request->rq_refcount, 2);
 
         spin_lock(&conn->c_lock);
         request->rq_xid = HTON__u32(++conn->c_xid_out);
@@ -200,12 +204,6 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
         if (request == NULL)
                 return;
 
-        if (request->rq_repmsg != NULL) { 
-                OBD_FREE(request->rq_repmsg, request->rq_replen);
-                request->rq_repmsg = NULL;
-                request->rq_reply_md.start = NULL; 
-        }
-
         if (atomic_dec_and_test(&request->rq_refcount))
                 ptlrpc_free_req(request);
 }
@@ -218,10 +216,24 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
                 return;
         }
 
-        if (request->rq_repmsg != NULL)
+        if (atomic_read(&request->rq_refcount) != 0) {
+                CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
+                       request, request->rq_reqmsg->opc,
+                       request->rq_connection->c_remote_uuid,
+                       request->rq_import->imp_client->cli_request_portal,
+                       request->rq_refcount);
+                /* LBUG(); */
+        }
+
+        if (request->rq_repmsg != NULL) { 
                 OBD_FREE(request->rq_repmsg, request->rq_replen);
-        if (request->rq_reqmsg != NULL)
+                request->rq_repmsg = NULL;
+                request->rq_reply_md.start = NULL; 
+        }
+        if (request->rq_reqmsg != NULL) {
                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+                request->rq_reqmsg = NULL;
+        }
 
         if (request->rq_connection) {
                 spin_lock(&request->rq_connection->c_lock);
@@ -341,11 +353,12 @@ restart:
                        (long long)req->rq_xid, (long long)req->rq_transno,
                        (long long)conn->c_last_committed);
                 if (atomic_dec_and_test(&req->rq_refcount)) {
-                        req->rq_import = NULL;
-
-                        /* We do this to prevent free_req deadlock.  Restarting
-                         * after each removal is not so bad, as we are almost
-                         * always deleting the first item in the list.
+                        /* We do this to prevent free_req deadlock.
+                         * Restarting after each removal is not so bad, as we are
+                         * almost always deleting the first item in the list.
+                         *
+                         * If we use a recursive lock here, we can skip the
+                         * unlock/lock/restart sequence.
                          */
                         spin_unlock(&conn->c_lock);
                         ptlrpc_free_req(req);
@@ -381,7 +394,7 @@ restart1:
                 list_del_init(&req->rq_list);
                 req->rq_import = NULL;
                 spin_unlock(&conn->c_lock);
-                ptlrpc_free_req(req);
+                ptlrpc_req_finished(req);
                 goto restart1;
         }
 restart2:
@@ -393,7 +406,7 @@ restart2:
                 list_del_init(&req->rq_list);
                 req->rq_import = NULL;
                 spin_unlock(&conn->c_lock);
-                ptlrpc_free_req(req); 
+                ptlrpc_req_finished(req); 
                 spin_lock(&conn->c_lock);
                 goto restart2;
         }
@@ -571,7 +584,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                        req->rq_connection->c_remote_uuid,
                        req->rq_import->imp_client->cli_request_portal);
                 /* we'll get sent again, so balance 2nd request_out_callback */
-                atomic_inc(&req->rq_refcount);
                 goto resend;
         }
 
@@ -637,8 +649,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
 
-        /* add a ref, which will again be balanced in request_out_callback */
-        atomic_inc(&req->rq_refcount);
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
index e643013..124fd23 100644 (file)
@@ -93,6 +93,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer,
         INIT_LIST_HEAD(&c->c_imports);
         INIT_LIST_HEAD(&c->c_exports);
         INIT_LIST_HEAD(&c->c_sb_chain);
+        INIT_LIST_HEAD(&c->c_recovd_data.rd_managed_chain);
         atomic_set(&c->c_refcount, 0);
         ptlrpc_connection_addref(c);
         spin_lock_init(&c->c_lock);
index 99b8b3a..55b35ea 100644 (file)
@@ -46,6 +46,7 @@ static int request_out_callback(ptl_event_t *ev)
                 LBUG();
         }
 
+        /* this balances the atomic_inc in ptl_send_rpc */
         ptlrpc_req_finished(req);
         RETURN(1);
 }
index 46aa30f..ddd674c 100644 (file)
@@ -333,6 +333,8 @@ int ptl_send_rpc(struct ptlrpc_request *request)
         source_id.nid = request->rq_connection->c_peer.peer_nid;
         source_id.pid = PTL_PID_ANY;
 
+        /* add a ref, which will be balanced in request_out_callback */
+        atomic_inc(&request->rq_refcount);
         if (request->rq_replen != 0) {
 
                 /* request->rq_repmsg is set only when the reply comes in, in
index 7afd983..f0fe5ca 100644 (file)
 #include <linux/lustre_ha.h>
 #include <linux/obd_support.h>
 
+/* dump_connection_list, but shorter for nicer debugging logs */
+static void d_c_l(struct list_head *head)
+{
+        int sanity = 0;
+        struct list_head *tmp;
+
+        list_for_each(tmp, head) {
+                struct ptlrpc_connection *conn =
+                        list_entry(tmp, struct ptlrpc_connection,
+                                   c_recovd_data.rd_managed_chain);
+                CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
+                       conn->c_recovd_data.rd_phase,
+                       conn->c_recovd_data.rd_next_phase);
+                if (sanity++ > 50)
+                        LBUG();
+        }
+}
+
+static void dump_lists(struct recovd_obd *recovd)
+{
+        CDEBUG(D_HA, "managed: \n");
+        d_c_l(&recovd->recovd_managed_items);
+        CDEBUG(D_HA, "troubled: \n");
+        d_c_l(&recovd->recovd_troubled_items);
+}
+
 void recovd_conn_manage(struct ptlrpc_connection *conn,
                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
 {
         struct recovd_data *rd = &conn->c_recovd_data;
         ENTRY;
 
+        if (!list_empty(&rd->rd_managed_chain)) {
+                if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
+                        CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
+                               conn, conn->c_remote_uuid);
+                        EXIT;
+                        return;
+                }
+                CDEBUG(D_HA,
+                       "conn %p/%s has recovery items %p/%p, making %p/%p\n",
+                       conn, conn->c_remote_uuid, rd->rd_recovd, rd->rd_recover,
+                       recovd, recover);
+                spin_lock(&rd->rd_recovd->recovd_lock);
+                list_del(&rd->rd_managed_chain);
+                spin_unlock(&rd->rd_recovd->recovd_lock);
+        }
+
         rd->rd_recovd = recovd;
         rd->rd_recover = recover;
         rd->rd_phase = RD_IDLE;
         rd->rd_next_phase = RD_TROUBLED;
 
         spin_lock(&recovd->recovd_lock);
-        INIT_LIST_HEAD(&rd->rd_managed_chain);
-        list_add(&recovd->recovd_managed_items, &rd->rd_managed_chain);
+        list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
+        dump_lists(recovd);
         spin_unlock(&recovd->recovd_lock);
 
         EXIT;
@@ -51,7 +93,6 @@ void recovd_conn_fail(struct ptlrpc_connection *conn)
                 return;
         }
 
-
         spin_lock(&recovd->recovd_lock);
         if (rd->rd_phase != RD_IDLE) {
                 CERROR("connection %p to %s already in recovery\n",
@@ -66,6 +107,7 @@ void recovd_conn_fail(struct ptlrpc_connection *conn)
         list_del(&rd->rd_managed_chain);
         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
         rd->rd_phase = RD_TROUBLED;
+        dump_lists(recovd);
         spin_unlock(&recovd->recovd_lock);
 
         wake_up(&recovd->recovd_waitq);
@@ -85,6 +127,7 @@ void recovd_conn_fixed(struct ptlrpc_connection *conn)
         rd->rd_phase = RD_IDLE;
         rd->rd_next_phase = RD_TROUBLED;
         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
+        dump_lists(rd->rd_recovd);
         spin_unlock(&rd->rd_recovd->recovd_lock);
 
         EXIT;
@@ -118,20 +161,6 @@ static int recovd_check_event(struct recovd_obd *recovd)
         RETURN(rc);
 }
 
-static void dump_connection_list(struct list_head *head)
-{
-        struct list_head *tmp;
-
-        list_for_each(tmp, head) {
-                struct ptlrpc_connection *conn =
-                        list_entry(tmp, struct ptlrpc_connection,
-                                   c_recovd_data.rd_managed_chain);
-                CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
-                       conn->c_recovd_data.rd_phase,
-                       conn->c_recovd_data.rd_next_phase);
-        }
-}
-
 static int recovd_handle_event(struct recovd_obd *recovd)
 {
         struct list_head *tmp, *n;
@@ -140,10 +169,7 @@ static int recovd_handle_event(struct recovd_obd *recovd)
 
         spin_lock(&recovd->recovd_lock);
 
-        CERROR("managed: \n");
-        dump_connection_list(&recovd->recovd_managed_items);
-        CERROR("troubled: \n");
-        dump_connection_list(&recovd->recovd_troubled_items);
+        dump_lists(recovd);
 
         /*
          * We use _safe here because one of the callbacks, expecially
@@ -178,6 +204,7 @@ static int recovd_handle_event(struct recovd_obd *recovd)
                         CERROR("starting recovery for rd %p (conn %p)\n",
                                rd, class_rd2conn(rd));
                         rd->rd_phase = RD_PREPARING;
+                        rd->rd_next_phase = RD_PREPARED;
                         
                         spin_unlock(&recovd->recovd_lock);
                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
@@ -185,14 +212,14 @@ static int recovd_handle_event(struct recovd_obd *recovd)
                         if (rc)
                                 goto cb_failed;
                         
-                        rd->rd_next_phase = RD_PREPARED;
                         break;
                         
                     case RD_PREPARED:
-                        rd->rd_phase = RD_RECOVERING;
                         
                         CERROR("recovery prepared for rd %p (conn %p)\n",
                                rd, class_rd2conn(rd));
+                        rd->rd_phase = RD_RECOVERING;
+                        rd->rd_next_phase = RD_RECOVERED;
                         
                         spin_unlock(&recovd->recovd_lock);
                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
@@ -200,7 +227,6 @@ static int recovd_handle_event(struct recovd_obd *recovd)
                         if (rc)
                                 goto cb_failed;
                         
-                        rd->rd_next_phase = RD_RECOVERED;
                         break;
                         
                     case RD_RECOVERED:
index d2e9f47..70d549b 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Lustre Light Super operations
+ * Portal-RPC reconnection and replay operations, for use in recovery.
  *
  * This code is issued under the GNU General Public License.
  * See the file COPYING in this distribution
 #include <linux/module.h>
 #include <linux/kmod.h>
 
-#define DEBUG_SUBSYSTEM S_LLITE
+#define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/lustre_lite.h>
 #include <linux/lustre_ha.h>
+#include <linux/lustre_net.h>
+#include <linux/obd.h>
 
-int ll_reconnect(struct ptlrpc_connection *conn) 
+static int ptlrpc_reconnect(struct ptlrpc_connection *conn) 
 {
         struct list_head *tmp;
         int rc = -EINVAL;
@@ -85,7 +86,7 @@ int ll_reconnect(struct ptlrpc_connection *conn)
         return rc;
 }
 
-static int ll_recover_upcall(struct ptlrpc_connection *conn)
+int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
 {
         char *argv[3];
         char *envp[3];
@@ -104,21 +105,23 @@ static int ll_recover_upcall(struct ptlrpc_connection *conn)
 
         rc = call_usermodehelper(argv[0], argv, envp);
         if (rc < 0) {
-                /*
-                 * Tragically, this will never be run, because call_umh doesn't
-                 * report errors like -ENOENT to its caller.
-                 */
-                CERROR("Error invoking recovery upcall (%s): %d\n",
-                       obd_recovery_upcall, rc);
+                CERROR("Error invoking recovery upcall %s for %s: %d\n",
+                       argv[0], argv[1], rc);
                 CERROR("Check /proc/sys/lustre/recovery_upcall?\n");
         } else {
                 CERROR("Invoked upcall %s for connection %s\n",
                        argv[0], argv[1]);
         }
-        RETURN(rc);
+
+        /*
+         * We don't want to make this a "failed" recovery, because the system
+         * administrator -- or, perhaps, tester -- may well be able to rescue
+         * things by running the correct upcall.
+         */
+        RETURN(0);
 }
 
-static int ll_recover_reconnect(struct ptlrpc_connection *conn)
+int ptlrpc_reconnect_and_replay(struct ptlrpc_connection *conn)
 {
         int rc = 0;
         struct list_head *tmp, *pos;
@@ -126,7 +129,7 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
         ENTRY;
 
         /* 1. reconnect */
-        rc = ll_reconnect(conn);
+        rc = ptlrpc_reconnect(conn);
         if (rc)
                 RETURN(rc);
         
@@ -141,8 +144,8 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
                 
                 /* replay what needs to be replayed */
                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
-                        CDEBUG(D_HA, "FL_REPLAY: xid "LPD64" op %d @ %d\n",
-                               req->rq_xid, req->rq_reqmsg->opc,
+                        CDEBUG(D_HA, "FL_REPLAY: xid "LPD64" transno "LPD64" op %d @ %d\n",
+                               req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
                                req->rq_import->imp_client->cli_request_portal);
                         rc = ptlrpc_replay_req(req);
 #if 0
@@ -162,8 +165,8 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
                 /* server has seen req, we have reply: skip */
                 if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
                     req->rq_xid <= conn->c_last_xid) { 
-                        CDEBUG(D_HA, "REPLIED SKIP: xid "LPD64" op %d @ %d\n",
-                               req->rq_xid, req->rq_reqmsg->opc,
+                        CDEBUG(D_HA, "REPLIED SKIP: xid "LPD64" transno "LPD64" op %d @ %d\n",
+                               req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
                                req->rq_import->imp_client->cli_request_portal);
                         continue;
                 }
@@ -171,8 +174,8 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
                 /* server has lost req, we have reply: resend, ign reply */
                 if ((req->rq_flags & PTL_RPC_FL_REPLIED)  &&
                     req->rq_xid > conn->c_last_xid) { 
-                        CDEBUG(D_HA, "REPLIED RESEND: xid "LPD64" op %d @ %d\n",
-                               req->rq_xid, req->rq_reqmsg->opc,
+                        CDEBUG(D_HA, "REPLIED RESEND: xid "LPD64" transno "LPD64" op %d @ %d\n",
+                               req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
                                req->rq_import->imp_client->cli_request_portal);
                         rc = ptlrpc_replay_req(req); 
                         if (rc) {
@@ -194,8 +197,8 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
                 /* service has not seen req, no reply: resend */
                 if ( !(req->rq_flags & PTL_RPC_FL_REPLIED)  &&
                      req->rq_xid > conn->c_last_xid) {
-                        CDEBUG(D_HA, "RESEND: xid "LPD64" op %d @ %d\n",
-                               req->rq_xid, req->rq_reqmsg->opc,
+                        CDEBUG(D_HA, "RESEND: xid "LPD64" transno "LPD64" op %d @ %d\n",
+                               req->rq_xid, req->rq_repmsg->transno, req->rq_reqmsg->opc,
                                req->rq_import->imp_client->cli_request_portal);
                         ptlrpc_resend_req(req);
                 }
@@ -218,36 +221,3 @@ static int ll_recover_reconnect(struct ptlrpc_connection *conn)
         spin_unlock(&conn->c_lock);
         return rc;
 }
-
-static int ll_retry_recovery(struct ptlrpc_connection *conn)
-{
-        CERROR("Recovery has failed on conn %p\n", conn);
-#if 0
-        /* XXX use a timer, sideshow bob */
-        recovd_conn_fail(conn);
-        /* XXX this is disabled until I fix it so that we don't just keep
-         * XXX retrying in the case of a missing upcall.
-         */
-#endif
-        return 0;
-}
-
-int ll_recover(struct recovd_data *rd, int phase)
-{
-        struct ptlrpc_connection *conn = class_rd2conn(rd);
-
-        LASSERT(conn);
-        ENTRY;
-
-        switch (phase) {
-            case PTLRPC_RECOVD_PHASE_PREPARE:
-                RETURN(ll_recover_upcall(conn));
-            case PTLRPC_RECOVD_PHASE_RECOVER:
-                RETURN(ll_recover_reconnect(conn));
-            case PTLRPC_RECOVD_PHASE_FAILURE:
-                RETURN(ll_retry_recovery(conn));
-        }
-
-        LBUG();
-        RETURN(-ENOSYS);
-}
index 7b28ffa..fbece03 100644 (file)
@@ -237,8 +237,9 @@ EXPORT_SYMBOL(lustre_msg_size);
 EXPORT_SYMBOL(lustre_unpack_msg);
 EXPORT_SYMBOL(lustre_msg_buf);
 
-EXPORT_SYMBOL(ll_recover);
-
+/* recover.c */
+EXPORT_SYMBOL(ptlrpc_run_recovery_upcall);
+EXPORT_SYMBOL(ptlrpc_reconnect_and_replay);
 
 MODULE_AUTHOR("Cluster File Systems, Inc <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Request Processor v1.0");