Whamcloud - gitweb
LU-14182 lov: cancel layout lock on replay deadlock
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 6a570c3..55994cd 100644 (file)
@@ -552,8 +552,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                 * bl_ast and -EINVAL reply is sent to server anyways.
                 * b=17645
                 */
-               lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+               lock->l_flags |= LDLM_FL_FAILED |
                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
+               if (!(ldlm_is_bl_ast(lock) &&
+                     lock->l_remote_handle.cookie != 0))
+                       lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                need_cancel = 1;
        }
        unlock_res_and_lock(lock);
@@ -587,9 +590,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
 {
-       return type == LDLM_FLOCK || type == LDLM_IBITS;
+       /* exclude EXTENT locks and DOM-only IBITS locks because they
+        * are asynchronous and don't wait on server being blocked.
+        */
+       return einfo->ei_type == LDLM_FLOCK ||
+              (einfo->ei_type == LDLM_IBITS &&
+               einfo->ei_inodebits != MDS_INODELOCK_DOM);
 }
 
 /**
@@ -598,21 +606,21 @@ static bool ldlm_request_slot_needed(enum ldlm_type type)
  * Called after receiving reply from server.
  */
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
-                         enum ldlm_type type, __u8 with_policy,
-                         enum ldlm_mode mode, __u64 *flags, void *lvb,
+                         struct ldlm_enqueue_info *einfo,
+                         __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
        const struct lu_env *env = NULL;
-       int is_replay = *flags & LDLM_FL_REPLAY;
+       int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
        struct ldlm_lock *lock;
        struct ldlm_reply *reply;
        int cleanup_phase = 1;
 
        ENTRY;
 
-       if (ldlm_request_slot_needed(type))
+       if (ldlm_request_slot_needed(einfo))
                obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
        ptlrpc_put_mod_rpc_slot(req);
@@ -623,7 +631,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
        if (!lock) {
-               LASSERT(type == LDLM_FLOCK);
+               LASSERT(einfo->ei_type == LDLM_FLOCK);
                RETURN(-ENOLCK);
        }
 
@@ -687,20 +695,20 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock->l_remote_handle = reply->lock_handle;
        }
 
-       *flags = ldlm_flags_from_wire(reply->lock_flags);
+       *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
        unlock_res_and_lock(lock);
 
        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
-              lock, reply->lock_handle.cookie, *flags);
+              lock, reply->lock_handle.cookie, *ldlm_flags);
 
        /*
         * If enqueue returned a blocked lock but the completion handler has
         * already run, then it fixed up the resource and we don't need to do it
         * again.
         */
-       if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+       if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
                int newmode = reply->lock_desc.l_req_mode;
 
                LASSERT(!is_replay);
@@ -732,12 +740,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-               if (type != LDLM_PLAIN)
+               if (einfo->ei_type != LDLM_PLAIN)
                        LDLM_DEBUG(lock,
                                   "client-side enqueue, new policy data");
        }
 
