Whamcloud - gitweb
LU-11518 ldlm: control lru_size for extent lock
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index c0f4f2f..ff5ae53 100644 (file)
 unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
 module_param(ldlm_enqueue_min, uint, 0644);
 MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
 
 /* in client side, whether the cached locks will be canceled before replay */
 unsigned int ldlm_cancel_unused_locks_before_replay = 1;
 
-static void interrupted_completion_wait(void *data)
-{
-}
-
 struct lock_wait_data {
        struct ldlm_lock *lwd_lock;
        __u32             lwd_conn_cnt;
@@ -111,9 +108,8 @@ int ldlm_request_bufsize(int count, int type)
        return sizeof(struct ldlm_request) + avail;
 }
 
-int ldlm_expired_completion_wait(void *data)
+void ldlm_expired_completion_wait(struct lock_wait_data *lwd)
 {
-       struct lock_wait_data *lwd = data;
        struct ldlm_lock *lock = lwd->lwd_lock;
        struct obd_import *imp;
        struct obd_device *obd;
@@ -124,9 +120,8 @@ int ldlm_expired_completion_wait(void *data)
 
                LDLM_ERROR(lock,
                           "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
-                          (s64)lock->l_activity,
-                          (s64)(ktime_get_real_seconds() -
-                                lock->l_activity));
+                          lock->l_activity,
+                          ktime_get_real_seconds() - lock->l_activity);
                if (ktime_get_seconds() > next_dump) {
                        last_dump = next_dump;
                        next_dump = ktime_get_seconds() + 300;
@@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data)
                        if (last_dump == 0)
                                libcfs_debug_dumplog();
                }
-               RETURN(0);
+               RETURN_EXIT;
        }
 
        obd = lock->l_conn_export->exp_obd;
@@ -143,11 +138,11 @@ int ldlm_expired_completion_wait(void *data)
        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
        LDLM_ERROR(lock,
                   "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
-                  (s64)lock->l_activity,
-                  (s64)(ktime_get_real_seconds() - lock->l_activity),
+                  lock->l_activity,
+                  ktime_get_real_seconds() - lock->l_activity,
                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 
-       RETURN(0);
+       EXIT;
 }
 
 int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
@@ -176,9 +171,9 @@ EXPORT_SYMBOL(is_granted_or_cancelled_nolock);
  * We use the same basis for both server side and client side functions
  * from a single node.
  */
-static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
+static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock)
 {
-       time64_t timeout;
+       timeout_t timeout;
 
        if (AT_OFF)
                return obd_timeout;
@@ -189,7 +184,7 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
         * doesn't respond reasonably, and then give us the lock.
         */
        timeout = at_get(ldlm_lock_to_ns_at(lock));
-       return max(3 * timeout, (time64_t) ldlm_enqueue_min);
+       return max(3 * timeout, (timeout_t)ldlm_enqueue_min);
 }
 
 /**
@@ -198,7 +193,6 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
  */
 static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
 {
-       time64_t delay;
        int result = 0;
 
        if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
@@ -208,10 +202,16 @@ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
                LDLM_DEBUG(lock, "client-side enqueue: granted");
        } else {
                /* Take into AT only CP RPC, not immediately granted locks */
-               delay = ktime_get_real_seconds() - lock->l_activity;
-               LDLM_DEBUG(lock, "client-side enqueue: granted after %llds",
-                          (s64)delay);
+               timeout_t delay = 0;
 
+               /* Discard negative timeouts. We should also limit the
+                * maximum value of the timeout
+                */
+               if (ktime_get_real_seconds() > lock->l_activity)
+                       delay = ktime_get_real_seconds() - lock->l_activity;
+
+               LDLM_DEBUG(lock, "client-side enqueue: granted after %ds",
+                          delay);
                /* Update our time estimate */
                at_measured(ldlm_lock_to_ns_at(lock), delay);
        }
@@ -269,8 +269,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        struct lock_wait_data lwd;
        struct obd_device *obd;
        struct obd_import *imp = NULL;
