From 86252a29efa9a6eedc1f792b3d9992aadd44343e Mon Sep 17 00:00:00 2001 From: shaver Date: Wed, 24 Sep 2003 21:12:08 +0000 Subject: [PATCH] Part of b=1742: Allow replay to skip missing transnos if all connected clients are either complete or have a request in the queue. (The original reply for transno N might not make it to the wire before the server crashes, but N+1 might have had a shorter path out. The reply-acks ensure that we can't miss a dependency this way.) --- lustre/ldlm/ldlm_lib.c | 51 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 81a283d..74340af 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -441,7 +441,6 @@ void target_abort_recovery(void *data) } obd->obd_recovering = obd->obd_abort_recovery = 0; - obd->obd_recoverable_clients = 0; wake_up(&obd->obd_next_transno_waitq); target_cancel_recovery_timer(obd); @@ -503,21 +502,39 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) static int check_for_next_transno(struct obd_device *obd) { struct ptlrpc_request *req; - int wake_up; - - /* XXX shouldn't we take obd->obd_processing_task_lock to check these - flags and the recovery_queue? */ - if (obd->obd_abort_recovery || !obd->obd_recovering) - return 1; + int wake_up = 0, connected, completed, queue_len; + __u64 next_transno, req_transno; + spin_lock_bh(&obd->obd_processing_task_lock); req = list_entry(obd->obd_recovery_queue.next, struct ptlrpc_request, rq_list); - LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno); - - wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno; - CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n", - req->rq_reqmsg->transno, obd->obd_next_recovery_transno, - obd->obd_recovering, wake_up); + req_transno = req->rq_reqmsg->transno; + connected = obd->obd_connected_clients; + completed = obd->obd_max_recoverable_clients - + obd->obd_recoverable_clients; + queue_len = obd->obd_requests_queued_for_recovery; + next_transno = obd->obd_next_recovery_transno; + + if (obd->obd_abort_recovery) { + CDEBUG(D_HA, "waking for aborted recovery\n"); + wake_up = 1; + } else if (!obd->obd_recovering) { + CDEBUG(D_HA, "waking for completed recovery (?)\n"); + wake_up = 1; + } else if (req_transno == next_transno) { + CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); + wake_up = 1; + } else if (queue_len + completed == connected) { + CDEBUG(D_HA, + "waking for skipped transno (skip: "LPD64 + ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n", + next_transno, queue_len, completed, connected, + req_transno); + obd->obd_next_recovery_transno = req_transno; + wake_up = 1; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + LASSERT(req->rq_reqmsg->transno >= next_transno); return wake_up; } @@ -552,11 +569,12 @@ static void process_recovery_queue(struct obd_device *obd) continue; } list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; spin_unlock_bh(&obd->obd_processing_task_lock); DEBUG_REQ(D_HA, req, "processing: "); (void)obd->obd_recovery_handler(req); - obd->obd_reintegrated_requests++; + obd->obd_replayed_requests++; reset_recovery_timer(obd); /* bug 1580: decide how to properly sync() in recovery */ //mds_fsync_super(mds->mds_sb); @@ -645,12 +663,13 @@ int target_queue_recovery_request(struct ptlrpc_request *req, list_add_tail(&req->rq_list, &obd->obd_recovery_queue); } + obd->obd_requests_queued_for_recovery++; + if (obd->obd_processing_task != 0) { /* Someone else is processing this queue, we'll leave it to * them. */ - if (transno == obd->obd_next_recovery_transno) - wake_up(&obd->obd_next_transno_waitq); + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); return 0; } -- 1.8.3.1