Whamcloud - gitweb
Landing b_lock_replay so that Phil can use my ldlm iterators and whatnot for his
authorshaver <shaver>
Sun, 1 Dec 2002 03:47:10 +0000 (03:47 +0000)
committershaver <shaver>
Sun, 1 Dec 2002 03:47:10 +0000 (03:47 +0000)
upcoming locking work.

Highlights:
 - lock replay infrastructure (needs much more testing, but didn't regress
   anything outside recovery)
 - b=421: ldlm iterators
 - b=348: imports now have service levels, replacing connections' c_level
 - replace c_delayed_head with imp_delayed_list
 - split imp_request_list into imp_replay_list for retained requests and
   imp_sending_list for inflight reqs
 - as a side-effect, clean up rq_refcount story materially
 - client-side recovery is now dispatched via a per-import handler function,
   for better layering and modularity
 - wire imports up to recovery before attempting mounts, for better handling of
   mount-time failures

17 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/lib/client.c
lustre/lib/target.c
lustre/llite/recover.c
lustre/llite/super.c
lustre/llite/super25.c
lustre/mdc/mdc_request.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/rpc.c

index 3c7367f..b52dfca 100644 (file)
@@ -150,13 +150,13 @@ struct ldlm_lock {
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
 
-        ldlm_completion_callback    l_completion_ast;
-        ldlm_blocking_callback    l_blocking_ast;
+        ldlm_completion_callback l_completion_ast;
+        ldlm_blocking_callback   l_blocking_ast;
 
         struct obd_export    *l_export;
         struct lustre_handle *l_connh;
         __u32                 l_flags;
-        struct lustre_handle   l_remote_handle;
+        struct lustre_handle  l_remote_handle;
         void                 *l_data;
         __u32                 l_data_len;
         struct ldlm_extent    l_extent;
@@ -280,6 +280,22 @@ do {                                                                          \
 #define LDLM_DEBUG_NOLOCK(format, a...)                 \
         CDEBUG(D_DLMTRACE, "### " format "\n" , ## a)
 
+/*
+ * Iterators.
+ */
+
+#define LDLM_ITER_CONTINUE 0 /* keep iterating */
+#define LDLM_ITER_STOP     1 /* stop iterating */
+
+typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *);
+
+int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
+                          void *closure);
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+                           void *closure);
+
+int ldlm_replay_locks(struct obd_import *imp);
+
 /* ldlm_extent.c */
 int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
 int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, int flags,
index 8611e88..1e6596b 100644 (file)
@@ -54,6 +54,8 @@ extern struct recovd_obd *ptlrpc_recovd;
 
 int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
 int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc);
-int ptlrpc_replay(struct obd_import *imp, int unreplied_only);
-
+int ptlrpc_replay(struct obd_import *imp, int send_last_flag);
+int ptlrpc_resend(struct obd_import *imp);
+void ptlrpc_free_committed(struct obd_import *imp);
+void ptlrpc_wake_delayed(struct obd_import *imp);
 #endif
index 893fd0a..0f0d67d 100644 (file)
 #define IMP_INVALID       1
 #define IMP_REPLAYABLE    2
 
+typedef int (*import_recover_t)(struct obd_import *imp, int phase);
+
 #include <linux/lustre_idl.h>
 struct obd_import {
+        import_recover_t          imp_recover;
         struct ptlrpc_connection *imp_connection;
         struct ptlrpc_client     *imp_client;
         struct lustre_handle      imp_handle;
         struct list_head          imp_chain;
-        struct list_head          imp_request_list;
+
+        /* Lists of requests that are retained for replay, waiting for a reply,
+         * or waiting for recovery to complete, respectively.
+         */
+        struct list_head          imp_replay_list;
+        struct list_head          imp_sending_list;
+        struct list_head          imp_delayed_list;
+
         struct obd_device        *imp_obd;
         int                       imp_flags;
         int                       imp_level;
@@ -30,7 +40,7 @@ struct obd_import {
         __u64                     imp_peer_last_xid;
         __u64                     imp_peer_committed_transno;
 
-        /* Protects flags, level, *_xid, request_list */
+        /* Protects flags, level, *_xid, *_list */
         spinlock_t                imp_lock;
 };
 
index d3aa4fd..ab8ee9a 100644 (file)
@@ -72,7 +72,6 @@ struct ptlrpc_connection {
         __u8                    c_local_uuid[37];  /* XXX do we need this? */
         __u8                    c_remote_uuid[37];
 
-        int                     c_level;
         __u32                   c_generation;  /* changes upon new connection */
         __u32                   c_epoch;       /* changes when peer changes */
         __u32                   c_bootcount;   /* peer's boot count */
index 12af650..da675c9 100644 (file)
@@ -157,8 +157,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         ldlm_add_waiting_lock(lock);
         (void)ptl_send_rpc(req);
 
-        /* no commit, and no waiting for reply, so 2x decref now */
-        ptlrpc_req_finished(req);
+        /* not waiting for reply */
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -191,8 +190,8 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         req->rq_replen = 0; /* no reply needed */
 
         (void)ptl_send_rpc(req);
-        /* no commit, and no waiting for reply, so 2x decref now */
-        ptlrpc_req_finished(req);
+
+        /* not waiting for reply */
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -778,6 +777,9 @@ EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
 EXPORT_SYMBOL(ldlm_namespace_dump);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
+EXPORT_SYMBOL(ldlm_replay_locks);
+EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach);
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
index 6672c3e..130f092 100644 (file)
@@ -646,3 +646,157 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
 
         return ELDLM_OK;
 }
