Whamcloud - gitweb
LU-13645 ldlm: re-process ldlm lock cleanup 05/39405/3
authorVitaly Fertman <c17818@cray.com>
Thu, 16 Jul 2020 11:28:03 +0000 (14:28 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 13 Aug 2020 14:50:03 +0000 (14:50 +0000)
For extent locks:
- rescan logic is not needed for group locks, it works well without it
- @err is not needed for ldlm_extent_compat_queue(), it is always set
  to @compat, remove it and set outside
- LDLM_FL_NO_TIMEOUT flag could be set once outside of
  ldlm_extent_compat_queue()
- add ldlm_resource_insert_lock_before();

For inodebits:
- glimpse expects ELDLM_LOCK_ABORTED to fill data properly on client
  side, do not return ELDLM_LOCK_WOULDBLOCK from
  ldlm_process_inodebits_lock()
- regular enqueue also does not have logic for ELDLM_LOCK_WOULDBLOCK,
  restore the original ELDLM_LOCK_ABORTED here as well for simplicity
- check for DOM lock in mdt_dom_client_has_lock() according to open
  flags, not for LCK_PW always;

Also, move sanity 82 to sanityn as after LU-9964 it is to be run on
two mount points.

Signed-off-by: Vitaly Fertman <c17818@cray.com>
Change-Id: I6d0b230f04aaa497db5b036b4ed9afe5d7f418b0
HPE-bug-id: LUS-8987
Reviewed-on: https://review.whamcloud.com/39405
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_resource.c
lustre/mdt/mdt_io.c
lustre/tests/sanity.sh
lustre/tests/sanityn.sh

index 710a783..56a8c73 100644 (file)
@@ -384,8 +384,8 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
  */
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                        __u64 *flags, enum ldlm_error *err,
-                        struct list_head *work_list, int *contended_locks)
+                        __u64 *flags, struct list_head *work_list,
+                        int *contended_locks)
 {
        struct ldlm_resource *res = req->l_resource;
        enum ldlm_mode req_mode = req->l_req_mode;
@@ -394,7 +394,6 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
        struct ldlm_lock *lock;
        int check_contention;
        int compat = 1;
-       int scan = 0;
        ENTRY;
 
         lockmode_verify(req_mode);
@@ -439,7 +438,6 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                         goto destroylock;
                                 }
 
-                                *flags |= LDLM_FL_NO_TIMEOUT;
                                 if (!work_list)
                                         RETURN(0);
 
@@ -486,33 +484,6 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                         if (req == lock)
                                 break;
 
-                        if (unlikely(scan)) {
-                                /* We only get here if we are queuing GROUP lock
-                                   and met some incompatible one. The main idea of this
-                                   code is to insert GROUP lock past compatible GROUP
-                                   lock in the waiting queue or if there is not any,
-                                   then in front of first non-GROUP lock */
-                                if (lock->l_req_mode != LCK_GROUP) {
-                                        /* Ok, we hit non-GROUP lock, there should
-                                         * be no more GROUP locks later on, queue in
-                                         * front of first non-GROUP lock */
-
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                       list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
-                                        compat = 0;
-                                        break;
-                                }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        compat = 0;
-                                        break;
-                                }
-                                continue;
-                        }
-
                         /* locks are compatible, overlap doesn't matter */
                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
                                 if (req_mode == LCK_PR &&
@@ -586,25 +557,18 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 
                        if (unlikely(req_mode == LCK_GROUP &&
                                     !ldlm_is_granted(lock))) {
-                                scan = 1;
                                 compat = 0;
                                 if (lock->l_req_mode != LCK_GROUP) {
                                         /* Ok, we hit non-GROUP lock, there should be no
                                            more GROUP locks later on, queue in front of
                                            first non-GROUP lock */
 
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                       list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
+                                       ldlm_resource_insert_lock_before(lock, req);
                                         break;
                                 }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        break;
-                                }
-                                continue;
+                               LASSERT(req->l_policy_data.l_extent.gid !=
+                                       lock->l_policy_data.l_extent.gid);
+                               continue;
                         }
 
                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
