}
obd->obd_recovering = obd->obd_abort_recovery = 0;
- obd->obd_recoverable_clients = 0;
wake_up(&obd->obd_next_transno_waitq);
target_cancel_recovery_timer(obd);
static int check_for_next_transno(struct obd_device *obd)
{
struct ptlrpc_request *req;
- int wake_up;
-
- /* XXX shouldn't we take obd->obd_processing_task_lock to check these
- flags and the recovery_queue? */
- if (obd->obd_abort_recovery || !obd->obd_recovering)
- return 1;
+ int wake_up = 0, connected, completed, queue_len;
+ __u64 next_transno, req_transno;
+ spin_lock_bh(&obd->obd_processing_task_lock);
req = list_entry(obd->obd_recovery_queue.next,
struct ptlrpc_request, rq_list);
- LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
-
- wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno;
- CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
- req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
- obd->obd_recovering, wake_up);
+ req_transno = req->rq_reqmsg->transno;
+ connected = obd->obd_connected_clients;
+ completed = obd->obd_max_recoverable_clients -
+ obd->obd_recoverable_clients;
+ queue_len = obd->obd_requests_queued_for_recovery;
+ next_transno = obd->obd_next_recovery_transno;
+
+ if (obd->obd_abort_recovery) {
+ CDEBUG(D_HA, "waking for aborted recovery\n");
+ wake_up = 1;
+ } else if (!obd->obd_recovering) {
+ CDEBUG(D_HA, "waking for completed recovery (?)\n");
+ wake_up = 1;
+ } else if (req_transno == next_transno) {
+ CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
+ wake_up = 1;
+ } else if (queue_len + completed == connected) {
+ CDEBUG(D_HA,
+ "waking for skipped transno (skip: "LPD64
+ ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
+ next_transno, queue_len, completed, connected,
+ req_transno);
+ obd->obd_next_recovery_transno = req_transno;
+ wake_up = 1;
+ }
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ LASSERT(req->rq_reqmsg->transno >= next_transno);
return wake_up;
}
continue;
}
list_del_init(&req->rq_list);
+ obd->obd_requests_queued_for_recovery--;
spin_unlock_bh(&obd->obd_processing_task_lock);
DEBUG_REQ(D_HA, req, "processing: ");
(void)obd->obd_recovery_handler(req);
- obd->obd_reintegrated_requests++;
+ obd->obd_replayed_requests++;
reset_recovery_timer(obd);
/* bug 1580: decide how to properly sync() in recovery */
//mds_fsync_super(mds->mds_sb);
list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
}
+ obd->obd_requests_queued_for_recovery++;
+
if (obd->obd_processing_task != 0) {
/* Someone else is processing this queue, we'll leave it to
* them.
*/
- if (transno == obd->obd_next_recovery_transno)
- wake_up(&obd->obd_next_transno_waitq);
+ wake_up(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
return 0;
}