Whamcloud - gitweb
LU-10841 ldlm: ASSERTION(lock->l_granted_mode!=lock->l_req_mode) 26/31726/4
authorAndriy Skulysh <c17819@cray.com>
Mon, 19 Feb 2018 09:02:36 +0000 (11:02 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 19 Apr 2018 04:38:34 +0000 (04:38 +0000)
Policy processors can unlock resource to send BL AST,
so cached next list entry can become invalid.

Move sending BL ASTs to ldlm_reprocess_queue()
in case of LDLM_PROCESS_RECOVERY.

Cray-bug-id: LUS-5689
Change-Id: Ib9b757576461b2f74aaa916b4b62538a9abfa0dd
Signed-off-by: Andriy Skulysh <c17819@cray.com>
Reviewed-on: https://review.whamcloud.com/31726
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_plain.c

index 623563c..97c754d 100644 (file)
@@ -786,15 +786,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                             enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
                             enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
-       struct list_head rpc_list;
        int rc, rc2;
        int contended_locks = 0;
        int rc, rc2;
        int contended_locks = 0;
+       struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
+                                                       NULL : work_list;
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
        LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
                !ldlm_is_ast_discard_data(lock));
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
        LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
                !ldlm_is_ast_discard_data(lock));
-       INIT_LIST_HEAD(&rpc_list);
        check_res_locked(res);
        *err = ELDLM_OK;
 
        check_res_locked(res);
        *err = ELDLM_OK;
 
@@ -819,49 +819,38 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
 
                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
                         ldlm_extent_policy(res, lock, flags);
 
                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
                         ldlm_extent_policy(res, lock, flags);
-                ldlm_grant_lock(lock, work_list);
+               ldlm_grant_lock(lock, grant_work);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
-       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
-               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
- restart:
         contended_locks = 0;
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
         contended_locks = 0;
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
-                                      &rpc_list, &contended_locks);
+                                     work_list, &contended_locks);
        if (rc < 0)
                GOTO(out_rpc_list, rc);
 
        rc2 = 0;
        if (rc != 2) {
                rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
        if (rc < 0)
                GOTO(out_rpc_list, rc);
 
        rc2 = 0;
        if (rc != 2) {
                rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                              flags, err, &rpc_list,
+                                              flags, err, work_list,
                                               &contended_locks);
                if (rc2 < 0)
                        GOTO(out_rpc_list, rc = rc2);
        }
 
                                               &contended_locks);
                if (rc2 < 0)
                        GOTO(out_rpc_list, rc = rc2);
        }
 
-       if (rc + rc2 != 2) {
-               /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to force
-                * client to wait for the lock endlessly once the lock is
-                * enqueued -bzzz */
-               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list,
-                                              LDLM_FL_NO_TIMEOUT);
-               if (rc == -ERESTART)
-                       GOTO(restart, rc);
-               *err = rc;
-       } else {
+       if (rc + rc2 == 2) {
                ldlm_extent_policy(res, lock, flags);
                ldlm_resource_unlink_lock(lock);
                ldlm_extent_policy(res, lock, flags);
                ldlm_resource_unlink_lock(lock);
-               ldlm_grant_lock(lock, work_list);
-               rc = 0;
+               ldlm_grant_lock(lock, grant_work);
+       } else {
+               /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
+                * force client to wait for the lock endlessly once
+                * the lock is enqueued -bzzz */
+               *flags |= LDLM_FL_NO_TIMEOUT;
        }
        }
+       rc = LDLM_ITER_CONTINUE;
 
 out_rpc_list:
 
 out_rpc_list:
-       if (!list_empty(&rpc_list)) {
-               LASSERT(!ldlm_is_ast_discard_data(lock));
-               ldlm_discard_bl_list(&rpc_list);
-       }
        RETURN(rc);
 }
 #endif /* HAVE_SERVER_SUPPORT */
        RETURN(rc);
 }
 #endif /* HAVE_SERVER_SUPPORT */
index e00c359..f848a36 100644 (file)
@@ -289,6 +289,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
        int overlaps = 0;
        int splitted = 0;
        const struct ldlm_callback_suite null_cbs = { NULL };
        int overlaps = 0;
        int splitted = 0;
        const struct ldlm_callback_suite null_cbs = { NULL };
+       struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
+                                                       NULL : work_list;
        ENTRY;
 
        CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
        ENTRY;
 
        CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
@@ -348,7 +350,7 @@ reprocess:
                                reprocess_failed = 1;
                                if (ldlm_flock_deadlock(req, lock)) {
                                        ldlm_flock_cancel_on_deadlock(req,
                                reprocess_failed = 1;
                                if (ldlm_flock_deadlock(req, lock)) {
                                        ldlm_flock_cancel_on_deadlock(req,
-                                                       work_list);
+                                                       grant_work);
                                        RETURN(LDLM_ITER_CONTINUE);
                                }
                                continue;
                                        RETURN(LDLM_ITER_CONTINUE);
                                }
                                continue;
