Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 4458ebc..bb1fa10 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 /**
  * This file contains Asynchronous System Trap (AST) handlers and related
@@ -237,9 +236,8 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
                RETURN(ldlm_completion_tail(lock, data));
        }
 
-       LDLM_DEBUG(lock,
-                  "client-side enqueue returned a blocked lock, going forward");
-       ldlm_reprocess_all(lock->l_resource, NULL);
+       LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
+       ldlm_reprocess_all(lock->l_resource, 0);
        RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_completion_ast_async);
@@ -552,8 +550,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                 * bl_ast and -EINVAL reply is sent to server anyways.
                 * b=17645
                 */
-               lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+               lock->l_flags |= LDLM_FL_FAILED |
                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
+               if (!(ldlm_is_bl_ast(lock) &&
+                     lock->l_remote_handle.cookie != 0))
+                       lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                need_cancel = 1;
        }
        unlock_res_and_lock(lock);
@@ -587,9 +588,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
 {
-       return type == LDLM_FLOCK || type == LDLM_IBITS;
+       /* exclude EXTENT locks and DOM-only IBITS locks because they
+        * are asynchronous and don't wait on server being blocked.
+        */
+       return einfo->ei_type == LDLM_FLOCK ||
+              (einfo->ei_type == LDLM_IBITS &&
+               einfo->ei_inodebits != MDS_INODELOCK_DOM);
 }
 
 /**
@@ -598,21 +604,21 @@ static bool ldlm_request_slot_needed(enum ldlm_type type)
  * Called after receiving reply from server.
  */
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
-                         enum ldlm_type type, __u8 with_policy,
-                         enum ldlm_mode mode, __u64 *flags, void *lvb,
+                         struct ldlm_enqueue_info *einfo,
+                         __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
        const struct lu_env *env = NULL;
-       int is_replay = *flags & LDLM_FL_REPLAY;
+       int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
        struct ldlm_lock *lock;
        struct ldlm_reply *reply;
        int cleanup_phase = 1;
 
        ENTRY;
 
-       if (ldlm_request_slot_needed(type))
+       if (ldlm_request_slot_needed(einfo))
                obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
        ptlrpc_put_mod_rpc_slot(req);
@@ -623,7 +629,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
        if (!lock) {
-               LASSERT(type == LDLM_FLOCK);
+               LASSERT(einfo->ei_type == LDLM_FLOCK);
                RETURN(-ENOLCK);
        }
 
@@ -687,20 +693,20 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock->l_remote_handle = reply->lock_handle;
        }
 
-       *flags = ldlm_flags_from_wire(reply->lock_flags);
+       *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
        unlock_res_and_lock(lock);
 
        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
-              lock, reply->lock_handle.cookie, *flags);
+              lock, reply->lock_handle.cookie, *ldlm_flags);
 
        /*
         * If enqueue returned a blocked lock but the completion handler has
         * already run, then it fixed up the resource and we don't need to do it
         * again.
         */
-       if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+       if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
                int newmode = reply->lock_desc.l_req_mode;
 
                LASSERT(!is_replay);
@@ -732,12 +738,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-               if (type != LDLM_PLAIN)
+               if (einfo->ei_type != LDLM_PLAIN)
                        LDLM_DEBUG(lock,
                                   "client-side enqueue, new policy data");
        }
 
-       if ((*flags) & LDLM_FL_AST_SENT) {
+       if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
                ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
@@ -768,9 +774,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        if (!is_replay) {
-               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
                if (lock->l_completion_ast != NULL) {
-                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       int err = lock->l_completion_ast(lock, *ldlm_flags,
+                                                        NULL);
 
                        if (!rc)
                                rc = err;
@@ -791,7 +798,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        EXIT;
 cleanup:
        if (cleanup_phase == 1 && rc)
-               failed_lock_cleanup(ns, lock, mode);
+               failed_lock_cleanup(ns, lock, einfo->ei_mode);
        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
        LDLM_LOCK_PUT(lock);
        LDLM_LOCK_RELEASE(lock);
@@ -853,8 +860,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
        struct req_capsule      *pill = &req->rq_pill;
        struct ldlm_request     *dlm = NULL;
        LIST_HEAD(head);
-       enum ldlm_lru_flags lru_flags;
-       int avail, to_free, pack = 0;
+       int avail, to_free = 0, pack = 0;
        int rc;
 
        ENTRY;
@@ -866,10 +872,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                req_capsule_filled_sizes(pill, RCL_CLIENT);
                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 
-               lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
-               to_free = !ns_connect_lru_resize(ns) &&
-                       opc == LDLM_ENQUEUE ? 1 : 0;
+               /* If we have reached the limit, free +1 slot for the new one */
+               if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE &&
+                   ns->ns_nr_unused >= ns->ns_max_unused)
+                       to_free = 1;
 
                /*
                 * Cancel LRU locks here _only_ if the server supports
@@ -879,7 +885,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                if (avail > count)
                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
                                                       avail - count, 0,
-                                                      lru_flags);
+                                                      LDLM_LRU_FLAG_NO_WAIT);
                if (avail > count)
                        pack = count;
                else
@@ -1067,24 +1073,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        /* extended LDLM opcodes in client stats */
        if (exp->exp_obd->obd_svc_stats != NULL) {
-               bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
-               /* OST glimpse has no intent buffer */
-               if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
-                                         RCL_CLIENT)) {
-                       struct ldlm_intent *it;
-
-                       it = req_capsule_client_get(&req->rq_pill,
-                                                   &RMF_LDLM_INTENT);
-                       glimpse = (it && (it->opc == IT_GLIMPSE));
-               }
-
-               if (!glimpse)
-                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
-               else
+               /* glimpse is intent with no intent buffer */
+               if (*flags & LDLM_FL_HAS_INTENT &&
+                   !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+                                          RCL_CLIENT))
                        lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                             PTLRPC_LAST_CNTR +
                                             LDLM_GLIMPSE_ENQUEUE);