+
+/* Lock iterators. */
+
+int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
+                          void *closure)
+{
+        struct list_head *tmp, *next;
+        struct ldlm_lock *lock;
+        int rc = LDLM_ITER_CONTINUE;
+        struct ldlm_namespace *ns = res->lr_namespace;
+
+        ENTRY;
+
+        if (!res)
+                RETURN(LDLM_ITER_CONTINUE);
+
+        l_lock(&ns->ns_lock);
+        list_for_each_safe(tmp, next, &res->lr_granted) {
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (iter(lock, closure) == LDLM_ITER_STOP)
+                        GOTO(out, rc = LDLM_ITER_STOP);
+        }
+
+        list_for_each_safe(tmp, next, &res->lr_converting) {
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (iter(lock, closure) == LDLM_ITER_STOP)
+                        GOTO(out, rc = LDLM_ITER_STOP);
+        }
+
+        list_for_each_safe(tmp, next, &res->lr_waiting) {
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (iter(lock, closure) == LDLM_ITER_STOP)
+                        GOTO(out, rc = LDLM_ITER_STOP);
+        }
+ out:
+        l_unlock(&ns->ns_lock);
+        RETURN(rc);
+}
+
+struct iter_helper_data {
+        ldlm_iterator_t iter;
+        void *closure;
+};
+
+static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
+{
+        struct iter_helper_data *helper = closure;
+        return helper->iter(lock, helper->closure);
+}
+
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+                           void *closure)
+{
+        int i, rc = LDLM_ITER_CONTINUE;
+        struct iter_helper_data helper = { iter: iter, closure: closure };
+        
+        l_lock(&ns->ns_lock);
+        for (i = 0; i < RES_HASH_SIZE; i++) {
+                struct list_head *tmp, *next;
+                list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+                        struct ldlm_resource *res = 
+                                list_entry(tmp, struct ldlm_resource, lr_hash);
+
+                        ldlm_resource_getref(res);
+                        rc = ldlm_resource_foreach(res, ldlm_iter_helper,
+                                                   &helper);
+                        ldlm_resource_put(res);
+                        if (rc == LDLM_ITER_STOP)
+                                GOTO(out, rc);
+                }
+        }
+ out:
+        l_unlock(&ns->ns_lock);
+        RETURN(rc);
+}
+
+/* Lock replay */
+
+static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
+{
+        struct list_head *list = closure;
+
+        /* we use l_pending_chain here, because it's unused on clients. */
+        list_add(&lock->l_pending_chain, list);
+        return LDLM_ITER_CONTINUE;
+}
+
+static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock,
+                           int last)
+{
+        struct ptlrpc_request *req;
+        struct ldlm_request *body;
+        struct ldlm_reply *reply;
+        int rc, size;
+        int flags = LDLM_FL_REPLAY;
+
+        flags |= lock->l_flags & 
+                (LDLM_FL_BLOCK_GRANTED|LDLM_FL_BLOCK_CONV|LDLM_FL_BLOCK_WAIT);
+
+        size = sizeof(*body);
+        req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+        
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        ldlm_lock2desc(lock, &body->lock_desc);
+        body->lock_flags = flags;
+
+        ldlm_lock2handle(lock, &body->lock_handle1);
+        size = sizeof(*reply);
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        if (last)
+                req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+
+        LDLM_DEBUG(lock, "replaying lock:");
+        rc = ptlrpc_queue_wait(req);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        reply = lustre_msg_buf(req->rq_repmsg, 0);
+        memcpy(&lock->l_remote_handle, &reply->lock_handle,
+               sizeof(lock->l_remote_handle));
+        LDLM_DEBUG(lock, "replayed lock:");
+ out:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+        struct list_head list, *pos, *next;
+        struct ldlm_lock *lock;
+        int rc = 0;
+        
+        ENTRY;
+        INIT_LIST_HEAD(&list);
+
+        l_lock(&ns->ns_lock);
+        (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
+
+        list_for_each_safe(pos, next, &list) {
+                lock = list_entry(pos, struct ldlm_lock, l_pending_chain);
+                rc = replay_one_lock(imp, lock, (next == &list));
+                if (rc)
+                        break; /* or try to do the rest? */
+        }
+        l_unlock(&ns->ns_lock);
+        RETURN(rc);
+}
index bf5fac3..00b0d48 100644 (file)
@@ -90,7 +90,9 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!imp->imp_connection)
                 RETURN(-ENOENT);
         
-        INIT_LIST_HEAD(&imp->imp_request_list);
+        INIT_LIST_HEAD(&imp->imp_replay_list);
+        INIT_LIST_HEAD(&imp->imp_sending_list);
+        INIT_LIST_HEAD(&imp->imp_delayed_list);
         spin_lock_init(&imp->imp_lock);
 
         ptlrpc_init_client(rq_portal, rp_portal, name,
@@ -161,16 +163,17 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         request->rq_reqmsg->cookie = conn->cookie;
         c = class_conn2export(conn)->exp_connection =
                 ptlrpc_connection_addref(request->rq_connection);
+        list_add(&imp->imp_chain, &c->c_imports);
         recovd_conn_manage(c, recovd, recover);
 
+        imp->imp_level = LUSTRE_CONN_CON;
         rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out_req, rc);
 
         if (rq_opc == MDS_CONNECT)
                 imp->imp_flags |= IMP_REPLAYABLE;
-        list_add(&imp->imp_chain, &c->c_imports);
-        c->c_level = LUSTRE_CONN_FULL;
+        imp->imp_level = LUSTRE_CONN_FULL;
         imp->imp_handle.addr = request->rq_repmsg->addr;
         imp->imp_handle.cookie = request->rq_repmsg->cookie;
 
index 7666663..141e155 100644 (file)
@@ -101,8 +101,7 @@ int target_handle_connect(struct ptlrpc_request *req)
         dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
         dlmimp->imp_obd = /* LDLM! */ NULL;
         spin_lock_init(&dlmimp->imp_lock);
-        
-        req->rq_connection->c_level = LUSTRE_CONN_FULL;
+        dlmimp->imp_level = LUSTRE_CONN_FULL;
 out:
         req->rq_status = rc;
         RETURN(rc);
