X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fldlm%2Fldlm_lock.c;h=b5c68b2c9a060453ecb8622828f1dcd8a8913e29;hb=d6e9ece60a551df832881b77f04227d0f07d6ade;hp=472dde5f0ab06529f8083f8c4ba6b57a63031b83;hpb=e920be6814512b1aa8696ea36d697d3b698c13e8;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 472dde5..b5c68b2 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -23,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2016, Intel Corporation. + * Copyright (c) 2010, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -1072,16 +1072,14 @@ static void ldlm_granted_list_add_lock(struct ldlm_lock *lock, * Add a lock to granted list on a resource maintaining skiplist * correctness. */ -static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) +void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) { - struct sl_insert_point prev; - ENTRY; + struct sl_insert_point prev; - LASSERT(lock->l_req_mode == lock->l_granted_mode); + LASSERT(lock->l_req_mode == lock->l_granted_mode); - search_granted_lock(&lock->l_resource->lr_granted, lock, &prev); - ldlm_granted_list_add_lock(lock, &prev); - EXIT; + search_granted_lock(&lock->l_resource->lr_granted, lock, &prev); + ldlm_granted_list_add_lock(lock, &prev); } /** @@ -1091,7 +1089,6 @@ static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) * NOTE: called by * - ldlm_lock_enqueue * - ldlm_reprocess_queue - * - ldlm_lock_convert * * must be called with lr_lock held */ @@ -1414,9 +1411,6 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags, GOTO(out, rc = 1); if (flags & LDLM_FL_BLOCK_GRANTED) GOTO(out, rc = 0); - lock = search_queue(&res->lr_converting, &data); - if (lock != NULL) - GOTO(out, rc = 1); lock = search_queue(&res->lr_waiting, &data); if (lock != NULL) GOTO(out, rc = 1); @@ -1695,6 +1689,33 @@ out: RETURN(ERR_PTR(rc)); } +#ifdef HAVE_SERVER_SUPPORT +static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock, + __u64 *flags) +{ + struct ldlm_resource *res = lock->l_resource; + enum ldlm_error rc = ELDLM_OK; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + ldlm_processing_policy policy; + ENTRY; + + policy = ldlm_processing_policy_table[res->lr_type]; +restart: + policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list); + if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode && + res->lr_type != LDLM_FLOCK) { + rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&rpc_list)) + ldlm_discard_bl_list(&rpc_list); + + RETURN(rc); +} +#endif + /** * Enqueue (request) a lock. * @@ -1712,9 +1733,6 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; int local = ns_is_client(ldlm_res_to_ns(res)); -#ifdef HAVE_SERVER_SUPPORT - ldlm_processing_policy policy; -#endif enum ldlm_error rc = ELDLM_OK; struct ldlm_interval *node = NULL; ENTRY; @@ -1814,33 +1832,27 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, * more or less trusting the clients not to lie. * * FIXME (bug 268): Detect obvious lies by checking compatibility in - * granted/converting queues. */ + * granted queue. */ if (local) { - if (*flags & LDLM_FL_BLOCK_CONV) - ldlm_resource_add_lock(res, &res->lr_converting, lock); - else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) - ldlm_resource_add_lock(res, &res->lr_waiting, lock); - else - ldlm_grant_lock(lock, NULL); + if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) + ldlm_resource_add_lock(res, &res->lr_waiting, lock); + else + ldlm_grant_lock(lock, NULL); GOTO(out, rc = ELDLM_OK); #ifdef HAVE_SERVER_SUPPORT - } else if (*flags & LDLM_FL_REPLAY) { - if (*flags & LDLM_FL_BLOCK_CONV) { - ldlm_resource_add_lock(res, &res->lr_converting, lock); - GOTO(out, rc = ELDLM_OK); - } else if (*flags & LDLM_FL_BLOCK_WAIT) { - ldlm_resource_add_lock(res, &res->lr_waiting, lock); + } else if (*flags & LDLM_FL_REPLAY) { + if (*flags & LDLM_FL_BLOCK_WAIT) { + ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, rc = ELDLM_OK); - } else if (*flags & LDLM_FL_BLOCK_GRANTED) { - ldlm_grant_lock(lock, NULL); + } else if (*flags & LDLM_FL_BLOCK_GRANTED) { + ldlm_grant_lock(lock, NULL); GOTO(out, rc = ELDLM_OK); - } - /* If no flags, fall through to normal enqueue path. */ - } + } + /* If no flags, fall through to normal enqueue path. */ + } - policy = ldlm_processing_policy_table[res->lr_type]; - policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL); - GOTO(out, rc); + rc = ldlm_lock_enqueue_helper(lock, flags); + GOTO(out, rc); #else } else { CERROR("This is client-side-only module, cannot handle " @@ -1872,6 +1884,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, __u64 flags; int rc = LDLM_ITER_CONTINUE; enum ldlm_error err; + struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list); ENTRY; check_res_locked(res); @@ -1881,15 +1894,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, LASSERT(intention == LDLM_PROCESS_RESCAN || intention == LDLM_PROCESS_RECOVERY); +restart: list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); pending = list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; - rc = policy(pending, &flags, intention, &err, work_list); + rc = policy(pending, &flags, intention, &err, &rpc_list); + if (pending->l_granted_mode == pending->l_req_mode || + res->lr_type == LDLM_FLOCK) { + list_splice(&rpc_list, work_list); + } else { + list_splice(&rpc_list, &bl_ast_list); + } /* * When this is called from recovery done, we always want * to scan the whole list no matter what 'rc' is returned. @@ -1899,6 +1920,22 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, break; } + if (!list_empty(&bl_ast_list)) { + unlock_res(res); + + LASSERT(intention == LDLM_PROCESS_RECOVERY); + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list, + LDLM_WORK_BL_AST); + + lock_res(res); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&bl_ast_list)) + ldlm_discard_bl_list(&bl_ast_list); + RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE); } @@ -1909,7 +1946,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \param[in] lock The lock to be enqueued. * \param[out] flags Lock flags for the lock to be enqueued. * \param[in] rpc_list Conflicting locks list. - * \param[in] grant_flags extra flags when granting a lock. * * \retval -ERESTART: Some lock was instantly canceled while sending * blocking ASTs, caller needs to re-check conflicting @@ -1918,7 +1954,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \reval 0: Lock is successfully added in waiting list. */ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, - struct list_head *rpc_list, __u64 grant_flags) + struct list_head *rpc_list) { struct ldlm_resource *res = lock->l_resource; int rc; @@ -1967,7 +2003,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, RETURN(rc); } - *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags); + *flags |= LDLM_FL_BLOCK_GRANTED; RETURN(0); } @@ -2216,7 +2252,7 @@ out: /** * Try to grant all waiting locks on a resource. * - * Calls ldlm_reprocess_queue on converting and waiting queues. + * Calls ldlm_reprocess_queue on waiting queue. * * Typically called after some resource locks are cancelled to see * if anything could be granted as a result of the cancellation. @@ -2246,19 +2282,15 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res, RETURN_EXIT; restart: lock_res(res); - rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list, - intention); - if (rc == LDLM_ITER_CONTINUE) - ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, - intention); + ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention); unlock_res(res); - rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, - LDLM_WORK_CP_AST); - if (rc == -ERESTART) { + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_CP_AST); + if (rc == -ERESTART) { LASSERT(list_empty(&rpc_list)); - goto restart; - } + goto restart; + } #else ENTRY; @@ -2303,19 +2335,6 @@ void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns) EXIT; } -static bool is_bl_done(struct ldlm_lock *lock) -{ - bool bl_done = true; - - if (!ldlm_is_bl_done(lock)) { - lock_res_and_lock(lock); - bl_done = ldlm_is_bl_done(lock); - unlock_res_and_lock(lock); - } - - return bl_done; -} - /** * Helper function to call blocking AST for LDLM lock \a lock in a * "cancelling" mode. @@ -2441,7 +2460,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp, res = ldlm_resource_getref(lock->l_resource); - ldlm_res_lvbo_update(res, NULL, 1); + ldlm_lvbo_update(res, lock, NULL, 1); ldlm_lock_cancel(lock); if (!exp->exp_obd->obd_stopping) ldlm_reprocess_all(res); @@ -2542,18 +2561,18 @@ int ldlm_export_cancel_locks(struct obd_export *exp) } /** - * Downgrade an exclusive lock. + * Downgrade an PW/EX lock to COS mode. * - * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The + * A lock mode convertion from PW/EX mode to less conflict mode. The * convertion may fail if lock was canceled before downgrade, but it doesn't * indicate any problem, because such lock has no reader or writer, and will * be released soon. - * Used by Commit on Sharing (COS) code. + * Used by Commit on Sharing (COS) code only for now. * * \param lock A lock to convert * \param new_mode new lock mode */ -void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) +void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) { ENTRY; @@ -2584,137 +2603,7 @@ void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) EXIT; } -EXPORT_SYMBOL(ldlm_lock_downgrade); - -/** - * Attempt to convert already granted lock to a different mode. - * - * While lock conversion is not currently used, future client-side - * optimizations could take advantage of it to avoid discarding cached - * pages on a file. - */ -struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, - enum ldlm_mode new_mode, __u32 *flags) -{ - struct list_head rpc_list; - struct ldlm_resource *res; - struct ldlm_namespace *ns; - int granted = 0; -#ifdef HAVE_SERVER_SUPPORT - int old_mode; - struct sl_insert_point prev; -#endif - struct ldlm_interval *node; - ENTRY; - - INIT_LIST_HEAD(&rpc_list); - /* Just return if mode is unchanged. */ - if (new_mode == lock->l_granted_mode) { - *flags |= LDLM_FL_BLOCK_GRANTED; - RETURN(lock->l_resource); - } - - /* I can't check the type of lock here because the bitlock of lock - * is not held here, so do the allocation blindly. -jay */ - OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS); - if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */ - RETURN(NULL); - - LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR), - "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); - - lock_res_and_lock(lock); - - res = lock->l_resource; - ns = ldlm_res_to_ns(res); - -#ifdef HAVE_SERVER_SUPPORT - old_mode = lock->l_req_mode; -#endif - lock->l_req_mode = new_mode; - if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) { -#ifdef HAVE_SERVER_SUPPORT - /* remember the lock position where the lock might be - * added back to the granted list later and also - * remember the join mode for skiplist fixing. */ - prev.res_link = lock->l_res_link.prev; - prev.mode_link = lock->l_sl_mode.prev; - prev.policy_link = lock->l_sl_policy.prev; -#endif - ldlm_resource_unlink_lock(lock); - } else { - ldlm_resource_unlink_lock(lock); - if (res->lr_type == LDLM_EXTENT) { - /* FIXME: ugly code, I have to attach the lock to a - * interval node again since perhaps it will be granted - * soon */ - INIT_LIST_HEAD(&node->li_group); - ldlm_interval_attach(node, lock); - node = NULL; - } - } - - /* - * Remove old lock from the pool before adding the lock with new - * mode below in ->policy() - */ - ldlm_pool_del(&ns->ns_pool, lock); - - /* If this is a local resource, put it on the appropriate list. */ - if (ns_is_client(ldlm_res_to_ns(res))) { - if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) { - ldlm_resource_add_lock(res, &res->lr_converting, lock); - } else { - /* This should never happen, because of the way the - * server handles conversions. */ - LDLM_ERROR(lock, "Erroneous flags %x on local lock\n", - *flags); - LBUG(); - - ldlm_grant_lock(lock, &rpc_list); - granted = 1; - /* FIXME: completion handling not with lr_lock held ! */ - if (lock->l_completion_ast) - lock->l_completion_ast(lock, 0, NULL); - } -#ifdef HAVE_SERVER_SUPPORT - } else { - int rc; - enum ldlm_error err; - __u64 pflags = 0; - ldlm_processing_policy policy; - - policy = ldlm_processing_policy_table[res->lr_type]; - rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err, - &rpc_list); - if (rc == LDLM_ITER_STOP) { - lock->l_req_mode = old_mode; - if (res->lr_type == LDLM_EXTENT) - ldlm_extent_add_lock(res, lock); - else - ldlm_granted_list_add_lock(lock, &prev); - - res = NULL; - } else { - *flags |= LDLM_FL_BLOCK_GRANTED; - granted = 1; - } - } -#else - } else { - CERROR("This is client-side-only module, cannot handle " - "LDLM_NAMESPACE_SERVER resource type lock.\n"); - LBUG(); - } -#endif - unlock_res_and_lock(lock); - - if (granted) - ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); - if (node) - OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); - RETURN(res); -} +EXPORT_SYMBOL(ldlm_lock_mode_downgrade); /** * Print lock with lock handle \a lockh description into debug log. @@ -2764,10 +2653,10 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, va_start(args, fmt); if (exp && exp->exp_connection) { - nid = libcfs_nid2str(exp->exp_connection->c_peer.nid); + nid = obd_export_nid2str(exp); } else if (exp && exp->exp_obd != NULL) { struct obd_import *imp = exp->exp_obd->u.cli.cl_import; - nid = libcfs_nid2str(imp->imp_connection->c_peer.nid); + nid = obd_import_nid2str(imp); } if (resource == NULL) {