+               else
+                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
        }
 
        /* It is important to obtain modify RPC slot first (if applicable), so
@@ -1094,13 +1091,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        if (einfo->ei_enq_slot)
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (ldlm_request_slot_needed(einfo->ei_type)) {
+       if (ldlm_request_slot_needed(einfo)) {
                rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
                if (rc) {
                        if (einfo->ei_enq_slot)
                                ptlrpc_put_mod_rpc_slot(req);
                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                        LDLM_LOCK_RELEASE(lock);
+                       if (!req_passed_in)
+                               ptlrpc_req_finished(req);
                        GOTO(out, rc);
                }
        }
@@ -1114,9 +1113,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
-                                   einfo->ei_mode, flags, lvb, lvb_len,
-                                   lockh, rc);
+       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+                                   lvb, lvb_len, lockh, rc);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -1256,7 +1254,8 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
                }
                LDLM_DEBUG(lock, "server-side local cancel");
                ldlm_lock_cancel(lock);
-               ldlm_reprocess_all(lock->l_resource, lock);
+               ldlm_reprocess_all(lock->l_resource,
+                                  lock->l_policy_data.l_inodebits.bits);
        }
 
        RETURN(rc);
@@ -1419,8 +1418,9 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
  */
 int ldlm_cli_update_pool(struct ptlrpc_request *req)
 {
+       struct ldlm_namespace *ns;
        struct obd_device *obd;
-       __u64 new_slv;
+       __u64 new_slv, ratio;
        __u32 new_limit;
 
        ENTRY;
@@ -1458,17 +1458,39 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        read_unlock(&obd->obd_pool_lock);
 
        /*
-        * Set new SLV and limit in OBD fields to make them accessible
-        * to the pool thread. We do not access obd_namespace and pool
-        * directly here as there is no reliable way to make sure that
-        * they are still alive at cleanup time. Evil races are possible
-        * which may cause Oops at that time.
+        * OBD device keeps the new pool attributes before they are handled by
+        * the pool.
         */
        write_lock(&obd->obd_pool_lock);
        obd->obd_pool_slv = new_slv;
        obd->obd_pool_limit = new_limit;
        write_unlock(&obd->obd_pool_lock);
 
+       /*
+        * Check if an urgent pool recalc is needed, let it to be a change of
+        * SLV on 10%. It is applicable to LRU resize enabled case only.
+        */
+       ns = obd->obd_namespace;
+       if (!ns_connect_lru_resize(ns) ||
+           ldlm_pool_get_slv(&ns->ns_pool) < new_slv)
+               RETURN(0);
+
+       ratio = 100 * new_slv / ldlm_pool_get_slv(&ns->ns_pool);
+       if (100 - ratio >= ns->ns_recalc_pct &&
+           !ns->ns_stopping && !ns->ns_rpc_recalc) {
+               bool recalc = false;
+
+               spin_lock(&ns->ns_lock);
+               if (!ns->ns_stopping && !ns->ns_rpc_recalc) {
+                       ldlm_namespace_get(ns);
+                       recalc = true;
+                       ns->ns_rpc_recalc = 1;
+               }
+               spin_unlock(&ns->ns_lock);
+               if (recalc)
+                       ldlm_bl_to_thread_ns(ns);
+       }
+
        RETURN(0);
 }
 
@@ -1502,7 +1524,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags cancel_flags)
 {
        struct obd_export *exp;
-       enum ldlm_lru_flags lru_flags;
        int avail, count = 1;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
@@ -1560,10 +1581,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                LASSERT(avail > 0);
 
                ns = ldlm_lock_to_ns(lock);
-               lru_flags = ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                              LCF_BL_AST, lru_flags);
+                                              LCF_BL_AST, 0);
        }
        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
        RETURN(0);