-       if ((*flags) & LDLM_FL_AST_SENT) {
+       if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
                ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
@@ -768,9 +776,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        if (!is_replay) {
-               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
                if (lock->l_completion_ast != NULL) {
-                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       int err = lock->l_completion_ast(lock, *ldlm_flags,
+                                                        NULL);
 
                        if (!rc)
                                rc = err;
@@ -791,7 +800,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        EXIT;
 cleanup:
        if (cleanup_phase == 1 && rc)
-               failed_lock_cleanup(ns, lock, mode);
+               failed_lock_cleanup(ns, lock, einfo->ei_mode);
        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
        LDLM_LOCK_PUT(lock);
        LDLM_LOCK_RELEASE(lock);
@@ -853,8 +862,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
        struct req_capsule      *pill = &req->rq_pill;
        struct ldlm_request     *dlm = NULL;
        LIST_HEAD(head);
-       enum ldlm_lru_flags lru_flags;
-       int avail, to_free, pack = 0;
+       int avail, to_free = 0, pack = 0;
        int rc;
 
        ENTRY;
@@ -866,10 +874,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                req_capsule_filled_sizes(pill, RCL_CLIENT);
                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 
-               lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
-               to_free = !ns_connect_lru_resize(ns) &&
-                       opc == LDLM_ENQUEUE ? 1 : 0;
+               /* If we have reached the limit, free +1 slot for the new one */
+               if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE &&
+                   ns->ns_nr_unused >= ns->ns_max_unused)
+                       to_free = 1;
 
                /*
                 * Cancel LRU locks here _only_ if the server supports
@@ -879,7 +887,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                if (avail > count)
                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
                                                       avail - count, 0,
-                                                      lru_flags);
+                                                      LDLM_LRU_FLAG_NO_WAIT);
                if (avail > count)
                        pack = count;
                else
@@ -1067,24 +1075,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        /* extended LDLM opcodes in client stats */
        if (exp->exp_obd->obd_svc_stats != NULL) {
-               bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
-               /* OST glimpse has no intent buffer */
-               if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
-                                         RCL_CLIENT)) {
-                       struct ldlm_intent *it;
-
-                       it = req_capsule_client_get(&req->rq_pill,
-                                                   &RMF_LDLM_INTENT);
-                       glimpse = (it && (it->opc == IT_GLIMPSE));
-               }
-
-               if (!glimpse)
-                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
-               else
+               /* glimpse is intent with no intent buffer */
+               if (*flags & LDLM_FL_HAS_INTENT &&
+                   !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+                                          RCL_CLIENT))
                        lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                             PTLRPC_LAST_CNTR +
                                             LDLM_GLIMPSE_ENQUEUE);
+               else
+                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
        }
 
        /* It is important to obtain modify RPC slot first (if applicable), so
@@ -1094,13 +1093,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        if (einfo->ei_enq_slot)
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (ldlm_request_slot_needed(einfo->ei_type)) {
+       if (ldlm_request_slot_needed(einfo)) {
                rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
                if (rc) {
                        if (einfo->ei_enq_slot)
                                ptlrpc_put_mod_rpc_slot(req);
                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                        LDLM_LOCK_RELEASE(lock);
+                       if (!req_passed_in)
+                               ptlrpc_req_finished(req);
                        GOTO(out, rc);
                }
        }
@@ -1114,9 +1115,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
-                                   einfo->ei_mode, flags, lvb, lvb_len,
-                                   lockh, rc);
+       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+                                   lvb, lvb_len, lockh, rc);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -1419,8 +1419,9 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
  */
 int ldlm_cli_update_pool(struct ptlrpc_request *req)
 {
+       struct ldlm_namespace *ns;
        struct obd_device *obd;
-       __u64 new_slv;
+       __u64 new_slv, ratio;
        __u32 new_limit;
 
        ENTRY;
@@ -1449,18 +1450,48 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        new_slv = lustre_msg_get_slv(req->rq_repmsg);
        obd = req->rq_import->imp_obd;
 
+       read_lock(&obd->obd_pool_lock);
+       if (obd->obd_pool_slv == new_slv &&
+           obd->obd_pool_limit == new_limit) {
+               read_unlock(&obd->obd_pool_lock);
+               RETURN(0);
+       }
+       read_unlock(&obd->obd_pool_lock);
+
        /*
-        * Set new SLV and limit in OBD fields to make them accessible
-        * to the pool thread. We do not access obd_namespace and pool
-        * directly here as there is no reliable way to make sure that
-        * they are still alive at cleanup time. Evil races are possible
-        * which may cause Oops at that time.
+        * OBD device keeps the new pool attributes before they are handled by
+        * the pool.
         */
        write_lock(&obd->obd_pool_lock);
        obd->obd_pool_slv = new_slv;
        obd->obd_pool_limit = new_limit;
        write_unlock(&obd->obd_pool_lock);
 
+       /*
+        * Check if an urgent pool recalc is needed, let it to be a change of
+        * SLV on 10%. It is applicable to LRU resize enabled case only.
+        */
+       ns = obd->obd_namespace;
+       if (!ns_connect_lru_resize(ns) ||
+           ldlm_pool_get_slv(&ns->ns_pool) < new_slv)
+               RETURN(0);
+
+       ratio = 100 * new_slv / ldlm_pool_get_slv(&ns->ns_pool);
+       if (100 - ratio >= ns->ns_recalc_pct &&
+           !ns->ns_stopping && !ns->ns_rpc_recalc) {
+               bool recalc = false;
+
+               spin_lock(&ns->ns_lock);
+               if (!ns->ns_stopping && !ns->ns_rpc_recalc) {
+                       ldlm_namespace_get(ns);
+                       recalc = true;
+                       ns->ns_rpc_recalc = 1;
+               }
+               spin_unlock(&ns->ns_lock);
+               if (recalc)
+                       ldlm_bl_to_thread_ns(ns);
+       }
+
        RETURN(0);
 }
 
