Whamcloud - gitweb
b=11270
authorzam <zam>
Thu, 8 May 2008 05:46:55 +0000 (05:46 +0000)
committerzam <zam>
Thu, 8 May 2008 05:46:55 +0000 (05:46 +0000)
i=vitaly.vertman
i=oleg.drokin

Lockless i/o code fixes and improvements:
 (1) lockless truncate checks for OBD_CONNECT_TRUNCLOCK and ll_file_punch
     sets OBD_FL_TRUNCLOCK correctly.
 (2) an lproc control for lockless truncate,
     lproc statistics for lockless truncate.
 (3) sanityN tests for lockless code

lustre/include/linux/lustre_lite.h
lustre/ldlm/ldlm_extent.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/rw.c
lustre/tests/sanityN.sh
lustre/tests/test-framework.sh

index 7a63f2e..c531c05 100644 (file)
@@ -50,6 +50,7 @@ enum {
          LPROC_LL_FSYNC,
          LPROC_LL_SETATTR,
          LPROC_LL_TRUNC,
+         LPROC_LL_LOCKLESS_TRUNC,
          LPROC_LL_FLOCK,
 
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
index cc0e3aa..2db142f 100644 (file)
@@ -408,170 +408,169 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                         compat = 0;
                         }
                 }
-                RETURN(compat);
-        }
-
-        /* for waiting queue */
-        list_for_each(tmp, queue) {
-                check_contention = 1;
-
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+        } else {
+                /* for waiting queue */
+                list_for_each(tmp, queue) {
+                        check_contention = 1;
 
-                if (req == lock)
-                        break;
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                if (unlikely(scan)) {
-                        /* We only get here if we are queuing GROUP lock
-                           and met some incompatible one. The main idea of this
-                           code is to insert GROUP lock past compatible GROUP
-                           lock in the waiting queue or if there is not any,
-                           then in front of first non-GROUP lock */
-                        if (lock->l_req_mode != LCK_GROUP) {
-                                /* Ok, we hit non-GROUP lock, there should be no
-                                more GROUP locks later on, queue in front of
-                                first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
-                                compat = 0;
-                                break;
-                        }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                compat = 0;
+                        if (req == lock)
                                 break;
-                        }
-                        continue;
-                }
 
-                /* locks are compatible, overlap doesn't matter */
-                if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                        if (req_mode == LCK_PR &&
-                            ((lock->l_policy_data.l_extent.start <=
-                             req->l_policy_data.l_extent.start) &&
-                             (lock->l_policy_data.l_extent.end >=
-                              req->l_policy_data.l_extent.end))) {
-                                /* If we met a PR lock just like us or wider,
-                                   and nobody down the list conflicted with
-                                   it, that means we can skip processing of
-                                   the rest of the list and safely place
-                                   ourselves at the end of the list, or grant
-                                   (dependent if we met an conflicting locks
-                                   before in the list).
-                                   In case of 1st enqueue only we continue
-                                   traversing if there is something conflicting
-                                   down the list because we need to make sure
-                                   that something is marked as AST_SENT as well,
-                                   in cse of empy worklist we would exit on
-                                   first conflict met. */
-                                /* There IS a case where such flag is
-                                   not set for a lock, yet it blocks
-                                   something. Luckily for us this is
-                                   only during destroy, so lock is
-                                   exclusive. So here we are safe */
-                                if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
-                                        RETURN(compat);
+                        if (unlikely(scan)) {
+                                /* We only get here if we are queuing GROUP lock
+                                   and met some incompatible one. The main idea of this
+                                   code is to insert GROUP lock past compatible GROUP
+                                   lock in the waiting queue or if there is not any,
+                                   then in front of first non-GROUP lock */
+                                if (lock->l_req_mode != LCK_GROUP) {
+                                        /* Ok, we hit non-GROUP lock, there should be no
+                                           more GROUP locks later on, queue in front of
+                                           first non-GROUP lock */
+
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        list_del_init(&lock->l_res_link);
+                                        ldlm_resource_insert_lock_after(req, lock);
+                                        compat = 0;
+                                        break;
+                                }
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* found it */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        compat = 0;
+                                        break;
                                 }
+                                continue;
                         }
 
-                        /* non-group locks are compatible, overlap doesn't
-                           matter */
-                        if (likely(req_mode != LCK_GROUP))
-                                continue;
+                        /* locks are compatible, overlap doesn't matter */
+                        if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                                if (req_mode == LCK_PR &&
+                                    ((lock->l_policy_data.l_extent.start <=
+                                      req->l_policy_data.l_extent.start) &&
+                                     (lock->l_policy_data.l_extent.end >=
+                                      req->l_policy_data.l_extent.end))) {
+                                        /* If we met a PR lock just like us or wider,
+                                           and nobody down the list conflicted with
+                                           it, that means we can skip processing of
+                                           the rest of the list and safely place
+                                           ourselves at the end of the list, or grant
+                                           (dependent if we met an conflicting locks
+                                           before in the list).
+                                           In case of 1st enqueue only we continue
+                                           traversing if there is something conflicting
+                                           down the list because we need to make sure
+                                           that something is marked as AST_SENT as well,
+                                           in cse of empy worklist we would exit on
+                                           first conflict met. */
+                                        /* There IS a case where such flag is
+                                           not set for a lock, yet it blocks
+                                           something. Luckily for us this is
+                                           only during destroy, so lock is
+                                           exclusive. So here we are safe */
+                                        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+                                                RETURN(compat);
+                                        }
+                                }
 
-                        /* If we are trying to get a GROUP lock and there is
-                           another one of this kind, we need to compare gid */
-                        if (req->l_policy_data.l_extent.gid ==
-                            lock->l_policy_data.l_extent.gid) {
-                                /* If existing lock with matched gid is granted,
-                                   we grant new one too. */
-                                if (lock->l_req_mode == lock->l_granted_mode)
-                                        RETURN(2);
+                                /* non-group locks are compatible, overlap doesn't
+                                   matter */
+                                if (likely(req_mode != LCK_GROUP))
+                                        continue;
 
-                                /* Otherwise we are scanning queue of waiting
-                                 * locks and it means current request would
-                                 * block along with existing lock (that is
-                                 * already blocked.
-                                 * If we are in nonblocking mode - return
-                                 * immediately */
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                        compat = -EWOULDBLOCK;
-                                        goto destroylock;
+                                /* If we are trying to get a GROUP lock and there is
+                                   another one of this kind, we need to compare gid */
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* If existing lock with matched gid is granted,
+                                           we grant new one too. */
+                                        if (lock->l_req_mode == lock->l_granted_mode)
+                                                RETURN(2);
+
+                                        /* Otherwise we are scanning queue of waiting
+                                         * locks and it means current request would
+                                         * block along with existing lock (that is
+                                         * already blocked.
+                                         * If we are in nonblocking mode - return
+                                         * immediately */
+                                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                                compat = -EWOULDBLOCK;
+                                                goto destroylock;
+                                        }
+                                        /* If this group lock is compatible with another
+                                         * group lock on the waiting list, they must be
+                                         * together in the list, so they can be granted
+                                         * at the same time.  Otherwise the later lock
+                                         * can get stuck behind another, incompatible,
+                                         * lock. */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        /* Because 'lock' is not granted, we can stop
+                                         * processing this queue and return immediately.
+                                         * There is no need to check the rest of the
+                                         * list. */
+                                        RETURN(0);
                                 }
-                                /* If this group lock is compatible with another
-                                 * group lock on the waiting list, they must be
-                                 * together in the list, so they can be granted
-                                 * at the same time.  Otherwise the later lock
-                                 * can get stuck behind another, incompatible,
-                                 * lock. */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                /* Because 'lock' is not granted, we can stop
-                                 * processing this queue and return immediately.
-                                 * There is no need to check the rest of the
-                                 * list. */
-                                RETURN(0);
                         }
-                }
 