@@ -1627,12 +1646,12 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
  */
 static enum ldlm_policy_res
 ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                          int unused, int added, int count)
+                          int added, int min)
 {
        enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
 
        /*
-        * don't check added & count since we want to process all locks
+        * don't check @added & @min since we want to process all locks
         * from unused list.
         * It's fine to not take lock to access lock->l_resource since
         * the lock has already been granted so it won't change.
@@ -1653,8 +1672,8 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
 /**
  * Callback function for LRU-resize policy. Decides whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current
- * scan \a added and number of locks to be preferably canceled \a count.
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1662,34 +1681,28 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
  */
 static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
        ktime_t cur = ktime_get();
        struct ldlm_pool *pl = &ns->ns_pool;
        u64 slv, lvf, lv;
        s64 la;
 
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       if (count && added >= count)
-               return LDLM_POLICY_KEEP_LOCK;
+       if (added < min)
+               return LDLM_POLICY_CANCEL_LOCK;
 
        /*
         * Despite of the LV, It doesn't make sense to keep the lock which
         * is unused for ns_max_age time.
         */
-       if (ktime_after(ktime_get(),
-                       ktime_add(lock->l_last_used, ns->ns_max_age)))
+       if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_CANCEL_LOCK;
 
        slv = ldlm_pool_get_slv(pl);
        lvf = ldlm_pool_get_lvf(pl);
        la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
                     NSEC_PER_SEC);
-       lv = lvf * la * unused;
+       lv = lvf * la * ns->ns_nr_unused >> 8;
 
        /* Inform pool about current CLV to see it via debugfs. */
        ldlm_pool_set_clv(pl, lv);
@@ -1707,44 +1720,21 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added,
-                               int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_lrur_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 /**
- * Callback function for debugfs used policy. Makes decision whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
- * added and number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
-                                                     struct ldlm_lock *lock,
-                                                     int unused, int added,
-                                                     int count)
-{
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
-}
-
-/**
- * Callback function for aged policy. Makes decision whether to keep \a lock in
- * LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
+ * Callback function for aged policy. Decides whether to keep
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1752,10 +1742,9 @@ static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
  */
 static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
-       if ((added >= count) &&
+       if ((added >= min) &&
            ktime_before(ktime_get(),
                         ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_KEEP_LOCK;
@@ -1766,78 +1755,43 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added, int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_aged_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for default policy. Makes decision whether to keep \a lock
- * in LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static
-enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
-                                               struct ldlm_lock *lock,
-                                               int unused, int added,
-                                               int count)
-{
-       /*
-        * Stop LRU processing when we reach past count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 typedef enum ldlm_policy_res
 (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                           int unused, int added, int count);
+                           int added, int min);
 
 static ldlm_cancel_lru_policy_t
 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
 {
        if (ns_connect_lru_resize(ns)) {
-               if (lru_flags & LDLM_LRU_FLAG_SHRINK)
-                       /* We kill passed number of old locks. */
-                       return ldlm_cancel_passed_policy;
-               if (lru_flags & LDLM_LRU_FLAG_LRUR) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_lrur_no_wait_policy;
-                       else
-                               return ldlm_cancel_lrur_policy;
-               }
-               if (lru_flags & LDLM_LRU_FLAG_PASSED)
-                       return ldlm_cancel_passed_policy;
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_lrur_no_wait_policy;
+               else
+                       return ldlm_cancel_lrur_policy;
        } else {
-               if (lru_flags & LDLM_LRU_FLAG_AGED) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_aged_no_wait_policy;
-                       else
-                               return ldlm_cancel_aged_policy;
-               }
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_aged_no_wait_policy;
+               else
+                       return ldlm_cancel_aged_policy;
        }
-       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-               return ldlm_cancel_no_wait_policy;
-
-       return ldlm_cancel_default_policy;
 }
 
 /**
- * - Free space in LRU for \a count new locks,
+ * - Free space in LRU for \a min new locks,
  *   redundant unused locks are canceled locally;
  * - also cancel locally unused aged locks;
  * - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
  * - GET the found locks and add them into the \a cancels list.
  *
  * A client lock can be added to the l_bl_ast list only when it is
@@ -1848,40 +1802,49 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
  * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
  * later without any special locking.
  *
- * Calling policies for enabled LRU resize:
- * ----------------------------------------
- * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
- *                             cancel not more than \a count locks;
- *
- * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
- *                             at the beginning of LRU list);
+ * Locks are cancelled according to the LRU resize policy (SLV from server)
+ * if LRU resize is enabled; otherwise, the "aged policy" is used;
  *
- * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
- *                             to memory pressre policy function;
- *
- * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
+ * LRU flags:
+ * ----------------------------------------
  *
- * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
- *                             (typically before replaying locks) w/o
- *                             sending any RPCs or waiting for any
- *                             outstanding RPC to complete.
+ * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting
+ *                                for any outstanding RPC to complete.
  *
  * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
- *                             other read locks covering the same pages, just
- *                             discard those pages.
+ *                              other read locks covering the same pages, just
+ *                              discard those pages.
  */
 static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
