Whamcloud - gitweb
b=22176 Add .sync_fs super block handler
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 350f3a9..c5e46eb 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -282,6 +282,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
         CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list);
         CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list);
+        CFS_INIT_LIST_HEAD(&cli->cl_loi_sync_fs_list);
         client_obd_list_lock_init(&cli->cl_loi_list_lock);
         cli->cl_r_in_flight = 0;
         cli->cl_w_in_flight = 0;
@@ -305,7 +306,6 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         cli->cl_cksum_type = cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
 #endif
         cfs_atomic_set(&cli->cl_resends, OSC_DEFAULT_RESENDS);
-        cfs_atomic_set(&cli->cl_quota_resends, CLIENT_QUOTA_DEFAULT_RESENDS);
 
         /* This value may be changed at connect time in
            ptlrpc_connect_interpret. */
@@ -338,8 +338,6 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
                 GOTO(err_ldlm, rc = -ENOENT);
         imp->imp_client = &obddev->obd_ldlm_client;
         imp->imp_connect_op = connect_op;
-        imp->imp_initial_recov = 1;
-        imp->imp_initial_recov_bk = 0;
         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
         memcpy(cli->cl_target_uuid.uuid, lustre_cfg_buf(lcfg, 1),
                LUSTRE_CFG_BUFLEN(lcfg, 1));
@@ -514,8 +512,7 @@ int client_disconnect_export(struct obd_export *exp)
         if (obd->obd_namespace != NULL) {
                 /* obd_force == local only */
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
-                                       obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
-                                       NULL);
+                                       obd->obd_force ? LCF_LOCAL : 0, NULL);
                 ldlm_namespace_free_prior(obd->obd_namespace, imp, obd->obd_force);
         }
 
@@ -581,9 +578,6 @@ int server_disconnect_export(struct obd_export *exp)
         }
         cfs_spin_unlock(&exp->exp_lock);
 
-        /* release nid stat refererence */
-        lprocfs_exp_cleanup(exp);
-
         RETURN(rc);
 }
 
@@ -991,12 +985,12 @@ dont_check_exports:
                                                        req->rq_self,
                                                        &remote_uuid);
         if (cfs_hlist_unhashed(&export->exp_nid_hash)) {
-                cfs_hash_add_unique(export->exp_obd->obd_nid_hash,
-                                    &export->exp_connection->c_peer.nid,
-                                    &export->exp_nid_hash);
+                cfs_hash_add(export->exp_obd->obd_nid_hash,
+                             &export->exp_connection->c_peer.nid,
+                             &export->exp_nid_hash);
         }
 
-        cfs_spin_lock_bh(&target->obd_processing_task_lock);
+        cfs_spin_lock(&target->obd_recovery_task_lock);
         if (target->obd_recovering && !export->exp_in_recovery) {
                 cfs_spin_lock(&export->exp_lock);
                 export->exp_in_recovery = 1;
@@ -1017,7 +1011,7 @@ dont_check_exports:
                     target->obd_max_recoverable_clients)
                         cfs_waitq_signal(&target->obd_next_transno_waitq);
         }
-        cfs_spin_unlock_bh(&target->obd_processing_task_lock);
+        cfs_spin_unlock(&target->obd_recovery_task_lock);
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
         conn = *tmp;
 
@@ -1054,6 +1048,12 @@ dont_check_exports:
         else
                 revimp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
 
+        if ((export->exp_connect_flags & OBD_CONNECT_FULL20) &&
+            (revimp->imp_msg_magic != LUSTRE_MSG_MAGIC_V1))
+                revimp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
+        else
+                revimp->imp_msghdr_flags &= ~MSGHDR_CKSUM_INCOMPAT18;
+
         rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx, &req->rq_flvr);
         if (rc) {
                 CERROR("Failed to get sec for reverse import: %d\n", rc);
@@ -1116,6 +1116,10 @@ static void target_request_copy_get(struct ptlrpc_request *req)
         cfs_atomic_inc(&req->rq_refcount);
         /** let export know it has replays to be handled */
         cfs_atomic_inc(&req->rq_export->exp_replay_count);
+        /* release service thread while request is queued 
+         * we are moving the request from active processing
+         * to waiting on the replay queue */
+        ptlrpc_server_active_request_dec(req);
 }
 
 static void target_request_copy_put(struct ptlrpc_request *req)
