X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_inodebits.c;h=095c6e5615d22710e2a7e1ca604dd68505df0695;hb=7c99f67d9d39e8a037e830cf08a9df305e6d8da2;hp=d6ededf3249e3f152f1507d0695ac9dc578722f1;hpb=954cc6754b19a5eb4b9f717f79037c40baa87f3f;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index d6ededf..095c6e5 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -57,40 +57,87 @@ #include "ldlm_internal.h" #ifdef HAVE_SERVER_SUPPORT -/* - * local lock will be canceled after use, and it should run blocking ast only - * when it should trigger Commit-on-Sharing, otherwise if the blocking ast - * is run and does nothing, it will prevent subsequent operations to trigger - * Commit-on-Sharing, see LU-11102. + +/** + * It should iterate through all waiting locks on a given resource queue and + * attempt to grant them. An optimization is to check only heads waitintg + * locks for each inodebit type. + * + * Must be called with resource lock held. */ -static bool ldlm_should_run_bl_ast(const struct ldlm_lock *lock, - const struct ldlm_lock *req) +int ldlm_reprocess_inodebits_queue(struct ldlm_resource *res, + struct list_head *queue, + struct list_head *work_list, + enum ldlm_process_intention intention, + struct ldlm_lock *hint) { - /* no blocking ast */ - if (!lock->l_blocking_ast) - return false; + __u64 flags; + int rc = LDLM_ITER_CONTINUE; + enum ldlm_error err; + LIST_HEAD(bl_ast_list); + struct ldlm_ibits_queues *queues = res->lr_ibits_queues; + int i; + + ENTRY; + + check_res_locked(res); + + LASSERT(res->lr_type == LDLM_IBITS); + LASSERT(intention == LDLM_PROCESS_RESCAN || + intention == LDLM_PROCESS_RECOVERY); + + if (intention == LDLM_PROCESS_RECOVERY) + return ldlm_reprocess_queue(res, queue, work_list, intention, + NULL); - /* not local lock */ - if (!ldlm_is_local(lock)) - return true; +restart: + CDEBUG(D_DLMTRACE, "--- Reprocess resource "DLDLMRES" (%p)\n", + PLDLMRES(res), res); + + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) { + LIST_HEAD(rpc_list); + struct list_head *head = &queues->liq_waiting[i]; + struct ldlm_lock *pending; + struct ldlm_ibits_node *node; + + if (list_empty(head)) + continue; + if (hint && !(hint->l_policy_data.l_inodebits.bits & (1 << i))) + continue; + + node = list_entry(head->next, struct ldlm_ibits_node, + lin_link[i]); + + pending = node->lock; + LDLM_DEBUG(pending, "Reprocessing lock from queue %d", i); + + flags = 0; + rc = ldlm_process_inodebits_lock(pending, &flags, intention, + &err, &rpc_list); + if (ldlm_is_granted(pending)) { + list_splice(&rpc_list, work_list); + /* Try to grant more locks from current queue */ + i--; + } else { + list_splice(&rpc_list, &bl_ast_list); + } + } - /* should trigger Commit-on-Sharing */ - if ((lock->l_req_mode & LCK_COS)) - return true; + if (!list_empty(&bl_ast_list)) { + unlock_res(res); - /* local read lock will be canceld after use */ - if (!(lock->l_req_mode & (LCK_PW | LCK_EX))) - return false; + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list, + LDLM_WORK_BL_AST); - /* if CoS enabled, check if @req is from different client */ - if (ldlm_is_cos_enabled(req)) - return lock->l_client_cookie != req->l_client_cookie; + lock_res(res); + if (rc == -ERESTART) + GOTO(restart, rc); + } - /* check if @req is COS incompatible */ - if (ldlm_is_cos_incompat(req)) - return true; + if (!list_empty(&bl_ast_list)) + ldlm_discard_bl_list(&bl_ast_list); - return false; + RETURN(rc); } /** @@ -204,12 +251,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* Add locks of the policy group to @work_list * as blocking locks for @req */ - if (ldlm_should_run_bl_ast(lock, req)) + if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, req, work_list); head = &lock->l_sl_policy; list_for_each_entry(lock, head, l_sl_policy) - if (ldlm_should_run_bl_ast(lock, req)) + if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, req, work_list); } @@ -245,7 +292,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, ENTRY; - LASSERT(lock->l_granted_mode != lock->l_req_mode); + LASSERT(!ldlm_is_granted(lock)); check_res_locked(res); if (intention == LDLM_PROCESS_RESCAN) { @@ -335,7 +382,10 @@ void ldlm_ibits_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy, union ldlm_policy_data *lpolicy) { lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits; - lpolicy->l_inodebits.try_bits = wpolicy->l_inodebits.try_bits; + /** + * try_bits are to be handled outside of generic write_to_local due + * to different behavior on a server and client. + */ } void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy, @@ -377,89 +427,151 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop) EXPORT_SYMBOL(ldlm_inodebits_drop); /* convert single lock */ -int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits) +int ldlm_cli_inodebits_convert(struct ldlm_lock *lock, + enum ldlm_cancel_flags cancel_flags) { - struct lustre_handle lockh; + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + struct ldlm_lock_desc ld = { { 0 } }; + __u64 drop_bits, new_bits; __u32 flags = 0; int rc; ENTRY; - LASSERT(drop_bits); - LASSERT(!lock->l_readers && !lock->l_writers); + check_res_locked(lock->l_resource); - LDLM_DEBUG(lock, "client lock convert START"); + /* Lock is being converted already */ + if (ldlm_is_converting(lock)) { + if (!(cancel_flags & LCF_ASYNC)) { + struct l_wait_info lwi = { 0 }; - ldlm_lock2handle(lock, &lockh); - lock_res_and_lock(lock); - /* check if all bits are blocked */ - if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) { - unlock_res_and_lock(lock); - /* return error to continue with cancel */ - GOTO(exit, rc = -EINVAL); + unlock_res_and_lock(lock); + l_wait_event(lock->l_waitq, + is_lock_converted(lock), &lwi); + lock_res_and_lock(lock); + } + RETURN(0); } - /* check if no common bits, consider this as successful convert */ - if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) { - unlock_res_and_lock(lock); - GOTO(exit, rc = 0); - } + /* lru_cancel may happen in parallel and call ldlm_cli_cancel_list() + * independently. + */ + if (ldlm_is_canceling(lock)) + RETURN(-EINVAL); - /* check if there is race with cancel */ - if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) { - unlock_res_and_lock(lock); - GOTO(exit, rc = -EINVAL); - } + /* no need in only local convert */ + if (lock->l_flags & (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK)) + RETURN(-EINVAL); - /* clear cbpending flag early, it is safe to match lock right after - * client convert because it is downgrade always. - */ - ldlm_clear_cbpending(lock); - ldlm_clear_bl_ast(lock); + drop_bits = lock->l_policy_data.l_inodebits.cancel_bits; + /* no cancel bits - means that caller needs full cancel */ + if (drop_bits == 0) + RETURN(-EINVAL); + + new_bits = lock->l_policy_data.l_inodebits.bits & ~drop_bits; + /* check if all lock bits are dropped, proceed with cancel */ + if (!new_bits) + RETURN(-EINVAL); + + /* check if no dropped bits, consider this as successful convert */ + if (lock->l_policy_data.l_inodebits.bits == new_bits) + RETURN(0); - /* If lock is being converted already, check drop bits first */ - if (ldlm_is_converting(lock)) { - /* raced lock convert, lock inodebits are remaining bits - * so check if they are conflicting with new convert or not. - */ - if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) { - unlock_res_and_lock(lock); - GOTO(exit, rc = 0); - } - /* Otherwise drop new conflicting bits in new convert */ - } ldlm_set_converting(lock); - /* from all bits of blocking lock leave only conflicting */ - drop_bits &= lock->l_policy_data.l_inodebits.bits; - /* save them in cancel_bits, so l_blocking_ast will know - * which bits from the current lock were dropped. */ - lock->l_policy_data.l_inodebits.cancel_bits = drop_bits; - /* Finally clear these bits in lock ibits */ - ldlm_inodebits_drop(lock, drop_bits); - unlock_res_and_lock(lock); /* Finally call cancel callback for remaining bits only. * It is important to have converting flag during that * so blocking_ast callback can distinguish convert from * cancels. */ - if (lock->l_blocking_ast) - lock->l_blocking_ast(lock, NULL, lock->l_ast_data, - LDLM_CB_CANCELING); - + ld.l_policy_data.l_inodebits.cancel_bits = drop_bits; + unlock_res_and_lock(lock); + lock->l_blocking_ast(lock, &ld, lock->l_ast_data, LDLM_CB_CANCELING); /* now notify server about convert */ - rc = ldlm_cli_convert(lock, &flags); - if (rc) { - lock_res_and_lock(lock); - if (ldlm_is_converting(lock)) { - ldlm_clear_converting(lock); - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); + rc = ldlm_cli_convert_req(lock, &flags, new_bits); + lock_res_and_lock(lock); + if (rc) + GOTO(full_cancel, rc); + + /* Finally clear these bits in lock ibits */ + ldlm_inodebits_drop(lock, drop_bits); + + /* Being locked again check if lock was canceled, it is important + * to do and don't drop cbpending below + */ + if (ldlm_is_canceling(lock)) + GOTO(full_cancel, rc = -EINVAL); + + /* also check again if more bits to be cancelled appeared */ + if (drop_bits != lock->l_policy_data.l_inodebits.cancel_bits) + GOTO(clear_converting, rc = -EAGAIN); + + /* clear cbpending flag early, it is safe to match lock right after + * client convert because it is downgrade always. + */ + ldlm_clear_cbpending(lock); + ldlm_clear_bl_ast(lock); + spin_lock(&ns->ns_lock); + if (list_empty(&lock->l_lru)) + ldlm_lock_add_to_lru_nolock(lock); + spin_unlock(&ns->ns_lock); + + /* the job is done, zero the cancel_bits. If more conflicts appear, + * it will result in another cycle of ldlm_cli_inodebits_convert(). + */ +full_cancel: + lock->l_policy_data.l_inodebits.cancel_bits = 0; +clear_converting: + ldlm_clear_converting(lock); + RETURN(rc); +} + +int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock) +{ + if (ldlm_is_ns_srv(lock)) { + int i; + + OBD_SLAB_ALLOC_PTR(lock->l_ibits_node, ldlm_inodebits_slab); + if (lock->l_ibits_node == NULL) + return -ENOMEM; + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + INIT_LIST_HEAD(&lock->l_ibits_node->lin_link[i]); + lock->l_ibits_node->lock = lock; + } else { + lock->l_ibits_node = NULL; + } + return 0; +} + +void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head, + struct ldlm_lock *lock) +{ + int i; + + if (!ldlm_is_ns_srv(lock)) + return; + + if (head == &res->lr_waiting) { + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) { + if (lock->l_policy_data.l_inodebits.bits & (1 << i)) + list_add_tail(&lock->l_ibits_node->lin_link[i], + &res->lr_ibits_queues->liq_waiting[i]); } - unlock_res_and_lock(lock); - GOTO(exit, rc); + } else if (head == &res->lr_granted && lock->l_ibits_node != NULL) { + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + LASSERT(list_empty(&lock->l_ibits_node->lin_link[i])); + OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab); + lock->l_ibits_node = NULL; } - EXIT; -exit: - LDLM_DEBUG(lock, "client lock convert END"); - return rc; +} + +void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock) +{ + int i; + + ldlm_unlink_lock_skiplist(lock); + if (!ldlm_is_ns_srv(lock)) + return; + + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + list_del_init(&lock->l_ibits_node->lin_link[i]); }