-       struct l_wait_info lwi;
-       time64_t timeout;
+       timeout_t timeout;
        int rc = 0;
 
        ENTRY;
@@ -300,15 +299,6 @@ noreproc:
        lwd.lwd_lock = lock;
        lock->l_activity = ktime_get_real_seconds();
 
-       if (ldlm_is_no_timeout(lock)) {
-               LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
-               lwi = LWI_INTR(interrupted_completion_wait, &lwd);
-       } else {
-               lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
-                                      ldlm_expired_completion_wait,
-                                      interrupted_completion_wait, &lwd);
-       }
-
        if (imp != NULL) {
                spin_lock(&imp->imp_lock);
                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
@@ -322,8 +312,22 @@ noreproc:
                rc = -EINTR;
        } else {
                /* Go to sleep until the lock is granted or cancelled. */
-               rc = l_wait_event(lock->l_waitq,
-                                 is_granted_or_cancelled(lock), &lwi);
+               if (ldlm_is_no_timeout(lock)) {
+                       LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
+                       rc = l_wait_event_abortable(
+                               lock->l_waitq,
+                               is_granted_or_cancelled(lock));
+               } else {
+                       if (wait_event_idle_timeout(
+                                   lock->l_waitq,
+                                   is_granted_or_cancelled(lock),
+                                   cfs_time_seconds(timeout)) == 0) {
+                               ldlm_expired_completion_wait(&lwd);
+                               rc = l_wait_event_abortable(
+                                       lock->l_waitq,
+                                       is_granted_or_cancelled(lock));
+                       }
+               }
        }
 
        if (rc) {
@@ -583,6 +587,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
+static bool ldlm_request_slot_needed(enum ldlm_type type)
+{
+       return type == LDLM_FLOCK || type == LDLM_IBITS;
+}
+
 /**
  * Finishing portion of client lock enqueue code.
  *
@@ -603,6 +612,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        ENTRY;
 
+       if (ldlm_request_slot_needed(type))
+               obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+       ptlrpc_put_mod_rpc_slot(req);
+
        if (req && req->rq_svc_thread)
                env = req->rq_svc_thread->t_env;
 
@@ -725,6 +739,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        if ((*flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
+               ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                unlock_res_and_lock(lock);
                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
@@ -837,9 +852,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
        struct req_capsule      *pill = &req->rq_pill;
        struct ldlm_request     *dlm = NULL;
-       struct list_head        head = LIST_HEAD_INIT(head);
-       enum ldlm_lru_flags lru_flags;
-       int avail, to_free, pack = 0;
+       LIST_HEAD(head);
+       int avail, to_free = 0, pack = 0;
        int rc;
 
        ENTRY;
@@ -851,10 +865,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                req_capsule_filled_sizes(pill, RCL_CLIENT);
                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 
-               lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
-               to_free = !ns_connect_lru_resize(ns) &&
-                       opc == LDLM_ENQUEUE ? 1 : 0;
+               /* If we have reached the limit, free +1 slot for the new one */
+               if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE &&
+                   ns->ns_nr_unused >= ns->ns_max_unused)
+                       to_free = 1;
 
                /*
                 * Cancel LRU locks here _only_ if the server supports
@@ -864,7 +878,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                if (avail > count)
                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
                                                       avail - count, 0,
-                                                      lru_flags);
+                                                      LDLM_LRU_FLAG_NO_WAIT);
                if (avail > count)
                        pack = count;
                else
@@ -1008,7 +1022,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        lock->l_conn_export = exp;
        lock->l_export = NULL;
        lock->l_blocking_ast = einfo->ei_cb_bl;
-       lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+       lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL |
+                                   LDLM_FL_ATOMIC_CB));
        lock->l_activity = ktime_get_real_seconds();
 
        /* lock not sent to server yet */