@@ -1494,7 +1525,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags cancel_flags)
 {
        struct obd_export *exp;
-       enum ldlm_lru_flags lru_flags;
        int avail, count = 1;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
@@ -1552,10 +1582,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                LASSERT(avail > 0);
 
                ns = ldlm_lock_to_ns(lock);
-               lru_flags = ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                              LCF_BL_AST, lru_flags);
+                                              LCF_BL_AST, 0);
        }
        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
        RETURN(0);
@@ -1619,12 +1647,12 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
  */
 static enum ldlm_policy_res
 ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                          int unused, int added, int count)
+                          int added, int min)
 {
        enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
 
        /*
-        * don't check added & count since we want to process all locks
+        * don't check @added & @min since we want to process all locks
         * from unused list.
         * It's fine to not take lock to access lock->l_resource since
         * the lock has already been granted so it won't change.
@@ -1645,8 +1673,8 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
 /**
  * Callback function for LRU-resize policy. Decides whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current
- * scan \a added and number of locks to be preferably canceled \a count.
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1654,34 +1682,28 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
  */
 static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
        ktime_t cur = ktime_get();
        struct ldlm_pool *pl = &ns->ns_pool;
        u64 slv, lvf, lv;
        s64 la;
 
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       if (count && added >= count)
-               return LDLM_POLICY_KEEP_LOCK;
+       if (added < min)
+               return LDLM_POLICY_CANCEL_LOCK;
 
        /*
         * Despite of the LV, It doesn't make sense to keep the lock which
         * is unused for ns_max_age time.
         */
-       if (ktime_after(ktime_get(),
-                       ktime_add(lock->l_last_used, ns->ns_max_age)))
+       if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_CANCEL_LOCK;
 
        slv = ldlm_pool_get_slv(pl);
        lvf = ldlm_pool_get_lvf(pl);
        la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
                     NSEC_PER_SEC);
-       lv = lvf * la * unused;
+       lv = lvf * la * ns->ns_nr_unused >> 8;
 
        /* Inform pool about current CLV to see it via debugfs. */
        ldlm_pool_set_clv(pl, lv);
@@ -1699,44 +1721,21 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added,
-                               int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_lrur_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 /**
- * Callback function for debugfs used policy. Makes decision whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
- * added and number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
-                                                     struct ldlm_lock *lock,
-                                                     int unused, int added,
-                                                     int count)
-{
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
-}
-
-/**
- * Callback function for aged policy. Makes decision whether to keep \a lock in
- * LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
+ * Callback function for aged policy. Decides whether to keep
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1744,10 +1743,9 @@ static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
  */
 static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
