+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* If some clients haven't replayed requests in time, evict them */
+ if (obd->obd_abort_recovery) {
+ int stale;
+ CERROR("req replay timed out, aborting ...\n");
+ obd->obd_abort_recovery = 0;
+ stale = class_disconnect_stale_exports(obd, req_replay_done, 0);
+ atomic_sub(stale, &obd->obd_lock_replay_clients);
+ abort_req_replay_queue(obd);
+ }
+
+ /* The second stage: replay locks */
+ CWARN("2: lock replay stage - %d clients\n",
+ atomic_read(&obd->obd_lock_replay_clients));
+ while ((req = target_next_replay_lock(obd))) {
+ LASSERT(trd->trd_processing_task == current->pid);
+ DEBUG_REQ(D_HA, req, "processing lock from %s: ",
+ ptlrpc_peernid2str(&req->rq_peer, peer_str));
+ (void)trd->trd_recovery_handler(req);
+ reset_recovery_timer(obd);
+ ptlrpc_free_clone(req);
+ obd->obd_replayed_locks++;
+ }
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* If some clients haven't replayed requests in time, evict them */
+ if (obd->obd_abort_recovery) {
+ int stale;
+ CERROR("lock replay timed out, aborting ...\n");
+ obd->obd_abort_recovery = 0;
+ stale = class_disconnect_stale_exports(obd, lock_replay_done, 0);
+ abort_lock_replay_queue(obd);
+ }
+
+ /* We drop recoverying flag to forward all new requests
+ * to regular mds_handle() since now */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ obd->obd_recovering = 0;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* The third stage: reply on final pings */
+ CWARN("3: final stage - process recovery completion pings\n");
+ while ((req = target_next_final_ping(obd))) {
+ LASSERT(trd->trd_processing_task == current->pid);
+ DEBUG_REQ(D_HA, req, "processing final ping from %s: ",
+ ptlrpc_peernid2str(&req->rq_peer, peer_str));
+ (void)trd->trd_recovery_handler(req);
+ ptlrpc_free_clone(req);
+ }
+
+ CWARN("4: recovery completed - %d/%d reqs/locks replayed\n",
+ obd->obd_replayed_requests, obd->obd_replayed_locks);
+ target_finish_recovery(obd);
+
+ trd->trd_processing_task = 0;
+ complete(&trd->trd_finishing);
+ return 0;
+}
+
+int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+{
+ int rc = 0;
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+
+ memset(trd, 0, sizeof(*trd));
+ init_completion(&trd->trd_starting);
+ init_completion(&trd->trd_finishing);
+ trd->trd_recovery_handler = handler;
+
+ if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
+ wait_for_completion(&trd->trd_starting);
+ LASSERT(obd->obd_recovering != 0);
+ } else
+ rc = -ECHILD;
+
+ return rc;
+}
+
+void target_stop_recovery_thread(struct obd_device *obd)
+{
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_recovery_data.trd_processing_task > 0) {
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+ CERROR("%s: aborting recovery\n", obd->obd_name);
+ obd->obd_abort_recovery = 1;
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ wait_for_completion(&trd->trd_finishing);
+ } else {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
+}
+#endif
+
+int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+{
+ struct obd_export *exp = req->rq_export;
+ LASSERT(exp != NULL);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+ /* client declares he's ready to replay locks */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (exp->exp_req_replay_needed) {
+ LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
+ exp->exp_req_replay_needed = 0;
+ atomic_dec(&obd->obd_req_replay_clients);
+ obd->obd_recoverable_clients--;
+ if (atomic_read(&obd->obd_req_replay_clients) == 0)
+ CDEBUG(D_HA, "all clients have replayed reqs\n");
+ wake_up(&obd->obd_next_transno_waitq);