Whamcloud - gitweb
b=6063
authoralex <alex>
Sat, 7 May 2005 19:01:31 +0000 (19:01 +0000)
committeralex <alex>
Sat, 7 May 2005 19:01:31 +0000 (19:01 +0000)
 - to avoid possible lock collision during replay, we should replay all
   request before any locks

14 files changed:
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/obdclass/genops.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/ost/ost_handler.c
lustre/ptlrpc/import.c

index e4a0bdb..ec36e72 100644 (file)
@@ -84,7 +84,8 @@ struct obd_export {
         /* ^ protects exp_outstanding_replies too */
         unsigned long             exp_flags;
         int                       exp_failed:1,
-                                  exp_replay_needed:1,
+                                  exp_req_replay_needed:1,
+                                  exp_lock_replay_needed:1,
                                   exp_libclient:1, /* liblustre client? */
                                   exp_sync:1;
         union {
index 8a0926c..1125ccf 100644 (file)
@@ -174,6 +174,8 @@ struct lustre_msg {
 #define MSG_LAST_REPLAY        1
 #define MSG_RESENT             2
 #define MSG_REPLAY             4
+#define MSG_REQ_REPLAY_DONE    8
+#define MSG_LOCK_REPLAY_DONE  16
 
 static inline int lustre_msg_get_flags(struct lustre_msg *msg)
 {
index ac5d138..0b581eb 100644 (file)
@@ -648,16 +648,23 @@ struct obd_device {
         spinlock_t                       obd_processing_task_lock;
         __u64                            obd_next_recovery_transno;
         int                              obd_replayed_requests;
+        int                              obd_replayed_locks;
         int                              obd_requests_queued_for_recovery;
         wait_queue_head_t                obd_next_transno_waitq;
         struct list_head                 obd_uncommitted_replies;
         spinlock_t                       obd_uncommitted_replies_lock;
         struct timer_list                obd_recovery_timer;
-        struct list_head                 obd_recovery_queue;
-        struct list_head                 obd_delayed_reply_queue;
         time_t                           obd_recovery_start;
         time_t                           obd_recovery_end;
 
+        atomic_t                         obd_req_replay_clients;
+        atomic_t                         obd_lock_replay_clients;
+
+        struct list_head                 obd_req_replay_queue;
+        struct list_head                 obd_lock_replay_queue;
+        struct list_head                 obd_final_req_queue;
+        int                              obd_recovery_stage;
+
         union {
                 struct filter_obd        filter;
                 struct mds_obd           mds;
index 9f41ed2..2da6730 100644 (file)
@@ -154,8 +154,10 @@ void class_put_type(struct obd_type *type);
 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
                   struct obd_uuid *cluuid);
 int class_disconnect(struct obd_export *exp, unsigned long flags);
-void class_disconnect_exports(struct obd_device *obddev, unsigned long flags);
-void class_disconnect_stale_exports(struct obd_device *obddev, unsigned long flags);
+void class_disconnect_exports(struct obd_device *, unsigned long);
+int class_disconnect_stale_exports(struct obd_device *,
+                                   int (*test_export)(struct obd_export *), 
+                                   unsigned long);
 
 /* generic operations shared by various OBD types */
 int class_multi_setup(struct obd_device *obddev, uint32_t len, void *data);
index a4ea476..9244c92 100644 (file)
@@ -898,12 +898,8 @@ static void target_release_saved_req(struct ptlrpc_request *req)
 
 static void target_finish_recovery(struct obd_device *obd)
 {
-        struct list_head *tmp, *n;
         int rc;
 
-        CWARN("%s: sending delayed replies to recovered clients\n",
-              obd->obd_name);
-
         ldlm_reprocess_all_ns(obd->obd_namespace);
 
         /* when recovery finished, cleanup orphans on mds and ost */
@@ -916,26 +912,40 @@ static void target_finish_recovery(struct obd_device *obd)
                         CERROR("postrecov failed %d\n", rc);
         }
 
+        obd->obd_recovery_end = LTIME_S(CURRENT_TIME);
+        return;
+}
 
-        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
-                struct ptlrpc_request *req;
+static void abort_req_replay_queue(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        struct list_head *tmp, *n;
+        int rc;
+
+        list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                DEBUG_REQ(D_ERROR, req, "delayed:");
-                ptlrpc_reply(req);
+                DEBUG_REQ(D_ERROR, req, "aborted:");
+                req->rq_status = -ENOTCONN;
+                req->rq_type = PTL_RPC_MSG_ERR;
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0) {
+                        ptlrpc_reply(req);
+                } else {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "packing failed for abort-reply; skipping");
+                }
                 target_release_saved_req(req);
         }
-        obd->obd_recovery_end = LTIME_S(CURRENT_TIME);
-        return;
 }
 
-static void abort_recovery_queue(struct obd_device *obd)
+static void abort_lock_replay_queue(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
         struct list_head *tmp, *n;
         int rc;
 
-        list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
                 DEBUG_REQ(D_ERROR, req, "aborted:");
@@ -976,14 +986,19 @@ void target_cleanup_recovery(struct obd_device *obd)
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                LASSERT (req->rq_reply_state);
-                lustre_free_reply_state(req->rq_reply_state);
+                LASSERT (req->rq_reply_state == 0);
                 target_release_saved_req(req);
         }
-        list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                list_del(&req->rq_list);
+                LASSERT (req->rq_reply_state == 0);
+                target_release_saved_req(req);
+        }
+        list_for_each_safe(tmp, n, &obd->obd_final_req_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
                 LASSERT (req->rq_reply_state == 0);
@@ -991,6 +1006,7 @@ void target_cleanup_recovery(struct obd_device *obd)
         }
 }
 
