unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
module_param(ldlm_enqueue_min, uint, 0644);
MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
/* in client side, whether the cached locks will be canceled before replay */
unsigned int ldlm_cancel_unused_locks_before_replay = 1;
struct req_capsule *pill = &req->rq_pill;
struct ldlm_request *dlm = NULL;
LIST_HEAD(head);
- enum ldlm_lru_flags lru_flags;
- int avail, to_free, pack = 0;
+ int avail, to_free = 0, pack = 0;
int rc;
ENTRY;
req_capsule_filled_sizes(pill, RCL_CLIENT);
avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
- lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
- LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
- to_free = !ns_connect_lru_resize(ns) &&
- opc == LDLM_ENQUEUE ? 1 : 0;
+ /* If we have reached the limit, free +1 slot for the new one */
+ if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE &&
+ ns->ns_nr_unused >= ns->ns_max_unused)
+ to_free = 1;
/*
* Cancel LRU locks here _only_ if the server supports
if (avail > count)
count += ldlm_cancel_lru_local(ns, cancels, to_free,
avail - count, 0,
- lru_flags);
+ LDLM_LRU_FLAG_NO_WAIT);
if (avail > count)
pack = count;
else
new_slv = lustre_msg_get_slv(req->rq_repmsg);
obd = req->rq_import->imp_obd;
+ read_lock(&obd->obd_pool_lock);
+ if (obd->obd_pool_slv == new_slv &&
+ obd->obd_pool_limit == new_limit) {
+ read_unlock(&obd->obd_pool_lock);
+ RETURN(0);
+ }
+ read_unlock(&obd->obd_pool_lock);
+
/*
* Set new SLV and limit in OBD fields to make them accessible
* to the pool thread. We do not access obd_namespace and pool
enum ldlm_cancel_flags cancel_flags)
{
struct obd_export *exp;
- enum ldlm_lru_flags lru_flags;
int avail, count = 1;
__u64 rc = 0;
struct ldlm_namespace *ns;
LASSERT(avail > 0);
ns = ldlm_lock_to_ns(lock);
- lru_flags = ns_connect_lru_resize(ns) ?
- LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
- LCF_BL_AST, lru_flags);
+ LCF_BL_AST, 0);
}
ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
RETURN(0);
*/
static enum ldlm_policy_res
ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
- int unused, int added, int count)
+ int added, int min)
{
enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
/*
- * don't check added & count since we want to process all locks
+ * don't check @added & @min since we want to process all locks
* from unused list.
* It's fine to not take lock to access lock->l_resource since
* the lock has already been granted so it won't change.
/**
* Callback function for LRU-resize policy. Decides whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current
- * scan \a added and number of locks to be preferably canceled \a count.
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
*
* \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
*
*/
static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
ktime_t cur = ktime_get();
struct ldlm_pool *pl = &ns->ns_pool;
u64 slv, lvf, lv;
s64 la;
- /*
- * Stop LRU processing when we reach past @count or have checked all
- * locks in LRU.
- */
- if (count && added >= count)
- return LDLM_POLICY_KEEP_LOCK;
+ if (added < min)
+ return LDLM_POLICY_CANCEL_LOCK;
/*
* Despite of the LV, It doesn't make sense to keep the lock which
* is unused for ns_max_age time.
*/
- if (ktime_after(ktime_get(),
- ktime_add(lock->l_last_used, ns->ns_max_age)))
+ if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age)))
return LDLM_POLICY_CANCEL_LOCK;
slv = ldlm_pool_get_slv(pl);
lvf = ldlm_pool_get_lvf(pl);
la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
NSEC_PER_SEC);
- lv = lvf * la * unused;
+ lv = lvf * la * ns->ns_nr_unused;
/* Inform pool about current CLV to see it via debugfs. */
ldlm_pool_set_clv(pl, lv);
static enum ldlm_policy_res
ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
enum ldlm_policy_res result;
- result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
+ result = ldlm_cancel_lrur_policy(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK)
return result;
- return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
+ return ldlm_cancel_no_wait_policy(ns, lock, added, min);
}
/**
- * Callback function for debugfs used policy. Makes decision whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
- * added and number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
- int unused, int added,
- int count)
-{
- /*
- * Stop LRU processing when we reach past @count or have checked all
- * locks in LRU.
- */
- return (added >= count) ?
- LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
-}
-
-/**
- * Callback function for aged policy. Makes decision whether to keep \a lock in
- * LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
+ * Callback function for aged policy. Decides whether to keep
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
*
* \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
*
*/
static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
- if ((added >= count) &&
+ if ((added >= min) &&
ktime_before(ktime_get(),
ktime_add(lock->l_last_used, ns->ns_max_age)))
return LDLM_POLICY_KEEP_LOCK;
static enum ldlm_policy_res
ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added, int count)
+ int added, int min)
{
enum ldlm_policy_res result;
- result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
+ result = ldlm_cancel_aged_policy(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK)
return result;
- return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for default policy. Makes decision whether to keep \a lock
- * in LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static
-enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
- int unused, int added,
- int count)
-{
- /*
- * Stop LRU processing when we reach past count or have checked all
- * locks in LRU.
- */
- return (added >= count) ?
- LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+ return ldlm_cancel_no_wait_policy(ns, lock, added, min);
}
typedef enum ldlm_policy_res
(*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
- int unused, int added, int count);
+ int added, int min);
static ldlm_cancel_lru_policy_t
ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
{
if (ns_connect_lru_resize(ns)) {
- if (lru_flags & LDLM_LRU_FLAG_SHRINK)
- /* We kill passed number of old locks. */
- return ldlm_cancel_passed_policy;
- if (lru_flags & LDLM_LRU_FLAG_LRUR) {
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_lrur_no_wait_policy;
- else
- return ldlm_cancel_lrur_policy;
- }
- if (lru_flags & LDLM_LRU_FLAG_PASSED)
- return ldlm_cancel_passed_policy;
+ if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+ return ldlm_cancel_lrur_no_wait_policy;
+ else
+ return ldlm_cancel_lrur_policy;
} else {
- if (lru_flags & LDLM_LRU_FLAG_AGED) {
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_aged_no_wait_policy;
- else
- return ldlm_cancel_aged_policy;
- }
+ if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+ return ldlm_cancel_aged_no_wait_policy;
+ else
+ return ldlm_cancel_aged_policy;
}
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_no_wait_policy;
-
- return ldlm_cancel_default_policy;
}
/**
- * - Free space in LRU for \a count new locks,
+ * - Free space in LRU for \a min new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
* - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
* - GET the found locks and add them into the \a cancels list.
*
* A client lock can be added to the l_bl_ast list only when it is
* attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
* later without any special locking.
*
- * Calling policies for enabled LRU resize:
- * ----------------------------------------
- * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
- * cancel not more than \a count locks;
- *
- * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
- * at the beginning of LRU list);
- *
- * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
- * to memory pressre policy function;
+ * Locks are cancelled according to the LRU resize policy (SLV from server)
+ * if LRU resize is enabled; otherwise, the "aged policy" is used;
*
- * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
+ * LRU flags:
+ * ----------------------------------------
*
- * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
- * (typically before replaying locks) w/o
- * sending any RPCs or waiting for any
- * outstanding RPC to complete.
+ * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting
+ * for any outstanding RPC to complete.
*
* flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
- * other read locks covering the same pages, just
- * discard those pages.
+ * other read locks covering the same pages, just
+ * discard those pages.
*/
static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
- struct list_head *cancels, int count, int max,
+ struct list_head *cancels,
+ int min, int max, int batch,
enum ldlm_lru_flags lru_flags)
{
ldlm_cancel_lru_policy_t pf;
int added = 0;
int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
-
ENTRY;
+ /*
+ * Let only 1 thread to proceed. However, not for those which have the
+ * @max limit given (ELC), as LRU may be left not cleaned up in full.
+ */
+ if (max == 0) {
+ if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+ RETURN(0);
+ } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+ RETURN(0);
+
+ LASSERT(ergo(max, min <= max));
+ /* No sense to give @batch for ELC */
+ LASSERT(ergo(max, batch == 0));
+
if (!ns_connect_lru_resize(ns))
- count += ns->ns_nr_unused - ns->ns_max_unused;
+ min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
+
+ /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+ if (min && min < batch)
+ min = batch;
pf = ldlm_cancel_lru_policy(ns, lru_flags);
LASSERT(pf != NULL);
* their weight. Big extent locks will stay in
* the cache.
*/
- result = pf(ns, lock, ns->ns_nr_unused, added, count);
+ result = pf(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK) {
lu_ref_del(&lock->l_reference, __func__, current);
LDLM_LOCK_RELEASE(lock);
unlock_res_and_lock(lock);
lu_ref_del(&lock->l_reference, __FUNCTION__, current);
added++;
+ /* Once a lock added, batch the requested amount */
+ if (min == 0)
+ min = batch;
}
+
+ if (max == 0)
+ clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
RETURN(added);
}
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
- int count, int max,
+ int min, int max,
enum ldlm_cancel_flags cancel_flags,
enum ldlm_lru_flags lru_flags)
{
int added;
- added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
+ added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
if (added <= 0)
return added;
}
/**
- * Cancel at least \a nr locks from given namespace LRU.
+ * Cancel at least \a min locks from given namespace LRU.
*
* When called with LCF_ASYNC the blocking callback will be handled
* in a thread and this function will return after the thread has been
* asked to call the callback. When called with LCF_ASYNC the blocking
* callback will be performed in this function.
*/
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
enum ldlm_cancel_flags cancel_flags,
enum ldlm_lru_flags lru_flags)
{
* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread.
*/
- count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
+ count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+ ns->ns_cancel_batch, lru_flags);
rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
if (rc == 0)
RETURN(count);
ENTRY;
atomic_dec(&req->rq_import->imp_replay_inflight);
+ wake_up(&req->rq_import->imp_replay_waitq);
+
if (rc != ELDLM_OK)
GOTO(out, rc);
LDLM_DEBUG(lock, "replaying lock:");
- atomic_inc(&req->rq_import->imp_replay_inflight);
+ atomic_inc(&imp->imp_replay_inflight);
aa = ptlrpc_req_async_args(aa, req);
aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = replay_lock_interpret;
canceled, ldlm_ns_name(ns));
}
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+ struct client_obd *cli = &imp->imp_obd->u.cli;
+
+ CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+ atomic_read(&imp->imp_replay_inflight) - 1,
+ cli->cl_max_rpcs_in_flight);
+
+ /* +1 due to ldlm_lock_replay() increment */
+ return atomic_read(&imp->imp_replay_inflight) <
+ 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
LIST_HEAD(list);
ENTRY;
- LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+ LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
/* don't replay locks if import failed recovery */
if (imp->imp_vbr_failed)
RETURN(0);
- /* ensure this doesn't fall to 0 before all have been queued */
- atomic_inc(&imp->imp_replay_inflight);
-
if (ldlm_cancel_unused_locks_before_replay)
ldlm_cancel_unused_locks_for_replay(ns);
}
rc = replay_one_lock(imp, lock);
LDLM_LOCK_RELEASE(lock);
+
+ if (rate_limit)
+ wait_event_idle_exclusive(imp->imp_replay_waitq,
+ lock_can_replay(imp));
}
+ RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+ struct obd_import *imp = data;
+
+ CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+ imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+ imp->imp_connection->c_remote_uuid.uuid);
+
+ __ldlm_replay_locks(imp, true);
atomic_dec(&imp->imp_replay_inflight);
+ ptlrpc_import_recovery_state_machine(imp);
+ class_import_put(imp);
- RETURN(rc);
+ return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+ struct task_struct *task;
+ int rc = 0;
+
+ class_import_get(imp);
+ /* ensure this doesn't fall to 0 before all have been queued */
+ atomic_inc(&imp->imp_replay_inflight);
+
+ task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+ /* run lock replay without rate control */
+ rc = __ldlm_replay_locks(imp, false);
+ atomic_dec(&imp->imp_replay_inflight);
+ class_import_put(imp);
+ }
+
+ return rc;
}