@@ -615,8 +579,6 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                              | LDLM_FL_SPECULATIVE)) {
                                         compat = -EWOULDBLOCK;
                                         goto destroylock;
-                                } else {
-                                        *flags |= LDLM_FL_NO_TIMEOUT;
                                 }
                         } else if (lock->l_policy_data.l_extent.end < req_start ||
                                    lock->l_policy_data.l_extent.start > req_end) {
@@ -663,7 +625,6 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 destroylock:
        list_del_init(&req->l_res_link);
         ldlm_lock_destroy_nolock(req);
-        *err = compat;
         RETURN(compat);
 }
 
@@ -786,7 +747,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                             enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
-       int rc, rc2;
+       int rc, rc2 = 0;
        int contended_locks = 0;
        struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
                                                        NULL : work_list;
@@ -806,10 +767,10 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                 * ever stops being true, we want to find out. */
                 LASSERT(*flags == 0);
                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
-                                              err, NULL, &contended_locks);
+                                             NULL, &contended_locks);
                 if (rc == 1) {
                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                                      flags, err, NULL,
+                                                     flags, NULL,
                                                       &contended_locks);
                 }
                 if (rc == 0)
@@ -824,18 +785,17 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
         }
 
         contended_locks = 0;
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
+       rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
                                      work_list, &contended_locks);
        if (rc < 0)
-               GOTO(out_rpc_list, rc);
+               GOTO(out, *err = rc);
 
-       rc2 = 0;
        if (rc != 2) {
                rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                              flags, err, work_list,
+                                              flags, work_list,
                                               &contended_locks);
                if (rc2 < 0)
-                       GOTO(out_rpc_list, rc = rc2);
+                       GOTO(out, *err = rc = rc2);
        }
 
        if (rc + rc2 == 2) {
@@ -848,10 +808,10 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                 * the lock is enqueued -bzzz */
                *flags |= LDLM_FL_NO_TIMEOUT;
        }
-       rc = LDLM_ITER_CONTINUE;
 
-out_rpc_list:
-       RETURN(rc);
+       RETURN(LDLM_ITER_CONTINUE);
+out:
+       return rc;
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
index 1723db4..7dd78d0 100644 (file)
@@ -289,22 +289,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
                                                        NULL : work_list;
        int rc;
-
        ENTRY;
 
+       *err = ELDLM_LOCK_ABORTED;
        LASSERT(!ldlm_is_granted(lock));
        check_res_locked(res);
 
        if (intention == LDLM_PROCESS_RESCAN) {
-               struct list_head *bl_list;
-
-               if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                       bl_list = NULL;
-                       *err = ELDLM_LOCK_WOULDBLOCK;
-               } else {
-                       bl_list = work_list;
-                       *err = ELDLM_LOCK_ABORTED;
-               }
+               struct list_head *bl_list =
+                       *flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
 
                LASSERT(lock->l_policy_data.l_inodebits.bits != 0);
 
@@ -355,9 +348,10 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        if (rc != 2) {
                /* if there were only bits to try and all are conflicting */
                if ((lock->l_policy_data.l_inodebits.bits |
-                    lock->l_policy_data.l_inodebits.try_bits) == 0) {
-                       *err = ELDLM_LOCK_WOULDBLOCK;
-               } else {
+                    lock->l_policy_data.l_inodebits.try_bits)) {
+                       /* There is no sense to set LDLM_FL_NO_TIMEOUT to @flags
+                        * for DOM lock while they are enqueued through intents,
+                        * i.e. @lock here is local which does not timeout. */
                        *err = ELDLM_OK;
                }
        } else {
index 2a6bf95..2293c59 100644 (file)
@@ -125,6 +125,8 @@ extern struct kmem_cache *ldlm_interval_tree_slab;
 
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
+void ldlm_resource_insert_lock_before(struct ldlm_lock *original,
+                                     struct ldlm_lock *new);
 
 /* ldlm_lock.c */
 
index 26a2b08..6bd149a 100644 (file)
@@ -1553,16 +1553,13 @@ int ldlm_resource_putref(struct ldlm_resource *res)
 }
 EXPORT_SYMBOL(ldlm_resource_putref);
 