@@ -1124,6 +1128,8 @@ static void target_request_copy_put(struct ptlrpc_request *req)
         LASSERT(cfs_atomic_read(&req->rq_export->exp_replay_count) > 0);
         cfs_atomic_dec(&req->rq_export->exp_replay_count);
         class_export_rpc_put(req->rq_export);
+        /* ptlrpc_server_drop_request() assumes the request is active */
+        ptlrpc_server_active_request_inc(req);
         ptlrpc_server_drop_request(req);
 }
 
@@ -1178,7 +1184,7 @@ static void target_finish_recovery(struct obd_device *obd)
                       obd->obd_name);
 
         ldlm_reprocess_all_ns(obd->obd_namespace);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_req_replay_queue) ||
             !cfs_list_empty(&obd->obd_lock_replay_queue) ||
             !cfs_list_empty(&obd->obd_final_req_queue)) {
@@ -1189,10 +1195,10 @@ static void target_finish_recovery(struct obd_device *obd)
                                "" : "lock ",
                        cfs_list_empty(&obd->obd_final_req_queue) ? \
                                "" : "final ");
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 LBUG();
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         obd->obd_recovery_end = cfs_time_current_sec();
 
@@ -1212,9 +1218,9 @@ static void abort_req_replay_queue(struct obd_device *obd)
         cfs_list_t abort_list;
 
         CFS_INIT_LIST_HEAD(&abort_list);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         cfs_list_splice_init(&obd->obd_req_replay_queue, &abort_list);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         cfs_list_for_each_entry_safe(req, n, &abort_list, rq_list) {
                 DEBUG_REQ(D_WARNING, req, "aborted:");
                 req->rq_status = -ENOTCONN;
@@ -1233,9 +1239,9 @@ static void abort_lock_replay_queue(struct obd_device *obd)
         cfs_list_t abort_list;
 
         CFS_INIT_LIST_HEAD(&abort_list);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         cfs_list_splice_init(&obd->obd_lock_replay_queue, &abort_list);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         cfs_list_for_each_entry_safe(req, n, &abort_list, rq_list){
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
@@ -1264,17 +1270,19 @@ void target_cleanup_recovery(struct obd_device *obd)
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&clean_list);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_dev_lock);
         if (!obd->obd_recovering) {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_dev_lock);
                 EXIT;
                 return;
         }
         obd->obd_recovering = obd->obd_abort_recovery = 0;
-        target_cancel_recovery_timer(obd);
+        cfs_spin_unlock(&obd->obd_dev_lock);
 
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
+        target_cancel_recovery_timer(obd);
         cfs_list_splice_init(&obd->obd_req_replay_queue, &clean_list);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         cfs_list_for_each_entry_safe(req, n, &clean_list, rq_list) {
                 LASSERT(req->rq_reply_state == 0);
@@ -1282,10 +1290,10 @@ void target_cleanup_recovery(struct obd_device *obd)
                 target_request_copy_put(req);
         }
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         cfs_list_splice_init(&obd->obd_lock_replay_queue, &clean_list);
         cfs_list_splice_init(&obd->obd_final_req_queue, &clean_list);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         cfs_list_for_each_entry_safe(req, n, &clean_list, rq_list){
                 LASSERT(req->rq_reply_state == 0);
@@ -1295,7 +1303,7 @@ void target_cleanup_recovery(struct obd_device *obd)
         EXIT;
 }
 
