CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
"lock enqueue timeout minimum");
+/* in client side, whether the cached locks will be canceled before replay */
+unsigned int ldlm_cancel_unused_locks_before_replay = 1;
+
static void interrupted_completion_wait(void *data)
{
}
RETURN(count);
}
+/* cancel as many locks as possible w/o sending any rpcs (e.g. to write back
+ * dirty data, to close a file, ...) or waiting for any rpcs in-flight (e.g.
+ * readahead requests, ...) */
+static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock *lock,
+ int unused, int added,
+ int count)
+{
+ ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
+ ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
+ check_res_locked(lock->l_resource);
+
+ /* don't check added & count since we want to process all locks
+ * from unused list */
+ switch (lock->l_resource->lr_type) {
+ case LDLM_EXTENT:
+ case LDLM_IBITS:
+ if (cb && cb(lock))
+ break;
+ default:
+ result = LDLM_POLICY_SKIP_LOCK;
+ break;
+ }
+
+ RETURN(result);
+}
+
/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
static ldlm_cancel_lru_policy_t
ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
{
+ if (flags & LDLM_CANCEL_NO_WAIT)
+ return ldlm_cancel_no_wait_policy;
+
if (ns_connect_lru_resize(ns)) {
if (flags & LDLM_CANCEL_SHRINK)
/* We kill passed number of old locks. */
* memory pressre policy function;
*
* flags & LDLM_CANCEL_AGED - cancel locks according to "aged policy".
+ *
+ * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible
+ * (typically before replaying locks) w/o
+ * sending any rpcs or waiting for any
+ * outstanding rpc to complete
*/
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
int count, int max, int cancel_flags, int flags)
{
ldlm_cancel_lru_policy_t pf;
struct ldlm_lock *lock, *next;
- int added = 0, unused;
+ int added = 0, unused, remained;
ENTRY;
spin_lock(&ns->ns_unused_lock);
unused = ns->ns_nr_unused;
+ remained = unused;
if (!ns_connect_lru_resize(ns))
count += unused - ns->ns_max_unused;
LASSERT(pf != NULL);
while (!list_empty(&ns->ns_unused_list)) {
+ ldlm_policy_res_t result;
+
+ /* Are all unused locks checked? */
+ if (remained-- <= 0)
+ break;
+
/* For any flags, stop scanning if @max is reached. */
if (max && added >= max)
break;
if (&lock->l_lru == &ns->ns_unused_list)
break;
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ spin_unlock(&ns->ns_unused_lock);
+
+ lock_res_and_lock(lock);
/* Pass the lock through the policy filter and see if it
* should stay in lru.
*
* old locks, but additionally chose them by
* their weight. Big extent locks will stay in
* the cache. */
- if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
+ result = pf(ns, lock, unused, added, count);
+ if (result == LDLM_POLICY_KEEP_LOCK) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ spin_lock(&ns->ns_unused_lock);
break;
+ }
+ if (result == LDLM_POLICY_SKIP_LOCK) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ spin_lock(&ns->ns_unused_lock);
+ continue;
+ }
- LDLM_LOCK_GET(lock); /* dropped by bl thread */
- spin_unlock(&ns->ns_unused_lock);
-
- lock_res_and_lock(lock);
/* Check flags again under the lock. */
if ((lock->l_flags & LDLM_FL_CANCELING) ||
(ldlm_lock_remove_from_lru(lock) == 0)) {
RETURN(0);
}
+/* cancel as many unused locks as possible before replay. since we are
+ * in recovery, we can't wait for any outstanding RPCs ro send any RPC
+ * to the server.
+ *
+ * callced only in recovery before replaying locks. there is no need to
+ * replay locks that are unused. since the clients may hold thousands of
+ * cached unused locks, dropping the unused locks can greatly reduce the
+ * load on the servers at recovery time.
+ */
+static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
+{
+ int canceled;
+ CFS_LIST_HEAD(cancels);
+
+ CDEBUG(D_DLMTRACE, "Dropping as many as unused locks as possible before" "replay for namespace %s (%d)\n", ns->ns_name,
+ ns->ns_nr_unused);
+
+ /* we don't need to care whether or not LRU resize is enabled
+ * because the LDLM_CANCLE_NO_WAIT policy doesn't use the
+ * count parameter */
+ canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
+ LDLM_FL_LOCAL_ONLY,
+ LDLM_CANCEL_NO_WAIT);
+
+ CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
+ canceled, ns->ns_name);
+}
+
int ldlm_replay_locks(struct obd_import *imp)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
/* ensure this doesn't fall to 0 before all have been queued */
atomic_inc(&imp->imp_replay_inflight);
+ if (ldlm_cancel_unused_locks_before_replay)
+ ldlm_cancel_unused_locks_for_replay(ns);
+
if (imp->imp_no_lock_replay) {
/* VBR: locks should be cancelled here */
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);