@@ -590,7 +592,7 @@ restart:
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, work_list);
+                       ldlm_add_ast_work_item(req, NULL, grant_work);
                 }
 #else /* !HAVE_SERVER_SUPPORT */
                 /* The only one possible case for client-side calls flock
                 }
 #else /* !HAVE_SERVER_SUPPORT */
                 /* The only one possible case for client-side calls flock
index 280efad..da561d8 100644 (file)
@@ -203,13 +203,13 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                                struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
                                struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
-       struct list_head rpc_list;
+       struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
+                                                       NULL : work_list;
        int rc;
 
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
        int rc;
 
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
-       INIT_LIST_HEAD(&rpc_list);
        check_res_locked(res);
 
        if (intention == LDLM_PROCESS_RESCAN) {
        check_res_locked(res);
 
        if (intention == LDLM_PROCESS_RESCAN) {
@@ -232,30 +232,23 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                        *flags |= LDLM_FL_LOCK_CHANGED;
                }
                ldlm_resource_unlink_lock(lock);
                        *flags |= LDLM_FL_LOCK_CHANGED;
                }
                ldlm_resource_unlink_lock(lock);
-               ldlm_grant_lock(lock, work_list);
+               ldlm_grant_lock(lock, grant_work);
 
                *err = ELDLM_OK;
                RETURN(LDLM_ITER_CONTINUE);
        }
 
 
                *err = ELDLM_OK;
                RETURN(LDLM_ITER_CONTINUE);
        }
 
-       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
-               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
-restart:
-       rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
-       rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
+       rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, work_list);
+       rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, work_list);
 
        if (rc != 2) {
                /* if there were only bits to try and all are conflicting */
                if ((lock->l_policy_data.l_inodebits.bits |
                     lock->l_policy_data.l_inodebits.try_bits) == 0) {
 
        if (rc != 2) {
                /* if there were only bits to try and all are conflicting */
                if ((lock->l_policy_data.l_inodebits.bits |
                     lock->l_policy_data.l_inodebits.try_bits) == 0) {
-                       rc = ELDLM_LOCK_WOULDBLOCK;
+                       *err = ELDLM_LOCK_WOULDBLOCK;
                } else {
                } else {
-                       rc = ldlm_handle_conflict_lock(lock, flags,
-                                                      &rpc_list, 0);
-                       if (rc == -ERESTART)
-                               GOTO(restart, rc);
+                       *err = ELDLM_OK;
                }
                }
-               *err = rc;
        } else {
                /* grant also all remaining try_bits */
                if (lock->l_policy_data.l_inodebits.try_bits != 0) {
        } else {
                /* grant also all remaining try_bits */
                if (lock->l_policy_data.l_inodebits.try_bits != 0) {
@@ -266,14 +259,11 @@ restart:
                }
                LASSERT(lock->l_policy_data.l_inodebits.bits);
                ldlm_resource_unlink_lock(lock);
                }
                LASSERT(lock->l_policy_data.l_inodebits.bits);
                ldlm_resource_unlink_lock(lock);
-               ldlm_grant_lock(lock, work_list);
-               rc = 0;
+               ldlm_grant_lock(lock, grant_work);
+               *err = ELDLM_OK;
        }
 
        }
 
-       if (!list_empty(&rpc_list))
-               ldlm_discard_bl_list(&rpc_list);
-
-       RETURN(rc);
+       RETURN(LDLM_ITER_CONTINUE);
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
index 872e0ba..92f2d58 100644 (file)
@@ -158,7 +158,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
                         enum ldlm_process_intention intention);
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                         struct list_head *work_list,
                         enum ldlm_process_intention intention);
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
-                             struct list_head *rpc_list, __u64 grant_flags);
+                             struct list_head *rpc_list);
 void ldlm_discard_bl_list(struct list_head *bl_list);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
 void ldlm_discard_bl_list(struct list_head *bl_list);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
index 01d9837..b5c68b2 100644 (file)
@@ -1689,6 +1689,33 @@ out:
        RETURN(ERR_PTR(rc));
 }
 
        RETURN(ERR_PTR(rc));
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
+                                            __u64 *flags)
+{
+       struct ldlm_resource *res = lock->l_resource;
+       enum ldlm_error rc = ELDLM_OK;
+       struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+       ldlm_processing_policy policy;
+       ENTRY;
+
+       policy = ldlm_processing_policy_table[res->lr_type];
+restart:
+       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
+       if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
+           res->lr_type != LDLM_FLOCK) {
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&rpc_list))
+               ldlm_discard_bl_list(&rpc_list);
+
+       RETURN(rc);
+}
+#endif
+
 /**
  * Enqueue (request) a lock.
  *
 /**
  * Enqueue (request) a lock.
  *
@@ -1706,9 +1733,6 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
        struct ldlm_lock *lock = *lockp;
        struct ldlm_resource *res = lock->l_resource;
        int local = ns_is_client(ldlm_res_to_ns(res));
        struct ldlm_lock *lock = *lockp;
        struct ldlm_resource *res = lock->l_resource;
        int local = ns_is_client(ldlm_res_to_ns(res));
-#ifdef HAVE_SERVER_SUPPORT
-       ldlm_processing_policy policy;
-#endif
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
        ENTRY;
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
        ENTRY;
@@ -1827,8 +1851,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
                /* If no flags, fall through to normal enqueue path. */
        }
 
                /* If no flags, fall through to normal enqueue path. */
        }
 