-/* obd_processing_task_lock should be held */
+/* obd_recovery_task_lock should be held */
 void target_cancel_recovery_timer(struct obd_device *obd)
 {
         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
@@ -1310,9 +1318,9 @@ static void reset_recovery_timer(struct obd_device *obd, int duration,
         cfs_time_t now = cfs_time_current_sec();
         cfs_duration_t left;
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!obd->obd_recovering || obd->obd_abort_recovery) {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 return;
         }
 
@@ -1323,15 +1331,11 @@ static void reset_recovery_timer(struct obd_device *obd, int duration,
         else if (!extend && (duration > obd->obd_recovery_timeout))
                 /* Track the client's largest expected replay time */
                 obd->obd_recovery_timeout = duration;
-#ifdef CRAY_XT3
-        /*
-         * If total recovery time already exceed the
-         * obd_recovery_max_time, then CRAY XT3 will
-         * abort the recovery
-         */
-        if(obd->obd_recovery_timeout > obd->obd_recovery_max_time)
-                obd->obd_recovery_timeout = obd->obd_recovery_max_time;
-#endif
+
+        /* Hard limit of obd_recovery_time_hard which should not happen */
+        if (obd->obd_recovery_timeout > obd->obd_recovery_time_hard)
+                obd->obd_recovery_timeout = obd->obd_recovery_time_hard;
+
         obd->obd_recovery_end = obd->obd_recovery_start +
                                 obd->obd_recovery_timeout;
         if (!cfs_timer_is_armed(&obd->obd_recovery_timer) ||
@@ -1339,23 +1343,21 @@ static void reset_recovery_timer(struct obd_device *obd, int duration,
                 left = cfs_time_sub(obd->obd_recovery_end, now);
                 cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(left));
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         CDEBUG(D_HA, "%s: recovery timer will expire in %u seconds\n",
                obd->obd_name, (unsigned)left);
 }
 
 static void check_and_start_recovery_timer(struct obd_device *obd)
 {
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (cfs_timer_is_armed(&obd->obd_recovery_timer)) {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 return;
         }
         CDEBUG(D_HA, "%s: starting recovery timer\n", obd->obd_name);
         obd->obd_recovery_start = cfs_time_current_sec();
-        /* minimum */
-        obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         reset_recovery_timer(obd, obd->obd_recovery_timeout, 0);
 }
@@ -1381,8 +1383,8 @@ target_start_and_reset_recovery_timer(struct obd_device *obd,
         if (!new_client && service_time)
                 /* Teach server about old server's estimates, as first guess
                  * at how long new requests will take. */
-                at_add(&req->rq_rqbd->rqbd_service->srv_at_estimate,
-                       service_time);
+                at_measured(&req->rq_rqbd->rqbd_service->srv_at_estimate,
+                            service_time);
 
         check_and_start_recovery_timer(obd);
 
@@ -1449,8 +1451,8 @@ static int check_for_next_transno(struct obd_device *obd)
         int wake_up = 0, connected, completed, queue_len;
         __u64 next_transno, req_transno;
         ENTRY;
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
 
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_req_replay_queue)) {
                 req = cfs_list_entry(obd->obd_req_replay_queue.next,
                                      struct ptlrpc_request, rq_list);
@@ -1505,7 +1507,7 @@ static int check_for_next_transno(struct obd_device *obd)
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         return wake_up;
 }
 
@@ -1513,7 +1515,7 @@ static int check_for_next_lock(struct obd_device *obd)
 {
         int wake_up = 0;
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_lock_replay_queue)) {
                 CDEBUG(D_HA, "waking for next lock\n");
                 wake_up = 1;
@@ -1527,7 +1529,7 @@ static int check_for_next_lock(struct obd_device *obd)
                 CDEBUG(D_HA, "waking for expired recovery\n");
                 wake_up = 1;
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         return wake_up;
 }