+#if 0
 static void target_abort_recovery(void *data)
 {
         struct obd_device *obd = data;
@@ -1006,11 +1022,11 @@ static void target_abort_recovery(void *data)
         target_finish_recovery(obd);
         ptlrpc_run_recovery_over_upcall(obd);
 }
+#endif
 
 static void target_recovery_expired(unsigned long castmeharder)
 {
         struct obd_device *obd = (struct obd_device *)castmeharder;
-        CERROR("recovery timed out, aborting\n");
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_recovering)
                 obd->obd_abort_recovery = 1;
@@ -1066,8 +1082,8 @@ static int check_for_next_transno(struct obd_device *obd)
         __u64 next_transno, req_transno;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (!list_empty(&obd->obd_recovery_queue)) {
-                req = list_entry(obd->obd_recovery_queue.next,
+        if (!list_empty(&obd->obd_req_replay_queue)) {
+                req = list_entry(obd->obd_req_replay_queue.next,
                                  struct ptlrpc_request, rq_list);
                 req_transno = req->rq_reqmsg->transno;
         } else {
@@ -1076,7 +1092,7 @@ static int check_for_next_transno(struct obd_device *obd)
 
         max = obd->obd_max_recoverable_clients;
         connected = obd->obd_connected_clients;
-        completed = max - obd->obd_recoverable_clients;
+        completed = max - atomic_read(&obd->obd_req_replay_clients);
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
@@ -1086,7 +1102,7 @@ static int check_for_next_transno(struct obd_device *obd)
         if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
-        } else if (max == completed) {
+        } else if (atomic_read(&obd->obd_req_replay_clients) == 0) {
                 CDEBUG(D_HA, "waking for completed recovery\n");
                 wake_up = 1;
         } else if (req_transno == next_transno) {
@@ -1120,8 +1136,8 @@ target_next_replay_req(struct obd_device *obd)
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_abort_recovery) {
                 req = NULL;
-        } else if (!list_empty(&obd->obd_recovery_queue)) {
-                req = list_entry(obd->obd_recovery_queue.next,
+        } else if (!list_empty(&obd->obd_req_replay_queue)) {
+                req = list_entry(obd->obd_req_replay_queue.next,
                                  struct ptlrpc_request, rq_list);
                 list_del_init(&req->rq_list);
                 obd->obd_requests_queued_for_recovery--;
@@ -1132,11 +1148,90 @@ target_next_replay_req(struct obd_device *obd)
         return req;
 }
 
+static int check_for_next_lock(struct obd_device *obd)
+{
+        struct ptlrpc_request *req = NULL;
+        int wake_up = 0;
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!list_empty(&obd->obd_lock_replay_queue)) {
+                req = list_entry(obd->obd_lock_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                CDEBUG(D_HA, "waking for next lock\n");
+                wake_up = 1;
+        } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) {
+                CDEBUG(D_HA, "waking for completed lock replay\n");
+                wake_up = 1;
+        } else if (obd->obd_abort_recovery) {
+                CDEBUG(D_HA, "waking for aborted recovery\n");
+                wake_up = 1;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        
+        return wake_up;
+}
+
+static struct ptlrpc_request *
+target_next_replay_lock(struct obd_device *obd)
+{
+        struct l_wait_info lwi = { 0 };
+        struct ptlrpc_request *req;
+
+        CDEBUG(D_HA, "Waiting for lock\n");
+        l_wait_event(obd->obd_next_transno_waitq,
+                     check_for_next_lock(obd), &lwi);
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_abort_recovery) {
+                req = NULL;
+        } else if (!list_empty(&obd->obd_lock_replay_queue)) {
+                req = list_entry(obd->obd_lock_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+        } else {
+                req = NULL;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        return req;
+}
+
+static struct ptlrpc_request *
+target_next_final_ping(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!list_empty(&obd->obd_final_req_queue)) {
+                req = list_entry(obd->obd_final_req_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+        } else {
+                req = NULL;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        return req;
+}
+
+static int req_replay_done(struct obd_export *exp)
+{
+        if (exp->exp_req_replay_needed)
+                return 0;
+        return 1;
+}
+
+static int lock_replay_done(struct obd_export *exp)
+{
+        if (exp->exp_lock_replay_needed)
+                return 0;
+        return 1;
+}
+
 static int target_recovery_thread(void *arg)
 {
         struct obd_device *obd = arg;
         struct ptlrpc_request *req;
         struct target_recovery_data *trd = &obd->obd_recovery_data;
+        char peer_str[PTL_NALFMT_SIZE];
         unsigned long flags;
         ENTRY;
 
@@ -1154,40 +1249,85 @@ static int target_recovery_thread(void *arg)
         obd->obd_recovering = 1;
         complete(&trd->trd_starting);
 
-        while (obd->obd_recovering) {
+        /* The first stage: replay requests */
+        CWARN("1: request replay stage - %d clients\n",
+              atomic_read(&obd->obd_req_replay_clients));
+        while ((req = target_next_replay_req(obd))) {
                 LASSERT(trd->trd_processing_task == current->pid);
-                req = target_next_replay_req(obd);
-                if (req != NULL) {
-                        char peer_str[PTL_NALFMT_SIZE];
-                        DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s: ", 
-                                  req->rq_reqmsg->transno, 
-                                  ptlrpc_peernid2str(&req->rq_peer, peer_str));
-                        (void)trd->trd_recovery_handler(req);
-                        obd->obd_replayed_requests++;
-                        reset_recovery_timer(obd);
-                        /* bug 1580: decide how to properly sync() in recovery*/
-                        //mds_fsync_super(mds->mds_sb);
-                        ptlrpc_free_clone(req);
-                        spin_lock_bh(&obd->obd_processing_task_lock);
-                        obd->obd_next_recovery_transno++;
-                        spin_unlock_bh(&obd->obd_processing_task_lock);
-                } else {
-                        /* recovery is over */
-                        spin_lock_bh(&obd->obd_processing_task_lock);
-                        obd->obd_recovering = 0;
-                        target_cancel_recovery_timer(obd);
-                        if (obd->obd_abort_recovery) {
-                                obd->obd_abort_recovery = 0;
-                                spin_unlock_bh(&obd->obd_processing_task_lock);
-                                target_abort_recovery(obd); 
-                        } else {
-                                LASSERT(obd->obd_recoverable_clients == 0);
-                                spin_unlock_bh(&obd->obd_processing_task_lock);
-                                target_finish_recovery(obd);
-                        }
-                }
+                DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s: ", 
+                          req->rq_reqmsg->transno, 
+                          ptlrpc_peernid2str(&req->rq_peer, peer_str));
+                (void)trd->trd_recovery_handler(req);
+                obd->obd_replayed_requests++;
+                reset_recovery_timer(obd);
+                /* bug 1580: decide how to properly sync() in recovery*/
+                //mds_fsync_super(mds->mds_sb);
+                ptlrpc_free_clone(req);
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                obd->obd_next_recovery_transno++;
+                spin_unlock_bh(&obd->obd_processing_task_lock);
         }
 
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* If some clients haven't replayed requests in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CERROR("req replay timed out, aborting ...\n");
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, req_replay_done, 0);
+                atomic_sub(stale, &obd->obd_lock_replay_clients);
+                abort_req_replay_queue(obd);
+        }
+
+        /* The second stage: replay locks */
+        CWARN("2: lock replay stage - %d clients\n",
+              atomic_read(&obd->obd_lock_replay_clients));
+        while ((req = target_next_replay_lock(obd))) {
+                LASSERT(trd->trd_processing_task == current->pid);
+                DEBUG_REQ(D_HA, req, "processing lock from %s: ", 
+                          ptlrpc_peernid2str(&req->rq_peer, peer_str));
+                (void)trd->trd_recovery_handler(req);
+                reset_recovery_timer(obd);
+                ptlrpc_free_clone(req);
+                obd->obd_replayed_locks++;
+        }
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* If some clients haven't replayed requests in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CERROR("lock replay timed out, aborting ...\n");
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, lock_replay_done, 0);
+                abort_lock_replay_queue(obd);
+        }
+
+        /* We drop recoverying flag to forward all new requests
+         * to regular mds_handle() since now */
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        obd->obd_recovering = 0;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* The third stage: reply on final pings */
+        CWARN("3: final stage - process recovery completion pings\n");
+        while ((req = target_next_final_ping(obd))) {
+                LASSERT(trd->trd_processing_task == current->pid);
+                DEBUG_REQ(D_HA, req, "processing final ping from %s: ", 
+                          ptlrpc_peernid2str(&req->rq_peer, peer_str));
+                (void)trd->trd_recovery_handler(req);
+                ptlrpc_free_clone(req);
+        }
+        
+        CWARN("4: recovery completed - %d/%d reqs/locks replayed\n",
+              obd->obd_replayed_requests, obd->obd_replayed_locks);
+        target_finish_recovery(obd);
+
         trd->trd_processing_task = 0;
         complete(&trd->trd_finishing);
         return 0;
@@ -1227,6 +1367,43 @@ void target_stop_recovery_thread(struct obd_device *obd)
 }
 #endif
 