@@ -149,7 +148,6 @@ static int target_fence_failed_connection(struct ptlrpc_connection *conn)
 {
         ENTRY;
 
-        conn->c_level = LUSTRE_CONN_RECOVD;
         conn->c_recovd_data.rd_phase = RD_PREPARED;
 
         RETURN(0);
index 8acd1bb..3310c34 100644 (file)
@@ -12,7 +12,6 @@
 #include <linux/lustre_ha.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_idl.h>
-#include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
 
 static int ll_retry_recovery(struct ptlrpc_connection *conn)
 {
@@ -20,167 +19,33 @@ static int ll_retry_recovery(struct ptlrpc_connection *conn)
         RETURN(0);
 }
 
-/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
-static void abort_inflight_for_import(struct obd_import *imp)
-{
-        struct list_head *tmp, *n;
-
-        /* Make sure that no new requests get processed for this import.
-         * ptlrpc_queue_wait must (and does) hold c_lock while testing this
-         * flags and then putting requests on sending_head or delayed_head.
-         */
-        spin_lock(&imp->imp_connection->c_lock);
-        imp->imp_flags |= IMP_INVALID;
-        spin_unlock(&imp->imp_connection->c_lock);
-
-        list_for_each_safe(tmp, n, &imp->imp_request_list) {
-                struct ptlrpc_request *req =
-                        list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                if (req->rq_flags & PTL_RPC_FL_REPLIED) {
-                        /* no need to replay, just discard */
-                        DEBUG_REQ(D_ERROR, req, "uncommitted");
-                        ptlrpc_req_finished(req);
-                } else {
-                        DEBUG_REQ(D_ERROR, req, "inflight");
-                        req->rq_flags |= PTL_RPC_FL_ERR;
-                        wake_up(&req->rq_wait_for_rep);
-                }
-        }
-
-        list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) {
-                struct ptlrpc_request *req =
-                        list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                if (req->rq_import != imp)
-                        continue;
-
-                DEBUG_REQ(D_ERROR, req, "aborting waiting req");
-                req->rq_flags |= PTL_RPC_FL_ERR;
-                wake_up(&req->rq_wait_for_rep);
-        }
-}
-
-static void set_osc_active(struct obd_import *imp, int active)
-{
-        struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
-
-        if (notify_obd == NULL)
-                return;
-
-        /* How gross is _this_? */
-        if (!list_empty(&notify_obd->obd_exports)) {
-                int rc;
-                struct lustre_handle fakeconn;
-                struct obd_ioctl_data ioc_data;
-                struct obd_export *exp =
-                        list_entry(notify_obd->obd_exports.next,
-                                   struct obd_export, exp_obd_chain);
-
-                fakeconn.addr = (__u64)(unsigned long)exp;
-                fakeconn.cookie = exp->exp_cookie;
-                ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
-                ioc_data.ioc_offset = active;
-                rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
-                                   sizeof ioc_data, &ioc_data, NULL);
-                if (rc)
-                        CERROR("disabling %s on LOV %p/%s: %d\n",
-                               imp->imp_obd->obd_uuid, notify_obd,
-                               notify_obd->obd_uuid, rc);
-        } else {
-                CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
-                       "%p\n", notify_obd, notify_obd->obd_uuid,
-                       imp->imp_obd->obd_uuid);
-        }
-}
-
-static void prepare_osc(struct obd_import *imp)
-{
-        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
-
-        CDEBUG(D_HA, "invalidating all locks for OST imp %p (to %s):\n",
-               imp, imp->imp_connection->c_remote_uuid);
-        ldlm_namespace_dump(ns);
-        ldlm_namespace_cleanup(ns, 1 /* no network ops */);
-
-        abort_inflight_for_import(imp);
-
-        set_osc_active(imp, 0 /* inactive */);
-}
-
-static void prepare_mdc(struct obd_import *imp)
-{
-        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
-        ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
-}
-
-static int ll_prepare_recovery(struct ptlrpc_connection *conn)
-{
-        struct list_head *tmp;
-
-        list_for_each(tmp, &conn->c_imports) {
-                struct obd_import *imp = list_entry(tmp, struct obd_import,
-                                                    imp_chain);
-
-                if (imp->imp_obd->obd_type->typ_ops->o_brw)
-                        prepare_osc(imp);
-                else
-                        prepare_mdc(imp);
-        }
-
-        return ptlrpc_run_recovery_upcall(conn);
-}
-
-static void reconnect_osc(struct obd_import *imp)
-{
-        int rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
-        if (rc == 0)
-                set_osc_active(imp, 1 /* active */);
-        else
-                CDEBUG(D_HA, "reconnection failed, not reactivating OSC %s\n",
-                       imp->imp_obd->obd_uuid);
-}
-
-static void reconnect_mdc(struct obd_import *imp)
-{
-        int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
-        if (!rc)
-                ptlrpc_replay(imp, 0 /* all reqs */);
-        else if (rc == EALREADY)
-                ptlrpc_replay(imp, 1 /* only unreplied reqs */);
-}
-
-static int ll_reconnect(struct ptlrpc_connection *conn)
-{
-        struct list_head *tmp;
-
-        ENTRY;
-        list_for_each(tmp, &conn->c_imports) {
-                struct obd_import *imp = list_entry(tmp, struct obd_import,
-                                                    imp_chain);
-                if (imp->imp_obd->obd_type->typ_ops->o_brw) {
-                        reconnect_osc(imp);
-                } else {
-                        reconnect_mdc(imp);
-                }
-        }
-
-        conn->c_level = LUSTRE_CONN_FULL;
-        RETURN(0);
-}
-
 int ll_recover(struct recovd_data *rd, int phase)
 {
         struct ptlrpc_connection *conn = class_rd2conn(rd);
+        struct list_head *tmp;
 
         LASSERT(conn);
         ENTRY;
 
         switch (phase) {
             case PTLRPC_RECOVD_PHASE_PREPARE:
-                RETURN(ll_prepare_recovery(conn));
             case PTLRPC_RECOVD_PHASE_RECOVER:
-                RETURN(ll_reconnect(conn));
+                list_for_each(tmp, &conn->c_imports) {
+                        struct obd_import *imp = 
+                                list_entry(tmp, struct obd_import, imp_chain);
+
+                        if (phase == PTLRPC_RECOVD_PHASE_PREPARE) {
+                                spin_lock(&imp->imp_lock);
+                                imp->imp_level = LUSTRE_CONN_RECOVD;
+                                spin_unlock(&imp->imp_lock);
+                        }
+                        imp->imp_recover(imp, phase);
+                }
+                
+                if (phase == PTLRPC_RECOVD_PHASE_PREPARE)
+                        RETURN(ptlrpc_run_recovery_upcall(conn));
+                RETURN(0);
+                        
             case PTLRPC_RECOVD_PHASE_FAILURE:
                 RETURN(ll_retry_recovery(conn));
         }
index cb7136c..cb3ae90 100644 (file)
@@ -152,9 +152,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 GOTO(out_free, sb = NULL);
         }
 