@@ -1541,38 +1543,33 @@ static int target_recovery_overseer(struct obd_device *obd,
                                     int (*check_routine)(struct obd_device *),
                                     int (*health_check)(struct obd_export *))
 {
-        int abort = 0, expired = 1;
-
-        do {
-                cfs_wait_event(obd->obd_next_transno_waitq, check_routine(obd));
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-                abort = obd->obd_abort_recovery;
-                expired = obd->obd_recovery_expired;
+repeat:
+        cfs_wait_event(obd->obd_next_transno_waitq, check_routine(obd));
+        if (obd->obd_abort_recovery) {
+                CWARN("recovery is aborted, evict exports in recovery\n");
+                /** evict exports which didn't finish recovery yet */
+                class_disconnect_stale_exports(obd, exp_finished);
+                return 1;
+        } else if (obd->obd_recovery_expired) {
                 obd->obd_recovery_expired = 0;
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (abort) {
-                        CWARN("recovery is aborted, evict exports in recovery\n");
-                        /** evict exports which didn't finish recovery yet */
-                        class_disconnect_stale_exports(obd, exp_finished);
-                } else if (expired) {
-                        /** If some clients died being recovered, evict them */
-                        CDEBUG(D_WARNING, "recovery is timed out, evict stale exports\n");
-                        /** evict cexports with no replay in queue, they are stalled */
-                        class_disconnect_stale_exports(obd, health_check);
-                        /** continue with VBR */
-                        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-                        obd->obd_version_recov = 1;
-                        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-                        /**
-                         * reset timer, recovery will proceed with versions now,
-                         * timeout is set just to handle reconnection delays
-                         */
-                        reset_recovery_timer(obd, RECONNECT_DELAY_MAX, 1);
-                        /** Wait for recovery events again, after evicting bad clients */
-                }
-        } while (!abort && expired);
-
-        return abort;
+                /** If some clients died being recovered, evict them */
+                CDEBUG(D_WARNING,
+                       "recovery is timed out, evict stale exports\n");
+                /** evict cexports with no replay in queue, they are stalled */
+                class_disconnect_stale_exports(obd, health_check);
+                /** continue with VBR */
+                cfs_spin_lock(&obd->obd_dev_lock);
+                obd->obd_version_recov = 1;
+                cfs_spin_unlock(&obd->obd_dev_lock);
+                /**
+                 * reset timer, recovery will proceed with versions now,
+                 * timeout is set just to handle reconnection delays
+                 */
+                reset_recovery_timer(obd, RECONNECT_DELAY_MAX, 1);
+                /** Wait for recovery events again, after evicting bad clients */
+                goto repeat;
+        }
+        return 0;
 }
 
 static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
@@ -1589,15 +1586,15 @@ static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
                 abort_lock_replay_queue(obd);
         }
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_req_replay_queue)) {
                 req = cfs_list_entry(obd->obd_req_replay_queue.next,
                                      struct ptlrpc_request, rq_list);
                 cfs_list_del_init(&req->rq_list);
                 obd->obd_requests_queued_for_recovery--;
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
         } else {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 LASSERT(cfs_list_empty(&obd->obd_req_replay_queue));
                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) == 0);
                 /** evict exports failed VBR */
@@ -1615,14 +1612,14 @@ static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd)
                                      exp_lock_replay_healthy))
                 abort_lock_replay_queue(obd);
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_lock_replay_queue)) {
                 req = cfs_list_entry(obd->obd_lock_replay_queue.next,
                                      struct ptlrpc_request, rq_list);
                 cfs_list_del_init(&req->rq_list);
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
         } else {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 LASSERT(cfs_list_empty(&obd->obd_lock_replay_queue));
                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients) == 0);
                 /** evict exports failed VBR */