+int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+{
+        struct obd_export *exp = req->rq_export;
+        LASSERT(exp != NULL);
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+                /* client declares he's ready to replay locks */
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                if (exp->exp_req_replay_needed) {
+                        LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
+                        exp->exp_req_replay_needed = 0;
+                        atomic_dec(&obd->obd_req_replay_clients);
+                        if (atomic_read(&obd->obd_req_replay_clients) == 0) {
+                                CDEBUG(D_HA, "all clients have replayed reqs\n");
+                                wake_up(&obd->obd_next_transno_waitq);
+                        }
+                }
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
+                /* client declares he's ready to complete recovery 
+                 * so, we put the request on th final queue */
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                if (exp->exp_lock_replay_needed) {
+                        LASSERT(atomic_read(&obd->obd_lock_replay_clients) > 0);
+                        exp->exp_lock_replay_needed = 0;
+                        atomic_dec(&obd->obd_lock_replay_clients);
+                        if (atomic_read(&obd->obd_lock_replay_clients) == 0) {
+                                CDEBUG(D_HA, "all clients have replayed locks\n");
+                                wake_up(&obd->obd_next_transno_waitq);
+                        }
+                }
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+
+        return 0;
+}
+
 int target_queue_recovery_request(struct ptlrpc_request *req,
                                   struct obd_device *obd)
 {
@@ -1234,6 +1411,39 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         int inserted = 0;
         __u64 transno = req->rq_reqmsg->transno;
 
+        if (obd->obd_recovery_data.trd_processing_task == current->pid) {
+                /* Processing the queue right now, don't re-add. */
+                return 1;
+        }
+
+        target_process_req_flags(obd, req);
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
+                /* client declares he's ready to complete recovery 
+                 * so, we put the request on th final queue */
+                req = ptlrpc_clone_req(req);
+                if (req == NULL)
+                        return -ENOMEM;
+                DEBUG_REQ(D_HA, req, "queue final req");
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                return 0;
+        }
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+                /* client declares he's ready to replay locks */
+                req = ptlrpc_clone_req(req);
+                if (req == NULL)
+                        return -ENOMEM;
+                DEBUG_REQ(D_HA, req, "queue lock replay req");
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                wake_up(&obd->obd_next_transno_waitq);
+                return 0;
+        }
+
+
         /* CAVEAT EMPTOR: The incoming request message has been swabbed
          * (i.e. buflens etc are in my own byte order), but type-dependent
          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
@@ -1256,8 +1466,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
          * handled will pass through here and be processed immediately.
          */
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_recovery_data.trd_processing_task == current->pid ||
-            transno < obd->obd_next_recovery_transno) {
+        if (transno < obd->obd_next_recovery_transno) {
                 /* Processing the queue right now, don't re-add. */
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);
@@ -1280,7 +1489,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         spin_lock_bh(&obd->obd_processing_task_lock);
 
         /* XXX O(n^2) */
-        list_for_each(tmp, &obd->obd_recovery_queue) {
+        list_for_each(tmp, &obd->obd_req_replay_queue) {
                 struct ptlrpc_request *reqiter =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
@@ -1292,7 +1501,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         }
 
         if (!inserted)
-                list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
+                list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
 
         obd->obd_requests_queued_for_recovery++;
         wake_up(&obd->obd_next_transno_waitq);
@@ -1305,41 +1514,6 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req)
         return req->rq_export->exp_obd;
 }
 