-       if ((added >= count) &&
+       if ((added >= min) &&
            ktime_before(ktime_get(),
                         ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_KEEP_LOCK;
@@ -1758,78 +1756,43 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added, int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_aged_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for default policy. Makes decision whether to keep \a lock
- * in LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static
-enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
-                                               struct ldlm_lock *lock,
-                                               int unused, int added,
-                                               int count)
-{
-       /*
-        * Stop LRU processing when we reach past count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 typedef enum ldlm_policy_res
 (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                           int unused, int added, int count);
+                           int added, int min);
 
 static ldlm_cancel_lru_policy_t
 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
 {
        if (ns_connect_lru_resize(ns)) {
-               if (lru_flags & LDLM_LRU_FLAG_SHRINK)
-                       /* We kill passed number of old locks. */
-                       return ldlm_cancel_passed_policy;
-               if (lru_flags & LDLM_LRU_FLAG_LRUR) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_lrur_no_wait_policy;
-                       else
-                               return ldlm_cancel_lrur_policy;
-               }
-               if (lru_flags & LDLM_LRU_FLAG_PASSED)
-                       return ldlm_cancel_passed_policy;
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_lrur_no_wait_policy;
+               else
+                       return ldlm_cancel_lrur_policy;
        } else {
-               if (lru_flags & LDLM_LRU_FLAG_AGED) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_aged_no_wait_policy;
-                       else
-                               return ldlm_cancel_aged_policy;
-               }
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_aged_no_wait_policy;
+               else
+                       return ldlm_cancel_aged_policy;
        }
-       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-               return ldlm_cancel_no_wait_policy;
-
-       return ldlm_cancel_default_policy;
 }
 
 /**
- * - Free space in LRU for \a count new locks,
+ * - Free space in LRU for \a min new locks,
  *   redundant unused locks are canceled locally;
  * - also cancel locally unused aged locks;
  * - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
  * - GET the found locks and add them into the \a cancels list.
  *
  * A client lock can be added to the l_bl_ast list only when it is
@@ -1840,40 +1803,49 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
  * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
  * later without any special locking.
  *
- * Calling policies for enabled LRU resize:
- * ----------------------------------------
- * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
- *                             cancel not more than \a count locks;
- *
- * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
- *                             at the beginning of LRU list);
+ * Locks are cancelled according to the LRU resize policy (SLV from server)
+ * if LRU resize is enabled; otherwise, the "aged policy" is used;
  *
- * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
- *                             to memory pressre policy function;
- *
- * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
+ * LRU flags:
+ * ----------------------------------------
  *
- * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
- *                             (typically before replaying locks) w/o
- *                             sending any RPCs or waiting for any
- *                             outstanding RPC to complete.
+ * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting
+ *                                for any outstanding RPC to complete.
  *
  * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
- *                             other read locks covering the same pages, just
- *                             discard those pages.
+ *                              other read locks covering the same pages, just
+ *                              discard those pages.
  */
 static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
-                                struct list_head *cancels, int count, int max,
+                                struct list_head *cancels,
+                                int min, int max, int batch,
                                 enum ldlm_lru_flags lru_flags)
 {
        ldlm_cancel_lru_policy_t pf;
        int added = 0;
        int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
-
        ENTRY;
 
+       /*
+        * Let only 1 thread to proceed. However, not for those which have the
+        * @max limit given (ELC), as LRU may be left not cleaned up in full.
+        */
+       if (max == 0) {
+               if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+                       RETURN(0);
+       } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+               RETURN(0);
+
+       LASSERT(ergo(max, min <= max));
+       /* No sense to give @batch for ELC */
+       LASSERT(ergo(max, batch == 0));
+
        if (!ns_connect_lru_resize(ns))
-               count += ns->ns_nr_unused - ns->ns_max_unused;
+               min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
+
+       /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+       if (min && min < batch)
+               min = batch;
 
        pf = ldlm_cancel_lru_policy(ns, lru_flags);
        LASSERT(pf != NULL);
@@ -1930,7 +1902,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                 * their weight. Big extent locks will stay in
                 * the cache.
                 */
-               result = pf(ns, lock, ns->ns_nr_unused, added, count);
+               result = pf(ns, lock, added, min);
                if (result == LDLM_POLICY_KEEP_LOCK) {
                        lu_ref_del(&lock->l_reference, __func__, current);
                        LDLM_LOCK_RELEASE(lock);
@@ -2007,18 +1979,25 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                unlock_res_and_lock(lock);
                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
                added++;
+               /* Once a lock added, batch the requested amount */
+               if (min == 0)
+                       min = batch;
        }
+
+       if (max == 0)
+               clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
        RETURN(added);
 }
 
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                         int count, int max,
+                         int min, int max,
                          enum ldlm_cancel_flags cancel_flags,
                          enum ldlm_lru_flags lru_flags)
 {
        int added;
 
-       added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
+       added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
        if (added <= 0)
                return added;
 
@@ -2026,14 +2005,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
 }
 
 /**
- * Cancel at least \a nr locks from given namespace LRU.
+ * Cancel at least \a min locks from given namespace LRU.
  *
  * When called with LCF_ASYNC the blocking callback will be handled
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  When called with LCF_ASYNC the blocking
  * callback will be performed in this function.
  */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
                    enum ldlm_cancel_flags cancel_flags,
                    enum ldlm_lru_flags lru_flags)
 {
@@ -2046,7 +2025,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
         * Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread.
         */
-       count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
+       count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+                                     ns->ns_cancel_batch, lru_flags);
        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
        if (rc == 0)
                RETURN(count);