@@ -1635,18 +1632,20 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
 {
         struct ptlrpc_request *req = NULL;
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         if (!cfs_list_empty(&obd->obd_final_req_queue)) {
                 req = cfs_list_entry(obd->obd_final_req_queue.next,
                                      struct ptlrpc_request, rq_list);
                 cfs_list_del_init(&req->rq_list);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 if (req->rq_export->exp_in_recovery) {
                         cfs_spin_lock(&req->rq_export->exp_lock);
                         req->rq_export->exp_in_recovery = 0;
                         cfs_spin_unlock(&req->rq_export->exp_lock);
                 }
+        } else {
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         return req;
 }
 
@@ -1657,17 +1656,18 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
         int rc;
         ENTRY;
 
-        rc = lu_context_init(&req->rq_recov_session, LCT_SESSION);
-        if (rc) {
-                CERROR("Failure to initialize session: %d\n", rc);
-                GOTO(reqcopy_put, rc);
-        }
         /**
          * export can be evicted during recovery, no need to handle replays for
          * it after that, discard such request silently
          */
         if (req->rq_export->exp_disconnected)
+                GOTO(reqcopy_put, rc = 0);
+
+        rc = lu_context_init(&req->rq_recov_session, LCT_SESSION);
+        if (rc) {
+                CERROR("Failure to initialize session: %d\n", rc);
                 GOTO(reqcopy_put, rc);
+        }
 
         req->rq_recov_session.lc_thread = thread;
         lu_context_enter(&req->rq_recov_session);
@@ -1708,7 +1708,7 @@ static int target_recovery_thread(void *arg)
         int rc = 0;
         ENTRY;
 
-        cfs_daemonize("tgt_recov");
+        cfs_daemonize_ctxt("tgt_recov");
 
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
@@ -1720,13 +1720,17 @@ static int target_recovery_thread(void *arg)
                 RETURN(rc);
 
         thread->t_env = &env;
+        thread->t_id = -1; /* force filter_iobuf_get/put to use local buffers */
         env.le_ctx.lc_thread = thread;
+        thread->t_data = NULL;
 
         CERROR("%s: started recovery thread pid %d\n", obd->obd_name,
                cfs_curproc_pid());
         trd->trd_processing_task = cfs_curproc_pid();
 
+        cfs_spin_lock(&obd->obd_dev_lock);
         obd->obd_recovering = 1;
+        cfs_spin_unlock(&obd->obd_dev_lock);
         cfs_complete(&trd->trd_starting);
 
         /* first of all, we have to know the first transno to replay */
@@ -1738,7 +1742,6 @@ static int target_recovery_thread(void *arg)
 
         /* next stage: replay requests */
         delta = jiffies;
-        obd->obd_req_replaying = 1;
         CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n",
                cfs_atomic_read(&obd->obd_req_replay_clients),
                obd->obd_next_recovery_transno);
@@ -1753,9 +1756,9 @@ static int target_recovery_thread(void *arg)
                  * bz18031: increase next_recovery_transno before
                  * target_request_copy_put() will drop exp_rpc reference
                  */
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_lock(&obd->obd_recovery_task_lock);
                 obd->obd_next_recovery_transno++;
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 target_exp_dequeue_req_replay(req);
                 target_request_copy_put(req);
                 obd->obd_replayed_requests++;
@@ -1785,10 +1788,12 @@ static int target_recovery_thread(void *arg)
         lut_boot_epoch_update(lut);
         /* We drop recoverying flag to forward all new requests
          * to regular mds_handle() since now */
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_dev_lock);
         obd->obd_recovering = obd->obd_abort_recovery = 0;