@@ -1034,7 +1049,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        }
 
        if (*flags & LDLM_FL_NDELAY) {
-               DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n");
+               DEBUG_REQ(D_DLMTRACE, req, "enqueue lock with no delay");
                req->rq_no_resend = req->rq_no_delay = 1;
                /*
                 * probably set a shorter timeout value and handle ETIMEDOUT
@@ -1071,6 +1086,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                             LDLM_GLIMPSE_ENQUEUE);
        }
 
+       /* It is important to obtain modify RPC slot first (if applicable), so
+        * that threads that are waiting for a modify RPC slot are not polluting
+        * our rpcs in flight counter. */
+
+       if (einfo->ei_enq_slot)
+               ptlrpc_get_mod_rpc_slot(req);
+
+       if (ldlm_request_slot_needed(einfo->ei_type)) {
+               rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+               if (rc) {
+                       if (einfo->ei_enq_slot)
+                               ptlrpc_put_mod_rpc_slot(req);
+                       failed_lock_cleanup(ns, lock, einfo->ei_mode);
+                       LDLM_LOCK_RELEASE(lock);
+                       GOTO(out, rc);
+               }
+       }
+
        if (async) {
                LASSERT(reqp != NULL);
                RETURN(0);
@@ -1093,6 +1126,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        else
                rc = err;
 
+out:
        if (!req_passed_in && req != NULL) {
                ptlrpc_req_finished(req);
                if (reqp)
@@ -1104,128 +1138,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
 /**
- * Client-side lock convert reply handling.
- *
- * Finish client lock converting, checks for concurrent converts
- * and clear 'converting' flag so lock can be placed back into LRU.
- */
-static int lock_convert_interpret(const struct lu_env *env,
-                                 struct ptlrpc_request *req,
-                                 void *args, int rc)
-{
-       struct ldlm_async_args *aa = args;
-       struct ldlm_lock *lock;
-       struct ldlm_reply *reply;
-
-       ENTRY;
-
-       lock = ldlm_handle2lock(&aa->lock_handle);
-       if (!lock) {
-               LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
-                       aa->lock_handle.cookie);
-               RETURN(-ESTALE);
-       }
-
-       LDLM_DEBUG(lock, "CONVERTED lock:");
-
-       if (rc != ELDLM_OK)
-               GOTO(out, rc);
-
-       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-       if (reply == NULL)
-               GOTO(out, rc = -EPROTO);
-
-       if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
-               LDLM_ERROR(lock,
-                          "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
-                          aa->lock_handle.cookie, reply->lock_handle.cookie,
-                          req->rq_export->exp_client_uuid.uuid,
-                          libcfs_id2str(req->rq_peer));
-               GOTO(out, rc = ELDLM_NO_LOCK_DATA);
-       }
-
-       lock_res_and_lock(lock);
-       /*
-        * Lock convert is sent for any new bits to drop, the converting flag
-        * is dropped when ibits on server are the same as on client. Meanwhile
-        * that can be so that more later convert will be replied first with
-        * and clear converting flag, so in case of such race just exit here.
-        * if lock has no converting bits then
-        */
-       if (!ldlm_is_converting(lock)) {
-               LDLM_DEBUG(lock,
-                          "convert ACK for lock without converting flag, reply ibits %#llx",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
-                  lock->l_policy_data.l_inodebits.bits) {
-               /*
-                * Compare server returned lock ibits and local lock ibits
-                * if they are the same we consider convertion is done,
-                * otherwise we have more converts inflight and keep
-                * converting flag.
-                */
-               LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else {
-               ldlm_clear_converting(lock);
-
-               /*
-                * Concurrent BL AST may arrive and cause another convert
-                * or cancel so just do nothing here if bl_ast is set,
-                * finish with convert otherwise.
-                */
-               if (!ldlm_is_bl_ast(lock)) {
-                       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
-
-                       /*
-                        * Drop cancel_bits since there are no more converts
-                        * and put lock into LRU if it is still not used and
-                        * is not there yet.
-                        */
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-                       if (!lock->l_readers && !lock->l_writers &&
-                           !ldlm_is_canceling(lock)) {
-                               spin_lock(&ns->ns_lock);
-                               /* there is check for list_empty() inside */
-                               ldlm_lock_remove_from_lru_nolock(lock);
-                               ldlm_lock_add_to_lru_nolock(lock);
-                               spin_unlock(&ns->ns_lock);
-                       }
-               }
-       }
-       unlock_res_and_lock(lock);
-out:
-       if (rc) {
-               int flag;
-
-               lock_res_and_lock(lock);
-               if (ldlm_is_converting(lock)) {
-                       ldlm_clear_converting(lock);
-                       ldlm_set_cbpending(lock);
-                       ldlm_set_bl_ast(lock);
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-               }
-               unlock_res_and_lock(lock);
-
-               /*
-                * fallback to normal lock cancel. If rc means there is no
-                * valid lock on server, do only local cancel
-                */
-               if (rc == ELDLM_NO_LOCK_DATA)
-                       flag = LCF_LOCAL;
-               else
-                       flag = LCF_ASYNC;
-
-               rc = ldlm_cli_cancel(&aa->lock_handle, flag);
-               if (rc < 0)
-                       LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
-                                  rc);
-       }
-       LDLM_LOCK_PUT(lock);
-       RETURN(rc);
-}
-
-/**
  * Client-side IBITS lock convert.
  *
  * Inform server that lock has been converted instead of canceling.
@@ -1237,19 +1149,15 @@ out:
  * is made asynchronous.
  *
  */
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits)
 {
        struct ldlm_request *body;
        struct ptlrpc_request *req;
-       struct ldlm_async_args *aa;
        struct obd_export *exp = lock->l_conn_export;
 
        ENTRY;
 
-       if (exp == NULL) {
-               LDLM_ERROR(lock, "convert must not be called on local locks.");
-               RETURN(-EINVAL);
-       }
+       LASSERT(exp != NULL);
 
        /*
         * this is better to check earlier and it is done so already,
@@ -1280,8 +1188,7 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
        body->lock_desc.l_req_mode = lock->l_req_mode;
        body->lock_desc.l_granted_mode = lock->l_granted_mode;
 
-       body->lock_desc.l_policy_data.l_inodebits.bits =
-                                       lock->l_policy_data.l_inodebits.bits;
+       body->lock_desc.l_policy_data.l_inodebits.bits = new_bits;
        body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
 
        body->lock_flags = ldlm_flags_to_wire(*flags);
@@ -1301,10 +1208,6 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
                lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                     LDLM_CONVERT - LDLM_FIRST_OPC);
 
-       aa = ptlrpc_req_async_args(aa, req);
-       ldlm_lock2handle(lock, &aa->lock_handle);
-       req->rq_interpret_reply = lock_convert_interpret;
-
        ptlrpcd_add_req(req);
        RETURN(0);
 }
@@ -1326,6 +1229,9 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
                bool local_only;
 
                LDLM_DEBUG(lock, "client-side cancel");
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL,
+                                cfs_fail_val);
+
                /* Set this flag to prevent others from getting new references*/
                lock_res_and_lock(lock);
                ldlm_set_cbpending(lock);
@@ -1532,7 +1438,7 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
            lustre_msg_get_limit(req->rq_repmsg) == 0) {
                DEBUG_REQ(D_HA, req,
-                         "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
+                         "Zero SLV or limit found (SLV=%llu, limit=%u)",
                          lustre_msg_get_slv(req->rq_repmsg),
                          lustre_msg_get_limit(req->rq_repmsg));
                RETURN(0);
@@ -1542,6 +1448,14 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        new_slv = lustre_msg_get_slv(req->rq_repmsg);
        obd = req->rq_import->imp_obd;
 
+       read_lock(&obd->obd_pool_lock);
+       if (obd->obd_pool_slv == new_slv &&
+           obd->obd_pool_limit == new_limit) {
+               read_unlock(&obd->obd_pool_lock);
+               RETURN(0);
+       }
+       read_unlock(&obd->obd_pool_lock);
+
        /*
         * Set new SLV and limit in OBD fields to make them accessible
         * to the pool thread. We do not access obd_namespace and pool
@@ -1557,6 +1471,27 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        RETURN(0);
 }
 
+int ldlm_cli_convert(struct ldlm_lock *lock,
+                    enum ldlm_cancel_flags cancel_flags)
+{
+       int rc = -EINVAL;
+
+       LASSERT(!lock->l_readers && !lock->l_writers);
+       LDLM_DEBUG(lock, "client lock convert START");
+
+       if (lock->l_resource->lr_type == LDLM_IBITS) {
+               lock_res_and_lock(lock);
+               do {
+                       rc = ldlm_cli_inodebits_convert(lock, cancel_flags);
+               } while (rc == -EAGAIN);
+               unlock_res_and_lock(lock);
+       }
+
+       LDLM_DEBUG(lock, "client lock convert END");
+       RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_cli_convert);
+
 /**
  * Client side lock cancel.
  *
@@ -1566,12 +1501,11 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags cancel_flags)
 {
        struct obd_export *exp;
-       enum ldlm_lru_flags lru_flags;
        int avail, count = 1;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
        struct ldlm_lock *lock;
-       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
 
        ENTRY;
 
@@ -1581,44 +1515,21 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
-       /* Convert lock bits instead of cancel for IBITS locks */
-       if (cancel_flags & LCF_CONVERT) {
-               LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
-               LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
-
-               rc = ldlm_cli_dropbits(lock,
-                               lock->l_policy_data.l_inodebits.cancel_bits);
-               if (rc == 0) {
-                       LDLM_LOCK_RELEASE(lock);
-                       RETURN(0);
-               }
-       }
-
        lock_res_and_lock(lock);
+       LASSERT(!ldlm_is_converting(lock));
+
        /* Lock is being canceled and the caller doesn't want to wait */
        if (ldlm_is_canceling(lock)) {
                if (cancel_flags & LCF_ASYNC) {
                        unlock_res_and_lock(lock);
                } else {
-                       struct l_wait_info lwi = { 0 };
-
                        unlock_res_and_lock(lock);
-                       l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+                       wait_event_idle(lock->l_waitq, is_bl_done(lock));
                }
                LDLM_LOCK_RELEASE(lock);
                RETURN(0);
        }
 
-       /*
-        * Lock is being converted, cancel it immediately.
-        * When convert will end, it releases lock and it will be gone.
-        */
-       if (ldlm_is_converting(lock)) {
-               /* set back flags removed by convert */
-               ldlm_set_cbpending(lock);
-               ldlm_set_bl_ast(lock);
-       }
-
        ldlm_set_canceling(lock);
        unlock_res_and_lock(lock);
 
@@ -1647,10 +1558,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                LASSERT(avail > 0);
 
                ns = ldlm_lock_to_ns(lock);
-               lru_flags = ns_connect_lru_resize(ns) ?
-                       LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                              LCF_BL_AST, lru_flags);
+                                              LCF_BL_AST, 0);
        }
        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
        RETURN(0);
