Whamcloud - gitweb
b=16774
authorzhanghc <zhanghc>
Thu, 22 Oct 2009 14:44:42 +0000 (14:44 +0000)
committerzhanghc <zhanghc>
Thu, 22 Oct 2009 14:44:42 +0000 (14:44 +0000)
at client side, cancel unused locks before replay locks during recovery

i=johann@sun.com
i=oleg.drokin@sun.com

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/mdc/mdc_request.c
lustre/osc/osc_request.c

index 1696c7b..11d2605 100644 (file)
@@ -358,6 +358,8 @@ typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
                                void *req_cookie, ldlm_mode_t mode, int flags,
                                void *data);
 
+typedef int (*ldlm_cancel_for_recovery)(struct ldlm_lock *lock);
+
 struct ldlm_valblock_ops {
         int (*lvbo_init)(struct ldlm_resource *res);
         int (*lvbo_update)(struct ldlm_resource *res, struct ptlrpc_request *r,
@@ -420,6 +422,9 @@ struct ldlm_namespace {
         struct adaptive_timeout ns_at_estimate;/* estimated lock callback time*/
         /* backward link to obd, required for ldlm pool to store new SLV. */
         struct obd_device     *ns_obd;
+
+        /* callback to cancel locks before replaying it during recovery */
+        ldlm_cancel_for_recovery ns_cancel_for_recovery;
 };
 
 static inline int ns_is_client(struct ldlm_namespace *ns)
@@ -448,6 +453,12 @@ static inline int ns_connect_lru_resize(struct ldlm_namespace *ns)
         return !!(ns->ns_connect_flags & OBD_CONNECT_LRU_RESIZE);
 }
 
+static inline void ns_register_cancel(struct ldlm_namespace *ns,
+                                      ldlm_cancel_for_recovery arg)
+{
+        LASSERT(ns != NULL);
+        ns->ns_cancel_for_recovery = arg;
+}
 /*
  *
  * Resource hash table
index 2d898d6..d62520a 100644 (file)
@@ -72,7 +72,8 @@ enum {
         LDLM_CANCEL_AGED   = 1 << 0, /* Cancel aged locks (non lru resize). */
         LDLM_CANCEL_PASSED = 1 << 1, /* Cancel passed number of locks. */
         LDLM_CANCEL_SHRINK = 1 << 2, /* Cancel locks from shrinker. */
-        LDLM_CANCEL_LRUR   = 1 << 3  /* Cancel locks from lru resize. */
+        LDLM_CANCEL_LRUR   = 1 << 3, /* Cancel locks from lru resize. */
+        LDLM_CANCEL_NO_WAIT = 1 << 4 /* Cancel locks w/o blocking (neither                                            * sending nor waiting fro any rpcs) */
 };
 
 int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
@@ -194,7 +195,8 @@ void ldlm_exit(void);
 
 enum ldlm_policy_res {
         LDLM_POLICY_CANCEL_LOCK,
-        LDLM_POLICY_KEEP_LOCK
+        LDLM_POLICY_KEEP_LOCK,
+        LDLM_POLICY_SKIP_LOCK
 };
 
 typedef enum ldlm_policy_res ldlm_policy_res_t;
index b6e402b..bbd6ea9 100644 (file)
@@ -50,6 +50,9 @@ int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
 CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
                 "lock enqueue timeout minimum");
 
+/* in client side, whether the cached locks will be canceled before replay */
+unsigned int ldlm_cancel_unused_locks_before_replay = 1;
+
 static void interrupted_completion_wait(void *data)
 {
 }
@@ -1178,6 +1181,33 @@ static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
         RETURN(count);
 }
 
