Whamcloud - gitweb
LU-11518 ldlm: control lru_size for extent lock 62/39562/5
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 31 Jul 2020 18:22:40 +0000 (21:22 +0300)
committerOleg Drokin <green@whamcloud.com>
Sat, 19 Sep 2020 14:13:04 +0000 (14:13 +0000)
We register ELC for extent locks to be canceled at enqueue time,
but it can't make positive effect to locks that have dirty pages
under it. To keep the semantics of lru_size, the client should
check how many unused locks are cached after adding a lock into
lru list. If it has already exceeded the hard limit
(ns_max_unused), the client will initiate async lock cancellation
process in batch mode (ns->ns_cancel_batch).

To do it, re-use the new batching LRU cancel functionality.

Wherever unlimited LRU cancel is called (not ELC), try to cancel in
batched mode.

And a new field named new sysfs attribute named *lru_cancel_batch*
is introduced into ldlm namespace to control the batch count.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Signed-off-by: Shuichi Ihara <sihara@ddn.com>
Signed-off-by: Gu Zheng <gzheng@ddn.com>
Signed-off-by: Vitaly Fertman <c17818@cray.com>
Change-Id: Ib18b829372da8599ba872b5ac5ab7421661f942d
Reviewed-on: https://es-gerrit.dev.cray.com/157068
Reviewed-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Alexey Lyashkov <c17817@cray.com>
Tested-by: Alexander Lezhoev <c17454@cray.com>
Reviewed-on: https://review.whamcloud.com/39562
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/tests/sanity.sh
lustre/tests/test-framework.sh

index 2c1e19c..f1071bd 100644 (file)
@@ -68,6 +68,7 @@ extern struct kset *ldlm_svc_kset;
  * client shows interest in that lock, e.g. glimpse is occured. */
 #define LDLM_DIRTY_AGE_LIMIT (10)
 #define LDLM_DEFAULT_PARALLEL_AST_LIMIT 1024
+#define LDLM_DEFAULT_LRU_SHRINK_BATCH (16)
 
 /**
  * LDLM non-error return states
@@ -443,6 +444,12 @@ struct ldlm_namespace {
         */
        unsigned int            ns_max_unused;
 
+       /**
+        * Cancel batch, if unused lock count exceed lru_size
+        * Only be used if LRUR disable.
+        */
+       unsigned int            ns_cancel_batch;
+
        /** Maximum allowed age (last used time) for locks in the LRU.  Set in
         * seconds from userspace, but stored in ns to avoid repeat conversions.
         */
index 6614656..44b944c 100644 (file)
@@ -171,7 +171,6 @@ int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use);
                ldlm_lock_remove_from_lru_check(lock, ktime_set(0, 0))
 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
-void ldlm_lock_add_to_lru(struct ldlm_lock *lock);
 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock);
 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
 
index f1482c0..ded6c7c 100644 (file)
@@ -844,14 +844,15 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
  */
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 {
-        struct ldlm_namespace *ns;
-        ENTRY;
+       struct ldlm_namespace *ns;
 
-        lock_res_and_lock(lock);
+       ENTRY;
+
+       lock_res_and_lock(lock);
 
-        ns = ldlm_lock_to_ns(lock);
+       ns = ldlm_lock_to_ns(lock);
 
-        ldlm_lock_decref_internal_nolock(lock, mode);
+       ldlm_lock_decref_internal_nolock(lock, mode);
 
        if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
            !lock->l_readers && !lock->l_writers) {
@@ -868,52 +869,49 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
        }
 
        if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+               unsigned int mask = D_DLMTRACE;
+
                /* If we received a blocked AST and this was the last reference,
                 * run the callback. */
                if (ldlm_is_ns_srv(lock) && lock->l_export)