-#warning Mike: is this the right place to raise the connection level?
         mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
-        mdc_conn->c_level = LUSTRE_CONN_FULL;
         list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
 
         obd = class_uuid2obd(osc);
index 30b582c..cd6544a 100644 (file)
@@ -153,9 +153,7 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent)
                 GOTO(out_free, sb = NULL);
         }
 
-#warning Peter: is this the right place to raise the connection level?
         mdc_conn = sbi2mdc(sbi)->cl_import.imp_connection;
-        mdc_conn->c_level = LUSTRE_CONN_FULL;
         list_add(&mdc_conn->c_sb_chain, &sbi->ll_conn_chain);
 
         obd = class_uuid2obd(osc);
index a9a5d9a..5058ef0 100644 (file)
@@ -619,21 +619,73 @@ out:
 
         return rc;
 }
-int mdc_attach(struct obd_device *dev, obd_count len, void *data)
+
+static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
 {
         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
 }
 
-int mdc_detach(struct obd_device *dev)
+static int mdc_detach(struct obd_device *dev)
 {
         return lprocfs_dereg_obd(dev);
 }
+
+static int mdc_recover(struct obd_import *imp, int phase)
+{
+        int rc;
+        ENTRY;
+
+        switch(phase) {
+            case PTLRPC_RECOVD_PHASE_PREPARE:
+                ldlm_cli_cancel_unused(imp->imp_obd->obd_namespace,
+                                       NULL, LDLM_FL_LOCAL_ONLY);
+                RETURN(0);
+            case PTLRPC_RECOVD_PHASE_RECOVER:
+                rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+                if (rc == EALREADY)
+                        RETURN(ptlrpc_replay(imp, 0));
+                if (rc)
+                        RETURN(rc);
+
+                rc = ptlrpc_replay(imp, 0 /* no last flag*/);
+                if (rc)
+                        RETURN(rc);
+
+                rc = ldlm_replay_locks(imp);
+                if (rc)
+                        RETURN(rc);
+
+                spin_lock(&imp->imp_lock);
+                imp->imp_level = LUSTRE_CONN_FULL;
+                spin_unlock(&imp->imp_lock);
+
+                ptlrpc_wake_delayed(imp);
+
+                rc = ptlrpc_resend(imp);
+                if (rc)
+                        RETURN(rc);
+
+                RETURN(0);
+            default:
+                RETURN(-EINVAL);
+        }
+}
+
+static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd,
+                       obd_uuid_t cluuid, struct recovd_obd *recovd,
+                       ptlrpc_recovery_cb_t recover)
+{
+        struct obd_import *imp = &obd->u.cli.cl_import;
+        imp->imp_recover = mdc_recover;
+        return client_obd_connect(conn, obd, cluuid, recovd, recover);
+}
+
 struct obd_ops mdc_obd_ops = {
         o_attach: mdc_attach,
         o_detach: mdc_detach,
         o_setup:   client_obd_setup,
         o_cleanup: client_obd_cleanup,
-        o_connect: client_obd_connect,
+        o_connect: mdc_connect,
         o_disconnect: client_obd_disconnect,
         o_statfs: mdc_statfs,
 };
index 43ae0ca..532ec14 100644 (file)
@@ -40,6 +40,7 @@
 #include <linux/kp30.h>
 #include <linux/lustre_mds.h> /* for mds_objid */
 #include <linux/obd_ost.h>
+#include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
 #include <linux/ctype.h>
 #include <linux/init.h>
 #include <linux/lustre_ha.h>
 extern struct lprocfs_vars status_var_nm_1[];
 extern struct lprocfs_vars status_class_var[];
 
-int osc_attach(struct obd_device *dev, obd_count len, void *data)
+static int osc_attach(struct obd_device *dev, obd_count len, void *data)
 {
         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
 }
 
-int osc_detach(struct obd_device *dev)
+static int osc_detach(struct obd_device *dev)
 {
         return lprocfs_dereg_obd(dev);
 }
@@ -878,12 +879,111 @@ out:
         return err;
 }
 