+/* cancel as many locks as possible w/o sending any rpcs (e.g. to write back
+ * dirty data, to close a file, ...) or waiting for any rpcs in-flight (e.g.
+ * readahead requests, ...) */
+static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
+                                                    struct ldlm_lock *lock,
+                                                    int unused, int added,
+                                                    int count)
+{
+        ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
+        ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
+        check_res_locked(lock->l_resource);
+
+        /* don't check added & count since we want to process all locks
+         * from unused list */
+        switch (lock->l_resource->lr_type) {
+                case LDLM_EXTENT:
+                case LDLM_IBITS:
+                        if (cb && cb(lock))
+                                break;
+                default:
+                        result = LDLM_POLICY_SKIP_LOCK;
+                        break;
+        }
+
+        RETURN(result);
+}
+
 /* Return 1 to stop lru processing and keep current lock cached. Return zero
  * otherwise. */
 static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
@@ -1260,6 +1290,9 @@ typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
 static ldlm_cancel_lru_policy_t
 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
 {
+        if (flags & LDLM_CANCEL_NO_WAIT)
+                return ldlm_cancel_no_wait_policy;
+
         if (ns_connect_lru_resize(ns)) {
                 if (flags & LDLM_CANCEL_SHRINK)
                         /* We kill passed number of old locks. */
@@ -1301,17 +1334,23 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
  *                              memory pressre policy function;
  *
  * flags & LDLM_CANCEL_AGED -   cancel locks according to "aged policy".
+ *
+ * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible
+ *                               (typically before replaying locks) w/o
+ *                               sending any rpcs or waiting for any
+ *                               outstanding rpc to complete
  */
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, int cancel_flags, int flags)
 {
         ldlm_cancel_lru_policy_t pf;
         struct ldlm_lock *lock, *next;
-        int added = 0, unused;
+        int added = 0, unused, remained;
         ENTRY;
 
         spin_lock(&ns->ns_unused_lock);
         unused = ns->ns_nr_unused;
+        remained = unused;
 
         if (!ns_connect_lru_resize(ns))
                 count += unused - ns->ns_max_unused;
@@ -1320,6 +1359,12 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
         LASSERT(pf != NULL);
 
         while (!list_empty(&ns->ns_unused_list)) {
+                ldlm_policy_res_t result;
+
+                /* Are all unused locks checked? */
+                if (remained-- <= 0)
+                        break;
+
                 /* For any flags, stop scanning if @max is reached. */
                 if (max && added >= max)
                         break;
@@ -1338,6 +1383,10 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                 if (&lock->l_lru == &ns->ns_unused_list)
                         break;
 
+                LDLM_LOCK_GET(lock); /* dropped by bl thread */
+                spin_unlock(&ns->ns_unused_lock);
+
+                lock_res_and_lock(lock);
                 /* Pass the lock through the policy filter and see if it
                  * should stay in lru.
                  *
@@ -1351,13 +1400,20 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                  * old locks, but additionally chose them by
                  * their weight. Big extent locks will stay in
                  * the cache. */
-                if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
+                result = pf(ns, lock, unused, added, count);
+                if (result == LDLM_POLICY_KEEP_LOCK) {
+                        unlock_res_and_lock(lock);
+                        LDLM_LOCK_PUT(lock);
+                        spin_lock(&ns->ns_unused_lock);
                         break;
+                }
+                if (result == LDLM_POLICY_SKIP_LOCK) {
+                        unlock_res_and_lock(lock);
+                        LDLM_LOCK_PUT(lock);
+                        spin_lock(&ns->ns_unused_lock);
+                        continue;
+                }
 
-                LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                spin_unlock(&ns->ns_unused_lock);
-
-                lock_res_and_lock(lock);
                 /* Check flags again under the lock. */
                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
                     (ldlm_lock_remove_from_lru(lock) == 0)) {
@@ -1946,6 +2002,34 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         RETURN(0);
 }
 
+/* cancel as many unused locks as possible before replay. since we are
+ * in recovery, we can't wait for any outstanding RPCs ro send any RPC
+ * to the server.
+ *
+ * callced only in recovery before replaying locks. there is no need to
+ * replay locks that are unused. since the clients may hold thousands of
+ * cached unused locks, dropping the unused locks can greatly reduce the
+ * load on the servers at recovery time.
+ */
+static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
+{
+        int canceled;
+        CFS_LIST_HEAD(cancels);
+
+        CDEBUG(D_DLMTRACE, "Dropping as many as unused locks as possible before"                           "replay for namespace %s (%d)\n", ns->ns_name,
+                           ns->ns_nr_unused);
+
+        /* we don't need to care whether or not LRU resize is enabled 
+         * because the LDLM_CANCLE_NO_WAIT policy doesn't use the
+         * count parameter */
+        canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
+                                         LDLM_FL_LOCAL_ONLY,
+                                         LDLM_CANCEL_NO_WAIT);
+
+        CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
+                           canceled, ns->ns_name);
+}
+
 int ldlm_replay_locks(struct obd_import *imp)
 {
         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
@@ -1961,6 +2045,9 @@ int ldlm_replay_locks(struct obd_import *imp)
         /* ensure this doesn't fall to 0 before all have been queued */
         atomic_inc(&imp->imp_replay_inflight);
 
+        if (ldlm_cancel_unused_locks_before_replay)
+                ldlm_cancel_unused_locks_for_replay(ns);
+
         if (imp->imp_no_lock_replay) {
                 /* VBR: locks should be cancelled here */
                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
index 99e85af..b767812 100644 (file)
@@ -66,6 +66,8 @@ cfs_proc_dir_entry_t *ldlm_type_proc_dir = NULL;
 cfs_proc_dir_entry_t *ldlm_ns_proc_dir = NULL;
 cfs_proc_dir_entry_t *ldlm_svc_proc_dir = NULL;
 
+extern unsigned int ldlm_cancel_unused_locks_before_replay;
+
 #ifdef LPROCFS
 static int ldlm_proc_dump_ns(struct file *file, const char *buffer,
                              unsigned long count, void *data)
@@ -80,6 +82,9 @@ int ldlm_proc_setup(void)
         int rc;
         struct lprocfs_vars list[] = {
                 { "dump_namespaces", NULL, ldlm_proc_dump_ns, NULL },
+                { "cancel_unused_locks_before_replay", 
+                  lprocfs_rd_uint, lprocfs_wr_uint, 
+                  &ldlm_cancel_unused_locks_before_replay, NULL },
                 { NULL }};
         ENTRY;
         LASSERT(ldlm_ns_proc_dir == NULL);
index 2fb34e2..bae823f 100644 (file)
@@ -1356,6 +1356,23 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
         RETURN(rc);
 }
 
+/* determine whether the lock can be canceled before replaying it during
+ * recovery, non zero value will be return if the lock can be canceled, 
+ * or zero returned for not */
+static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+        if (lock->l_resource->lr_type != LDLM_IBITS)
+                RETURN(0);
+
+       /* FIXME: if we ever get into a situation where there are too many
+        * opened files with open locks on a single node, then we really
+        * should replay these open locks to reget it */
+        if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
+                RETURN(0);
+
+        RETURN(1);
+}
+
 static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -1387,6 +1404,8 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0)
                 ptlrpc_lprocfs_register_obd(obd);
 
+        ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
+
         rc = obd_llog_init(obd, obd, NULL);
         if (rc) {
                 mdc_cleanup(obd);
index c890c71..2ccb080 100644 (file)
@@ -4338,6 +4338,28 @@ static int osc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
+/* determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information 
+ *
+ * return values:
+ *  zero  - the lock can't be canceled
+ *  other - ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+        check_res_locked(lock->l_resource);
+        if (lock->l_granted_mode == LCK_GROUP || 
+            lock->l_resource->lr_type != LDLM_EXTENT)
+                RETURN(0);
+
+        /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
+        if (lock->l_granted_mode == LCK_PR ||
+            lock->l_granted_mode == LCK_CR)
+                RETURN(1);
+
+        RETURN(0);       
+}
+
 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         int rc;
@@ -4379,6 +4401,8 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
                 }
                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
                 sema_init(&cli->cl_grant_sem, 1);
+
+                ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
         }
 
         RETURN(rc);