-/**
- * Add a lock into a given resource into specified lock list.
- */
-void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
-                           struct ldlm_lock *lock)
+static void __ldlm_resource_add_lock(struct ldlm_resource *res,
+                                    struct list_head *head,
+                                    struct ldlm_lock *lock,
+                                    bool tail)
 {
        check_res_locked(res);
 
-       LDLM_DEBUG(lock, "About to add this lock");
-
        if (ldlm_is_destroyed(lock)) {
                CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
                return;
@@ -1570,36 +1567,53 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
 
        LASSERT(list_empty(&lock->l_res_link));
 
-       list_add_tail(&lock->l_res_link, head);
+       if (tail)
+               list_add_tail(&lock->l_res_link, head);
+       else
+               list_add(&lock->l_res_link, head);
 
        if (res->lr_type == LDLM_IBITS)
                ldlm_inodebits_add_lock(res, head, lock);
+
+       ldlm_resource_dump(D_INFO, res);
+}
+
+/**
+ * Add a lock into a given resource into specified lock list.
+ */
+void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
+                           struct ldlm_lock *lock)
+{
+       LDLM_DEBUG(lock, "About to add this lock");
+
+       __ldlm_resource_add_lock(res, head, lock, true);
 }
 
 /**
  * Insert a lock into resource after specified lock.
- *
- * Obtain resource description from the lock we are inserting after.
  */
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                     struct ldlm_lock *new)
 {
-       struct ldlm_resource *res = original->l_resource;
-
-       check_res_locked(res);
+       LASSERT(!list_empty(&original->l_res_link));
 
-       ldlm_resource_dump(D_INFO, res);
        LDLM_DEBUG(new, "About to insert this lock after %p: ", original);
+       __ldlm_resource_add_lock(original->l_resource,
+                                &original->l_res_link,
+                                new, false);
+}
 
-       if (ldlm_is_destroyed(new)) {
-               CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
-               goto out;
-       }
-
-       LASSERT(list_empty(&new->l_res_link));
+/**
+ * Insert a lock into resource before the specified lock.
+ */
+void ldlm_resource_insert_lock_before(struct ldlm_lock *original,
+                                      struct ldlm_lock *new)
+{
+       LASSERT(!list_empty(&original->l_res_link));
 
-       list_add(&new->l_res_link, &original->l_res_link);
- out:;
+       LDLM_DEBUG(new, "About to insert this lock before %p: ", original);
+       __ldlm_resource_add_lock(original->l_resource,
+                                original->l_res_link.prev, new, false);
 }
 
 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
index b89d8fb..ba4c372 100644 (file)
@@ -1267,17 +1267,21 @@ bool mdt_dom_client_has_lock(struct mdt_thread_info *info,
        struct mdt_device *mdt = info->mti_mdt;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
        struct lustre_handle lockh;
        enum ldlm_mode mode;
        struct ldlm_lock *lock;
+       enum ldlm_mode lm;
        bool rc;
 
        policy->l_inodebits.bits = MDS_INODELOCK_DOM;
        fid_build_reg_res_name(fid, res_id);
 
+
+       lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR | LCK_PW;
        mode = ldlm_lock_match(mdt->mdt_namespace, LDLM_FL_BLOCK_GRANTED |
                               LDLM_FL_TEST_LOCK, res_id, LDLM_IBITS, policy,
-                              LCK_PW, &lockh, 0);
+                              lm, &lockh, 0);
 
        /* There is no other PW lock on this object; finished. */
        if (mode == 0)
index 4b6a6f4..8578c65 100755 (executable)
@@ -144,6 +144,13 @@ check_swap_layouts_support()
                skip "Does not support layout lock."
 }
 