+static void set_osc_active(struct obd_import *imp, int active)
+{
+        struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
+
+        if (notify_obd == NULL)
+                return;
+
+        /* How gross is _this_? */
+        if (!list_empty(&notify_obd->obd_exports)) {
+                int rc;
+                struct lustre_handle fakeconn;
+                struct obd_ioctl_data ioc_data;
+                struct obd_export *exp =
+                        list_entry(notify_obd->obd_exports.next,
+                                   struct obd_export, exp_obd_chain);
+
+                fakeconn.addr = (__u64)(unsigned long)exp;
+                fakeconn.cookie = exp->exp_cookie;
+                ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
+                ioc_data.ioc_offset = active;
+                rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
+                                   sizeof ioc_data, &ioc_data, NULL);
+                if (rc)
+                        CERROR("disabling %s on LOV %p/%s: %d\n",
+                               imp->imp_obd->obd_uuid, notify_obd,
+                               notify_obd->obd_uuid, rc);
+        } else {
+                CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
+                       "%p\n", notify_obd, notify_obd->obd_uuid,
+                       imp->imp_obd->obd_uuid);
+        }
+}
+
+
+/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
+static void abort_inflight_for_import(struct obd_import *imp)
+{
+        struct list_head *tmp, *n;
+
+        /* Make sure that no new requests get processed for this import.
+         * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
+         * flag and then putting requests on sending_list or delayed_list.
+         */
+        spin_lock(&imp->imp_lock);
+        imp->imp_flags |= IMP_INVALID;
+        spin_unlock(&imp->imp_lock);
+
+        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_list);
+
+                DEBUG_REQ(D_HA, req, "inflight");
+                req->rq_flags |= PTL_RPC_FL_ERR;
+                wake_up(&req->rq_wait_for_rep);
+        }
+
+        list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_list);
+
+                DEBUG_REQ(D_HA, req, "aborting waiting req");
+                req->rq_flags |= PTL_RPC_FL_ERR;
+                wake_up(&req->rq_wait_for_rep);
+        }
+}
+
+static int osc_recover(struct obd_import *imp, int phase)
+{
+        int rc;
+        ENTRY;
+
+        switch(phase) {
+            case PTLRPC_RECOVD_PHASE_PREPARE: {
+                struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+                ldlm_namespace_cleanup(ns, 1 /* no network ops */);
+                abort_inflight_for_import(imp);
+                set_osc_active(imp, 0 /* inactive */);
+                RETURN(0);
+            }
+            case PTLRPC_RECOVD_PHASE_RECOVER:
+                rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
+                if (rc)
+                        RETURN(rc);
+                set_osc_active(imp, 1 /* active */);
+                RETURN(0);
+            default:
+                RETURN(-EINVAL);
+        }
+}
+
+static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
+                       obd_uuid_t cluuid, struct recovd_obd *recovd,
+                       ptlrpc_recovery_cb_t recover)
+{
+        struct obd_import *imp = &obd->u.cli.cl_import;
+        imp->imp_recover = osc_recover;
+        return client_obd_connect(conn, obd, cluuid, recovd, recover);
+}
+
 struct obd_ops osc_obd_ops = {
         o_attach:       osc_attach,
         o_detach:       osc_detach,
         o_setup:        client_obd_setup,
         o_cleanup:      client_obd_cleanup,
-        o_connect:      client_obd_connect,
+        o_connect:      osc_connect,
         o_disconnect:   client_obd_disconnect,
         o_statfs:       osc_statfs,
         o_packmd:       osc_packmd,
index 8f4aceb..b60ec1f 100644 (file)
@@ -192,7 +192,6 @@ static int ll_sync_brw_timeout(void *data)
                 CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
                        desc->bd_page_count, desc->bd_connection->c_remote_uuid,
                        desc->bd_portal, desc->bd_connection);
-                desc->bd_connection->c_level = LUSTRE_CONN_RECOVD;
 
                 /* This one will "never" arrive, don't wait for it. */
                 if (atomic_dec_and_test(&set->brw_refcount))
@@ -284,20 +283,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         request->rq_connection = ptlrpc_connection_addref(conn);
 
         INIT_LIST_HEAD(&request->rq_list);
-        /*
-         * This will be reduced once when the sender is finished (waiting for
-         * reply, f.e.), and once when the request has been committed and is
-         * removed from the to-be-committed list.
-         *
-         * Also, the refcount will be increased in ptl_send_rpc immediately
-         * before we hand it off to portals, and there will be a corresponding
-         * decrease in request_out_cb (which is called to indicate that portals
-         * is finished with the request, and it can be safely freed).
-         *
-         * (Except in the DLM server case, where it will be dropped twice
-         * by the sender, and then the last time by request_out_callback.)
-         */
-        atomic_set(&request->rq_refcount, 2);
+        atomic_set(&request->rq_refcount, 1);
 
         spin_lock(&imp->imp_lock);
         request->rq_xid = HTON__u32(++imp->imp_last_xid);
@@ -382,29 +368,9 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
+        ENTRY;
         if (req->rq_repmsg != NULL) {
-                struct obd_import *imp = req->rq_import;
-                struct ptlrpc_connection *conn = imp->imp_connection;
-                ENTRY;
-                if (req->rq_level > conn->c_level) {
-                        DEBUG_REQ(D_HA, req,
-                               "recovery started, ignoring (%d > %d)",
-                               req->rq_level, conn->c_level);
-                        req->rq_repmsg = NULL;
-                        GOTO(out, rc = 0);
-                }
                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
-                spin_lock(&imp->imp_lock);
-                if (req->rq_transno > imp->imp_max_transno) {
-                        imp->imp_max_transno = req->rq_transno;
-                } else if (req->rq_transno != 0) {
-                        if (conn->c_level == LUSTRE_CONN_FULL) {
-                                CERROR("got transno "LPD64" after "
-                                       LPD64": recovery may not work\n",
-                                       req->rq_transno, imp->imp_max_transno);
-                        }
-                }
-                spin_unlock(&imp->imp_lock);
                 req->rq_flags |= PTL_RPC_FL_REPLIED;
                 GOTO(out, rc = 1);
         }
@@ -474,7 +440,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
 
-        list_for_each_safe(tmp, saved, &imp->imp_request_list) {
+        list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
@@ -482,20 +448,6 @@ void ptlrpc_free_committed(struct obd_import *imp)
                         continue;
                 }
 
