X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lock.c;h=5e7afe6d271a6669238c669391d1b598eb662e87;hb=75a417fa0065d52a31215daaaaf41c0fa9751a89;hp=61324cc80115941db180ff2cff696269f5a5a359;hpb=ebba68f378f72107fa51a8002369d1acef7dbedd;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 61324cc..5e7afe6 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -125,8 +125,6 @@ const char *ldlm_it2str(enum ldlm_intent_flags it) return "getattr"; case IT_LOOKUP: return "lookup"; - case IT_UNLINK: - return "unlink"; case IT_GETXATTR: return "getxattr"; case IT_LAYOUT: @@ -236,6 +234,8 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock) struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); + if (ns->ns_last_pos == &lock->l_lru) + ns->ns_last_pos = lock->l_lru.prev; list_del_init(&lock->l_lru); LASSERT(ns->ns_nr_unused > 0); ns->ns_nr_unused--; @@ -286,7 +286,6 @@ void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock) LASSERT(list_empty(&lock->l_lru)); LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); list_add_tail(&lock->l_lru, &ns->ns_unused_list); - ldlm_clear_skipped(lock); LASSERT(ns->ns_nr_unused >= 0); ns->ns_nr_unused++; } @@ -479,12 +478,13 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats, LDLM_NSS_LOCKS); - INIT_LIST_HEAD(&lock->l_handle.h_link); + INIT_LIST_HEAD_RCU(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, &lock_handle_ops); lu_ref_init(&lock->l_reference); lu_ref_add(&lock->l_reference, "hash", lock); lock->l_callback_timeout = 0; + lock->l_activity = 0; #if LUSTRE_TRACKS_LOCK_EXP_REFS INIT_LIST_HEAD(&lock->l_exp_refs_link); @@ -665,12 +665,19 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, * discard dirty data, rather than writing back. */ if (ldlm_is_ast_discard_data(new)) ldlm_set_discard_data(lock); - LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, work_list); - LDLM_LOCK_GET(lock); - LASSERT(lock->l_blocking_lock == NULL); - lock->l_blocking_lock = LDLM_LOCK_GET(new); - } + + /* Lock can be converted from a blocking state back to granted + * after lock convert or COS downgrade but still be in an + * older bl_list because it is controlled only by + * ldlm_work_bl_ast_lock(), let it be processed there. + */ + if (list_empty(&lock->l_bl_ast)) { + list_add(&lock->l_bl_ast, work_list); + LDLM_LOCK_GET(lock); + } + LASSERT(lock->l_blocking_lock == NULL); + lock->l_blocking_lock = LDLM_LOCK_GET(new); + } } /** @@ -868,7 +875,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode) } else if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers && !ldlm_is_no_lru(lock) && - !ldlm_is_bl_ast(lock)) { + !ldlm_is_bl_ast(lock) && + !ldlm_is_converting(lock)) { LDLM_DEBUG(lock, "add lock into lru list"); @@ -1689,6 +1697,33 @@ out: RETURN(ERR_PTR(rc)); } +#ifdef HAVE_SERVER_SUPPORT +static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock, + __u64 *flags) +{ + struct ldlm_resource *res = lock->l_resource; + enum ldlm_error rc = ELDLM_OK; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + ldlm_processing_policy policy; + ENTRY; + + policy = ldlm_processing_policy_table[res->lr_type]; +restart: + policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list); + if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode && + res->lr_type != LDLM_FLOCK) { + rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&rpc_list)) + ldlm_discard_bl_list(&rpc_list); + + RETURN(rc); +} +#endif + /** * Enqueue (request) a lock. * @@ -1699,16 +1734,14 @@ out: * set, skip all the enqueueing and delegate lock processing to intent policy * function. */ -enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, +enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, + struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *cookie, __u64 *flags) { struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; int local = ns_is_client(ldlm_res_to_ns(res)); -#ifdef HAVE_SERVER_SUPPORT - ldlm_processing_policy policy; -#endif enum ldlm_error rc = ELDLM_OK; struct ldlm_interval *node = NULL; ENTRY; @@ -1716,8 +1749,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, /* policies are not executed on the client or during replay */ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT && !local && ns->ns_policy) { - rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags, - NULL); + rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode, + *flags, NULL); if (rc == ELDLM_LOCK_REPLACED) { /* The lock that was returned has already been granted, * and placed into lockp. If it's not the same as the @@ -1827,8 +1860,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, /* If no flags, fall through to normal enqueue path. */ } - policy = ldlm_processing_policy_table[res->lr_type]; - policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL); + rc = ldlm_lock_enqueue_helper(lock, flags); GOTO(out, rc); #else } else { @@ -1861,6 +1893,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, __u64 flags; int rc = LDLM_ITER_CONTINUE; enum ldlm_error err; + struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list); ENTRY; check_res_locked(res); @@ -1870,15 +1903,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, LASSERT(intention == LDLM_PROCESS_RESCAN || intention == LDLM_PROCESS_RECOVERY); +restart: list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); pending = list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; - rc = policy(pending, &flags, intention, &err, work_list); + rc = policy(pending, &flags, intention, &err, &rpc_list); + if (pending->l_granted_mode == pending->l_req_mode || + res->lr_type == LDLM_FLOCK) { + list_splice(&rpc_list, work_list); + } else { + list_splice(&rpc_list, &bl_ast_list); + } /* * When this is called from recovery done, we always want * to scan the whole list no matter what 'rc' is returned. @@ -1888,6 +1929,20 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, break; } + if (!list_empty(&bl_ast_list)) { + unlock_res(res); + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list, + LDLM_WORK_BL_AST); + + lock_res(res); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&bl_ast_list)) + ldlm_discard_bl_list(&bl_ast_list); + RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE); } @@ -1898,7 +1953,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \param[in] lock The lock to be enqueued. * \param[out] flags Lock flags for the lock to be enqueued. * \param[in] rpc_list Conflicting locks list. - * \param[in] grant_flags extra flags when granting a lock. * * \retval -ERESTART: Some lock was instantly canceled while sending * blocking ASTs, caller needs to re-check conflicting @@ -1907,7 +1961,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, * \reval 0: Lock is successfully added in waiting list. */ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, - struct list_head *rpc_list, __u64 grant_flags) + struct list_head *rpc_list) { struct ldlm_resource *res = lock->l_resource; int rc; @@ -1956,7 +2010,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, RETURN(rc); } - *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags); + *flags |= LDLM_FL_BLOCK_GRANTED; RETURN(0); } @@ -1969,27 +2023,21 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, */ void ldlm_discard_bl_list(struct list_head *bl_list) { - struct list_head *tmp, *pos; - ENTRY; + struct ldlm_lock *lock, *tmp; - list_for_each_safe(pos, tmp, bl_list) { - struct ldlm_lock *lock = - list_entry(pos, struct ldlm_lock, l_bl_ast); + ENTRY; + list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast) { + LASSERT(!list_empty(&lock->l_bl_ast)); list_del_init(&lock->l_bl_ast); - LASSERT(ldlm_is_ast_sent(lock)); ldlm_clear_ast_sent(lock); LASSERT(lock->l_bl_ast_run == 0); - LASSERT(lock->l_blocking_lock); - LDLM_LOCK_RELEASE(lock->l_blocking_lock); - lock->l_blocking_lock = NULL; + ldlm_clear_blocking_lock(lock); LDLM_LOCK_RELEASE(lock); } EXIT; } -#endif - /** * Process a call to blocking AST callback for a lock in ast_work list */ @@ -1997,9 +2045,11 @@ static int ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) { struct ldlm_cb_set_arg *arg = opaq; - struct ldlm_lock_desc d; - int rc; - struct ldlm_lock *lock; + struct ldlm_lock *lock; + struct ldlm_lock_desc d; + struct ldlm_bl_desc bld; + int rc; + ENTRY; if (list_empty(arg->list)) @@ -2007,66 +2057,49 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast); - /* nobody should touch l_bl_ast */ + /* nobody should touch l_bl_ast but some locks in the list may become + * granted after lock convert or COS downgrade, these locks should be + * just skipped here and removed from the list. + */ lock_res_and_lock(lock); list_del_init(&lock->l_bl_ast); + /* lock is not blocking lock anymore, but was kept in the list because + * it can managed only here. + */ + if (!ldlm_is_ast_sent(lock)) { + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + RETURN(0); + } + + LASSERT(lock->l_blocking_lock); + ldlm_lock2desc(lock->l_blocking_lock, &d); + /* copy blocking lock ibits in cancel_bits as well, + * new client may use them for lock convert and it is + * important to use new field to convert locks from + * new servers only + */ + d.l_policy_data.l_inodebits.cancel_bits = + lock->l_blocking_lock->l_policy_data.l_inodebits.bits; + + /* Blocking lock is being destroyed here but some information about it + * may be needed inside l_blocking_ast() function below, + * e.g. in mdt_blocking_ast(). So save needed data in bl_desc. + */ + bld.bl_same_client = lock->l_client_cookie == + lock->l_blocking_lock->l_client_cookie; + bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock); + arg->bl_desc = &bld; + LASSERT(ldlm_is_ast_sent(lock)); LASSERT(lock->l_bl_ast_run == 0); - LASSERT(lock->l_blocking_lock); lock->l_bl_ast_run++; + ldlm_clear_blocking_lock(lock); unlock_res_and_lock(lock); - ldlm_lock2desc(lock->l_blocking_lock, &d); - rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING); - LDLM_LOCK_RELEASE(lock->l_blocking_lock); - lock->l_blocking_lock = NULL; - LDLM_LOCK_RELEASE(lock); - - RETURN(rc); -} - -/** - * Process a call to completion AST callback for a lock in ast_work list - */ -static int -ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) -{ - struct ldlm_cb_set_arg *arg = opaq; - int rc = 0; - struct ldlm_lock *lock; - ldlm_completion_callback completion_callback; - ENTRY; - if (list_empty(arg->list)) - RETURN(-ENOENT); - - lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast); - - /* It's possible to receive a completion AST before we've set - * the l_completion_ast pointer: either because the AST arrived - * before the reply, or simply because there's a small race - * window between receiving the reply and finishing the local - * enqueue. (bug 842) - * - * This can't happen with the blocking_ast, however, because we - * will never call the local blocking_ast until we drop our - * reader/writer reference, which we won't do until we get the - * reply and finish enqueueing. */ - - /* nobody should touch l_cp_ast */ - lock_res_and_lock(lock); - list_del_init(&lock->l_cp_ast); - LASSERT(ldlm_is_cp_reqd(lock)); - /* save l_completion_ast since it can be changed by - * mds_intent_policy(), see bug 14225 */ - completion_callback = lock->l_completion_ast; - ldlm_clear_cp_reqd(lock); - unlock_res_and_lock(lock); - - if (completion_callback != NULL) - rc = completion_callback(lock, 0, (void *)arg); LDLM_LOCK_RELEASE(lock); RETURN(rc); @@ -2138,6 +2171,53 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) RETURN(rc); } +#endif + +/** + * Process a call to completion AST callback for a lock in ast_work list + */ +static int +ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) +{ + struct ldlm_cb_set_arg *arg = opaq; + struct ldlm_lock *lock; + ldlm_completion_callback completion_callback; + int rc = 0; + + ENTRY; + + if (list_empty(arg->list)) + RETURN(-ENOENT); + + lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast); + + /* It's possible to receive a completion AST before we've set + * the l_completion_ast pointer: either because the AST arrived + * before the reply, or simply because there's a small race + * window between receiving the reply and finishing the local + * enqueue. (bug 842) + * + * This can't happen with the blocking_ast, however, because we + * will never call the local blocking_ast until we drop our + * reader/writer reference, which we won't do until we get the + * reply and finish enqueueing. */ + + /* nobody should touch l_cp_ast */ + lock_res_and_lock(lock); + list_del_init(&lock->l_cp_ast); + LASSERT(ldlm_is_cp_reqd(lock)); + /* save l_completion_ast since it can be changed by + * mds_intent_policy(), see bug 14225 */ + completion_callback = lock->l_completion_ast; + ldlm_clear_cp_reqd(lock); + unlock_res_and_lock(lock); + + if (completion_callback != NULL) + rc = completion_callback(lock, 0, (void *)arg); + LDLM_LOCK_RELEASE(lock); + + RETURN(rc); +} /** * Process list of locks in need of ASTs being sent. @@ -2146,11 +2226,11 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) * one. */ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, - ldlm_desc_ast_t ast_type) + ldlm_desc_ast_t ast_type) { struct ldlm_cb_set_arg *arg; - set_producer_func work_ast_lock; - int rc; + set_producer_func work_ast_lock; + int rc; if (list_empty(rpc_list)) RETURN(0); @@ -2163,24 +2243,26 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, arg->list = rpc_list; switch (ast_type) { - case LDLM_WORK_BL_AST: - arg->type = LDLM_BL_CALLBACK; - work_ast_lock = ldlm_work_bl_ast_lock; - break; - case LDLM_WORK_CP_AST: - arg->type = LDLM_CP_CALLBACK; - work_ast_lock = ldlm_work_cp_ast_lock; - break; - case LDLM_WORK_REVOKE_AST: - arg->type = LDLM_BL_CALLBACK; - work_ast_lock = ldlm_work_revoke_ast_lock; - break; - case LDLM_WORK_GL_AST: - arg->type = LDLM_GL_CALLBACK; - work_ast_lock = ldlm_work_gl_ast_lock; - break; - default: - LBUG(); + case LDLM_WORK_CP_AST: + arg->type = LDLM_CP_CALLBACK; + work_ast_lock = ldlm_work_cp_ast_lock; + break; +#ifdef HAVE_SERVER_SUPPORT + case LDLM_WORK_BL_AST: + arg->type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_bl_ast_lock; + break; + case LDLM_WORK_REVOKE_AST: + arg->type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_revoke_ast_lock; + break; + case LDLM_WORK_GL_AST: + arg->type = LDLM_GL_CALLBACK; + work_ast_lock = ldlm_work_gl_ast_lock; + break; +#endif + default: + LBUG(); } /* We create a ptlrpc request set with flow control extension. @@ -2192,7 +2274,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, if (arg->set == NULL) GOTO(out, rc = -ENOMEM); - ptlrpc_set_wait(arg->set); + ptlrpc_set_wait(NULL, arg->set); ptlrpc_set_destroy(arg->set); rc = atomic_read(&arg->restart) ? -ERESTART : 0; @@ -2351,6 +2433,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) * talking to me first. -phik */ if (lock->l_readers || lock->l_writers) { LDLM_ERROR(lock, "lock still has references"); + unlock_res_and_lock(lock); LBUG(); } @@ -2401,6 +2484,7 @@ int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data) EXPORT_SYMBOL(ldlm_lock_set_data); struct export_cl_data { + const struct lu_env *ecl_env; struct obd_export *ecl_exp; int ecl_loop; }; @@ -2413,7 +2497,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp, res = ldlm_resource_getref(lock->l_resource); - ldlm_lvbo_update(res, lock, NULL, 1); + ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1); ldlm_lock_cancel(lock); if (!exp->exp_obd->obd_stopping) ldlm_reprocess_all(res); @@ -2453,10 +2537,17 @@ ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, */ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) { + struct lu_env env; struct export_cl_data ecl = { .ecl_exp = exp, .ecl_loop = 0, }; + int rc; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + ecl.ecl_env = &env; while (!list_empty(&exp->exp_bl_list)) { struct ldlm_lock *lock; @@ -2479,6 +2570,8 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) LDLM_LOCK_RELEASE(lock); } + lu_env_fini(&env); + CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, " "left on hash table %d.\n", exp, ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count)); @@ -2493,10 +2586,16 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp) */ int ldlm_export_cancel_locks(struct obd_export *exp) { - struct export_cl_data ecl = { - .ecl_exp = exp, - .ecl_loop = 0, - }; + struct export_cl_data ecl; + struct lu_env env; + int rc; + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + ecl.ecl_env = &env; + ecl.ecl_exp = exp; + ecl.ecl_loop = 0; cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_cancel_locks_for_export_cb, &ecl); @@ -2510,6 +2609,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp) exp->exp_obd->obd_stopping) ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace); + lu_env_fini(&env); + return ecl.ecl_loop; } @@ -2520,13 +2621,18 @@ int ldlm_export_cancel_locks(struct obd_export *exp) * convertion may fail if lock was canceled before downgrade, but it doesn't * indicate any problem, because such lock has no reader or writer, and will * be released soon. - * Used by Commit on Sharing (COS) code only for now. + * + * Used by Commit on Sharing (COS) code to force object changes commit in case + * of conflict. Converted lock is considered as new lock and all blocking AST + * things are cleared, so any pending or new blocked lock on that lock will + * cause new call to blocking_ast and force resource object commit. * * \param lock A lock to convert * \param new_mode new lock mode */ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) { +#ifdef HAVE_SERVER_SUPPORT ENTRY; LASSERT(new_mode == LCK_COS); @@ -2547,14 +2653,20 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) * ldlm_grant_lock() called below. */ ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock); + + /* Consider downgraded lock as a new lock and clear all states + * related to a previous blocking AST processing. + */ + ldlm_clear_blocking_data(lock); + lock->l_req_mode = new_mode; ldlm_grant_lock(lock, NULL); - unlock_res_and_lock(lock); ldlm_reprocess_all(lock->l_resource); EXIT; +#endif } EXPORT_SYMBOL(ldlm_lock_mode_downgrade); @@ -2606,10 +2718,10 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, va_start(args, fmt); if (exp && exp->exp_connection) { - nid = libcfs_nid2str(exp->exp_connection->c_peer.nid); + nid = obd_export_nid2str(exp); } else if (exp && exp->exp_obd != NULL) { struct obd_import *imp = exp->exp_obd->u.cli.cl_import; - nid = libcfs_nid2str(imp->imp_connection->c_peer.nid); + nid = obd_import_nid2str(imp); } if (resource == NULL) {