* emulation + race with upcoming bl_ast. */
#define LDLM_FL_FAIL_LOC 0x100000000ULL
+/* Used while processing the unused list to know that we have already
+ * handled this lock and decided to skip it */
+#define LDLM_FL_SKIPPED 0x200000000ULL
+
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
#define LDLM_CB_BLOCKING 1
void *req_cookie, ldlm_mode_t mode, int flags,
void *data);
+typedef int (*ldlm_cancel_for_recovery)(struct ldlm_lock *lock);
+
struct ldlm_valblock_ops {
int (*lvbo_init)(struct ldlm_resource *res);
int (*lvbo_update)(struct ldlm_resource *res,
struct obd_device *ns_obd;
struct adaptive_timeout ns_at_estimate;/* estimated lock callback time*/
+
+ /* callback to cancel locks before replaying it during recovery */
+ ldlm_cancel_for_recovery ns_cancel_for_recovery;
};
static inline int ns_is_client(struct ldlm_namespace *ns)
return !!(ns->ns_connect_flags & OBD_CONNECT_LRU_RESIZE);
}
+static inline void ns_register_cancel(struct ldlm_namespace *ns,
+ ldlm_cancel_for_recovery arg)
+{
+ LASSERT(ns != NULL);
+ ns->ns_cancel_for_recovery = arg;
+}
+
/*
*
* Resource hash table
CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
"lock enqueue timeout minimum");
+/* in client side, whether the cached locks will be canceled before replay */
+unsigned int ldlm_cancel_unused_locks_before_replay = 1;
+
static void interrupted_completion_wait(void *data)
{
}
}
/**
+ * Cancel as many locks as possible w/o sending any rpcs (e.g. to write back
+ * dirty data, to close a file, ...) or waiting for any rpcs in-flight (e.g.
+ * readahead requests, ...)
+ */
+static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
+ struct ldlm_lock *lock,
+ int unused, int added,
+ int count)
+{
+ ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
+ ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
+ lock_res_and_lock(lock);
+
+ /* don't check added & count since we want to process all locks
+ * from unused list */
+ switch (lock->l_resource->lr_type) {
+ case LDLM_EXTENT:
+ case LDLM_IBITS:
+ if (cb && cb(lock))
+ break;
+ default:
+ result = LDLM_POLICY_SKIP_LOCK;
+ lock->l_flags |= LDLM_FL_SKIPPED;
+ break;
+ }
+
+ unlock_res_and_lock(lock);
+ RETURN(result);
+}
+
+/**
* Callback function for lru-resize policy. Makes decision whether to keep
* \a lock in LRU for current \a LRU size \a unused, added in current scan
* \a added and number of locks to be preferably canceled \a count.
static ldlm_cancel_lru_policy_t
ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
{
+ if (flags & LDLM_CANCEL_NO_WAIT)
+ return ldlm_cancel_no_wait_policy;
+
if (ns_connect_lru_resize(ns)) {
if (flags & LDLM_CANCEL_SHRINK)
/* We kill passed number of old locks. */
* memory pressre policy function;
*
* flags & LDLM_CANCEL_AGED - cancel alocks according to "aged policy".
+ *
+ * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible
+ * (typically before replaying locks) w/o
+ * sending any rpcs or waiting for any
+ * outstanding rpc to complete.
*/
static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
int count, int max, int flags)
{
ldlm_cancel_lru_policy_t pf;
struct ldlm_lock *lock, *next;
- int added = 0, unused;
+ int added = 0, unused, remained;
ENTRY;
cfs_spin_lock(&ns->ns_unused_lock);
unused = ns->ns_nr_unused;
+ remained = unused;
if (!ns_connect_lru_resize(ns))
count += unused - ns->ns_max_unused;
LASSERT(pf != NULL);
while (!cfs_list_empty(&ns->ns_unused_list)) {
+ ldlm_policy_res_t result;
+
+ /* all unused locks */
+ if (remained-- <= 0)
+ break;
+
/* For any flags, stop scanning if @max is reached. */
if (max && added >= max)
break;
/* No locks which got blocking requests. */
LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
+ if (flags & LDLM_CANCEL_NO_WAIT &&
+ lock->l_flags & LDLM_FL_SKIPPED)
+ /* already processed */
+ continue;
+
/* Somebody is already doing CANCEL. No need in this
* lock in lru, do not traverse it again. */
if (!(lock->l_flags & LDLM_FL_CANCELING))
* old locks, but additionally chose them by
* their weight. Big extent locks will stay in
* the cache. */
- if (pf(ns, lock, unused, added, count) ==
- LDLM_POLICY_KEEP_LOCK) {
+ result = pf(ns, lock, unused, added, count);
+ if (result == LDLM_POLICY_KEEP_LOCK) {
lu_ref_del(&lock->l_reference,
__FUNCTION__, cfs_current());
LDLM_LOCK_RELEASE(lock);
cfs_spin_lock(&ns->ns_unused_lock);
break;
}
+ if (result == LDLM_POLICY_SKIP_LOCK) {
+ lu_ref_del(&lock->l_reference,
+ __FUNCTION__, cfs_current());
+ LDLM_LOCK_RELEASE(lock);
+ cfs_spin_lock(&ns->ns_unused_lock);
+ continue;
+ }
lock_res_and_lock(lock);
/* Check flags again under the lock. */
RETURN(0);
}
+/**
+ * Cancel as many unused locks as possible before replay. since we are
+ * in recovery, we can't wait for any outstanding RPCs to send any RPC
+ * to the server.
+ *
+ * Called only in recovery before replaying locks. there is no need to
+ * replay locks that are unused. since the clients may hold thousands of
+ * cached unused locks, dropping the unused locks can greatly reduce the
+ * load on the servers at recovery time.
+ */
+static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
+{
+ int canceled;
+ CFS_LIST_HEAD(cancels);
+
+ CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before"
+ "replay for namespace %s (%d)\n", ns->ns_name,
+ ns->ns_nr_unused);
+
+ /* We don't need to care whether or not LRU resize is enabled
+ * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
+ * count parameter */
+ canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
+ LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
+
+ CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
+ canceled, ns->ns_name);
+}
+
int ldlm_replay_locks(struct obd_import *imp)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
/* ensure this doesn't fall to 0 before all have been queued */
cfs_atomic_inc(&imp->imp_replay_inflight);
+ if (ldlm_cancel_unused_locks_before_replay)
+ ldlm_cancel_unused_locks_for_replay(ns);
+
(void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
cfs_list_for_each_entry_safe(lock, next, &list, l_pending_chain) {