-                if (unlikely(req_mode == LCK_GROUP &&
-                             (lock->l_req_mode != lock->l_granted_mode))) {
-                        scan = 1;
-                        compat = 0;
-                        if (lock->l_req_mode != LCK_GROUP) {
-                                /* Ok, we hit non-GROUP lock, there should
-                                 * be no more GROUP locks later on, queue in
-                                 * front of first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
-                                break;
-                        }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                break;
+                        if (unlikely(req_mode == LCK_GROUP &&
+                                     (lock->l_req_mode != lock->l_granted_mode))) {
+                                scan = 1;
+                                compat = 0;
+                                if (lock->l_req_mode != LCK_GROUP) {
+                                        /* Ok, we hit non-GROUP lock, there should
+                                         * be no more GROUP locks later on, queue in
+                                         * front of first non-GROUP lock */
+
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        list_del_init(&lock->l_res_link);
+                                        ldlm_resource_insert_lock_after(req, lock);
+                                        break;
+                                }
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* found it */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        break;
+                                }
+                                continue;
                         }
-                        continue;
-                }
 
-                if (unlikely(lock->l_req_mode == LCK_GROUP)) {
-                        /* If compared lock is GROUP, then requested is PR/PW/
-                         * so this is not compatible; extent range does not
-                         * matter */
-                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                compat = -EWOULDBLOCK;
-                                goto destroylock;
-                        } else {
-                                *flags |= LDLM_FL_NO_TIMEOUT;
-                        }
-                } else if (lock->l_policy_data.l_extent.end < req_start ||
-                           lock->l_policy_data.l_extent.start > req_end) {
-                        /* if a non group lock doesn't overlap skip it */
-                        continue;
-                } else if (lock->l_req_extent.end < req_start ||
-                           lock->l_req_extent.start > req_end)
-                        /* false contention, the requests doesn't really overlap */
+                        if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                                /* If compared lock is GROUP, then requested is PR/PW/
+                                 * so this is not compatible; extent range does not
+                                 * matter */
+                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                        compat = -EWOULDBLOCK;
+                                        goto destroylock;
+                                } else {
+                                        *flags |= LDLM_FL_NO_TIMEOUT;
+                                }
+                        } else if (lock->l_policy_data.l_extent.end < req_start ||
+                                   lock->l_policy_data.l_extent.start > req_end) {
+                                /* if a non group lock doesn't overlap skip it */
+                                continue;
+                        } else if (lock->l_req_extent.end < req_start ||
+                                   lock->l_req_extent.start > req_end)
+                                /* false contention, the requests doesn't really overlap */
                                 check_contention = 0;
 