@@ -1664,7 +1573,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel);
 int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
                               enum ldlm_cancel_flags cancel_flags)
 {
-       struct list_head head = LIST_HEAD_INIT(head);
+       LIST_HEAD(head);
        struct ldlm_lock *lock, *next;
        int left = 0, bl_ast = 0;
        __u64 rc;
@@ -1714,12 +1623,12 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
  */
 static enum ldlm_policy_res
 ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                          int unused, int added, int count)
+                          int added, int min)
 {
        enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
 
        /*
-        * don't check added & count since we want to process all locks
+        * don't check @added & @min since we want to process all locks
         * from unused list.
         * It's fine to not take lock to access lock->l_resource since
         * the lock has already been granted so it won't change.
@@ -1729,6 +1638,7 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                case LDLM_IBITS:
                        if (ns->ns_cancel != NULL && ns->ns_cancel(lock) != 0)
                                break;
+                       /* fallthrough */
                default:
                        result = LDLM_POLICY_SKIP_LOCK;
                        break;
@@ -1739,8 +1649,8 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
 /**
  * Callback function for LRU-resize policy. Decides whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current
- * scan \a added and number of locks to be preferably canceled \a count.
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1748,34 +1658,28 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
  */
 static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
        ktime_t cur = ktime_get();
        struct ldlm_pool *pl = &ns->ns_pool;
        u64 slv, lvf, lv;
        s64 la;
 
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       if (count && added >= count)
-               return LDLM_POLICY_KEEP_LOCK;
+       if (added < min)
+               return LDLM_POLICY_CANCEL_LOCK;
 
        /*
         * Despite of the LV, It doesn't make sense to keep the lock which
         * is unused for ns_max_age time.
         */
-       if (ktime_after(ktime_get(),
-                       ktime_add(lock->l_last_used, ns->ns_max_age)))
+       if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_CANCEL_LOCK;
 
        slv = ldlm_pool_get_slv(pl);
        lvf = ldlm_pool_get_lvf(pl);
        la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
                     NSEC_PER_SEC);