-                /* If neither replied-to nor restarted, keep it. */
-                if (!(req->rq_flags &
-                      (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) {
-                        DEBUG_REQ(D_HA, req, "keeping (in-flight)");
-                        continue;
-                }
-
-                /* This needs to match the commit test in ptlrpc_queue_wait() */
-                if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
-                    req->rq_transno == 0) {
-                        DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)");
-                        continue;
-                }
-
                 /* not yet committed */
                 if (req->rq_transno > imp->imp_peer_committed_transno)
                         break;
@@ -519,7 +471,7 @@ void ptlrpc_cleanup_client(struct obd_import *imp)
         LASSERT(conn);
 
         spin_lock(&imp->imp_lock);
-        list_for_each_safe(tmp, saved, &imp->imp_request_list) {
+        list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 /* XXX we should make sure that nobody's sleeping on these! */
@@ -599,14 +551,15 @@ static int expired_request(void *data)
                 RETURN(1);
 
         req->rq_timeout = 0;
-        req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
         recovd_conn_fail(req->rq_import->imp_connection);
 
+#if 0
         /* If this request is for recovery or other primordial tasks,
          * don't go back to sleep.
          */
         if (req->rq_level < LUSTRE_CONN_FULL)
                 RETURN(1);
+#endif
         RETURN(0);
 }
 
@@ -618,21 +571,15 @@ static int interrupted_request(void *data)
         RETURN(1); /* ignored, as of this writing */
 }
 
-/* If we're being torn down by umount -f, or the import has been
- * invalidated (such as by an OST failure), the request must fail with
- * -EIO.
+/* If the import has been invalidated (such as by an OST failure), the
+ * request must fail with -EIO.
  *
- * Must be called with conn->c_lock held, will drop it if it returns -EIO.
- *
- * XXX this should just be testing the import, and umount_begin shouldn't touch
- * XXX the connection.
+ * Must be called with imp_lock held, will drop it if it returns -EIO.
  */
