Whamcloud - gitweb
Part of b=1742:
authorshaver <shaver>
Wed, 24 Sep 2003 21:12:08 +0000 (21:12 +0000)
committershaver <shaver>
Wed, 24 Sep 2003 21:12:08 +0000 (21:12 +0000)
Allow replay to skip missing transnos if all connected clients are either
complete or have a request in the queue. (The original reply for transno N
might not make it to the wire before the server crashes, but N+1 might have had
a shorter path out. The reply-acks ensure that we can't miss a dependency this
way.)

lustre/ldlm/ldlm_lib.c

index 81a283d..74340af 100644 (file)
@@ -441,7 +441,6 @@ void target_abort_recovery(void *data)
         }
 
         obd->obd_recovering = obd->obd_abort_recovery = 0;
-        obd->obd_recoverable_clients = 0;
 
         wake_up(&obd->obd_next_transno_waitq);
         target_cancel_recovery_timer(obd);
@@ -503,21 +502,39 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
 static int check_for_next_transno(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
-        int wake_up;
-
-        /* XXX shouldn't we take obd->obd_processing_task_lock to check these
-           flags and the recovery_queue? */
-        if (obd->obd_abort_recovery || !obd->obd_recovering)
-                return 1;
+        int wake_up = 0, connected, completed, queue_len;
+        __u64 next_transno, req_transno;
 
+        spin_lock_bh(&obd->obd_processing_task_lock);
         req = list_entry(obd->obd_recovery_queue.next,
                          struct ptlrpc_request, rq_list);
-        LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
-
-        wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno;
-        CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
-               req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
-               obd->obd_recovering, wake_up);
+        req_transno = req->rq_reqmsg->transno;
+        connected = obd->obd_connected_clients;
+        completed = obd->obd_max_recoverable_clients - 
+                obd->obd_recoverable_clients;
+        queue_len = obd->obd_requests_queued_for_recovery;
+        next_transno = obd->obd_next_recovery_transno;
+
+        if (obd->obd_abort_recovery) {
+                CDEBUG(D_HA, "waking for aborted recovery\n");
+                wake_up = 1;
+        } else if (!obd->obd_recovering) {
+                CDEBUG(D_HA, "waking for completed recovery (?)\n");
+                wake_up = 1;
+        } else if (req_transno == next_transno) {
+                CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
+                wake_up = 1;
+        } else if (queue_len + completed == connected) {
+                CDEBUG(D_HA,
+                       "waking for skipped transno (skip: "LPD64
+                       ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
+                       next_transno, queue_len, completed, connected,
+                       req_transno);
+                obd->obd_next_recovery_transno = req_transno;
+                wake_up = 1;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        LASSERT(req->rq_reqmsg->transno >= next_transno);
         return wake_up;
 }
 
@@ -552,11 +569,12 @@ static void process_recovery_queue(struct obd_device *obd)
                         continue;
                 }
                 list_del_init(&req->rq_list);
+                obd->obd_requests_queued_for_recovery--;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
 
                 DEBUG_REQ(D_HA, req, "processing: ");
                 (void)obd->obd_recovery_handler(req);
-                obd->obd_reintegrated_requests++;
+                obd->obd_replayed_requests++;
                 reset_recovery_timer(obd);
                 /* bug 1580: decide how to properly sync() in recovery */
                 //mds_fsync_super(mds->mds_sb);
@@ -645,12 +663,13 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
         }
 
+        obd->obd_requests_queued_for_recovery++;
+
         if (obd->obd_processing_task != 0) {
                 /* Someone else is processing this queue, we'll leave it to
                  * them.
                  */
-                if (transno == obd->obd_next_recovery_transno)
-                        wake_up(&obd->obd_next_transno_waitq);
+                wake_up(&obd->obd_next_transno_waitq);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return 0;
         }