-       lv = lvf * la * unused;
+       lv = lvf * la * ns->ns_nr_unused;
 
        /* Inform pool about current CLV to see it via debugfs. */
        ldlm_pool_set_clv(pl, lv);
@@ -1793,44 +1697,21 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added,
-                               int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_lrur_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for debugfs used policy. Makes decision whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
- * added and number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
-                                                     struct ldlm_lock *lock,
-                                                     int unused, int added,
-                                                     int count)
-{
-       /*
-        * Stop LRU processing when we reach past @count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 /**
- * Callback function for aged policy. Makes decision whether to keep \a lock in
- * LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
+ * Callback function for aged policy. Decides whether to keep
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
  *
  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
  *
@@ -1838,10 +1719,9 @@ static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
  */
 static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
                                                    struct ldlm_lock *lock,
-                                                   int unused, int added,
-                                                   int count)
+                                                   int added, int min)
 {
-       if ((added >= count) &&
+       if ((added >= min) &&
            ktime_before(ktime_get(),
                         ktime_add(lock->l_last_used, ns->ns_max_age)))
                return LDLM_POLICY_KEEP_LOCK;
@@ -1852,78 +1732,43 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
 static enum ldlm_policy_res
 ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock,
-                               int unused, int added, int count)
+                               int added, int min)
 {
        enum ldlm_policy_res result;
 
-       result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
+       result = ldlm_cancel_aged_policy(ns, lock, added, min);
        if (result == LDLM_POLICY_KEEP_LOCK)
                return result;
 
-       return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for default policy. Makes decision whether to keep \a lock
- * in LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static
-enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
-                                               struct ldlm_lock *lock,
-                                               int unused, int added,
-                                               int count)
-{
-       /*
-        * Stop LRU processing when we reach past count or have checked all
-        * locks in LRU.
-        */
-       return (added >= count) ?
-               LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+       return ldlm_cancel_no_wait_policy(ns, lock, added, min);
 }
 
 typedef enum ldlm_policy_res
 (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                           int unused, int added, int count);