+check_swap_layout_no_dom()
+{
+       local FOLDER=$1
+       local SUPP=$(lfs getstripe $FOLDER | grep "pattern:       mdt" | wc -l)
+       [ $SUPP -eq 0 ] || skip "layout swap does not support DOM files so far"
+}
+
 check_and_setup_lustre
 DIR=${DIR:-$MOUNT}
 assert_DIR
@@ -9116,28 +9123,6 @@ test_81b() { # LU-456
 }
 run_test 81b "OST should return -ENOSPC when retry still fails ======="
 
-test_82() { # LU-1031
-       dd if=/dev/zero of=$DIR/$tfile bs=1M count=10
-       local gid1=14091995
-       local gid2=16022000
-
-       multiop_bg_pause $DIR/$tfile OG${gid1}_g${gid1}c || return 1
-       local MULTIPID1=$!
-       multiop_bg_pause $DIR/$tfile O_G${gid2}r10g${gid2}c || return 2
-       local MULTIPID2=$!
-       kill -USR1 $MULTIPID2
-       sleep 2
-       if [[ `ps h -o comm -p $MULTIPID2` == "" ]]; then
-               error "First grouplock does not block second one"
-       else
-               echo "Second grouplock blocks first one"
-       fi
-       kill -USR1 $MULTIPID1
-       wait $MULTIPID1
-       wait $MULTIPID2
-}
-run_test 82 "Basic grouplock test"
-
 test_99() {
        [ -z "$(which cvs 2>/dev/null)" ] && skip_env "could not find cvs"
 
@@ -15916,6 +15901,7 @@ test_184c() {
        local cmpn_arg=$(cmp -n 2>&1 | grep "invalid option")
        [ -n "$cmpn_arg" ] && skip_env "cmp does not support -n"
        check_swap_layouts_support
+       check_swap_layout_no_dom $DIR
 
        local dir0=$DIR/$tdir/$testnum
        mkdir -p $dir0 || error "creating dir $dir0"
@@ -15963,6 +15949,7 @@ run_test 184c "Concurrent write and layout swap"
 
 test_184d() {
        check_swap_layouts_support
+       check_swap_layout_no_dom $DIR
        [ -z "$(which getfattr 2>/dev/null)" ] &&
                skip_env "no getfattr command"
 
@@ -15999,6 +15986,7 @@ test_184e() {
        [[ $MDS1_VERSION -ge $(version_code 2.6.94) ]] ||
                skip "Need MDS version at least 2.6.94"
        check_swap_layouts_support
+       check_swap_layout_no_dom $DIR
        [ -z "$(which getfattr 2>/dev/null)" ] &&
                skip_env "no getfattr command"
 
@@ -22192,6 +22180,7 @@ test_405() {
                        skip "Layout swap lock is not supported"
 
        check_swap_layouts_support
+       check_swap_layout_no_dom $DIR
 
        test_mkdir $DIR/$tdir
        swap_lock_test -d $DIR/$tdir ||
index ec00a5e..c75a476 100755 (executable)
@@ -5295,6 +5295,30 @@ test_106c() {
 }
 run_test 106c "Verify statx attributes mask"
 
+test_107() { # LU-1031
+       dd if=/dev/zero of=$DIR1/$tfile bs=1M count=10
+       local gid1=14091995
+       local gid2=16022000
+
+       $LFS getstripe $DIR1/$tfile
+
+       multiop_bg_pause $DIR1/$tfile OG${gid1}_g${gid1}c || return 1
+       local MULTIPID1=$!
+       multiop_bg_pause $DIR2/$tfile O_G${gid2}r10g${gid2}c || return 2
+       local MULTIPID2=$!
+       kill -USR1 $MULTIPID2
+       sleep 2
+       if [[ `ps h -o comm -p $MULTIPID2` == "" ]]; then
+               error "First grouplock does not block second one"
+       else
+               echo "First grouplock blocks second one"
+       fi
+       kill -USR1 $MULTIPID1
+       wait $MULTIPID1
+       wait $MULTIPID2
+}
+run_test 107 "Basic grouplock conflict"
+
 log "cleanup: ======================================================"
 
 # kill and wait in each test only guarentee script finish, but command in script