+        cfs_spin_unlock(&obd->obd_dev_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         target_cancel_recovery_timer(obd);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         while ((req = target_next_final_ping(obd))) {
                 LASSERT(trd->trd_processing_task == cfs_curproc_pid());
                 DEBUG_REQ(D_HA, req, "processing final ping from %s: ",
@@ -1801,7 +1806,7 @@ static int target_recovery_thread(void *arg)
         delta = (jiffies - delta) / CFS_HZ;
         CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
               delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
-        if (delta > obd_timeout * OBD_RECOVERY_FACTOR) {
+        if (delta > OBD_RECOVERY_TIME_SOFT) {
                 CWARN("too long recovery - read logs\n");
                 libcfs_debug_dumplog();
         }
@@ -1837,19 +1842,17 @@ static int target_start_recovery_thread(struct lu_target *lut,
 
 void target_stop_recovery_thread(struct obd_device *obd)
 {
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_recovery_data.trd_processing_task > 0) {
                 struct target_recovery_data *trd = &obd->obd_recovery_data;
                 /** recovery can be done but postrecovery is not yet */
+                cfs_spin_lock(&obd->obd_dev_lock);
                 if (obd->obd_recovering) {
                         CERROR("%s: Aborting recovery\n", obd->obd_name);
                         obd->obd_abort_recovery = 1;
                         cfs_waitq_signal(&obd->obd_next_transno_waitq);
                 }
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_dev_lock);
                 cfs_wait_for_completion(&trd->trd_finishing);
-        } else {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         }
 }
 
@@ -1870,10 +1873,8 @@ static void target_recovery_expired(unsigned long castmeharder)
                cfs_time_current_sec()- obd->obd_recovery_start,
                obd->obd_connected_clients);
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
         obd->obd_recovery_expired = 1;
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
 void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
@@ -1892,9 +1893,12 @@ void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
         obd->obd_recovery_start = 0;
         obd->obd_recovery_end = 0;
-        obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
-        /* bz13079: this should be set to desired value for ost but not for mds */
-        obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
+
+        /* both values can be get from mount data already */
+        if (obd->obd_recovery_timeout == 0)
+                obd->obd_recovery_timeout = OBD_RECOVERY_TIME_SOFT;
+        if (obd->obd_recovery_time_hard == 0)
+                obd->obd_recovery_time_hard = OBD_RECOVERY_TIME_HARD;
         cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
         target_start_recovery_thread(lut, handler);
 }
@@ -1909,32 +1913,29 @@ static int target_process_req_flags(struct obd_device *obd,
         LASSERT(exp != NULL);
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
                 /* client declares he's ready to replay locks */
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_lock(&exp->exp_lock);
                 if (exp->exp_req_replay_needed) {
-                        LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) >
-                                0);
-                        cfs_spin_lock(&exp->exp_lock);
                         exp->exp_req_replay_needed = 0;
                         cfs_spin_unlock(&exp->exp_lock);
+                        LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
                         cfs_atomic_dec(&obd->obd_req_replay_clients);
+                } else {
+                        cfs_spin_unlock(&exp->exp_lock);
                 }
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         }
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
                 /* client declares he's ready to complete recovery
                  * so, we put the request on th final queue */
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_lock(&exp->exp_lock);
                 if (exp->exp_lock_replay_needed) {
-                        LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients) >
-                                0);
-                        cfs_spin_lock(&exp->exp_lock);
                         exp->exp_lock_replay_needed = 0;
                         cfs_spin_unlock(&exp->exp_lock);
+                        LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
                         cfs_atomic_dec(&obd->obd_lock_replay_clients);
+                } else {
+                        cfs_spin_unlock(&exp->exp_lock);
                 }
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         }
-
         return 0;
 }
 
@@ -1958,35 +1959,35 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                  * so, we put the request on th final queue */
                 target_request_copy_get(req);
                 DEBUG_REQ(D_HA, req, "queue final req");
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
+                cfs_spin_lock(&obd->obd_recovery_task_lock);
                 if (obd->obd_recovering) {
                         cfs_list_add_tail(&req->rq_list,
                                           &obd->obd_final_req_queue);
                 } else {
-                        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                        cfs_spin_unlock(&obd->obd_recovery_task_lock);
                         target_request_copy_put(req);
                         RETURN(obd->obd_stopping ? -ENOTCONN : 1);
                 }
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 RETURN(0);
         }
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
                 /* client declares he's ready to replay locks */
                 target_request_copy_get(req);
                 DEBUG_REQ(D_HA, req, "queue lock replay req");
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
+                cfs_spin_lock(&obd->obd_recovery_task_lock);
                 LASSERT(obd->obd_recovering);
                 /* usually due to recovery abort */
                 if (!req->rq_export->exp_in_recovery) {
-                        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                        cfs_spin_unlock(&obd->obd_recovery_task_lock);
                         target_request_copy_put(req);
                         RETURN(-ENOTCONN);
                 }
                 LASSERT(req->rq_export->exp_lock_replay_needed);
                 cfs_list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 RETURN(0);
         }
 
