From: Andriy Skulysh Date: Mon, 19 Feb 2018 09:02:36 +0000 (+0200) Subject: LU-10841 ldlm: ASSERTION(lock->l_granted_mode!=lock->l_req_mode) X-Git-Tag: 2.11.51~5 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=d6e9ece60a551df832881b77f04227d0f07d6ade LU-10841 ldlm: ASSERTION(lock->l_granted_mode!=lock->l_req_mode) Policy processors can unlock resource to send BL AST, so cached next list entry can become invalid. Move sending BL ASTs to ldlm_reprocess_queue() in case of LDLM_PROCESS_RECOVERY. Cray-bug-id: LUS-5689 Change-Id: Ib9b757576461b2f74aaa916b4b62538a9abfa0dd Signed-off-by: Andriy Skulysh Reviewed-on: https://review.whamcloud.com/31726 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Alexandr Boyko Reviewed-by: Vitaly Fertman Reviewed-by: Oleg Drokin --- diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 623563c..97c754d 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -786,15 +786,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags, enum ldlm_error *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; - struct list_head rpc_list; int rc, rc2; int contended_locks = 0; + struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ? + NULL : work_list; ENTRY; LASSERT(lock->l_granted_mode != lock->l_req_mode); LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) || !ldlm_is_ast_discard_data(lock)); - INIT_LIST_HEAD(&rpc_list); check_res_locked(res); *err = ELDLM_OK; @@ -819,49 +819,38 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags, if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) ldlm_extent_policy(res, lock, flags); - ldlm_grant_lock(lock, work_list); + ldlm_grant_lock(lock, grant_work); RETURN(LDLM_ITER_CONTINUE); } - LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) || - (intention == LDLM_PROCESS_RECOVERY && work_list != NULL)); - restart: contended_locks = 0; rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, - &rpc_list, &contended_locks); + work_list, &contended_locks); if (rc < 0) GOTO(out_rpc_list, rc); rc2 = 0; if (rc != 2) { rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, - flags, err, &rpc_list, + flags, err, work_list, &contended_locks); if (rc2 < 0) GOTO(out_rpc_list, rc = rc2); } - if (rc + rc2 != 2) { - /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to force - * client to wait for the lock endlessly once the lock is - * enqueued -bzzz */ - rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, - LDLM_FL_NO_TIMEOUT); - if (rc == -ERESTART) - GOTO(restart, rc); - *err = rc; - } else { + if (rc + rc2 == 2) { ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); - rc = 0; + ldlm_grant_lock(lock, grant_work); + } else { + /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to + * force client to wait for the lock endlessly once + * the lock is enqueued -bzzz */ + *flags |= LDLM_FL_NO_TIMEOUT; } + rc = LDLM_ITER_CONTINUE; out_rpc_list: - if (!list_empty(&rpc_list)) { - LASSERT(!ldlm_is_ast_discard_data(lock)); - ldlm_discard_bl_list(&rpc_list); - } RETURN(rc); } #endif /* HAVE_SERVER_SUPPORT */ diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index e00c359..f848a36 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -289,6 +289,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int overlaps = 0; int splitted = 0; const struct ldlm_callback_suite null_cbs = { NULL }; + struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ? + NULL : work_list; ENTRY; CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start " @@ -348,7 +350,7 @@ reprocess: reprocess_failed = 1; if (ldlm_flock_deadlock(req, lock)) { ldlm_flock_cancel_on_deadlock(req, - work_list); + grant_work); RETURN(LDLM_ITER_CONTINUE); } continue; @@ -590,7 +592,7 @@ restart: } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, work_list); + ldlm_add_ast_work_item(req, NULL, grant_work); } #else /* !HAVE_SERVER_SUPPORT */ /* The only one possible case for client-side calls flock diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 280efad..da561d8 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -203,13 +203,13 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; - struct list_head rpc_list; + struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ? + NULL : work_list; int rc; ENTRY; LASSERT(lock->l_granted_mode != lock->l_req_mode); - INIT_LIST_HEAD(&rpc_list); check_res_locked(res); if (intention == LDLM_PROCESS_RESCAN) { @@ -232,30 +232,23 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, *flags |= LDLM_FL_LOCK_CHANGED; } ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); + ldlm_grant_lock(lock, grant_work); *err = ELDLM_OK; RETURN(LDLM_ITER_CONTINUE); } - LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) || - (intention == LDLM_PROCESS_RECOVERY && work_list != NULL)); -restart: - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list); - rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list); + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, work_list); + rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, work_list); if (rc != 2) { /* if there were only bits to try and all are conflicting */ if ((lock->l_policy_data.l_inodebits.bits | lock->l_policy_data.l_inodebits.try_bits) == 0) { - rc = ELDLM_LOCK_WOULDBLOCK; + *err = ELDLM_LOCK_WOULDBLOCK; } else { - rc = ldlm_handle_conflict_lock(lock, flags, - &rpc_list, 0); - if (rc == -ERESTART) - GOTO(restart, rc); + *err = ELDLM_OK; } - *err = rc; } else { /* grant also all remaining try_bits */ if (lock->l_policy_data.l_inodebits.try_bits != 0) { @@ -266,14 +259,11 @@ restart: } LASSERT(lock->l_policy_data.l_inodebits.bits); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); - rc = 0; + ldlm_grant_lock(lock, grant_work); + *err = ELDLM_OK; } - if (!list_empty(&rpc_list)) - ldlm_discard_bl_list(&rpc_list); - - RETURN(rc); + RETURN(LDLM_ITER_CONTINUE); } #endif /* HAVE_SERVER_SUPPORT */ diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 872e0ba..92f2d58 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -158,7 +158,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, struct list_head *work_list, enum ldlm_process_intention intention); int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, - struct list_head *rpc_list, __u64 grant_flags); + struct list_head *rpc_list); void ldlm_discard_bl_list(struct list_head *bl_list); #endif int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 01d9837..b5c68b2 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1689,6 +1689,33 @@ out: RETURN(ERR_PTR(rc)); } +#ifdef HAVE_SERVER_SUPPORT +static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock, + __u64 *flags) +{ + struct ldlm_resource *res = lock->l_resource; + enum ldlm_error rc = ELDLM_OK; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + ldlm_processing_policy policy; + ENTRY; + + policy = ldlm_processing_policy_table[res->lr_type]; +restart: + policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list); + if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode && + res->lr_type != LDLM_FLOCK) { + rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&rpc_list)) + ldlm_discard_bl_list(&rpc_list); + + RETURN(rc); +} +#endif + /** * Enqueue (request) a lock. * @@ -1706,9 +1733,6 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; int local = ns_is_client(ldlm_res_to_ns(res)); -#ifdef HAVE_SERVER_SUPPORT - ldlm_processing_policy policy; -#endif enum ldlm_error rc = ELDLM_OK; struct ldlm_interval *node = NULL; ENTRY; @@ -1827,8 +1851,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, /* If no flags, fall through to normal enqueue path. */ } - policy = ldlm_processing_policy_table[res->lr_type]; - policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL); + rc = ldlm_lock_enqueue_helper(lock, flags); GOTO(out, rc); #else } else { @@ -1861,6 +1884,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, __u64 flags; int rc = LDLM_ITER_CONTINUE; enum ldlm_error err; + struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list); ENTRY; check_res_locked(res); @@ -1870,15 +1894,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, LASSERT(intention == LDLM_PROCESS_RESCAN || intention == LDLM_PROCESS_RECOVERY); +restart: list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); pending = list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; - rc = policy(pending, &flags, intention, &err, work_list); + rc = policy(pending, &flags, intention, &err, &rpc_list); + if (pending->l_granted_mode == pending->l_req_mode || + res->lr_type == LDLM_FLOCK) { + list_splice(&rpc_list, work_list); + } else { + list_splice(&rpc_list, &bl_ast_list); + } /* * When this is called from recovery done, we always want * to scan the whole list no matter what 'rc' is returned. @@ -1888,6 +1920,22 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, break; } + if (!list_empty(&bl_ast_list)) { + unlock_res(res); + + LASSERT(intention == LDLM_PROCESS_RECOVERY); + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list, + LDLM_WORK_BL_AST); + + lock_res(res); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&bl_ast_list)) + ldlm_discard_bl_list(&bl_ast_list); + RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE); } @@ -1898,7 +1946,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \param[in] lock The lock to be enqueued. * \param[out] flags Lock flags for the lock to be enqueued. * \param[in] rpc_list Conflicting locks list. - * \param[in] grant_flags extra flags when granting a lock. * * \retval -ERESTART: Some lock was instantly canceled while sending * blocking ASTs, caller needs to re-check conflicting @@ -1907,7 +1954,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \reval 0: Lock is successfully added in waiting list. */ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, - struct list_head *rpc_list, __u64 grant_flags) + struct list_head *rpc_list) { struct ldlm_resource *res = lock->l_resource; int rc; @@ -1956,7 +2003,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, RETURN(rc); } - *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags); + *flags |= LDLM_FL_BLOCK_GRANTED; RETURN(0); } diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 247c712..aa074f5 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -129,13 +129,14 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags, enum ldlm_error *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; - struct list_head rpc_list; + struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ? + NULL : work_list; int rc; ENTRY; LASSERT(lock->l_granted_mode != lock->l_req_mode); check_res_locked(res); - INIT_LIST_HEAD(&rpc_list); + *err = ELDLM_OK; if (intention == LDLM_PROCESS_RESCAN) { LASSERT(work_list != NULL); @@ -147,31 +148,19 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags, RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); + ldlm_grant_lock(lock, grant_work); RETURN(LDLM_ITER_CONTINUE); } - LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) || - (intention == LDLM_PROCESS_RECOVERY && work_list != NULL)); - restart: - rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list); - rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list); - - if (rc != 2) { - rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0); - if (rc == -ERESTART) - GOTO(restart, rc); - *err = rc; - } else { + rc = ldlm_plain_compat_queue(&res->lr_granted, lock, work_list); + rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, work_list); + + if (rc == 2) { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); - rc = 0; + ldlm_grant_lock(lock, grant_work); } - if (!list_empty(&rpc_list)) - ldlm_discard_bl_list(&rpc_list); - - RETURN(rc); + RETURN(LDLM_ITER_CONTINUE); } #endif /* HAVE_SERVER_SUPPORT */