+                           int added, int min);
 
 static ldlm_cancel_lru_policy_t
 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
 {
        if (ns_connect_lru_resize(ns)) {
-               if (lru_flags & LDLM_LRU_FLAG_SHRINK)
-                       /* We kill passed number of old locks. */
-                       return ldlm_cancel_passed_policy;
-               if (lru_flags & LDLM_LRU_FLAG_LRUR) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_lrur_no_wait_policy;
-                       else
-                               return ldlm_cancel_lrur_policy;
-               }
-               if (lru_flags & LDLM_LRU_FLAG_PASSED)
-                       return ldlm_cancel_passed_policy;
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_lrur_no_wait_policy;
+               else
+                       return ldlm_cancel_lrur_policy;
        } else {
-               if (lru_flags & LDLM_LRU_FLAG_AGED) {
-                       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-                               return ldlm_cancel_aged_no_wait_policy;
-                       else
-                               return ldlm_cancel_aged_policy;
-               }
+               if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+                       return ldlm_cancel_aged_no_wait_policy;
+               else
+                       return ldlm_cancel_aged_policy;
        }
-       if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
-               return ldlm_cancel_no_wait_policy;
-
-       return ldlm_cancel_default_policy;
 }
 
 /**
- * - Free space in LRU for \a count new locks,
+ * - Free space in LRU for \a min new locks,
  *   redundant unused locks are canceled locally;
  * - also cancel locally unused aged locks;
  * - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
  * - GET the found locks and add them into the \a cancels list.
  *
  * A client lock can be added to the l_bl_ast list only when it is
@@ -1934,40 +1779,49 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
  * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
  * later without any special locking.
  *
- * Calling policies for enabled LRU resize:
- * ----------------------------------------
- * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
- *                             cancel not more than \a count locks;
- *
- * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
- *                             at the beginning of LRU list);
- *
- * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
- *                             to memory pressre policy function;
+ * Locks are cancelled according to the LRU resize policy (SLV from server)
+ * if LRU resize is enabled; otherwise, the "aged policy" is used;
  *
- * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
+ * LRU flags:
+ * ----------------------------------------
  *
- * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
- *                             (typically before replaying locks) w/o
- *                             sending any RPCs or waiting for any
- *                             outstanding RPC to complete.
+ * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting
+ *                                for any outstanding RPC to complete.
  *
  * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
- *                             other read locks covering the same pages, just
- *                             discard those pages.
+ *                              other read locks covering the same pages, just
+ *                              discard those pages.
  */
 static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