-                                struct list_head *cancels, int count, int max,
+                                struct list_head *cancels,
+                                int min, int max, int batch,
                                 enum ldlm_lru_flags lru_flags)
 {
        ldlm_cancel_lru_policy_t pf;
        int added = 0;
        int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
-
        ENTRY;
 
+       /*
+        * Let only 1 thread to proceed. However, not for those which have the
+        * @max limit given (ELC), as LRU may be left not cleaned up in full.
+        */
+       if (max == 0) {
+               if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+                       RETURN(0);
+       } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+               RETURN(0);
+
+       LASSERT(ergo(max, min <= max));
+       /* No sense to give @batch for ELC */
+       LASSERT(ergo(max, batch == 0));
+
        if (!ns_connect_lru_resize(ns))
-               count += ns->ns_nr_unused - ns->ns_max_unused;
+               min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
+
+       /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+       if (min && min < batch)
+               min = batch;
 
        pf = ldlm_cancel_lru_policy(ns, lru_flags);
        LASSERT(pf != NULL);
@@ -1938,7 +1901,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                 * their weight. Big extent locks will stay in
                 * the cache.
                 */
-               result = pf(ns, lock, ns->ns_nr_unused, added, count);
+               result = pf(ns, lock, added, min);
                if (result == LDLM_POLICY_KEEP_LOCK) {
                        lu_ref_del(&lock->l_reference, __func__, current);
                        LDLM_LOCK_RELEASE(lock);
@@ -2015,18 +1978,25 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                unlock_res_and_lock(lock);
                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
                added++;
+               /* Once a lock added, batch the requested amount */
+               if (min == 0)
+                       min = batch;
        }
+
+       if (max == 0)
+               clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
        RETURN(added);
 }
 
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                         int count, int max,
+                         int min, int max,
                          enum ldlm_cancel_flags cancel_flags,
                          enum ldlm_lru_flags lru_flags)
 {
        int added;
 
-       added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
+       added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
        if (added <= 0)
                return added;
 
@@ -2034,14 +2004,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
 }
 
 /**
- * Cancel at least \a nr locks from given namespace LRU.
+ * Cancel at least \a min locks from given namespace LRU.
  *
  * When called with LCF_ASYNC the blocking callback will be handled
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  When called with LCF_ASYNC the blocking
  * callback will be performed in this function.
  */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
                    enum ldlm_cancel_flags cancel_flags,
                    enum ldlm_lru_flags lru_flags)
 {
@@ -2054,7 +2024,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
         * Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread.
         */
-       count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
+       count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+                                     ns->ns_cancel_batch, lru_flags);
        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
        if (rc == 0)
                RETURN(count);
@@ -2508,6 +2479,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        /* We're part of recovery, so don't wait for it. */
        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+       /* If the state changed while we were prepared, don't wait */
+       req->rq_no_delay = 1;
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        ldlm_lock2desc(lock, &body->lock_desc);
@@ -2557,6 +2530,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
               ldlm_ns_name(ns), ns->ns_nr_unused);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
        /*
         * We don't need to care whether or not LRU resize is enabled
         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -2591,7 +2566,8 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
+       while (atomic_read(&imp->imp_replay_inflight) != 1)
+               cond_resched();
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
@@ -2604,9 +2580,11 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
                list_del_init(&lock->l_pending_chain);
-               if (rc) {
+               /* If we disconnected in the middle - cleanup and let
+                * reconnection to happen again. LU-14027 */
+               if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
                        LDLM_LOCK_RELEASE(lock);
-                       continue; /* or try to do the rest? */
+                       continue;
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
@@ -2627,8 +2605,6 @@ static int ldlm_lock_replay_thread(void *data)
 {
        struct obd_import *imp = data;
 
-       unshare_fs_struct();
-
        CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
               imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
               imp->imp_connection->c_remote_uuid.uuid);
@@ -2646,9 +2622,12 @@ int ldlm_replay_locks(struct obd_import *imp)
        struct task_struct *task;
        int rc = 0;
 
-       class_import_get(imp);
        /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
+       if (atomic_inc_return(&imp->imp_replay_inflight) > 1) {
+               atomic_dec(&imp->imp_replay_inflight);
+               return 0;
+       }
+       class_import_get(imp);
 
        task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
        if (IS_ERR(task)) {