+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* If some clients haven't replayed requests in time, evict them */
+ if (obd->obd_abort_recovery) {
+ CDEBUG(D_ERROR, "req replay timed out, aborting ...\n");
+ obd->obd_abort_recovery = obd->obd_stopping;
+ class_disconnect_stale_exports(obd, req_replay_done);
+ abort_req_replay_queue(obd);
+ }
+ /* The second stage: replay locks */
+ CDEBUG(D_INFO, "2: lock replay stage - %d clients\n",
+ atomic_read(&obd->obd_lock_replay_clients));
+ while ((req = target_next_replay_lock(obd))) {
+ LASSERT(trd->trd_processing_task == current->pid);
+ DEBUG_REQ(D_HA|D_WARNING, req, "processing lock from %s: ",
+ libcfs_nid2str(req->rq_peer.nid));
+ handle_recovery_req(thread, req,
+ trd->trd_recovery_handler);
+ obd->obd_replayed_locks++;
+ }
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ /* If some clients haven't replayed requests in time, evict them */
+ if (obd->obd_abort_recovery) {
+ int stale;
+ CERROR("lock replay timed out, aborting ...\n");
+ obd->obd_abort_recovery = obd->obd_stopping;
+ stale = class_disconnect_stale_exports(obd, lock_replay_done);
+ abort_lock_replay_queue(obd);
+ }
+
+ /* We drop recoverying flag to forward all new requests
+ * to regular mds_handle() since now */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ obd->obd_recovering = obd->obd_abort_recovery = 0;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ /* The third stage: reply on final pings */
+ CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
+ while ((req = target_next_final_ping(obd))) {
+ LASSERT(trd->trd_processing_task == current->pid);
+ DEBUG_REQ(D_HA, req, "processing final ping from %s: ",
+ libcfs_nid2str(req->rq_peer.nid));
+ handle_recovery_req(thread, req,
+ trd->trd_recovery_handler);
+ }
+
+ delta = (jiffies - delta) / HZ;
+ CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
+ delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
+ LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
+ LASSERT(atomic_read(&obd->obd_lock_replay_clients) == 0);
+ if (delta > obd_timeout * 2) {
+ CWARN("too long recovery - read logs\n");
+ libcfs_debug_dumplog();
+ }
+
+ target_finish_recovery(obd);
+
+ lu_env_fini(&env);
+ trd->trd_processing_task = 0;
+ complete(&trd->trd_finishing);
+ RETURN(rc);
+}
+
+int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+{
+ int rc = 0;
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+
+ memset(trd, 0, sizeof(*trd));
+ init_completion(&trd->trd_starting);
+ init_completion(&trd->trd_finishing);
+ trd->trd_recovery_handler = handler;
+
+ if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
+ wait_for_completion(&trd->trd_starting);
+ LASSERT(obd->obd_recovering != 0);
+ } else
+ rc = -ECHILD;
+
+ return rc;
+}
+
+void target_stop_recovery_thread(struct obd_device *obd)
+{
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_recovery_data.trd_processing_task > 0) {
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+ CERROR("%s: Aborting recovery\n", obd->obd_name);
+ obd->obd_abort_recovery = 1;
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ wait_for_completion(&trd->trd_finishing);
+ } else {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
+}
+
+void target_recovery_fini(struct obd_device *obd)
+{
+ class_disconnect_exports(obd);
+ target_stop_recovery_thread(obd);
+ target_cleanup_recovery(obd);
+}
+EXPORT_SYMBOL(target_recovery_fini);
+
+void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
+{
+ if (obd->obd_max_recoverable_clients == 0)
+ return;
+
+ CWARN("RECOVERY: service %s, %d recoverable clients, "
+ "last_transno "LPU64"\n", obd->obd_name,
+ obd->obd_max_recoverable_clients, obd->obd_last_committed);
+ obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+ target_start_recovery_thread(obd, handler);
+ obd->obd_recovery_start = CURRENT_SECONDS;
+ /* Only used for lprocfs_status */
+ obd->obd_recovery_end = obd->obd_recovery_start + OBD_RECOVERY_TIMEOUT;
+}
+EXPORT_SYMBOL(target_recovery_init);
+
+#endif
+
+int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+{
+ struct obd_export *exp = req->rq_export;
+ LASSERT(exp != NULL);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+ /* client declares he's ready to replay locks */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (exp->exp_req_replay_needed) {
+ LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
+ spin_lock(&exp->exp_lock);
+ exp->exp_req_replay_needed = 0;
+ spin_unlock(&exp->exp_lock);
+ atomic_dec(&obd->obd_req_replay_clients);
+ LASSERT(obd->obd_recoverable_clients > 0);
+ obd->obd_recoverable_clients--;
+ if (atomic_read(&obd->obd_req_replay_clients) == 0)
+ CDEBUG(D_HA, "all clients have replayed reqs\n");
+ wake_up(&obd->obd_next_transno_waitq);