-int target_queue_final_reply(struct ptlrpc_request *req, int rc)
-{
-        struct obd_device *obd = target_req2obd(req);
-
-        LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
-
-        if (rc) {
-                /* Just like ptlrpc_error, but without the sending. */
-                rc = lustre_pack_reply(req, 0, NULL, NULL);
-                LASSERT(rc == 0); /* XXX handle this */
-                req->rq_type = PTL_RPC_MSG_ERR;
-        }
-
-        LASSERT (!req->rq_reply_state->rs_difficult);
-        LASSERT(list_empty(&req->rq_list));
-        
-        req = ptlrpc_clone_req(req);
-
-        spin_lock_bh(&obd->obd_processing_task_lock);
-
-        list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
-
-        /* only count the first "replay over" request from each
-           export */
-        if (req->rq_export->exp_replay_needed) {
-                --obd->obd_recoverable_clients;
-                req->rq_export->exp_replay_needed = 0;
-                CWARN("%s: %d recoverable clients remain\n",
-                      obd->obd_name, obd->obd_recoverable_clients);
-        }
-        wake_up(&obd->obd_next_transno_waitq);
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        return 1;
-}
-
 int
 target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
 {
index 0634cb7..952112b 100644 (file)
@@ -399,6 +399,9 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 RETURN(0);
         }
 