-#define EIO_IF_INVALID(conn, req)                                             \
-if ((conn->c_flags & CONN_INVALID) ||                                         \
-    (req->rq_import->imp_flags & IMP_INVALID)) {                              \
-        DEBUG_REQ(D_ERROR, req, "%s_INVALID:",                                \
-                  (conn->c_flags & CONN_INVALID) ? "CONN" : "IMP");           \
-        spin_unlock(&conn->c_lock);                                           \
+#define EIO_IF_INVALID(req)                                                   \
+if (req->rq_import->imp_flags & IMP_INVALID) {                                \
+        DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");                              \
+        spin_unlock(&imp->imp_lock);                                          \
         RETURN(-EIO);                                                         \
 }
 
@@ -645,30 +592,30 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
-        req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */
+
+        /* for distributed debugging */
+        req->rq_reqmsg->status = HTON__u32(current->pid); 
         CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
                NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
                conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
 
-
-        /* XXX probably both an import and connection level are needed */
-        if (req->rq_level > conn->c_level) {
-                spin_lock(&conn->c_lock);
-                EIO_IF_INVALID(conn, req);
+        if (req->rq_level > imp->imp_level) {
+                spin_lock(&imp->imp_lock);
+                EIO_IF_INVALID(req);
                 list_del(&req->rq_list);
-                list_add_tail(&req->rq_list, &conn->c_delayed_head);
-                spin_unlock(&conn->c_lock);
+                list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+                spin_unlock(&imp->imp_lock);
 
-                DEBUG_REQ(D_HA, req, "waiting for recovery: (%d < %d)",
-                          req->rq_level, conn->c_level);
+                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)",
+                          current->comm, req->rq_level, imp->imp_level);
                 lwi = LWI_INTR(NULL, NULL);
                 rc = l_wait_event(req->rq_wait_for_rep,
-                                  (req->rq_level <= conn->c_level) ||
+                                  (req->rq_level <= imp->imp_level) ||
                                   (req->rq_flags & PTL_RPC_FL_ERR), &lwi);
 
-                spin_lock(&conn->c_lock);
+                spin_lock(&imp->imp_lock);
                 list_del_init(&req->rq_list);
-                spin_unlock(&conn->c_lock);
+                spin_unlock(&imp->imp_lock);
 
                 if (req->rq_flags & PTL_RPC_FL_ERR)
                         RETURN(-EIO);
@@ -680,12 +627,12 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         }
  resend:
         req->rq_timeout = obd_timeout;
-        spin_lock(&conn->c_lock);
-        EIO_IF_INVALID(conn, req);
+        spin_lock(&imp->imp_lock);
+        EIO_IF_INVALID(req);
 
-        list_del(&req->rq_list);
-        list_add_tail(&req->rq_list, &imp->imp_request_list);
-        spin_unlock(&conn->c_lock);
+        LASSERT(list_empty(&req->rq_list));
+        list_add_tail(&req->rq_list, &imp->imp_sending_list);
+        spin_unlock(&imp->imp_lock);
         rc = ptl_send_rpc(req);
         if (rc) {
                 CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
@@ -701,6 +648,10 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         DEBUG_REQ(D_NET, req, "-- done sleeping");
 
+        spin_lock(&imp->imp_lock);
+        list_del_init(&req->rq_list);
+        spin_unlock(&imp->imp_lock);
+
         if (req->rq_flags & PTL_RPC_FL_ERR) {
                 ptlrpc_abort(req);
                 GOTO(out, rc = -EIO);
@@ -714,7 +665,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 goto resend;
         }
 
-        // up(&cli->cli_rpc_sem);
         if (req->rq_flags & PTL_RPC_FL_INTR) {
                 if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
                         LBUG(); /* should only be interrupted if we timed out */
@@ -749,36 +699,33 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
 
-        spin_lock(&conn->c_lock);
 
-        /* Requests that aren't from replayable imports, or which don't have
-         * transno information, can be "committed" early.
-         *
-         * But don't commit anything that's kept indefinitely for replay (has
-         * the PTL_RPC_FL_REPLAY flag set), such as open requests.
-         *
-         * This needs to match the commit test in ptlrpc_free_committed().
-         */
-        if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
-            (req->rq_repmsg->transno == 0 &&
-             (req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) {
-                /* This import doesn't support replay, so we can just "commit"
-                 * this request now.
-                 */
-                DEBUG_REQ(D_HA, req, "not replayable, committing:");
-                list_del_init(&req->rq_list);
-                __ptlrpc_req_finished(req, 1);
-        }
         if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
+                spin_lock(&imp->imp_lock);
+                if (req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0) {
+                        /* Balanced in ptlrpc_free_committed, usually. */
+                        atomic_inc(&req->rq_refcount);
+                        list_add_tail(&req->rq_list, &imp->imp_replay_list);
+                }
+
+                if (req->rq_transno > imp->imp_max_transno) {
+                        imp->imp_max_transno = req->rq_transno;
+                } else if (req->rq_transno != 0 &&
+                           imp->imp_level == LUSTRE_CONN_FULL) {
+                        CERROR("got transno "LPD64" after "LPD64": recovery "
+                               "may not work\n", req->rq_transno,
+                               imp->imp_max_transno);
+                }
+
                 /* Replay-enabled imports return commit-status information. */
                 imp->imp_peer_last_xid = req->rq_repmsg->last_xid;
-                imp->imp_peer_committed_transno = 
+                imp->imp_peer_committed_transno =
                         req->rq_repmsg->last_committed;
+                spin_unlock(&imp->imp_lock);
                 ptlrpc_free_committed(imp);
         }
 
         rc = ptlrpc_check_status(req);
-        spin_unlock(&conn->c_lock);
 
         EXIT;
  out:
index 2458b08..2182591 100644 (file)
@@ -79,7 +79,6 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer,
         if (c == NULL)
                 GOTO(out, c);
 
-        c->c_level = LUSTRE_CONN_NEW;
         c->c_generation = 1;
         c->c_epoch = 1;
         c->c_bootcount = 0;
index 9d955e6..9dddf78 100644 (file)
@@ -110,8 +110,6 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
         int rc;
 
         ENTRY;
-        conn->c_level = LUSTRE_CONN_RECOVD;
-
         argv[0] = obd_recovery_upcall;
         argv[1] = conn->c_remote_uuid;
         argv[2] = NULL;
@@ -138,159 +136,140 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
         RETURN(0);
 }
 
-#define REPLAY_COMMITTED     0 /* Fully processed (commit + reply). */
-#define REPLAY_REPLAY        1 /* Forced-replay (e.g. open). */
-#define REPLAY_RESEND        2 /* Resend required. */
-#define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */
-#define REPLAY_RESTART       4 /* Have to restart the call, sorry! */
-
-static int replay_state(struct ptlrpc_request *req, __u64 committed)
+int ptlrpc_replay(struct obd_import *imp, int send_last_flag)
 {
-        /* This request must always be replayed. */
-        if (req->rq_flags & PTL_RPC_FL_REPLAY)
-                return REPLAY_REPLAY;
+        int rc = 0;
+        struct list_head *tmp, *pos;
+        struct ptlrpc_request *req;
+        __u64 committed = imp->imp_peer_committed_transno;
+        ENTRY;
 
-        /* Uncommitted request */
-        if (req->rq_transno > committed) {
-                if (req->rq_flags & PTL_RPC_FL_REPLIED) {
-                        /* Saw reply, so resend and ignore new reply. */
-                        return REPLAY_RESEND_IGNORE;
-                }
+        /* It might have committed some after we last spoke, so make sure we
+         * get rid of them now.
+         */
+        ptlrpc_free_committed(imp);
 
-                /* Didn't see reply either, so resend. */
-                return REPLAY_RESEND;
+        spin_lock(&imp->imp_lock);
+
+        CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
+               imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
+
+        list_for_each(tmp, &imp->imp_replay_list) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                DEBUG_REQ(D_HA, req, "RETAINED: ");
         }
 
-        /* This request has been committed and we saw the reply.  Goodbye! */
-        if (req->rq_flags & PTL_RPC_FL_REPLIED)
-                return REPLAY_COMMITTED;
+        list_for_each_safe(tmp, pos, &imp->imp_replay_list) { 
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+
+                if (req->rq_transno == imp->imp_max_transno &&
+                    send_last_flag) {
+                        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+                        DEBUG_REQ(D_HA, req, "LAST_REPLAY:");
+                } else {
+                        DEBUG_REQ(D_HA, req, "REPLAY:");
+                }
+
+                rc = ptlrpc_replay_req(req);
+                req->rq_reqmsg->flags &= ~MSG_LAST_REPLAY;
 
-        /* Request committed, but we didn't see the reply: have to restart. */
-        return REPLAY_RESTART;
+                if (rc) {
+                        CERROR("recovery replay error %d for req %Ld\n",
+                               rc, req->rq_xid);
+                        GOTO(out, rc);
+                }
+        }
+
+ out:
+        spin_unlock(&imp->imp_lock);
+        return rc;
 }
 
-static char *replay_state2str(int state) {
-        static char *state_strings[] = {
-                "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART",
-        };
-        static char *unknown_state = "UNKNOWN";
+#define NO_RESEND     0 /* No action required. */
+#define RESEND        1 /* Resend required. */
+#define RESEND_IGNORE 2 /* Resend, ignore the reply (already saw it). */
+#define RESTART       3 /* Have to restart the call, sorry! */
+
+static int resend_type(struct ptlrpc_request *req, __u64 committed)
+{
+        if (req->rq_transno < committed) {
+                if (req->rq_flags & PTL_RPC_FL_REPLIED) {
+                        /* Saw the reply and it was committed, no biggie. */
+                        DEBUG_REQ(D_HA, req, "NO_RESEND");
+                        return NO_RESEND;
+                }
+                /* Request committed, but no reply: have to restart. */
+                return RESTART;
+        }
 
-        if (state < 0 || 
-            state > (sizeof(state_strings) / sizeof(state_strings[0]))) {
-                return unknown_state;
+        if (req->rq_flags & PTL_RPC_FL_REPLIED) {
+                /* Saw reply, so resend and ignore new reply. */
+                return RESEND_IGNORE;
         }
 
-        return state_strings[state];
+        /* Didn't see reply either, so resend. */
+        return RESEND;
+
 }
 
-int ptlrpc_replay(struct obd_import *imp, int unreplied_only)
+int ptlrpc_resend(struct obd_import *imp)
 {
-        int rc = 0, state;
+        int rc = 0, type;
         struct list_head *tmp, *pos;
         struct ptlrpc_request *req;
-        struct ptlrpc_connection *conn = imp->imp_connection;
         __u64 committed = imp->imp_peer_committed_transno;
+
         ENTRY;
 
         spin_lock(&imp->imp_lock);
-
-        CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
-               imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
-
-        list_for_each(tmp, &imp->imp_request_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                state = replay_state(req, committed);
-                DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state));
-        }
-
-        list_for_each(tmp, &conn->c_delayed_head) {
+        list_for_each(tmp, &imp->imp_sending_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                state = replay_state(req, committed);
-                DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state));
+                DEBUG_REQ(D_HA, req, "SENDING: ");
         }
 
-        list_for_each_safe(tmp, pos, &imp->imp_request_list) { 
+        list_for_each_safe(tmp, pos, &imp->imp_sending_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                if (unreplied_only) {
-                        if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
-                                DEBUG_REQ(D_HA, req, "UNREPLIED:");
-                                ptlrpc_restart_req(req);
-                        }
-                        continue;
-                }
-
-                state = replay_state(req, committed);
-
-                if (req->rq_transno == imp->imp_max_transno) {
-                        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
-                        DEBUG_REQ(D_HA, req, "last for replay");
-                        LASSERT(state != REPLAY_COMMITTED);
-                }
-
-                switch (state) {
-                    case REPLAY_REPLAY:
-                        DEBUG_REQ(D_HA, req, "REPLAY:");
-                        rc = ptlrpc_replay_req(req);
-#if 0
-#error We should not hold a spinlock over such a lengthy operation.
-#error If necessary, drop spinlock, do operation, re-get spinlock, restart loop.
-#error If we need to avoid re-processint items, then delete them from the list
-#error as they are replayed and re-add at the tail of this list, so the next
-#error item to process will always be at the head of the list.
-#endif
-                        if (rc) {
-                                CERROR("recovery replay error %d for req %Ld\n",
-                                       rc, req->rq_xid);
-                                GOTO(out, rc);
-                        }
+                
+                switch(resend_type(req, committed)) {
+                    case NO_RESEND:
                         break;
 
-                    case REPLAY_COMMITTED:
-                        DEBUG_REQ(D_ERROR, req, "COMMITTED:");
-                        /* XXX commit now? */
+                    case RESTART:
+                        DEBUG_REQ(D_HA, req, "RESTART:");
+                        ptlrpc_restart_req(req);
                         break;
 
-                    case REPLAY_RESEND_IGNORE:
+                    case RESEND_IGNORE:
                         DEBUG_REQ(D_HA, req, "RESEND_IGNORE:");
-                        rc = ptlrpc_replay_req(req); 
+                        rc = ptlrpc_replay_req(req);
                         if (rc) {
-                                CERROR("request resend error %d for req %Ld\n",
-                                       rc, req->rq_xid); 
-                                GOTO(out, rc);
+                                DEBUG_REQ(D_ERROR, req, "error %d resending:",
+                                          rc);
+                                ptlrpc_restart_req(req); /* might as well */
                         }
                         break;
 
-                    case REPLAY_RESTART:
-                        DEBUG_REQ(D_HA, req, "RESTART:");
-                        ptlrpc_restart_req(req);
-                        break;
-
-                    case REPLAY_RESEND:
+                    case RESEND:
                         DEBUG_REQ(D_HA, req, "RESEND:");
                         ptlrpc_resend_req(req);
                         break;
-
+                        
                     default:
                         LBUG();
                 }
-
         }
+}
 
-        conn->c_level = LUSTRE_CONN_FULL;
-        recovd_conn_fixed(conn);
+void ptlrpc_wake_delayed(struct obd_import *imp)
+{
+        struct list_head *tmp, *pos;
+        struct ptlrpc_request *req;
 
-        CERROR("recovery complete on conn %p(%s), waking delayed reqs\n",
-               conn, conn->c_remote_uuid);
-        /* Finally, continue processing requests that blocked for recovery. */
-        list_for_each_safe(tmp, pos, &conn->c_delayed_head) { 
+        spin_lock(&imp->imp_lock);
+        list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                DEBUG_REQ(D_HA, req, "WAKING: ");
-                ptlrpc_continue_req(req);
+                DEBUG_REQ(D_HA, req, "waking:");
+                wake_up(&req->rq_wait_for_rep);
         }
-
-        EXIT;
- out:
-        spin_unlock(&conn->c_lock);
-        return rc;
+        spin_unlock(&imp->imp_lock);
 }
index 7263ac0..c721993 100644 (file)
@@ -272,6 +272,8 @@ EXPORT_SYMBOL(lustre_msg_buf);
 EXPORT_SYMBOL(ptlrpc_run_recovery_upcall);
 EXPORT_SYMBOL(ptlrpc_reconnect_import);
 EXPORT_SYMBOL(ptlrpc_replay);
+EXPORT_SYMBOL(ptlrpc_resend);
+EXPORT_SYMBOL(ptlrpc_wake_delayed);
 
 MODULE_AUTHOR("Cluster File Systems, Inc <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Request Processor v1.0");