@@ -2404,6 +2384,8 @@ static int replay_lock_interpret(const struct lu_env *env,
 
        ENTRY;
        atomic_dec(&req->rq_import->imp_replay_inflight);
+       wake_up(&req->rq_import->imp_replay_waitq);
+
        if (rc != ELDLM_OK)
                GOTO(out, rc);
 
@@ -2498,6 +2480,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        /* We're part of recovery, so don't wait for it. */
        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+       /* If the state changed while we were prepared, don't wait */
+       req->rq_no_delay = 1;
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        ldlm_lock2desc(lock, &body->lock_desc);
@@ -2519,7 +2503,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        LDLM_DEBUG(lock, "replaying lock:");
 
-       atomic_inc(&req->rq_import->imp_replay_inflight);
+       atomic_inc(&imp->imp_replay_inflight);
        aa = ptlrpc_req_async_args(aa, req);
        aa->lock_handle = body->lock_handle[0];
        req->rq_interpret_reply = replay_lock_interpret;
@@ -2547,6 +2531,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
               ldlm_ns_name(ns), ns->ns_nr_unused);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
        /*
         * We don't need to care whether or not LRU resize is enabled
         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -2559,7 +2545,20 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
                           canceled, ldlm_ns_name(ns));
 }
 
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+       struct client_obd *cli = &imp->imp_obd->u.cli;
+
+       CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+              atomic_read(&imp->imp_replay_inflight) - 1,
+              cli->cl_max_rpcs_in_flight);
+
+       /* +1 due to ldlm_lock_replay() increment */
+       return atomic_read(&imp->imp_replay_inflight) <
+              1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 {
        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
        LIST_HEAD(list);
@@ -2568,15 +2567,12 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
                RETURN(0);
 
-       /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
-
        if (ldlm_cancel_unused_locks_before_replay)
                ldlm_cancel_unused_locks_for_replay(ns);
 
@@ -2584,15 +2580,62 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
                list_del_init(&lock->l_pending_chain);
-               if (rc) {
+               /* If we disconnected in the middle - cleanup and let
+                * reconnection to happen again. LU-14027 */
+               if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
                        LDLM_LOCK_RELEASE(lock);
-                       continue; /* or try to do the rest? */
+                       continue;
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
+
+               if (rate_limit)
+                       wait_event_idle_exclusive(imp->imp_replay_waitq,
+                                                 lock_can_replay(imp));
        }
 
+       RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+       struct obd_import *imp = data;
+
+       CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+              imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+              imp->imp_connection->c_remote_uuid.uuid);
+
+       __ldlm_replay_locks(imp, true);
        atomic_dec(&imp->imp_replay_inflight);
+       ptlrpc_import_recovery_state_machine(imp);
+       class_import_put(imp);
 
-       RETURN(rc);
+       return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       class_import_get(imp);
+       /* ensure this doesn't fall to 0 before all have been queued */
+       atomic_inc(&imp->imp_replay_inflight);
+
+       task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+               /* run lock replay without rate control */
+               rc = __ldlm_replay_locks(imp, false);
+               atomic_dec(&imp->imp_replay_inflight);
+               class_import_put(imp);
+       }
+
+       return rc;
 }