Whamcloud - gitweb
LU-13600 ptlrpc: limit rate of lock replays
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index a8084b1..42225f9 100644 (file)
@@ -67,6 +67,7 @@
 unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
 module_param(ldlm_enqueue_min, uint, 0644);
 MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
 
 /* in client side, whether the cached locks will be canceled before replay */
 unsigned int ldlm_cancel_unused_locks_before_replay = 1;
@@ -2403,6 +2404,8 @@ static int replay_lock_interpret(const struct lu_env *env,
 
        ENTRY;
        atomic_dec(&req->rq_import->imp_replay_inflight);
+       wake_up(&req->rq_import->imp_replay_waitq);
+
        if (rc != ELDLM_OK)
                GOTO(out, rc);
 
@@ -2518,7 +2521,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        LDLM_DEBUG(lock, "replaying lock:");
 
-       atomic_inc(&req->rq_import->imp_replay_inflight);
+       atomic_inc(&imp->imp_replay_inflight);
        aa = ptlrpc_req_async_args(aa, req);
        aa->lock_handle = body->lock_handle[0];
        req->rq_interpret_reply = replay_lock_interpret;
@@ -2558,7 +2561,20 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
                           canceled, ldlm_ns_name(ns));
 }
 
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+       struct client_obd *cli = &imp->imp_obd->u.cli;
+
+       CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+              atomic_read(&imp->imp_replay_inflight) - 1,
+              cli->cl_max_rpcs_in_flight);
+
+       /* +1 due to ldlm_lock_replay() increment */
+       return atomic_read(&imp->imp_replay_inflight) <
+              1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 {
        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
        LIST_HEAD(list);
@@ -2567,15 +2583,12 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
                RETURN(0);
 
-       /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
-
        if (ldlm_cancel_unused_locks_before_replay)
                ldlm_cancel_unused_locks_for_replay(ns);
 
@@ -2589,9 +2602,56 @@ int ldlm_replay_locks(struct obd_import *imp)
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
+
+               if (rate_limit)
+                       wait_event_idle_exclusive(imp->imp_replay_waitq,
+                                                 lock_can_replay(imp));
        }
 
+       RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+       struct obd_import *imp = data;
+
+       unshare_fs_struct();
+
+       CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+              imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+              imp->imp_connection->c_remote_uuid.uuid);
+
+       __ldlm_replay_locks(imp, true);
        atomic_dec(&imp->imp_replay_inflight);
+       ptlrpc_import_recovery_state_machine(imp);
+       class_import_put(imp);
 
-       RETURN(rc);
+       return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       class_import_get(imp);
+       /* ensure this doesn't fall to 0 before all have been queued */
+       atomic_inc(&imp->imp_replay_inflight);
+
+       task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+               /* run lock replay without rate control */
+               rc = __ldlm_replay_locks(imp, false);
+               atomic_dec(&imp->imp_replay_inflight);
+               class_import_put(imp);
+       }
+
+       return rc;
 }