-                        CERROR("FL_CBPENDING set on non-local lock--just a "
-                               "warning\n");
-
-                LDLM_DEBUG(lock, "final decref done on cbpending lock");
+                       mask |= D_WARNING;
+               LDLM_DEBUG_LIMIT(mask, lock,
+                                "final decref done on %sCBPENDING lock",
+                                mask & D_WARNING ? "non-local " : "");
 
-                LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                ldlm_lock_remove_from_lru(lock);
-                unlock_res_and_lock(lock);
+               LDLM_LOCK_GET(lock); /* dropped by bl thread */
+               ldlm_lock_remove_from_lru(lock);
+               unlock_res_and_lock(lock);
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+                       OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
                if (ldlm_is_atomic_cb(lock) ||
                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
-                        ldlm_handle_bl_callback(ns, NULL, lock);
+                       ldlm_handle_bl_callback(ns, NULL, lock);
         } else if (ns_is_client(ns) &&
-                   !lock->l_readers && !lock->l_writers &&
+                  !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
                   !ldlm_is_bl_ast(lock) &&
                   !ldlm_is_converting(lock)) {
 
-                LDLM_DEBUG(lock, "add lock into lru list");
-
-                /* If this is a client-side namespace and this was the last
-                 * reference, put it on the LRU. */
-                ldlm_lock_add_to_lru(lock);
-                unlock_res_and_lock(lock);
+               /* If this is a client-side namespace and this was the last
+                * reference, put it on the LRU.
+                */
+               ldlm_lock_add_to_lru(lock);
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "add lock into lru list");
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
-
-                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
-                 * are not supported by the server, otherwise, it is done on
-                 * enqueue. */
-                if (!exp_connect_cancelset(lock->l_conn_export) &&
-                    !ns_connect_lru_resize(ns))
-                       ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
-        } else {
-                LDLM_DEBUG(lock, "do not add lock into lru list");
-                unlock_res_and_lock(lock);
-        }
+                       OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
-        EXIT;
+               ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
+       } else {
+               LDLM_DEBUG(lock, "do not add lock into lru list");
+               unlock_res_and_lock(lock);
+       }
+
+       EXIT;
 }
 
 /**
index 74a1e73..ff5ae53 100644 (file)
@@ -2001,7 +2001,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
         * Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread.
         */
-       count = ldlm_prepare_lru_list(ns, &cancels, min, 0, 0, lru_flags);
+       count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+                                     ns->ns_cancel_batch, lru_flags);
        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
        if (rc == 0)
                RETURN(count);
index e439ffb..e262e7f 100644 (file)
@@ -372,6 +372,32 @@ static ssize_t lru_size_store(struct kobject *kobj, struct attribute *attr,
 }
 LUSTRE_RW_ATTR(lru_size);
 
