* client shows interest in that lock, e.g. glimpse is occured. */
#define LDLM_DIRTY_AGE_LIMIT (10)
#define LDLM_DEFAULT_PARALLEL_AST_LIMIT 1024
+#define LDLM_DEFAULT_LRU_SHRINK_BATCH (16)
/**
* LDLM non-error return states
*/
unsigned int ns_max_unused;
+ /**
+ * Cancel batch, if unused lock count exceed lru_size
+ * Only be used if LRUR disable.
+ */
+ unsigned int ns_cancel_batch;
+
/** Maximum allowed age (last used time) for locks in the LRU. Set in
* seconds from userspace, but stored in ns to avoid repeat conversions.
*/
ldlm_lock_remove_from_lru_check(lock, ktime_set(0, 0))
int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
-void ldlm_lock_add_to_lru(struct ldlm_lock *lock);
void ldlm_lock_touch_in_lru(struct ldlm_lock *lock);
void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
*/
void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
{
- struct ldlm_namespace *ns;
- ENTRY;
+ struct ldlm_namespace *ns;
- lock_res_and_lock(lock);
+ ENTRY;
+
+ lock_res_and_lock(lock);
- ns = ldlm_lock_to_ns(lock);
+ ns = ldlm_lock_to_ns(lock);
- ldlm_lock_decref_internal_nolock(lock, mode);
+ ldlm_lock_decref_internal_nolock(lock, mode);
if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
!lock->l_readers && !lock->l_writers) {
}
if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ unsigned int mask = D_DLMTRACE;
+
/* If we received a blocked AST and this was the last reference,
* run the callback. */
if (ldlm_is_ns_srv(lock) && lock->l_export)
- CERROR("FL_CBPENDING set on non-local lock--just a "
- "warning\n");
-
- LDLM_DEBUG(lock, "final decref done on cbpending lock");
+ mask |= D_WARNING;
+ LDLM_DEBUG_LIMIT(mask, lock,
+ "final decref done on %sCBPENDING lock",
+ mask & D_WARNING ? "non-local " : "");
- LDLM_LOCK_GET(lock); /* dropped by bl thread */
- ldlm_lock_remove_from_lru(lock);
- unlock_res_and_lock(lock);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ ldlm_lock_remove_from_lru(lock);
+ unlock_res_and_lock(lock);
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
if (ldlm_is_atomic_cb(lock) ||
ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
- ldlm_handle_bl_callback(ns, NULL, lock);
+ ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns_is_client(ns) &&
- !lock->l_readers && !lock->l_writers &&
+ !lock->l_readers && !lock->l_writers &&
!ldlm_is_no_lru(lock) &&
!ldlm_is_bl_ast(lock) &&
!ldlm_is_converting(lock)) {
- LDLM_DEBUG(lock, "add lock into lru list");
-
- /* If this is a client-side namespace and this was the last
- * reference, put it on the LRU. */
- ldlm_lock_add_to_lru(lock);
- unlock_res_and_lock(lock);
+ /* If this is a client-side namespace and this was the last
+ * reference, put it on the LRU.
+ */
+ ldlm_lock_add_to_lru(lock);
+ unlock_res_and_lock(lock);
+ LDLM_DEBUG(lock, "add lock into lru list");
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
-
- /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
- * are not supported by the server, otherwise, it is done on
- * enqueue. */
- if (!exp_connect_cancelset(lock->l_conn_export) &&
- !ns_connect_lru_resize(ns))
- ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
- } else {
- LDLM_DEBUG(lock, "do not add lock into lru list");
- unlock_res_and_lock(lock);
- }
+ OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
- EXIT;
+ ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
+ } else {
+ LDLM_DEBUG(lock, "do not add lock into lru list");
+ unlock_res_and_lock(lock);
+ }
+
+ EXIT;
}
/**
* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread.
*/
- count = ldlm_prepare_lru_list(ns, &cancels, min, 0, 0, lru_flags);
+ count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+ ns->ns_cancel_batch, lru_flags);
rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
if (rc == 0)
RETURN(count);
}
LUSTRE_RW_ATTR(lru_size);
+static ssize_t lru_cancel_batch_show(struct kobject *kobj,
+ struct attribute *attr, char *buf)
+{
+ struct ldlm_namespace *ns = container_of(kobj, struct ldlm_namespace,
+ ns_kobj);
+
+ return snprintf(buf, sizeof(buf) - 1, "%u\n", ns->ns_cancel_batch);
+}
+
+static ssize_t lru_cancel_batch_store(struct kobject *kobj,
+ struct attribute *attr,
+ const char *buffer, size_t count)
+{
+ struct ldlm_namespace *ns = container_of(kobj, struct ldlm_namespace,
+ ns_kobj);
+ unsigned long tmp;
+
+ if (kstrtoul(buffer, 10, &tmp))
+ return -EINVAL;
+
+ ns->ns_cancel_batch = (unsigned int)tmp;
+
+ return count;
+}
+LUSTRE_RW_ATTR(lru_cancel_batch);
+
static ssize_t lru_max_age_show(struct kobject *kobj, struct attribute *attr,
char *buf)
{
&lustre_attr_lock_count.attr,
&lustre_attr_lock_unused_count.attr,
&lustre_attr_lru_size.attr,
+ &lustre_attr_lru_cancel_batch.attr,
&lustre_attr_lru_max_age.attr,
&lustre_attr_early_lock_cancel.attr,
&lustre_attr_dirty_age_limit.attr,
ns->ns_max_parallel_ast = LDLM_DEFAULT_PARALLEL_AST_LIMIT;
ns->ns_nr_unused = 0;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
+ ns->ns_cancel_batch = LDLM_DEFAULT_LRU_SHRINK_BATCH;
ns->ns_max_age = ktime_set(LDLM_DEFAULT_MAX_ALIVE, 0);
ns->ns_ctime_age_limit = LDLM_CTIME_AGE_LIMIT;
ns->ns_dirty_age_limit = ktime_set(LDLM_DIRTY_AGE_LIMIT, 0);
}
run_test 424 "simulate ENOMEM in ptl_send_rpc bulk reply ME attach"
+test_425() {
+ test_mkdir -c -1 $DIR/$tdir
+ $LFS setstripe -c -1 $DIR/$tdir
+
+ lru_resize_disable "" 100
+ stack_trap "lru_resize_enable" EXIT
+
+ sleep 5
+
+ for i in $(seq $((MDSCOUNT * 125))); do
+ local t=$DIR/$tdir/$tfile_$i
+
+ dd if=/dev/zero of=$t bs=4K count=1 > /dev/null 2>&1 ||
+ error_noexit "Create file $t"
+ done
+ stack_trap "rm -rf $DIR/$tdir" EXIT
+
+ for oscparam in $($LCTL list_param ldlm.namespaces.*osc-[-0-9a-f]*); do
+ local lru_size=$($LCTL get_param -n $oscparam.lru_size)
+ local lock_count=$($LCTL get_param -n $oscparam.lock_count)
+
+ [ $lock_count -le $lru_size ] ||
+ error "osc lock count $lock_count > lru size $lru_size"
+ done
+
+ for mdcparam in $($LCTL list_param ldlm.namespaces.*mdc-*); do
+ local lru_size=$($LCTL get_param -n $mdcparam.lru_size)
+ local lock_count=$($LCTL get_param -n $mdcparam.lock_count)
+
+ [ $lock_count -le $lru_size ] ||
+ error "mdc lock count $lock_count > lru size $lru_size"
+ done
+}
+run_test 425 "lock count should not exceed lru size"
+
prep_801() {
[[ $MDS1_VERSION -lt $(version_code 2.9.55) ]] ||
[[ $OST1_VERSION -lt $(version_code 2.9.55) ]] &&
default_lru_size()
{
- NR_CPU=$(grep -c "processor" /proc/cpuinfo)
- DEFAULT_LRU_SIZE=$((100 * NR_CPU))
- echo "$DEFAULT_LRU_SIZE"
+ local nr_cpu=$(grep -c "processor" /proc/cpuinfo)
+
+ echo $((100 * nr_cpu))
}
lru_resize_enable()
lru_resize_disable()
{
- lctl set_param ldlm.namespaces.*$1*.lru_size $(default_lru_size)
+ local dev=${1}
+ local lru_size=${2:-$(default_lru_size)}
+
+ $LCTL set_param ldlm.namespaces.*$dev*.lru_size=$lru_size
}
flock_is_enabled()