-                                struct list_head *cancels, int count, int max,
+                                struct list_head *cancels,
+                                int min, int max, int batch,
                                 enum ldlm_lru_flags lru_flags)
 {
        ldlm_cancel_lru_policy_t pf;
        int added = 0;
        int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
-
        ENTRY;
 
+       /*
+        * Let only 1 thread to proceed. However, not for those which have the
+        * @max limit given (ELC), as LRU may be left not cleaned up in full.
+        */
+       if (max == 0) {
+               if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+                       RETURN(0);
+       } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+               RETURN(0);
+
+       LASSERT(ergo(max, min <= max));
+       /* No sense to give @batch for ELC */
+       LASSERT(ergo(max, batch == 0));
+
        if (!ns_connect_lru_resize(ns))
-               count += ns->ns_nr_unused - ns->ns_max_unused;
+               min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
+
+       /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+       if (min && min < batch)
+               min = batch;
 
        pf = ldlm_cancel_lru_policy(ns, lru_flags);
        LASSERT(pf != NULL);
@@ -1989,8 +1843,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                        /* No locks which got blocking requests. */
                        LASSERT(!ldlm_is_bl_ast(lock));
 
-                       if (!ldlm_is_canceling(lock) &&
-                           !ldlm_is_converting(lock))
+                       if (!ldlm_is_canceling(lock))
                                break;
 
                        /*
@@ -2025,7 +1878,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                 * their weight. Big extent locks will stay in
                 * the cache.
                 */
-               result = pf(ns, lock, ns->ns_nr_unused, added, count);
+               result = pf(ns, lock, added, min);
                if (result == LDLM_POLICY_KEEP_LOCK) {
                        lu_ref_del(&lock->l_reference, __func__, current);
                        LDLM_LOCK_RELEASE(lock);
@@ -2048,7 +1901,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
 
                lock_res_and_lock(lock);
                /* Check flags again under the lock. */
-               if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
+               if (ldlm_is_canceling(lock) ||
                    ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
                        /*
                         * Another thread is removing lock from LRU, or
@@ -2102,18 +1955,25 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                unlock_res_and_lock(lock);
                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
                added++;
+               /* Once a lock added, batch the requested amount */
+               if (min == 0)
+                       min = batch;
        }
+
+       if (max == 0)
+               clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
        RETURN(added);
 }
 
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                         int count, int max,
+                         int min, int max,
                          enum ldlm_cancel_flags cancel_flags,
                          enum ldlm_lru_flags lru_flags)
 {
        int added;
 
-       added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
+       added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
        if (added <= 0)
                return added;
 
@@ -2121,18 +1981,18 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
 }
 
 /**
- * Cancel at least \a nr locks from given namespace LRU.
+ * Cancel at least \a min locks from given namespace LRU.
  *
  * When called with LCF_ASYNC the blocking callback will be handled
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  When called with LCF_ASYNC the blocking
  * callback will be performed in this function.
  */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
                    enum ldlm_cancel_flags cancel_flags,
                    enum ldlm_lru_flags lru_flags)
 {
-       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
        int count, rc;
 
        ENTRY;
@@ -2141,7 +2001,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
         * Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread.
         */
-       count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
+       count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+                                     ns->ns_cancel_batch, lru_flags);
        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
        if (rc == 0)
                RETURN(count);