+static ssize_t lru_cancel_batch_show(struct kobject *kobj,
+                                struct attribute *attr, char *buf)
+{
+       struct ldlm_namespace *ns = container_of(kobj, struct ldlm_namespace,
+                                                ns_kobj);
+
+       return snprintf(buf, sizeof(buf) - 1, "%u\n", ns->ns_cancel_batch);
+}
+
+static ssize_t lru_cancel_batch_store(struct kobject *kobj,
+                                 struct attribute *attr,
+                                 const char *buffer, size_t count)
+{
+       struct ldlm_namespace *ns = container_of(kobj, struct ldlm_namespace,
+                                                ns_kobj);
+       unsigned long tmp;
+
+       if (kstrtoul(buffer, 10, &tmp))
+               return -EINVAL;
+
+       ns->ns_cancel_batch = (unsigned int)tmp;
+
+       return count;
+}
+LUSTRE_RW_ATTR(lru_cancel_batch);
+
 static ssize_t lru_max_age_show(struct kobject *kobj, struct attribute *attr,
                                char *buf)
 {
@@ -624,6 +650,7 @@ static struct attribute *ldlm_ns_attrs[] = {
        &lustre_attr_lock_count.attr,
        &lustre_attr_lock_unused_count.attr,
        &lustre_attr_lru_size.attr,
+       &lustre_attr_lru_cancel_batch.attr,
        &lustre_attr_lru_max_age.attr,
        &lustre_attr_early_lock_cancel.attr,
        &lustre_attr_dirty_age_limit.attr,
@@ -904,6 +931,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
        ns->ns_max_parallel_ast   = LDLM_DEFAULT_PARALLEL_AST_LIMIT;
        ns->ns_nr_unused          = 0;
        ns->ns_max_unused         = LDLM_DEFAULT_LRU_SIZE;
+       ns->ns_cancel_batch       = LDLM_DEFAULT_LRU_SHRINK_BATCH;
        ns->ns_max_age            = ktime_set(LDLM_DEFAULT_MAX_ALIVE, 0);
        ns->ns_ctime_age_limit    = LDLM_CTIME_AGE_LIMIT;
        ns->ns_dirty_age_limit    = ktime_set(LDLM_DIRTY_AGE_LIMIT, 0);
index a0db0c0..b6363ae 100755 (executable)
@@ -23371,6 +23371,41 @@ test_424() {
 }
 run_test 424 "simulate ENOMEM in ptl_send_rpc bulk reply ME attach"
 
+test_425() {
+       test_mkdir -c -1 $DIR/$tdir
+       $LFS setstripe -c -1 $DIR/$tdir
+
+       lru_resize_disable "" 100
+       stack_trap "lru_resize_enable" EXIT
+
+       sleep 5
+
+       for i in $(seq $((MDSCOUNT * 125))); do
+               local t=$DIR/$tdir/$tfile_$i
+
+               dd if=/dev/zero of=$t bs=4K count=1 > /dev/null 2>&1 ||
+                       error_noexit "Create file $t"
+       done
+       stack_trap "rm -rf $DIR/$tdir" EXIT
+
+       for oscparam in $($LCTL list_param ldlm.namespaces.*osc-[-0-9a-f]*); do
+               local lru_size=$($LCTL get_param -n $oscparam.lru_size)
+               local lock_count=$($LCTL get_param -n $oscparam.lock_count)
+
+               [ $lock_count -le $lru_size ] ||
+                       error "osc lock count $lock_count > lru size $lru_size"
+       done
+
+       for mdcparam in $($LCTL list_param ldlm.namespaces.*mdc-*); do
+               local lru_size=$($LCTL get_param -n $mdcparam.lru_size)
+               local lock_count=$($LCTL get_param -n $mdcparam.lock_count)
+
+               [ $lock_count -le $lru_size ] ||
+                       error "mdc lock count $lock_count > lru size $lru_size"
+       done
+}
+run_test 425 "lock count should not exceed lru size"
+
 prep_801() {
        [[ $MDS1_VERSION -lt $(version_code 2.9.55) ]] ||
        [[ $OST1_VERSION -lt $(version_code 2.9.55) ]] &&
index 06a81f6..69e0801 100755 (executable)
@@ -6060,9 +6060,9 @@ cancel_lru_locks() {
 
 default_lru_size()
 {
-        NR_CPU=$(grep -c "processor" /proc/cpuinfo)
-        DEFAULT_LRU_SIZE=$((100 * NR_CPU))
-        echo "$DEFAULT_LRU_SIZE"
+       local nr_cpu=$(grep -c "processor" /proc/cpuinfo)
+
+       echo $((100 * nr_cpu))
 }
 
 lru_resize_enable()
@@ -6072,7 +6072,10 @@ lru_resize_enable()
 
 lru_resize_disable()
 {
-    lctl set_param ldlm.namespaces.*$1*.lru_size $(default_lru_size)
+       local dev=${1}
+       local lru_size=${2:-$(default_lru_size)}
+
+       $LCTL set_param ldlm.namespaces.*$dev*.lru_size=$lru_size
 }
 
 flock_is_enabled()