compat = 0;
}
}
- RETURN(compat);
- }
-
- /* for waiting queue */
- list_for_each(tmp, queue) {
- check_contention = 1;
-
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ } else {
+ /* for waiting queue */
+ list_for_each(tmp, queue) {
+ check_contention = 1;
- if (req == lock)
- break;
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- if (unlikely(scan)) {
- /* We only get here if we are queuing GROUP lock
- and met some incompatible one. The main idea of this
- code is to insert GROUP lock past compatible GROUP
- lock in the waiting queue or if there is not any,
- then in front of first non-GROUP lock */
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should be no
- more GROUP locks later on, queue in front of
- first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
- compat = 0;
- break;
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock, req);
- compat = 0;
+ if (req == lock)
break;
- }
- continue;
- }
- /* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_req_mode, req_mode)) {
- if (req_mode == LCK_PR &&
- ((lock->l_policy_data.l_extent.start <=
- req->l_policy_data.l_extent.start) &&
- (lock->l_policy_data.l_extent.end >=
- req->l_policy_data.l_extent.end))) {
- /* If we met a PR lock just like us or wider,
- and nobody down the list conflicted with
- it, that means we can skip processing of
- the rest of the list and safely place
- ourselves at the end of the list, or grant
- (dependent if we met an conflicting locks
- before in the list).
- In case of 1st enqueue only we continue
- traversing if there is something conflicting
- down the list because we need to make sure
- that something is marked as AST_SENT as well,
- in cse of empy worklist we would exit on
- first conflict met. */
- /* There IS a case where such flag is
- not set for a lock, yet it blocks
- something. Luckily for us this is
- only during destroy, so lock is
- exclusive. So here we are safe */
- if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
- RETURN(compat);
+ if (unlikely(scan)) {
+ /* We only get here if we are queuing GROUP lock
+ and met some incompatible one. The main idea of this
+ code is to insert GROUP lock past compatible GROUP
+ lock in the waiting queue or if there is not any,
+ then in front of first non-GROUP lock */
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Ok, we hit non-GROUP lock, there should be no
+ more GROUP locks later on, queue in front of
+ first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ compat = 0;
+ break;
+ }
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ compat = 0;
+ break;
}
+ continue;
}
- /* non-group locks are compatible, overlap doesn't
- matter */
- if (likely(req_mode != LCK_GROUP))
- continue;
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_req_mode, req_mode)) {
+ if (req_mode == LCK_PR &&
+ ((lock->l_policy_data.l_extent.start <=
+ req->l_policy_data.l_extent.start) &&
+ (lock->l_policy_data.l_extent.end >=
+ req->l_policy_data.l_extent.end))) {
+ /* If we met a PR lock just like us or wider,
+ and nobody down the list conflicted with
+ it, that means we can skip processing of
+ the rest of the list and safely place
+ ourselves at the end of the list, or grant
+ (dependent if we met an conflicting locks
+ before in the list).
+ In case of 1st enqueue only we continue
+ traversing if there is something conflicting
+ down the list because we need to make sure
+ that something is marked as AST_SENT as well,
+ in cse of empy worklist we would exit on
+ first conflict met. */
+ /* There IS a case where such flag is
+ not set for a lock, yet it blocks
+ something. Luckily for us this is
+ only during destroy, so lock is
+ exclusive. So here we are safe */
+ if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+ RETURN(compat);
+ }
+ }
- /* If we are trying to get a GROUP lock and there is
- another one of this kind, we need to compare gid */
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* If existing lock with matched gid is granted,
- we grant new one too. */
- if (lock->l_req_mode == lock->l_granted_mode)
- RETURN(2);
+ /* non-group locks are compatible, overlap doesn't
+ matter */
+ if (likely(req_mode != LCK_GROUP))
+ continue;
- /* Otherwise we are scanning queue of waiting
- * locks and it means current request would
- * block along with existing lock (that is
- * already blocked.
- * If we are in nonblocking mode - return
- * immediately */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- compat = -EWOULDBLOCK;
- goto destroylock;
+ /* If we are trying to get a GROUP lock and there is
+ another one of this kind, we need to compare gid */
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* If existing lock with matched gid is granted,
+ we grant new one too. */
+ if (lock->l_req_mode == lock->l_granted_mode)
+ RETURN(2);
+
+ /* Otherwise we are scanning queue of waiting
+ * locks and it means current request would
+ * block along with existing lock (that is
+ * already blocked.
+ * If we are in nonblocking mode - return
+ * immediately */
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ }
+ /* If this group lock is compatible with another
+ * group lock on the waiting list, they must be
+ * together in the list, so they can be granted
+ * at the same time. Otherwise the later lock
+ * can get stuck behind another, incompatible,
+ * lock. */
+ ldlm_resource_insert_lock_after(lock, req);
+ /* Because 'lock' is not granted, we can stop
+ * processing this queue and return immediately.
+ * There is no need to check the rest of the
+ * list. */
+ RETURN(0);
}
- /* If this group lock is compatible with another
- * group lock on the waiting list, they must be
- * together in the list, so they can be granted
- * at the same time. Otherwise the later lock
- * can get stuck behind another, incompatible,
- * lock. */
- ldlm_resource_insert_lock_after(lock, req);
- /* Because 'lock' is not granted, we can stop
- * processing this queue and return immediately.
- * There is no need to check the rest of the
- * list. */
- RETURN(0);
}
- }
- if (unlikely(req_mode == LCK_GROUP &&
- (lock->l_req_mode != lock->l_granted_mode))) {
- scan = 1;
- compat = 0;
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should
- * be no more GROUP locks later on, queue in
- * front of first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
- break;
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock, req);
- break;
+ if (unlikely(req_mode == LCK_GROUP &&
+ (lock->l_req_mode != lock->l_granted_mode))) {
+ scan = 1;
+ compat = 0;
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Ok, we hit non-GROUP lock, there should
+ * be no more GROUP locks later on, queue in
+ * front of first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ break;
+ }
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ break;
+ }
+ continue;
}
- continue;
- }
- if (unlikely(lock->l_req_mode == LCK_GROUP)) {
- /* If compared lock is GROUP, then requested is PR/PW/
- * so this is not compatible; extent range does not
- * matter */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- compat = -EWOULDBLOCK;
- goto destroylock;
- } else {
- *flags |= LDLM_FL_NO_TIMEOUT;
- }
- } else if (lock->l_policy_data.l_extent.end < req_start ||
- lock->l_policy_data.l_extent.start > req_end) {
- /* if a non group lock doesn't overlap skip it */
- continue;
- } else if (lock->l_req_extent.end < req_start ||
- lock->l_req_extent.start > req_end)
- /* false contention, the requests doesn't really overlap */
+ if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+ /* If compared lock is GROUP, then requested is PR/PW/
+ * so this is not compatible; extent range does not
+ * matter */
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ } else {
+ *flags |= LDLM_FL_NO_TIMEOUT;
+ }
+ } else if (lock->l_policy_data.l_extent.end < req_start ||
+ lock->l_policy_data.l_extent.start > req_end) {
+ /* if a non group lock doesn't overlap skip it */
+ continue;
+ } else if (lock->l_req_extent.end < req_start ||
+ lock->l_req_extent.start > req_end)
+ /* false contention, the requests doesn't really overlap */
check_contention = 0;
- if (!work_list)
- RETURN(0);
+ if (!work_list)
+ RETURN(0);
- /* don't count conflicting glimpse locks */
- if (lock->l_req_mode == LCK_PR &&
- lock->l_policy_data.l_extent.start == 0 &&
- lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
- check_contention = 0;
+ /* don't count conflicting glimpse locks */
+ if (lock->l_req_mode == LCK_PR &&
+ lock->l_policy_data.l_extent.start == 0 &&
+ lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+ check_contention = 0;
- *contended_locks += check_contention;
+ *contended_locks += check_contention;
- compat = 0;
- if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, work_list);
+ compat = 0;
+ if (lock->l_blocking_ast)
+ ldlm_add_ast_work_item(lock, req, work_list);
+ }
}
if (ldlm_check_contention(req, *contended_locks) &&
count;
}
+static int ll_rd_lockless_truncate(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+
+ *eof = 1;
+ return snprintf(page, count, "%u\n",
+ ll_s2sbi(sb)->ll_lockless_truncate_enable);
+}
+
+static int ll_wr_lockless_truncate(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return lprocfs_write_helper(buffer, count,
+ &sbi->ll_lockless_truncate_enable)
+ ?: count;
+}
+
static int ll_rd_statahead_max(char *page, char **start, off_t off,
int count, int *eof, void *data)
{
{ "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 },
{ "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
{ "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 },
- { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+ { "contention_seconds", ll_rd_contention_time,
+ ll_wr_contention_time, 0},
+ { "lockless_truncate", ll_rd_lockless_truncate,
+ ll_wr_lockless_truncate, 0},
{ "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 },
{ "statahead_stats", ll_rd_statahead_stats, 0, 0 },
{ 0 }
/* inode operation */
{ LPROC_LL_SETATTR, LPROCFS_TYPE_REGS, "setattr" },
{ LPROC_LL_TRUNC, LPROCFS_TYPE_REGS, "truncate" },
+ { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate" },
{ LPROC_LL_FLOCK, LPROCFS_TYPE_REGS, "flock" },
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
{ LPROC_LL_GETATTR, LPROCFS_TYPE_REGS, "getattr" },
}
run_test 31 "voluntary cancel / blocking ast race=============="
+# enable/disable lockless truncate feature, depending on the arg 0/1
+enable_lockless_truncate() {
+ lctl set_param -n llite.*.lockless_truncate $1
+}
+
+test_32a() { # bug 11270
+ local p="$TMP/sanityN-$TESTNAME.parameters"
+ save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p
+ cancel_lru_locks osc
+ clear_llite_stats
+ enable_lockless_truncate 1
+ dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+
+ log "checking cached lockless truncate"
+ $TRUNCATE $DIR1/$tfile 8000000
+ $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+ error "lockless truncate doesn't use cached locks"
+
+ log "checking not cached lockless truncate"
+ $TRUNCATE $DIR2/$tfile 5000000
+ $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -ne 0 ] ||
+ error "not cached trancate isn't lockless"
+
+ log "disabled lockless truncate"
+ enable_lockless_truncate 0
+ clear_llite_stats
+ $TRUNCATE $DIR2/$tfile 3000000
+ $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+ error "lockless truncate disabling failed"
+ rm $DIR1/$tfile
+ # restore lockless_truncate default values
+ restore_lustre_params < $p
+ rm -f $p
+}
+run_test 32a "lockless truncate"
+
+test_32b() { # bug 11270
+ local node
+ local p="$TMP/sanityN-$TESTNAME.parameters"
+ save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p
+ for node in $(osts_nodes); do
+ save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p
+ save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p
+ save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p
+ done
+ clear_llite_stats
+ # agressive lockless i/o settings
+ for node in $(osts_nodes); do
+ do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60'
+ done
+ lctl set_param -n llite.*.contention_seconds 60
+ for i in $(seq 5); do
+ dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ done
+ [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered"
+ # disable lockless i/o (it is disabled by default)
+ for node in $(osts_nodes); do
+ do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0'
+ done
+ # set contention_seconds to 0 at client too, otherwise Lustre still
+ # remembers lock contention
+ lctl set_param -n llite.*.contention_seconds 0
+ clear_llite_stats
+ for i in $(seq 5); do
+ dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ done
+ [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] ||
+ error "lockless i/o works when disabled"
+ rm -f $DIR1/$tfile
+ restore_lustre_params <$p
+ rm -f $p
+}
+run_test 32b "lockless i/o"
+
log "cleanup: ======================================================"
check_and_cleanup_lustre