From ebba68f378f72107fa51a8002369d1acef7dbedd Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Tue, 12 Dec 2017 14:17:21 +0300 Subject: [PATCH] LU-10175 ldlm: remove obsoleted lock convert code Patch removes lock mode convert mechanics from Lustre, it is obsoleted and not functional at the moment. Also there are no plans to restore it and use again. Signed-off-by: Mikhail Pershin Change-Id: I477caf24927768dfcdc15888e59a7d5e62d5b577 Reviewed-on: https://review.whamcloud.com/30491 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Bobi Jam Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 25 ++--- lustre/include/lustre_dlm_flags.h | 3 +- lustre/ldlm/ldlm_extent.c | 1 - lustre/ldlm/ldlm_inodebits.c | 1 - lustre/ldlm/ldlm_lock.c | 197 ++++++-------------------------------- lustre/ldlm/ldlm_lockd.c | 91 +----------------- lustre/ldlm/ldlm_plain.c | 1 - lustre/ldlm/ldlm_request.c | 177 +++++++--------------------------- lustre/ldlm/ldlm_resource.c | 51 ++++------ lustre/mdt/mdt_handler.c | 2 +- lustre/ptlrpc/service.c | 2 +- 11 files changed, 91 insertions(+), 460 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index d0aeed9..9e833cc 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -722,7 +722,7 @@ struct ldlm_lock { struct list_head l_lru; /** * Linkage to resource's lock queues according to current lock state. - * (could be granted, waiting or converting) + * (could be granted or waiting) * Protected by lr_lock in struct ldlm_resource. */ struct list_head l_res_link; @@ -962,8 +962,6 @@ struct ldlm_resource { * @{ */ /** List of locks in granted state */ struct list_head lr_granted; - /** List of locks waiting to change their granted mode (converted) */ - struct list_head lr_converting; /** * List of locks that could not be granted due to conflicts and * that are waiting for conflicts to go away */ @@ -1205,21 +1203,21 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, * LDLM_PROCESS_RESCAN: * * It's used when policy functions are called from ldlm_reprocess_queue() to - * reprocess the wait & convert list and try to grant locks, blocking ASTs + * reprocess the wait list and try to grant locks, blocking ASTs * have already been sent in this situation, completion ASTs need be sent for * the locks being granted. * * LDLM_PROCESS_ENQUEUE: * * It's used when policy functions are called from ldlm_lock_enqueue() to - * process the wait & convert list for handling an enqueue request, blocking + * process the wait list for handling an enqueue request, blocking * ASTs have not been sent yet, so list of conflicting locks would be * collected and ASTs sent. * * LDLM_PROCESS_RECOVERY: * * It's used when policy functions are called from ldlm_reprocess_queue() to - * reprocess the wait & convert list when recovery done. In case of blocking + * reprocess the wait list when recovery done. In case of blocking * ASTs are lost before recovery, it needs not only to grant locks if * available, but also send blocking ASTs to the locks doesn't have AST sent * flag. Completion ASTs need be sent for the locks being granted. @@ -1305,14 +1303,11 @@ int ldlm_glimpse_locks(struct ldlm_resource *res, * MDT or OST to pass through LDLM requests to LDLM for handling * @{ */ -int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, - ldlm_blocking_callback, ldlm_glimpse_callback); int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, - const struct ldlm_request *dlm_req, - const struct ldlm_callback_suite *cbs); -int ldlm_handle_convert(struct ptlrpc_request *req); + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs); int ldlm_handle_convert0(struct ptlrpc_request *req, - const struct ldlm_request *dlm_req); + const struct ldlm_request *dlm_req); int ldlm_handle_cancel(struct ptlrpc_request *req); int ldlm_request_cancel(struct ptlrpc_request *req, const struct ldlm_request *dlm_req, @@ -1462,9 +1457,7 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags, struct lustre_handle *, int unref); enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh, __u64 *bits); -struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, - enum ldlm_mode new_mode, __u32 *flags); -void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode); +void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode); void ldlm_lock_cancel(struct ldlm_lock *lock); void ldlm_reprocess_all(struct ldlm_resource *res); void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns); @@ -1606,8 +1599,6 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, void *data, __u32 lvb_len, enum lvb_type lvb_type, const __u64 *client_cookie, struct lustre_handle *lockh); -int ldlm_cli_convert(const struct lustre_handle *lockh, int new_mode, - __u32 *flags); int ldlm_cli_update_pool(struct ptlrpc_request *req); int ldlm_cli_cancel(const struct lustre_handle *lockh, enum ldlm_cancel_flags cancel_flags); diff --git a/lustre/include/lustre_dlm_flags.h b/lustre/include/lustre_dlm_flags.h index 43c24b5..dc978d4 100644 --- a/lustre/include/lustre_dlm_flags.h +++ b/lustre/include/lustre_dlm_flags.h @@ -44,7 +44,7 @@ /** * Server placed lock on conv list, or a recovering client wants the lock - * added to the conv list, no questions asked. */ + * added to the conv list, no questions asked. (obsoleted) */ #define LDLM_FL_BLOCK_CONV 0x0000000000000004ULL // bit 2 #define ldlm_is_block_conv(_l) LDLM_TEST_FLAG(( _l), 1ULL << 2) #define ldlm_set_block_conv(_l) LDLM_SET_FLAG(( _l), 1ULL << 2) @@ -403,7 +403,6 @@ /** l_flags bits marked as "blocked" bits */ #define LDLM_FL_BLOCKED_MASK (LDLM_FL_BLOCK_GRANTED |\ - LDLM_FL_BLOCK_CONV |\ LDLM_FL_BLOCK_WAIT) /** l_flags bits marked as "gone" bits */ diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index bee0c0c..35a4c46 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -790,7 +790,6 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags, ENTRY; LASSERT(lock->l_granted_mode != lock->l_req_mode); - LASSERT(list_empty(&res->lr_converting)); LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) || !ldlm_is_ast_discard_data(lock)); INIT_LIST_HEAD(&rpc_list); diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index a0814dd..280efad 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -209,7 +209,6 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, ENTRY; LASSERT(lock->l_granted_mode != lock->l_req_mode); - LASSERT(list_empty(&res->lr_converting)); INIT_LIST_HEAD(&rpc_list); check_res_locked(res); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index c078734..61324cc 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1089,7 +1089,6 @@ void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) * NOTE: called by * - ldlm_lock_enqueue * - ldlm_reprocess_queue - * - ldlm_lock_convert * * must be called with lr_lock held */ @@ -1412,9 +1411,6 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags, GOTO(out, rc = 1); if (flags & LDLM_FL_BLOCK_GRANTED) GOTO(out, rc = 0); - lock = search_queue(&res->lr_converting, &data); - if (lock != NULL) - GOTO(out, rc = 1); lock = search_queue(&res->lr_waiting, &data); if (lock != NULL) GOTO(out, rc = 1); @@ -1812,33 +1808,28 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, * more or less trusting the clients not to lie. * * FIXME (bug 268): Detect obvious lies by checking compatibility in - * granted/converting queues. */ + * granted queue. */ if (local) { - if (*flags & LDLM_FL_BLOCK_CONV) - ldlm_resource_add_lock(res, &res->lr_converting, lock); - else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) - ldlm_resource_add_lock(res, &res->lr_waiting, lock); - else - ldlm_grant_lock(lock, NULL); + if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) + ldlm_resource_add_lock(res, &res->lr_waiting, lock); + else + ldlm_grant_lock(lock, NULL); GOTO(out, rc = ELDLM_OK); #ifdef HAVE_SERVER_SUPPORT - } else if (*flags & LDLM_FL_REPLAY) { - if (*flags & LDLM_FL_BLOCK_CONV) { - ldlm_resource_add_lock(res, &res->lr_converting, lock); + } else if (*flags & LDLM_FL_REPLAY) { + if (*flags & LDLM_FL_BLOCK_WAIT) { + ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, rc = ELDLM_OK); - } else if (*flags & LDLM_FL_BLOCK_WAIT) { - ldlm_resource_add_lock(res, &res->lr_waiting, lock); + } else if (*flags & LDLM_FL_BLOCK_GRANTED) { + ldlm_grant_lock(lock, NULL); GOTO(out, rc = ELDLM_OK); - } else if (*flags & LDLM_FL_BLOCK_GRANTED) { - ldlm_grant_lock(lock, NULL); - GOTO(out, rc = ELDLM_OK); - } - /* If no flags, fall through to normal enqueue path. */ - } + } + /* If no flags, fall through to normal enqueue path. */ + } - policy = ldlm_processing_policy_table[res->lr_type]; + policy = ldlm_processing_policy_table[res->lr_type]; policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL); - GOTO(out, rc); + GOTO(out, rc); #else } else { CERROR("This is client-side-only module, cannot handle " @@ -2214,7 +2205,7 @@ out: /** * Try to grant all waiting locks on a resource. * - * Calls ldlm_reprocess_queue on converting and waiting queues. + * Calls ldlm_reprocess_queue on waiting queue. * * Typically called after some resource locks are cancelled to see * if anything could be granted as a result of the cancellation. @@ -2244,19 +2235,15 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res, RETURN_EXIT; restart: lock_res(res); - rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list, - intention); - if (rc == LDLM_ITER_CONTINUE) - ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, - intention); + ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention); unlock_res(res); - rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, - LDLM_WORK_CP_AST); - if (rc == -ERESTART) { + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_CP_AST); + if (rc == -ERESTART) { LASSERT(list_empty(&rpc_list)); - goto restart; - } + goto restart; + } #else ENTRY; @@ -2527,18 +2514,18 @@ int ldlm_export_cancel_locks(struct obd_export *exp) } /** - * Downgrade an exclusive lock. + * Downgrade an PW/EX lock to COS mode. * - * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The + * A lock mode convertion from PW/EX mode to less conflict mode. The * convertion may fail if lock was canceled before downgrade, but it doesn't * indicate any problem, because such lock has no reader or writer, and will * be released soon. - * Used by Commit on Sharing (COS) code. + * Used by Commit on Sharing (COS) code only for now. * * \param lock A lock to convert * \param new_mode new lock mode */ -void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) +void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) { ENTRY; @@ -2569,137 +2556,7 @@ void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) EXIT; } -EXPORT_SYMBOL(ldlm_lock_downgrade); - -/** - * Attempt to convert already granted lock to a different mode. - * - * While lock conversion is not currently used, future client-side - * optimizations could take advantage of it to avoid discarding cached - * pages on a file. - */ -struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, - enum ldlm_mode new_mode, __u32 *flags) -{ - struct list_head rpc_list; - struct ldlm_resource *res; - struct ldlm_namespace *ns; - int granted = 0; -#ifdef HAVE_SERVER_SUPPORT - int old_mode; - struct sl_insert_point prev; -#endif - struct ldlm_interval *node; - ENTRY; - - INIT_LIST_HEAD(&rpc_list); - /* Just return if mode is unchanged. */ - if (new_mode == lock->l_granted_mode) { - *flags |= LDLM_FL_BLOCK_GRANTED; - RETURN(lock->l_resource); - } - - /* I can't check the type of lock here because the bitlock of lock - * is not held here, so do the allocation blindly. -jay */ - OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS); - if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */ - RETURN(NULL); - - LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR), - "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); - - lock_res_and_lock(lock); - - res = lock->l_resource; - ns = ldlm_res_to_ns(res); - -#ifdef HAVE_SERVER_SUPPORT - old_mode = lock->l_req_mode; -#endif - lock->l_req_mode = new_mode; - if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) { -#ifdef HAVE_SERVER_SUPPORT - /* remember the lock position where the lock might be - * added back to the granted list later and also - * remember the join mode for skiplist fixing. */ - prev.res_link = lock->l_res_link.prev; - prev.mode_link = lock->l_sl_mode.prev; - prev.policy_link = lock->l_sl_policy.prev; -#endif - ldlm_resource_unlink_lock(lock); - } else { - ldlm_resource_unlink_lock(lock); - if (res->lr_type == LDLM_EXTENT) { - /* FIXME: ugly code, I have to attach the lock to a - * interval node again since perhaps it will be granted - * soon */ - INIT_LIST_HEAD(&node->li_group); - ldlm_interval_attach(node, lock); - node = NULL; - } - } - - /* - * Remove old lock from the pool before adding the lock with new - * mode below in ->policy() - */ - ldlm_pool_del(&ns->ns_pool, lock); - - /* If this is a local resource, put it on the appropriate list. */ - if (ns_is_client(ldlm_res_to_ns(res))) { - if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) { - ldlm_resource_add_lock(res, &res->lr_converting, lock); - } else { - /* This should never happen, because of the way the - * server handles conversions. */ - LDLM_ERROR(lock, "Erroneous flags %x on local lock\n", - *flags); - LBUG(); - - ldlm_grant_lock(lock, &rpc_list); - granted = 1; - /* FIXME: completion handling not with lr_lock held ! */ - if (lock->l_completion_ast) - lock->l_completion_ast(lock, 0, NULL); - } -#ifdef HAVE_SERVER_SUPPORT - } else { - int rc; - enum ldlm_error err; - __u64 pflags = 0; - ldlm_processing_policy policy; - - policy = ldlm_processing_policy_table[res->lr_type]; - rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err, - &rpc_list); - if (rc == LDLM_ITER_STOP) { - lock->l_req_mode = old_mode; - if (res->lr_type == LDLM_EXTENT) - ldlm_extent_add_lock(res, lock); - else - ldlm_granted_list_add_lock(lock, &prev); - - res = NULL; - } else { - *flags |= LDLM_FL_BLOCK_GRANTED; - granted = 1; - } - } -#else - } else { - CERROR("This is client-side-only module, cannot handle " - "LDLM_NAMESPACE_SERVER resource type lock.\n"); - LBUG(); - } -#endif - unlock_res_and_lock(lock); - - if (granted) - ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); - if (node) - OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); - RETURN(res); -} +EXPORT_SYMBOL(ldlm_lock_mode_downgrade); /** * Print lock with lock handle \a lockh description into debug log. diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 839349b..7f6191b 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1461,101 +1461,12 @@ existing_lock: } /** - * Old-style LDLM main entry point for server code enqueue. - */ -int ldlm_handle_enqueue(struct ptlrpc_request *req, - ldlm_completion_callback completion_callback, - ldlm_blocking_callback blocking_callback, - ldlm_glimpse_callback glimpse_callback) -{ - struct ldlm_request *dlm_req; - struct ldlm_callback_suite cbs = { - .lcs_completion = completion_callback, - .lcs_blocking = blocking_callback, - .lcs_glimpse = glimpse_callback - }; - int rc; - - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req != NULL) { - rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace, - req, dlm_req, &cbs); - } else { - rc = -EFAULT; - } - return rc; -} - -/** * Main LDLM entry point for server code to process lock conversion requests. */ int ldlm_handle_convert0(struct ptlrpc_request *req, const struct ldlm_request *dlm_req) { - struct ldlm_reply *dlm_rep; - struct ldlm_lock *lock; - int rc; - ENTRY; - - if (req->rq_export && req->rq_export->exp_nid_stats && - req->rq_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats, - LDLM_CONVERT - LDLM_FIRST_OPC); - - rc = req_capsule_server_pack(&req->rq_pill); - if (rc) - RETURN(rc); - - dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - dlm_rep->lock_flags = dlm_req->lock_flags; - - lock = ldlm_handle2lock(&dlm_req->lock_handle[0]); - if (!lock) { - req->rq_status = LUSTRE_EINVAL; - } else { - void *res = NULL; - - LDLM_DEBUG(lock, "server-side convert handler START"); - - res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, - &dlm_rep->lock_flags); - if (res) { - if (ldlm_del_waiting_lock(lock)) - LDLM_DEBUG(lock, "converted waiting lock"); - req->rq_status = 0; - } else { - req->rq_status = LUSTRE_EDEADLK; - } - } - - if (lock) { - if (!req->rq_status) - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "server-side convert handler END"); - LDLM_LOCK_PUT(lock); - } else - LDLM_DEBUG_NOLOCK("server-side convert handler END"); - - RETURN(0); -} - -/** - * Old-style main LDLM entry point for server code to process lock conversion - * requests. - */ -int ldlm_handle_convert(struct ptlrpc_request *req) -{ - int rc; - struct ldlm_request *dlm_req; - - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req != NULL) { - rc = ldlm_handle_convert0(req, dlm_req); - } else { - CERROR ("Can't unpack dlm_req\n"); - rc = -EFAULT; - } - return rc; + RETURN(-ENOTSUPP); } /** diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 7cdf870..247c712 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -135,7 +135,6 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags, LASSERT(lock->l_granted_mode != lock->l_req_mode); check_res_locked(res); - LASSERT(list_empty(&res->lr_converting)); INIT_LIST_HEAD(&rpc_list); if (intention == LDLM_PROCESS_RESCAN) { diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index c0117d2..1fc6b1a 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -242,8 +242,6 @@ EXPORT_SYMBOL(ldlm_completion_ast_async); * * - to force all locks when resource is destroyed (cleanup_resource()); * - * - during lock conversion (not used currently). - * * If lock is not granted in the first case, this function waits until second * or penultimate cases happen in some other thread. * @@ -1038,105 +1036,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, } EXPORT_SYMBOL(ldlm_cli_enqueue); -static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, - __u32 *flags) -{ - struct ldlm_resource *res; - int rc; - ENTRY; - if (ns_is_client(ldlm_lock_to_ns(lock))) { - CERROR("Trying to cancel local lock\n"); - LBUG(); - } - LDLM_DEBUG(lock, "client-side local convert"); - - res = ldlm_lock_convert(lock, new_mode, flags); - if (res) { - ldlm_reprocess_all(res); - rc = 0; - } else { - rc = LUSTRE_EDEADLK; - } - LDLM_DEBUG(lock, "client-side local convert handler END"); - LDLM_LOCK_PUT(lock); - RETURN(rc); -} - -/* FIXME: one of ldlm_cli_convert or the server side should reject attempted - * conversion of locks which are on the waiting or converting queue */ -/* Caller of this code is supposed to take care of lock readers/writers - accounting */ -int ldlm_cli_convert(const struct lustre_handle *lockh, int new_mode, - __u32 *flags) -{ - struct ldlm_request *body; - struct ldlm_reply *reply; - struct ldlm_lock *lock; - struct ldlm_resource *res; - struct ptlrpc_request *req; - int rc; - ENTRY; - - lock = ldlm_handle2lock(lockh); - if (!lock) { - LBUG(); - RETURN(-EINVAL); - } - *flags = 0; - - if (lock->l_conn_export == NULL) - RETURN(ldlm_cli_convert_local(lock, new_mode, flags)); - - LDLM_DEBUG(lock, "client-side convert"); - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export), - &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION, - LDLM_CONVERT); - if (req == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } - - body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - body->lock_handle[0] = lock->l_remote_handle; - - body->lock_desc.l_req_mode = new_mode; - body->lock_flags = ldlm_flags_to_wire(*flags); - - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc != ELDLM_OK) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - if (req->rq_status) - GOTO(out, rc = req->rq_status); - - res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); - if (res != NULL) { - ldlm_reprocess_all(res); - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - if (lock->l_completion_ast) { - rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, - NULL); - if (rc) - GOTO(out, rc); - } - } else { - rc = LUSTRE_EDEADLK; - } - EXIT; - out: - LDLM_LOCK_PUT(lock); - ptlrpc_req_finished(req); - return rc; -} - /** * Cancel locks locally. * Returns: @@ -2126,41 +2025,34 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, /* Lock iterators. */ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, - void *closure) + void *closure) { struct list_head *tmp, *next; - struct ldlm_lock *lock; - int rc = LDLM_ITER_CONTINUE; + struct ldlm_lock *lock; + int rc = LDLM_ITER_CONTINUE; - ENTRY; + ENTRY; - if (!res) - RETURN(LDLM_ITER_CONTINUE); + if (!res) + RETURN(LDLM_ITER_CONTINUE); - lock_res(res); + lock_res(res); list_for_each_safe(tmp, next, &res->lr_granted) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (iter(lock, closure) == LDLM_ITER_STOP) - GOTO(out, rc = LDLM_ITER_STOP); - } - - list_for_each_safe(tmp, next, &res->lr_converting) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - - if (iter(lock, closure) == LDLM_ITER_STOP) - GOTO(out, rc = LDLM_ITER_STOP); - } + if (iter(lock, closure) == LDLM_ITER_STOP) + GOTO(out, rc = LDLM_ITER_STOP); + } list_for_each_safe(tmp, next, &res->lr_waiting) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (iter(lock, closure) == LDLM_ITER_STOP) - GOTO(out, rc = LDLM_ITER_STOP); - } - out: - unlock_res(res); - RETURN(rc); + if (iter(lock, closure) == LDLM_ITER_STOP) + GOTO(out, rc = LDLM_ITER_STOP); + } +out: + unlock_res(res); + RETURN(rc); } struct iter_helper_data { @@ -2319,28 +2211,23 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) RETURN(0); } - /* - * If granted mode matches the requested mode, this lock is granted. - * - * If they differ, but we have a granted mode, then we were granted - * one mode and now want another: ergo, converting. - * - * If we haven't been granted anything and are on a resource list, - * then we're blocked/waiting. - * - * If we haven't been granted anything and we're NOT on a resource list, - * then we haven't got a reply yet and don't have a known disposition. - * This happens whenever a lock enqueue is the request that triggers - * recovery. - */ - if (lock->l_granted_mode == lock->l_req_mode) - flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED; - else if (lock->l_granted_mode) - flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV; + /* + * If granted mode matches the requested mode, this lock is granted. + * + * If we haven't been granted anything and are on a resource list, + * then we're blocked/waiting. + * + * If we haven't been granted anything and we're NOT on a resource list, + * then we haven't got a reply yet and don't have a known disposition. + * This happens whenever a lock enqueue is the request that triggers + * recovery. + */ + if (lock->l_granted_mode == lock->l_req_mode) + flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED; else if (!list_empty(&lock->l_res_link)) - flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT; - else - flags = LDLM_FL_REPLAY; + flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT; + else + flags = LDLM_FL_REPLAY; req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index d0e6a29..8241dc8 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1072,14 +1072,13 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, static int ldlm_resource_clean(struct cfs_hash *hs, struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg) { - struct ldlm_resource *res = cfs_hash_object(hs, hnode); + struct ldlm_resource *res = cfs_hash_object(hs, hnode); __u64 flags = *(__u64 *)arg; - cleanup_resource(res, &res->lr_granted, flags); - cleanup_resource(res, &res->lr_converting, flags); - cleanup_resource(res, &res->lr_waiting, flags); + cleanup_resource(res, &res->lr_granted, flags); + cleanup_resource(res, &res->lr_waiting, flags); - return 0; + return 0; } static int ldlm_resource_complain(struct cfs_hash *hs, struct cfs_hash_bd *bd, @@ -1371,7 +1370,6 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) } INIT_LIST_HEAD(&res->lr_granted); - INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); atomic_set(&res->lr_refcount, 1); @@ -1484,28 +1482,23 @@ struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res) static void __ldlm_resource_putref_final(struct cfs_hash_bd *bd, struct ldlm_resource *res) { - struct ldlm_ns_bucket *nsb = res->lr_ns_bucket; + struct ldlm_ns_bucket *nsb = res->lr_ns_bucket; if (!list_empty(&res->lr_granted)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } - - if (!list_empty(&res->lr_converting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } if (!list_empty(&res->lr_waiting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } - cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash, - bd, &res->lr_hash); - lu_ref_fini(&res->lr_reference); - if (cfs_hash_bd_count_get(bd) == 0) - ldlm_namespace_put(nsb->nsb_namespace); + cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash, + bd, &res->lr_hash); + lu_ref_fini(&res->lr_reference); + if (cfs_hash_bd_count_get(bd) == 0) + ldlm_namespace_put(nsb->nsb_namespace); } /* Returns 1 if the resource was freed, 0 if it remains. */ @@ -1688,15 +1681,11 @@ void ldlm_resource_dump(int level, struct ldlm_resource *res) } } } - if (!list_empty(&res->lr_converting)) { - CDEBUG(level, "Converting locks:\n"); - list_for_each_entry(lock, &res->lr_converting, l_res_link) - LDLM_DEBUG_LIMIT(level, lock, "###"); - } + if (!list_empty(&res->lr_waiting)) { - CDEBUG(level, "Waiting locks:\n"); + CDEBUG(level, "Waiting locks:\n"); list_for_each_entry(lock, &res->lr_waiting, l_res_link) - LDLM_DEBUG_LIMIT(level, lock, "###"); - } + LDLM_DEBUG_LIMIT(level, lock, "###"); + } } EXPORT_SYMBOL(ldlm_resource_dump); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index b07cdc1..2c1c530 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3023,7 +3023,7 @@ void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, "state %p transno %lld\n", req, req->rq_reply_state, req->rq_transno); if (cos) { - ldlm_lock_downgrade(lock, LCK_COS); + ldlm_lock_mode_downgrade(lock, LCK_COS); mode = LCK_COS; } if (req->rq_export->exp_disconnected) diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 707f079..5c94c4f 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -2249,7 +2249,7 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs) while (nlocks-- > 0) { lock = ack_locks[nlocks]; - ldlm_lock_downgrade(lock, LCK_COS); + ldlm_lock_mode_downgrade(lock, LCK_COS); LDLM_LOCK_PUT(lock); } RETURN(0); -- 1.8.3.1