From 541902a3f934d0e68368d9698cef38d44c473527 Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Mon, 7 May 2018 23:36:55 +0300 Subject: [PATCH] LU-10175 ldlm: handle lock converts in cancel handler - Use cancel portals and high-priority handling for lock converts. Update ldlm_cancel_handler to understand LDLM_CONVERT RPC for that. - Use ns_dirty_age_limit for lock convert - don't convert too old locks. - Check for empty converts and skip such Signed-off-by: Mikhail Pershin Change-Id: I767626acd974ad88bbbf0bb3b0a46744c45b7897 Reviewed-on: https://review.whamcloud.com/32314 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Fan Yong Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/lustre_export.h | 5 + lustre/include/uapi/linux/lustre/lustre_idl.h | 3 +- lustre/ldlm/ldlm_inodebits.c | 18 +- lustre/ldlm/ldlm_lockd.c | 237 ++++++++++++++------------ lustre/ldlm/ldlm_request.c | 38 ++++- lustre/llite/llite_lib.c | 2 +- lustre/llite/namei.c | 7 +- 7 files changed, 188 insertions(+), 122 deletions(-) diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 6c5a08a..5ead593 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -450,6 +450,11 @@ static inline int exp_connect_flr(struct obd_export *exp) return !!(exp_connect_flags2(exp) & OBD_CONNECT2_FLR); } +static inline int exp_connect_lock_convert(struct obd_export *exp) +{ + return !!(exp_connect_flags2(exp) & OBD_CONNECT2_LOCK_CONVERT); +} + extern struct obd_export *class_conn2export(struct lustre_handle *conn); extern struct obd_device *class_conn2obd(struct lustre_handle *conn); diff --git a/lustre/include/uapi/linux/lustre/lustre_idl.h b/lustre/include/uapi/linux/lustre/lustre_idl.h index 1c821a0..7999816 100644 --- a/lustre/include/uapi/linux/lustre/lustre_idl.h +++ b/lustre/include/uapi/linux/lustre/lustre_idl.h @@ -859,7 +859,8 @@ struct ptlrpc_body_v2 { OBD_CONNECT_GRANT_PARAM | \ OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2) -#define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR) +#define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR | \ ++ OBD_CONNECT2_LOCK_CONVERT) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 626c624..c5d5d5a 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -325,7 +325,7 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop) /* Just return if there are no conflicting bits */ if ((lock->l_policy_data.l_inodebits.bits & to_drop) == 0) { - LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx\n", + LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx", lock->l_policy_data.l_inodebits.bits, to_drop); /* nothing to do */ RETURN(0); @@ -356,13 +356,19 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits) ldlm_lock2handle(lock, &lockh); lock_res_and_lock(lock); - /* check if all bits are cancelled */ + /* check if all bits are blocked */ if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) { unlock_res_and_lock(lock); /* return error to continue with cancel */ GOTO(exit, rc = -EINVAL); } + /* check if no common bits, consider this as successful convert */ + if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) { + unlock_res_and_lock(lock); + GOTO(exit, rc = 0); + } + /* check if there is race with cancel */ if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) { unlock_res_and_lock(lock); @@ -408,9 +414,11 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits) rc = ldlm_cli_convert(lock, &flags); if (rc) { lock_res_and_lock(lock); - ldlm_clear_converting(lock); - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); + if (ldlm_is_converting(lock)) { + ldlm_clear_converting(lock); + ldlm_set_cbpending(lock); + ldlm_set_bl_ast(lock); + } unlock_res_and_lock(lock); GOTO(exit, rc); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 92ee0c8..3273fdc 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1494,16 +1494,16 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, if (ldlm_is_cancel(lock)) { LDLM_ERROR(lock, "convert on canceled lock!"); - req->rq_status = LUSTRE_EINVAL; + rc = ELDLM_NO_LOCK_DATA; } else if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) { LDLM_ERROR(lock, "lock mode differs!"); - req->rq_status = LUSTRE_EINVAL; + rc = ELDLM_NO_LOCK_DATA; } else if (bits == new) { /* This can be valid situation if CONVERT RPCs are - * re-ordered. Just finish silently*/ + * re-ordered. Just finish silently */ LDLM_DEBUG(lock, "lock is converted already!"); - req->rq_status = 0; + rc = ELDLM_OK; } else { lock_res_and_lock(lock); if (ldlm_is_waited(lock)) @@ -1515,21 +1515,28 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, lock->l_bl_ast_run = 0; ldlm_clear_ast_sent(lock); unlock_res_and_lock(lock); - req->rq_status = 0; + ldlm_reprocess_all(lock->l_resource); + rc = ELDLM_OK; + } + + if (rc == ELDLM_OK) { dlm_rep->lock_handle = lock->l_remote_handle; ldlm_ibits_policy_local_to_wire(&lock->l_policy_data, &dlm_rep->lock_desc.l_policy_data); - - ldlm_reprocess_all(lock->l_resource); } - LDLM_DEBUG(lock, "server-side convert handler END"); + + LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", + rc); LDLM_LOCK_PUT(lock); } else { - req->rq_status = LUSTRE_EINVAL; - LDLM_DEBUG_NOLOCK("server-side convert handler END"); + rc = ELDLM_NO_LOCK_DATA; + LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d", + rc); } + req->rq_status = rc; + RETURN(0); } @@ -2234,145 +2241,163 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) */ static int ldlm_cancel_handler(struct ptlrpc_request *req) { - int rc; - ENTRY; + int rc; - /* Requests arrive in sender's byte order. The ptlrpc service - * handler has already checked and, if necessary, byte-swapped the - * incoming request message body, but I am responsible for the - * message buffers. */ + ENTRY; - req_capsule_init(&req->rq_pill, req, RCL_SERVER); + /* Requests arrive in sender's byte order. The ptlrpc service + * handler has already checked and, if necessary, byte-swapped the + * incoming request message body, but I am responsible for the + * message buffers. */ - if (req->rq_export == NULL) { - struct ldlm_request *dlm_req; + req_capsule_init(&req->rq_pill, req, RCL_SERVER); - CERROR("%s from %s arrived at %lu with bad export cookie " - "%llu\n", - ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), - libcfs_nid2str(req->rq_peer.nid), - req->rq_arrival_time.tv_sec, - lustre_msg_get_handle(req->rq_reqmsg)->cookie); - - if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { - req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); - dlm_req = req_capsule_client_get(&req->rq_pill, - &RMF_DLM_REQ); - if (dlm_req != NULL) - ldlm_lock_dump_handle(D_ERROR, - &dlm_req->lock_handle[0]); - } - ldlm_callback_reply(req, -ENOTCONN); - RETURN(0); - } + if (req->rq_export == NULL) { + struct ldlm_request *dlm_req; - switch (lustre_msg_get_opc(req->rq_reqmsg)) { + CERROR("%s from %s arrived at %lu with bad export cookie " + "%llu\n", + ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), + libcfs_nid2str(req->rq_peer.nid), + req->rq_arrival_time.tv_sec, + lustre_msg_get_handle(req->rq_reqmsg)->cookie); + + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); + dlm_req = req_capsule_client_get(&req->rq_pill, + &RMF_DLM_REQ); + if (dlm_req != NULL) + ldlm_lock_dump_handle(D_ERROR, + &dlm_req->lock_handle[0]); + } + ldlm_callback_reply(req, -ENOTCONN); + RETURN(0); + } - /* XXX FIXME move this back to mds/handler.c, bug 249 */ - case LDLM_CANCEL: - req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); - CDEBUG(D_INODE, "cancel\n"); + switch (lustre_msg_get_opc(req->rq_reqmsg)) { + /* XXX FIXME move this back to mds/handler.c, bug 249 */ + case LDLM_CANCEL: + req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); + CDEBUG(D_INODE, "cancel\n"); if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) || CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) || CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT)) RETURN(0); - rc = ldlm_handle_cancel(req); - if (rc) - break; - RETURN(0); - default: - CERROR("invalid opcode %d\n", - lustre_msg_get_opc(req->rq_reqmsg)); - req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); - ldlm_callback_reply(req, -EINVAL); - } + rc = ldlm_handle_cancel(req); + break; + case LDLM_CONVERT: + { + struct ldlm_request *dlm_req; - RETURN(0); + req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT); + CDEBUG(D_INODE, "convert\n"); + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) { + CDEBUG(D_INFO, "bad request buffer for cancel\n"); + rc = ldlm_callback_reply(req, -EPROTO); + } else { + req->rq_status = ldlm_handle_convert0(req, dlm_req); + rc = ptlrpc_reply(req); + } + break; + } + default: + CERROR("invalid opcode %d\n", + lustre_msg_get_opc(req->rq_reqmsg)); + req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); + rc = ldlm_callback_reply(req, -EINVAL); + } + + RETURN(rc); } static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, - struct ldlm_lock *lock) + struct ldlm_lock *lock) { - struct ldlm_request *dlm_req; - struct lustre_handle lockh; - int rc = 0; - int i; - ENTRY; - - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req == NULL) - RETURN(0); + struct ldlm_request *dlm_req; + struct lustre_handle lockh; + int rc = 0; + int i; - ldlm_lock2handle(lock, &lockh); - for (i = 0; i < dlm_req->lock_count; i++) { - if (lustre_handle_equal(&dlm_req->lock_handle[i], - &lockh)) { - DEBUG_REQ(D_RPCTRACE, req, - "Prio raised by lock %#llx.", lockh.cookie); + ENTRY; - rc = 1; - break; - } - } + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(0); - RETURN(rc); + ldlm_lock2handle(lock, &lockh); + for (i = 0; i < dlm_req->lock_count; i++) { + if (lustre_handle_equal(&dlm_req->lock_handle[i], + &lockh)) { + DEBUG_REQ(D_RPCTRACE, req, + "Prio raised by lock %#llx.", lockh.cookie); + rc = 1; + break; + } + } + RETURN(rc); } static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) { - struct ldlm_request *dlm_req; - int rc = 0; - int i; - ENTRY; + struct ldlm_request *dlm_req; + int rc = 0; + int i; - /* no prolong in recovery */ - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) - RETURN(0); + ENTRY; - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req == NULL) - RETURN(-EFAULT); + /* no prolong in recovery */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); - for (i = 0; i < dlm_req->lock_count; i++) { - struct ldlm_lock *lock; + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(-EFAULT); - lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); - if (lock == NULL) - continue; + for (i = 0; i < dlm_req->lock_count; i++) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (lock == NULL) + continue; rc = ldlm_is_ast_sent(lock) ? 1 : 0; - if (rc) - LDLM_DEBUG(lock, "hpreq cancel lock"); - LDLM_LOCK_PUT(lock); + if (rc) + LDLM_DEBUG(lock, "hpreq cancel/convert lock"); + LDLM_LOCK_PUT(lock); - if (rc) - break; - } + if (rc) + break; + } - RETURN(rc); + RETURN(rc); } static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = { - .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, + .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, .hpreq_check = ldlm_cancel_hpreq_check, .hpreq_fini = NULL, }; static int ldlm_hpreq_handler(struct ptlrpc_request *req) { - ENTRY; + ENTRY; - req_capsule_init(&req->rq_pill, req, RCL_SERVER); + req_capsule_init(&req->rq_pill, req, RCL_SERVER); - if (req->rq_export == NULL) - RETURN(0); + if (req->rq_export == NULL) + RETURN(0); - if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) { - req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); - req->rq_ops = &ldlm_cancel_hpreq_ops; - } - RETURN(0); + if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); + req->rq_ops = &ldlm_cancel_hpreq_ops; + } else if (LDLM_CONVERT == lustre_msg_get_opc(req->rq_reqmsg)) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT); + req->rq_ops = &ldlm_cancel_hpreq_ops; + } + RETURN(0); } static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 07520c33..36bc26e 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1073,7 +1073,7 @@ static int lock_convert_interpret(const struct lu_env *env, aa->lock_handle.cookie, reply->lock_handle.cookie, req->rq_export->exp_client_uuid.uuid, libcfs_id2str(req->rq_peer)); - GOTO(out, rc = -ESTALE); + GOTO(out, rc = ELDLM_NO_LOCK_DATA); } lock_res_and_lock(lock); @@ -1122,15 +1122,29 @@ static int lock_convert_interpret(const struct lu_env *env, unlock_res_and_lock(lock); out: if (rc) { + int flag; + lock_res_and_lock(lock); if (ldlm_is_converting(lock)) { ldlm_clear_converting(lock); ldlm_set_cbpending(lock); ldlm_set_bl_ast(lock); + lock->l_policy_data.l_inodebits.cancel_bits = 0; } unlock_res_and_lock(lock); - } + /* fallback to normal lock cancel. If rc means there is no + * valid lock on server, do only local cancel */ + if (rc == ELDLM_NO_LOCK_DATA) + flag = LCF_LOCAL; + else + flag = LCF_ASYNC; + + rc = ldlm_cli_cancel(&aa->lock_handle, flag); + if (rc < 0) + LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n", + rc); + } LDLM_LOCK_PUT(lock); RETURN(rc); } @@ -1161,6 +1175,15 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) RETURN(-EINVAL); } + /* this is better to check earlier and it is done so already, + * but this check is kept too as final one to issue an error + * if any new code will miss such check. + */ + if (!exp_connect_lock_convert(exp)) { + LDLM_ERROR(lock, "server doesn't support lock convert\n"); + RETURN(-EPROTO); + } + if (lock->l_resource->lr_type != LDLM_IBITS) { LDLM_ERROR(lock, "convert works with IBITS locks only."); RETURN(-EINVAL); @@ -1189,13 +1212,12 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) ptlrpc_request_set_replen(req); - /* That could be useful to use cancel portals for convert as well - * as high-priority handling. This will require changes in - * ldlm_cancel_handler to understand convert RPC as well. - * - * req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; - * req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; + /* + * Use cancel portals for convert as well as high-priority handling. */ + req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; + req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; + ptlrpc_at_set_req_timeout(req); if (exp->exp_obd->obd_svc_stats != NULL) diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index a56ffb5..5d06edc 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -224,7 +224,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_GRANT_PARAM | OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2; - data->ocd_connect_flags2 = OBD_CONNECT2_FLR; + data->ocd_connect_flags2 = OBD_CONNECT2_FLR | OBD_CONNECT2_LOCK_CONVERT; #ifdef HAVE_LRU_RESIZE_SUPPORT if (sbi->ll_flags & LL_SBI_LRU_RESIZE) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 0e098a3..8118a92 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -374,11 +374,16 @@ void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel) * that convert is really needed. */ int ll_md_need_convert(struct ldlm_lock *lock) { + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); struct inode *inode; __u64 wanted = lock->l_policy_data.l_inodebits.cancel_bits; __u64 bits = lock->l_policy_data.l_inodebits.bits & ~wanted; enum ldlm_mode mode = LCK_MINMODE; + if (!lock->l_conn_export || + !exp_connect_lock_convert(lock->l_conn_export)) + return 0; + if (!wanted || !bits || ldlm_is_cancel(lock)) return 0; @@ -411,7 +416,7 @@ int ll_md_need_convert(struct ldlm_lock *lock) lock_res_and_lock(lock); if (ktime_after(ktime_get(), ktime_add(lock->l_last_used, - ktime_set(10, 0)))) { + ktime_set(ns->ns_dirty_age_limit, 0)))) { unlock_res_and_lock(lock); return 0; } -- 1.8.3.1