Whamcloud - gitweb
LU-13365 ldlm: check slv and limit before updating
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 143d0a9..4458ebc 100644 (file)
 unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
 module_param(ldlm_enqueue_min, uint, 0644);
 MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
 
 /* in client side, whether the cached locks will be canceled before replay */
 unsigned int ldlm_cancel_unused_locks_before_replay = 1;
 
-static void interrupted_completion_wait(void *data)
-{
-}
-
 struct lock_wait_data {
        struct ldlm_lock *lwd_lock;
        __u32             lwd_conn_cnt;
@@ -111,9 +108,8 @@ int ldlm_request_bufsize(int count, int type)
        return sizeof(struct ldlm_request) + avail;
 }
 
-int ldlm_expired_completion_wait(void *data)
+void ldlm_expired_completion_wait(struct lock_wait_data *lwd)
 {
-       struct lock_wait_data *lwd = data;
        struct ldlm_lock *lock = lwd->lwd_lock;
        struct obd_import *imp;
        struct obd_device *obd;
@@ -124,9 +120,8 @@ int ldlm_expired_completion_wait(void *data)
 
                LDLM_ERROR(lock,
                           "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
-                          (s64)lock->l_activity,
-                          (s64)(ktime_get_real_seconds() -
-                                lock->l_activity));
+                          lock->l_activity,
+                          ktime_get_real_seconds() - lock->l_activity);
                if (ktime_get_seconds() > next_dump) {
                        last_dump = next_dump;
                        next_dump = ktime_get_seconds() + 300;
@@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data)
                        if (last_dump == 0)
                                libcfs_debug_dumplog();
                }
-               RETURN(0);
+               RETURN_EXIT;
        }
 
        obd = lock->l_conn_export->exp_obd;
@@ -143,11 +138,11 @@ int ldlm_expired_completion_wait(void *data)
        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
        LDLM_ERROR(lock,
                   "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
-                  (s64)lock->l_activity,
-                  (s64)(ktime_get_real_seconds() - lock->l_activity),
+                  lock->l_activity,
+                  ktime_get_real_seconds() - lock->l_activity,
                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 
-       RETURN(0);
+       EXIT;
 }
 
 int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
@@ -176,9 +171,9 @@ EXPORT_SYMBOL(is_granted_or_cancelled_nolock);
  * We use the same basis for both server side and client side functions
  * from a single node.
  */
-static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
+static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock)
 {
-       time64_t timeout;
+       timeout_t timeout;
 
        if (AT_OFF)
                return obd_timeout;
@@ -189,7 +184,7 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
         * doesn't respond reasonably, and then give us the lock.
         */
        timeout = at_get(ldlm_lock_to_ns_at(lock));
-       return max(3 * timeout, (time64_t) ldlm_enqueue_min);
+       return max(3 * timeout, (timeout_t)ldlm_enqueue_min);
 }
 
 /**
@@ -198,7 +193,6 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
  */
 static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
 {
-       time64_t delay;
        int result = 0;
 
        if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
@@ -208,10 +202,16 @@ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
                LDLM_DEBUG(lock, "client-side enqueue: granted");
        } else {
                /* Take into AT only CP RPC, not immediately granted locks */
-               delay = ktime_get_real_seconds() - lock->l_activity;
-               LDLM_DEBUG(lock, "client-side enqueue: granted after %llds",
-                          (s64)delay);
+               timeout_t delay = 0;
+
+               /* Discard negative timeouts. We should also limit the
+                * maximum value of the timeout
+                */
+               if (ktime_get_real_seconds() > lock->l_activity)
+                       delay = ktime_get_real_seconds() - lock->l_activity;
 
+               LDLM_DEBUG(lock, "client-side enqueue: granted after %ds",
+                          delay);
                /* Update our time estimate */
                at_measured(ldlm_lock_to_ns_at(lock), delay);
        }
@@ -269,8 +269,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        struct lock_wait_data lwd;
        struct obd_device *obd;
        struct obd_import *imp = NULL;
-       struct l_wait_info lwi;
-       time64_t timeout;
+       timeout_t timeout;
        int rc = 0;
 
        ENTRY;
@@ -300,15 +299,6 @@ noreproc:
        lwd.lwd_lock = lock;
        lock->l_activity = ktime_get_real_seconds();
 