+        LASSERTF(lock->l_export->exp_obd->obd_recovering == 0,
+                 "BUG 6063: lock collide during recovery");
+
         LASSERT(lock);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
@@ -1676,4 +1679,3 @@ EXPORT_SYMBOL(target_send_reply);
 EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
-EXPORT_SYMBOL(target_queue_final_reply);
index 90c988a..012b31e 100644 (file)
@@ -1036,6 +1036,11 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 size[1] = lock->l_lvb_len;
         }
         req->rq_replen = lustre_msg_size(buffers, size);
+        /* notify the server we've replayed all requests.
+         * also, we mark the request to be put on a dedicated
+         * queue to be processed after all request replayes.
+         * bug 6063 */
+        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
 
         LDLM_DEBUG(lock, "replaying lock:");
 
index c6e48cf..ecfc6a2 100644 (file)
@@ -2823,14 +2823,6 @@ int mds_handle(struct ptlrpc_request *req)
         }
  out:
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return target_queue_final_reply(req, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
 
         target_send_reply(req, rc, fail);
         return 0;
index 391654a..b8bb100 100644 (file)
@@ -380,9 +380,17 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
                 spin_lock_init(&med->med_open_lock);
 
                 mcd = NULL;
-                exp->exp_replay_needed = 1;
+                exp->exp_req_replay_needed = 1;
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
+
+                /* track clients to separate req replay
+                 * from lock replay. bug 6063 */
+                atomic_inc(&obd->obd_req_replay_clients);
+                exp->exp_req_replay_needed = 1;
+                atomic_inc(&obd->obd_lock_replay_clients);
+                exp->exp_lock_replay_needed = 1;
+                
                 class_export_put(exp);
 
                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
index 36ae1e8..596bc0b 100644 (file)
@@ -738,7 +738,9 @@ void class_disconnect_exports(struct obd_device *obd, unsigned long flags)
 
 /* Remove exports that have not completed recovery.
  */
-void class_disconnect_stale_exports(struct obd_device *obd, unsigned long flags)
+int class_disconnect_stale_exports(struct obd_device *obd,
+                                   int (*test_export)(struct obd_export *),
+                                   unsigned long flags)
 {
         struct list_head work_list;
         struct list_head *pos, *n;
@@ -750,10 +752,12 @@ void class_disconnect_stale_exports(struct obd_device *obd, unsigned long flags)
         spin_lock(&obd->obd_dev_lock);
         list_for_each_safe(pos, n, &obd->obd_exports) {
                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
-                if (exp->exp_replay_needed) {
+                if (!test_export(exp)) {
                         list_del(&exp->exp_obd_chain);
                         list_add(&exp->exp_obd_chain, &work_list);
                         cnt++;
+                        CDEBUG(D_ERROR, "%s: disconnect stale client %s\n",
+                               obd->obd_name, exp->exp_client_uuid.uuid);
                 }
         }
         spin_unlock(&obd->obd_dev_lock);
@@ -761,7 +765,7 @@ void class_disconnect_stale_exports(struct obd_device *obd, unsigned long flags)
         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
                obd->obd_name, cnt);
         class_disconnect_export_list(&work_list, flags);
-        EXIT;
+        RETURN(cnt);
 }
 
 
index 414fde4..c27c6ae 100644 (file)
@@ -116,8 +116,9 @@ static int class_attach(struct lustre_cfg *lcfg)
         init_timer(&obd->obd_recovery_timer);
         spin_lock_init(&obd->obd_processing_task_lock);
         init_waitqueue_head(&obd->obd_next_transno_waitq);
-        INIT_LIST_HEAD(&obd->obd_recovery_queue);
-        INIT_LIST_HEAD(&obd->obd_delayed_reply_queue);
+        INIT_LIST_HEAD(&obd->obd_req_replay_queue);
+        INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
+        INIT_LIST_HEAD(&obd->obd_final_req_queue);
 
         spin_lock_init(&obd->obd_uncommitted_replies_lock);
         INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
index c5dbac8..43a778d 100644 (file)
@@ -499,7 +499,10 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 spin_lock_init(&fed->fed_lock);
 
                 fcd = NULL;
-                exp->exp_replay_needed = 1;
+                exp->exp_req_replay_needed = 1;
+                exp->exp_lock_replay_needed = 1;
+                atomic_inc(&obd->obd_req_replay_clients);
+                atomic_inc(&obd->obd_lock_replay_clients);
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
                 class_export_put(exp);
index 1090944..cf1c6de 100644 (file)
@@ -1228,15 +1228,6 @@ int ost_handle(struct ptlrpc_request *req)
         }
 
 out_check_req:
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        rc = target_queue_final_reply(req, rc);
-                        GOTO(out_free_oti, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
 
         if (!rc)
                 oti_to_request(oti, req);
index 7cf82f1..52f3587 100644 (file)
@@ -589,7 +589,7 @@ static int signal_completed_replay(struct obd_import *imp)
 
         req->rq_replen = lustre_msg_size(0, NULL);
         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
-        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+        req->rq_reqmsg->flags |= MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE;
         req->rq_timeout *= 3;
         req->rq_interpret_reply = completed_replay_interpret;