@@ -2178,11 +2039,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                        continue;
 
                /*
-                * If somebody is already doing CANCEL, or blocking AST came,
-                * or lock is being converted then skip this lock.
+                * If somebody is already doing CANCEL, or blocking AST came
+                * then skip this lock.
                 */
-               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
-                   ldlm_is_converting(lock))
+               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
                        continue;
 
                if (lockmode_compat(lock->l_granted_mode, mode))
@@ -2208,7 +2068,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                /* See CBPENDING comment in ldlm_cancel_lru */
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
                                 lock_flags;
-
                LASSERT(list_empty(&lock->l_bl_ast));
                list_add(&lock->l_bl_ast, cancels);
                LDLM_LOCK_GET(lock);
@@ -2295,7 +2154,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                    enum ldlm_cancel_flags flags, void *opaque)
 {
        struct ldlm_resource *res;
-       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
        int count;
        int rc;
 
@@ -2501,6 +2360,8 @@ static int replay_lock_interpret(const struct lu_env *env,
 
        ENTRY;
        atomic_dec(&req->rq_import->imp_replay_inflight);
+       wake_up(&req->rq_import->imp_replay_waitq);
+
        if (rc != ELDLM_OK)
                GOTO(out, rc);
 
@@ -2616,7 +2477,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        LDLM_DEBUG(lock, "replaying lock:");
 
-       atomic_inc(&req->rq_import->imp_replay_inflight);
+       atomic_inc(&imp->imp_replay_inflight);
        aa = ptlrpc_req_async_args(aa, req);
        aa->lock_handle = body->lock_handle[0];
        req->rq_interpret_reply = replay_lock_interpret;
@@ -2638,7 +2499,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
 {
        int canceled;
-       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
 
        CDEBUG(D_DLMTRACE,
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
@@ -2656,24 +2517,34 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
                           canceled, ldlm_ns_name(ns));
 }
 
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+       struct client_obd *cli = &imp->imp_obd->u.cli;
+
+       CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+              atomic_read(&imp->imp_replay_inflight) - 1,
+              cli->cl_max_rpcs_in_flight);
+
+       /* +1 due to ldlm_lock_replay() increment */
+       return atomic_read(&imp->imp_replay_inflight) <
+              1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 {
        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
-       struct list_head list = LIST_HEAD_INIT(list);
+       LIST_HEAD(list);
        struct ldlm_lock *lock, *next;
        int rc = 0;
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
                RETURN(0);
 
-       /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
-
        if (ldlm_cancel_unused_locks_before_replay)
                ldlm_cancel_unused_locks_for_replay(ns);
 
@@ -2687,9 +2558,54 @@ int ldlm_replay_locks(struct obd_import *imp)
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
+
+               if (rate_limit)
+                       wait_event_idle_exclusive(imp->imp_replay_waitq,
+                                                 lock_can_replay(imp));
        }
 
+       RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+       struct obd_import *imp = data;
+
+       CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+              imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+              imp->imp_connection->c_remote_uuid.uuid);
+
+       __ldlm_replay_locks(imp, true);
        atomic_dec(&imp->imp_replay_inflight);
+       ptlrpc_import_recovery_state_machine(imp);
+       class_import_put(imp);
 
-       RETURN(rc);
+       return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+       struct task_struct *task;
+       int rc = 0;
+
+       class_import_get(imp);
+       /* ensure this doesn't fall to 0 before all have been queued */
+       atomic_inc(&imp->imp_replay_inflight);
+
+       task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+               /* run lock replay without rate control */
+               rc = __ldlm_replay_locks(imp, false);
+               atomic_dec(&imp->imp_replay_inflight);
+               class_import_put(imp);
+       }
+
+       return rc;
 }