-       if (ldlm_is_no_timeout(lock)) {
-               LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
-               lwi = LWI_INTR(interrupted_completion_wait, &lwd);
-       } else {
-               lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
-                                      ldlm_expired_completion_wait,
-                                      interrupted_completion_wait, &lwd);
-       }
-
        if (imp != NULL) {
                spin_lock(&imp->imp_lock);
                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
@@ -322,8 +312,22 @@ noreproc:
                rc = -EINTR;
        } else {
                /* Go to sleep until the lock is granted or cancelled. */
-               rc = l_wait_event(lock->l_waitq,
-                                 is_granted_or_cancelled(lock), &lwi);
+               if (ldlm_is_no_timeout(lock)) {
+                       LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
+                       rc = l_wait_event_abortable(
+                               lock->l_waitq,
+                               is_granted_or_cancelled(lock));
+               } else {
+                       if (wait_event_idle_timeout(
+                                   lock->l_waitq,
+                                   is_granted_or_cancelled(lock),
+                                   cfs_time_seconds(timeout)) == 0) {
+                               ldlm_expired_completion_wait(&lwd);
+                               rc = l_wait_event_abortable(
+                                       lock->l_waitq,
+                                       is_granted_or_cancelled(lock));
+                       }
+               }
        }
 
        if (rc) {
@@ -1445,6 +1449,14 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        new_slv = lustre_msg_get_slv(req->rq_repmsg);
        obd = req->rq_import->imp_obd;
 
+       read_lock(&obd->obd_pool_lock);
+       if (obd->obd_pool_slv == new_slv &&
+           obd->obd_pool_limit == new_limit) {
+               read_unlock(&obd->obd_pool_lock);
+               RETURN(0);
+       }
+       read_unlock(&obd->obd_pool_lock);
+
        /*
         * Set new SLV and limit in OBD fields to make them accessible
         * to the pool thread. We do not access obd_namespace and pool
@@ -2400,6 +2412,8 @@ static int replay_lock_interpret(const struct lu_env *env,
 
        ENTRY;
        atomic_dec(&req->rq_import->imp_replay_inflight);
+       wake_up(&req->rq_import->imp_replay_waitq);
+
        if (rc != ELDLM_OK)
                GOTO(out, rc);
 
@@ -2515,7 +2529,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        LDLM_DEBUG(lock, "replaying lock:");
 
-       atomic_inc(&req->rq_import->imp_replay_inflight);
+       atomic_inc(&imp->imp_replay_inflight);
        aa = ptlrpc_req_async_args(aa, req);
        aa->lock_handle = body->lock_handle[0];
        req->rq_interpret_reply = replay_lock_interpret;
@@ -2555,7 +2569,20 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
                           canceled, ldlm_ns_name(ns));
 }
 
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+       struct client_obd *cli = &imp->imp_obd->u.cli;
+
+       CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+              atomic_read(&imp->imp_replay_inflight) - 1,
+              cli->cl_max_rpcs_in_flight);
+
+       /* +1 due to ldlm_lock_replay() increment */
+       return atomic_read(&imp->imp_replay_inflight) <
+              1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 {
        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
        LIST_HEAD(list);
@@ -2564,15 +2591,12 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
                RETURN(0);
 
-       /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
-
        if (ldlm_cancel_unused_locks_before_replay)
                ldlm_cancel_unused_locks_for_replay(ns);
 
@@ -2586,9 +2610,56 @@ int ldlm_replay_locks(struct obd_import *imp)
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
+
+               if (rate_limit)
+                       wait_event_idle_exclusive(imp->imp_replay_waitq,
+                                                 lock_can_replay(imp));
        }
 
+       RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+       struct obd_import *imp = data;
+
+       unshare_fs_struct();
+
+       CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+              imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+              imp->imp_connection->c_remote_uuid.uuid);
+
+       __ldlm_replay_locks(imp, true);
        atomic_dec(&imp->imp_replay_inflight);
+       ptlrpc_import_recovery_state_machine(imp);
+       class_import_put(imp);
 
-       RETURN(rc);
+       return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       class_import_get(imp);
+       /* ensure this doesn't fall to 0 before all have been queued */
+       atomic_inc(&imp->imp_replay_inflight);
+
+       task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+               /* run lock replay without rate control */
+               rc = __ldlm_replay_locks(imp, false);
+               atomic_dec(&imp->imp_replay_inflight);
+               class_import_put(imp);
+       }
+
+       return rc;
 }