-       policy = ldlm_processing_policy_table[res->lr_type];
-       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
+       rc = ldlm_lock_enqueue_helper(lock, flags);
        GOTO(out, rc);
 #else
         } else {
        GOTO(out, rc);
 #else
         } else {
@@ -1861,6 +1884,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
+       struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
        ENTRY;
 
        check_res_locked(res);
        ENTRY;
 
        check_res_locked(res);
@@ -1870,15 +1894,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
 
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
 
+restart:
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
+               struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
-               rc = policy(pending, &flags, intention, &err, work_list);
+               rc = policy(pending, &flags, intention, &err, &rpc_list);
+               if (pending->l_granted_mode == pending->l_req_mode ||
+                   res->lr_type == LDLM_FLOCK) {
+                       list_splice(&rpc_list, work_list);
+               } else {
+                       list_splice(&rpc_list, &bl_ast_list);
+               }
                /*
                 * When this is called from recovery done, we always want
                 * to scan the whole list no matter what 'rc' is returned.
                /*
                 * When this is called from recovery done, we always want
                 * to scan the whole list no matter what 'rc' is returned.
@@ -1888,6 +1920,22 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                        break;
         }
 
                        break;
         }
 
+       if (!list_empty(&bl_ast_list)) {
+               unlock_res(res);
+
+               LASSERT(intention == LDLM_PROCESS_RECOVERY);
+
+               rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+                                      LDLM_WORK_BL_AST);
+
+               lock_res(res);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&bl_ast_list))
+               ldlm_discard_bl_list(&bl_ast_list);
+
         RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 
         RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 
@@ -1898,7 +1946,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \param[in] lock             The lock to be enqueued.
  * \param[out] flags           Lock flags for the lock to be enqueued.
  * \param[in] rpc_list         Conflicting locks list.
  * \param[in] lock             The lock to be enqueued.
  * \param[out] flags           Lock flags for the lock to be enqueued.
  * \param[in] rpc_list         Conflicting locks list.
- * \param[in] grant_flags      extra flags when granting a lock.
  *
  * \retval -ERESTART:  Some lock was instantly canceled while sending
  *                     blocking ASTs, caller needs to re-check conflicting
  *
  * \retval -ERESTART:  Some lock was instantly canceled while sending
  *                     blocking ASTs, caller needs to re-check conflicting
@@ -1907,7 +1954,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \reval 0:           Lock is successfully added in waiting list.
  */
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
  * \reval 0:           Lock is successfully added in waiting list.
  */
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
-                             struct list_head *rpc_list, __u64 grant_flags)
+                             struct list_head *rpc_list)
 {
        struct ldlm_resource *res = lock->l_resource;
        int rc;
 {
        struct ldlm_resource *res = lock->l_resource;
        int rc;
@@ -1956,7 +2003,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
 
                RETURN(rc);
        }
 
                RETURN(rc);
        }
-       *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+       *flags |= LDLM_FL_BLOCK_GRANTED;
 
        RETURN(0);
 }
 
        RETURN(0);
 }
index 247c712..aa074f5 100644 (file)
@@ -129,13 +129,14 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
                            enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
                            enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
-       struct list_head rpc_list;
+       struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
+                                                       NULL : work_list;
        int rc;
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
        check_res_locked(res);
        int rc;
        ENTRY;
 
        LASSERT(lock->l_granted_mode != lock->l_req_mode);
        check_res_locked(res);
-       INIT_LIST_HEAD(&rpc_list);
+       *err = ELDLM_OK;
 
        if (intention == LDLM_PROCESS_RESCAN) {
                 LASSERT(work_list != NULL);
 
        if (intention == LDLM_PROCESS_RESCAN) {
                 LASSERT(work_list != NULL);
@@ -147,31 +148,19 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, work_list);
+               ldlm_grant_lock(lock, grant_work);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
-       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
-               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
- restart:
-        rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
-        rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
-
-        if (rc != 2) {
-               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0);
-               if (rc == -ERESTART)
-                       GOTO(restart, rc);
-               *err = rc;
-       } else {
+       rc = ldlm_plain_compat_queue(&res->lr_granted, lock, work_list);
+       rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, work_list);
+
+       if (rc == 2) {
                ldlm_resource_unlink_lock(lock);
                ldlm_resource_unlink_lock(lock);
-               ldlm_grant_lock(lock, work_list);
-               rc = 0;
+               ldlm_grant_lock(lock, grant_work);
        }
 
        }
 
-       if (!list_empty(&rpc_list))
-               ldlm_discard_bl_list(&rpc_list);
-
-       RETURN(rc);
+       RETURN(LDLM_ITER_CONTINUE);
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
 }
 #endif /* HAVE_SERVER_SUPPORT */