@@ -2000,8 +2001,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 RETURN(1);
         }
 
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-
         /* If we're processing the queue, we want don't want to queue this
          * message.
          *
@@ -2012,37 +2011,36 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
          * Also, a resent, replayed request that has already been
          * handled will pass through here and be processed immediately.
          */
-        CWARN("Next recovery transno: "LPU64", current: "LPU64", replaying: %i\n",
-              obd->obd_next_recovery_transno, transno, obd->obd_req_replaying);
-        if (transno < obd->obd_next_recovery_transno && obd->obd_req_replaying) {
+        CWARN("Next recovery transno: "LPU64", current: "LPU64", replaying\n",
+              obd->obd_next_recovery_transno, transno);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
+        if (transno < obd->obd_next_recovery_transno) {
                 /* Processing the queue right now, don't re-add. */
                 LASSERT(cfs_list_empty(&req->rq_list));
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                cfs_spin_unlock(&obd->obd_recovery_task_lock);
                 RETURN(1);
         }
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))
                 RETURN(0);
 
         target_request_copy_get(req);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-        LASSERT(obd->obd_recovering);
         if (!req->rq_export->exp_in_recovery) {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
                 target_request_copy_put(req);
                 RETURN(-ENOTCONN);
         }
         LASSERT(req->rq_export->exp_req_replay_needed);
 
         if (target_exp_enqueue_req_replay(req)) {
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
                 target_request_copy_put(req);
                 RETURN(0);
         }
 
         /* XXX O(n^2) */
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
+        LASSERT(obd->obd_recovering);
         cfs_list_for_each(tmp, &obd->obd_req_replay_queue) {
                 struct ptlrpc_request *reqiter =
                         cfs_list_entry(tmp, struct ptlrpc_request, rq_list);
@@ -2057,7 +2055,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                              transno)) {
                         DEBUG_REQ(D_ERROR, req, "dropping replay: transno "
                                   "has been claimed by another client");
-                        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+                        cfs_spin_unlock(&obd->obd_recovery_task_lock);
                         target_exp_dequeue_req_replay(req);
                         target_request_copy_put(req);
                         RETURN(0);
@@ -2068,8 +2066,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 cfs_list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
 
         obd->obd_requests_queued_for_recovery++;
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
         RETURN(0);
 }
 
@@ -2354,53 +2352,6 @@ out:
 }
 #endif /* HAVE_QUOTA_SUPPORT */
 
-/* Send a remote set_info_async.
- * This may go from client to server or server to client
- */
-int target_set_info_rpc(struct obd_import *imp, int opcode,
-                        obd_count keylen, void *key,
-                        obd_count vallen, void *val,
-                        struct ptlrpc_request_set *set)
-{
-        struct ptlrpc_request *req;
-        char                  *tmp;
-        int                    rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
-                             RCL_CLIENT, keylen);
-        req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
-                             RCL_CLIENT, vallen);
-        rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, opcode);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
-        memcpy(tmp, key, keylen);
-        tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
-        memcpy(tmp, val, vallen);
-
-        ptlrpc_request_set_replen(req);
-
-        if (set) {
-                ptlrpc_set_add_req(set, req);
-                ptlrpc_check_set(NULL, set);
-        } else {
-                rc = ptlrpc_queue_wait(req);
-                ptlrpc_req_finished(req);
-        }
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(target_set_info_rpc);
-
-
 ldlm_mode_t lck_compat_array[] = {
         [LCK_EX] LCK_COMPAT_EX,
         [LCK_PW] LCK_COMPAT_PW,
@@ -2446,7 +2397,7 @@ int ldlm_error2errno(ldlm_error_t error)
                 if (((int)error) < 0)  /* cast to signed type */
                         result = error; /* as ldlm_error_t can be unsigned */
                 else {
-                        CERROR("Invalid DLM result code: %i\n", error);
+                        CERROR("Invalid DLM result code: %d\n", error);
                         result = -EPROTO;
                 }
         }