-                if (!work_list)
-                        RETURN(0);
+                        if (!work_list)
+                                RETURN(0);
 
-                /* don't count conflicting glimpse locks */
-                if (lock->l_req_mode == LCK_PR &&
-                    lock->l_policy_data.l_extent.start == 0 &&
-                    lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
-                        check_contention = 0;
+                        /* don't count conflicting glimpse locks */
+                        if (lock->l_req_mode == LCK_PR &&
+                            lock->l_policy_data.l_extent.start == 0 &&
+                            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+                                check_contention = 0;
 
-                *contended_locks += check_contention;
+                        *contended_locks += check_contention;
 
-                compat = 0;
-                if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, work_list);
+                        compat = 0;
+                        if (lock->l_blocking_ast)
+                                ldlm_add_ast_work_item(lock, req, work_list);
+                }
         }
 
         if (ldlm_check_contention(req, *contended_locks) &&
index ddaa5ab..d49116f 100644 (file)
@@ -1406,7 +1406,8 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        ll_tree_unlock(&tree);
+                        if (tree_locked)
+                                ll_tree_unlock(&tree);
                         goto out;
                 }
         } else {
index 10fed7c..f98f79e 100644 (file)
@@ -231,6 +231,8 @@ enum stats_track_type {
 
 /* default value for ll_sb_info->contention_time */
 #define SBI_DEFAULT_CONTENTION_SECONDS     60
+/* default value for lockless_truncate_enable */
+#define SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE 1
 
 struct ll_sb_info {
         struct list_head          ll_list;
@@ -260,6 +262,7 @@ struct ll_sb_info {
         struct list_head          ll_pglist; /* all pages (llap_pglist_item) */
 
         unsigned                  ll_contention_time; /* seconds */
+        unsigned                  ll_lockless_truncate_enable; /* true/false */
 
         struct ll_ra_info         ll_ra_info;
         unsigned int              ll_namelen;
index 54fcd63..c42df61 100644 (file)
@@ -76,6 +76,7 @@ static struct ll_sb_info *ll_init_sbi(void)
         sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
                                            SBI_DEFAULT_READAHEAD_WHOLE_MAX;
         sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
+        sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE;
         INIT_LIST_HEAD(&sbi->ll_conn_chain);
         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
 
@@ -273,7 +274,8 @@ static int client_common_fill_super(struct super_block *sb,
 
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_GRANT |
                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | 
-                OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT;
+                OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT |
+                OBD_CONNECT_TRUNCLOCK;
 
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
@@ -1308,7 +1310,8 @@ static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
         UNLOCK_INODE_MUTEX(inode);
         UP_WRITE_I_ALLOC_SEM(inode);
 
-        if (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK) {
+        if (sbi->ll_lockless_truncate_enable && 
+            (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) {
                 ast_flags = LDLM_FL_BLOCK_GRANTED;
                 rc = obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT,
                                &policy, LCK_PW, &ast_flags, inode, &lockh);
index 04a7f90..b5a306b 100644 (file)
@@ -463,6 +463,27 @@ static int ll_wr_contention_time(struct file *file, const char *buffer,
                 count;
 }
 
+static int ll_rd_lockless_truncate(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+
+        *eof = 1;
+        return snprintf(page, count, "%u\n",
+                        ll_s2sbi(sb)->ll_lockless_truncate_enable);
+}
+
+static int ll_wr_lockless_truncate(struct file *file, const char *buffer,
+                                   unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return lprocfs_write_helper(buffer, count,
+                                    &sbi->ll_lockless_truncate_enable)
+                ?: count;
+}
+
 static int ll_rd_statahead_max(char *page, char **start, off_t off,
                                int count, int *eof, void *data)
 {
@@ -534,7 +555,10 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "stats_track_pid",  ll_rd_track_pid, ll_wr_track_pid, 0 },
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
-        { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+        { "contention_seconds", ll_rd_contention_time,
+                                ll_wr_contention_time, 0},
+        { "lockless_truncate", ll_rd_lockless_truncate,
+                               ll_wr_lockless_truncate, 0},
         { "statahead_max",      ll_rd_statahead_max, ll_wr_statahead_max, 0 },
         { "statahead_stats",    ll_rd_statahead_stats, 0, 0 },
         { 0 }
@@ -576,6 +600,7 @@ struct llite_file_opcode {
         /* inode operation */
         { LPROC_LL_SETATTR,        LPROCFS_TYPE_REGS, "setattr" },
         { LPROC_LL_TRUNC,          LPROCFS_TYPE_REGS, "truncate" },
+        { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate" },
         { LPROC_LL_FLOCK,          LPROCFS_TYPE_REGS, "flock" },
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
         { LPROC_LL_GETATTR,        LPROCFS_TYPE_REGS, "getattr" },
index c48c39e..b29cbf8 100644 (file)
@@ -120,7 +120,14 @@ int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
         oinfo.oi_oa = &oa;
         oa.o_id = lli->lli_smd->lsm_object_id;
         oa.o_valid = OBD_MD_FLID;
-        oa.o_flags = srvlock ? OBD_FL_TRUNCLOCK : 0;
+        if (srvlock) {
+                /* set OBD_MD_FLFLAGS in o_valid, only if we 
+                 * set OBD_FL_TRUNCLOCK, otherwise ost_punch
+                 * and filter_setattr get confused, see the comment
+                 * in ost_punch */
+                oa.o_flags = OBD_FL_TRUNCLOCK;
+                oa.o_valid |= OBD_MD_FLFLAGS;
+        }
         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
                         OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER |
@@ -212,6 +219,8 @@ void ll_truncate(struct inode *inode)
         ll_inode_size_unlock(inode, 0);
         if (!srvlock)
                 ll_file_punch(inode, new_size, 0);
+        else
+                ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
 
         EXIT;
         return;
index dbcb0c4..00e3147 100644 (file)
@@ -589,6 +589,85 @@ test_31() {
 }
 run_test 31 "voluntary cancel / blocking ast race=============="
 
+# enable/disable lockless truncate feature, depending on the arg 0/1
+enable_lockless_truncate() {
+        lctl set_param -n llite.*.lockless_truncate $1
+}
+
+test_32a() { # bug 11270
+        local p="$TMP/sanityN-$TESTNAME.parameters"
+        save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p
+        cancel_lru_locks osc
+        clear_llite_stats
+        enable_lockless_truncate 1
+        dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+
+        log "checking cached lockless truncate"
+        $TRUNCATE $DIR1/$tfile 8000000
+        $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+                error "lockless truncate doesn't use cached locks"
+
+        log "checking not cached lockless truncate"
+        $TRUNCATE $DIR2/$tfile 5000000
+        $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -ne 0 ] ||
+                error "not cached trancate isn't lockless"
+
+        log "disabled lockless truncate"
+        enable_lockless_truncate 0
+        clear_llite_stats
+        $TRUNCATE $DIR2/$tfile 3000000
+        $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+                error "lockless truncate disabling failed"
+        rm $DIR1/$tfile
+        # restore lockless_truncate default values
+        restore_lustre_params < $p
+        rm -f $p
+}
+run_test 32a "lockless truncate"
+
+test_32b() { # bug 11270
+        local node
+        local p="$TMP/sanityN-$TESTNAME.parameters"
+        save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p
+        for node in $(osts_nodes); do
+                save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p
+                save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p
+                save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p
+        done
+        clear_llite_stats
+        # agressive lockless i/o settings 
+        for node in $(osts_nodes); do
+                do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60'
+        done
+        lctl set_param -n llite.*.contention_seconds 60
+        for i in $(seq 5); do
+                dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+                dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+        done
+        [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered" 
+        # disable lockless i/o (it is disabled by default)
+        for node in $(osts_nodes); do
+                do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0'
+        done
+        # set contention_seconds to 0 at client too, otherwise Lustre still
+        # remembers lock contention
+        lctl set_param -n llite.*.contention_seconds 0
+        clear_llite_stats
+        for i in $(seq 5); do
+                dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+                dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+        done
+        [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] ||
+                error "lockless i/o works when disabled" 
+        rm -f $DIR1/$tfile
+        restore_lustre_params <$p
+        rm -f $p
+}
+run_test 32b "lockless i/o"
+
 log "cleanup: ======================================================"
 
 check_and_cleanup_lustre
index fb824d3..feb98e3 100644 (file)
@@ -1438,3 +1438,33 @@ multiop_bg_pause() {
 
     return 0
 }
+
+# reset llite stat counters
+clear_llite_stats(){
+        lctl set_param -n llite.*.stats 0
+}
+
+# sum llite stat items
+calc_llite_stats() {
+        local res=$(lctl get_param -n llite.*.stats |
+                    awk 'BEGIN {s = 0} END {print s} /^'"$1"'/ {s += $2}')
+        echo $res
+}
+
+# save_lustre_params(node, parameter_mask)
+# generate a stream of formatted strings (<node> <param name>=<param value>)
+save_lustre_params() {
+        local s
+        do_node $1 "lctl get_param $2" | while read s; do echo "$1 $s"; done
+}
+
+# restore lustre parameters from input stream, produces by save_lustre_params
+restore_lustre_params() {
+        local node
+        local name
+        local val
+        while IFS=" =" read node name val; do
+                do_node $node "lctl set_param -n $name $val"
+        done
+}
+