From 0716f5a9d98a4fa299b2cfc7cfee236313e3dbcc Mon Sep 17 00:00:00 2001 From: Andreas Dilger Date: Fri, 13 Mar 2020 20:22:01 -0600 Subject: [PATCH] LU-12506 mdc: clean up code style for mdc_locks.c Clean up code style in mdc_locks.c (whitespace, strings) Run a useful test while we are at it. Test-Parameters: trivial testlist=runtests mdscount=10 mdtcount=80 Signed-off-by: Andreas Dilger Change-Id: I3e3e0a6a4712e8bc85c93c0a182afa00433ebbe5 Reviewed-on: https://review.whamcloud.com/37917 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Arshad Hussain Reviewed-by: Shaun Tancheff Reviewed-by: Hongchao Zhang Reviewed-by: Oleg Drokin --- lustre/mdc/mdc_locks.c | 518 +++++++++++++++++++++++++------------------------ 1 file changed, 264 insertions(+), 254 deletions(-) diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index b6b1c1c..10c1f2c 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -91,7 +91,7 @@ int it_open_error(int phase, struct lookup_intent *it) CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status); LBUG(); - return 0; + return 0; } EXPORT_SYMBOL(it_open_error); @@ -101,36 +101,36 @@ int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh, { struct ldlm_lock *lock; struct inode *new_inode = data; - ENTRY; - if(bits) - *bits = 0; + ENTRY; + if (bits) + *bits = 0; if (!lustre_handle_is_used(lockh)) RETURN(0); lock = ldlm_handle2lock(lockh); - LASSERT(lock != NULL); - lock_res_and_lock(lock); + LASSERT(lock != NULL); + lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_inode && lock->l_resource->lr_lvb_inode != data) { struct inode *old_inode = lock->l_resource->lr_lvb_inode; + LASSERTF(old_inode->i_state & I_FREEING, - "Found existing inode %p/%lu/%u state %lu in lock: " - "setting data to %p/%lu/%u\n", old_inode, - old_inode->i_ino, old_inode->i_generation, + "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n", + old_inode, old_inode->i_ino, old_inode->i_generation, old_inode->i_state, new_inode, new_inode->i_ino, new_inode->i_generation); } lock->l_resource->lr_lvb_inode = new_inode; - if (bits) - *bits = lock->l_policy_data.l_inodebits.bits; + if (bits) + *bits = lock->l_policy_data.l_inodebits.bits; - unlock_res_and_lock(lock); - LDLM_LOCK_PUT(lock); + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); - RETURN(0); + RETURN(0); } enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags, @@ -140,8 +140,8 @@ enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags, { struct ldlm_res_id res_id; enum ldlm_mode rc; - ENTRY; + ENTRY; fid_build_reg_res_name(fid, &res_id); /* LU-4405: Clear bits not supported by server */ policy->l_inodebits.bits &= exp_connect_ibits(exp); @@ -159,7 +159,6 @@ int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, int rc; ENTRY; - fid_build_reg_res_name(fid, &res_id); rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, policy, mode, flags, opaque); @@ -172,8 +171,8 @@ int mdc_null_inode(struct obd_export *exp, struct ldlm_res_id res_id; struct ldlm_resource *res; struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace; - ENTRY; + ENTRY; LASSERTF(ns != NULL, "no namespace passed\n"); fid_build_reg_res_name(fid, &res_id); @@ -210,15 +209,15 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * original request doesn't need this buffer (at most it sends just the * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty * buffer and may also be difficult to allocate and save a very large - * request buffer for each open. (bug 5707) + * request buffer for each open. (b=5707) * * OOM here may cause recovery failure if lmm is needed (only for the * original open if the MDS crashed just when this client also OOM'd) * but this is incredibly unlikely, and questionable whether the client - * could do MDS recovery under OOM anyways... */ + * could do MDS recovery under OOM anyways... + */ int mdc_save_lovea(struct ptlrpc_request *req, - const struct req_msg_field *field, - void *data, u32 size) + const struct req_msg_field *field, void *data, u32 size) { struct req_capsule *pill = &req->rq_pill; struct lov_user_md *lmm; @@ -261,8 +260,8 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, LIST_HEAD(cancels); int count = 0; enum ldlm_mode mode; - int rc; int repsize, repsize_estimate; + int rc; ENTRY; @@ -293,24 +292,24 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, MDS_INODELOCK_OPEN); } - /* If CREATE, cancel parent's UPDATE lock. */ - if (it->it_op & IT_CREAT) - mode = LCK_EX; - else - mode = LCK_CR; - count += mdc_resource_get_unused(exp, &op_data->op_fid1, - &cancels, mode, - MDS_INODELOCK_UPDATE); - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_INTENT_OPEN); - if (req == NULL) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); - RETURN(ERR_PTR(-ENOMEM)); - } - - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - op_data->op_namelen + 1); + /* If CREATE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, mode, + MDS_INODELOCK_UPDATE); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_OPEN); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(ERR_PTR(-ENOMEM)); + } + + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); if (cl_is_lov_delay_create(it->it_flags)) { /* open(O_LOV_DELAY_CREATE) won't pack lmm */ LASSERT(lmmsize == 0); @@ -347,13 +346,13 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, req->rq_replay = req->rq_import->imp_replayable; spin_unlock(&req->rq_lock); - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = (__u64)it->it_op; + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; - /* pack the intended request */ - mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm, - lmmsize); + /* pack the intended request */ + mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm, + lmmsize); req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, mdt_md_capsule_size); @@ -424,23 +423,21 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it, RETURN(req); } -#define GA_DEFAULT_EA_NAME_LEN 20 -#define GA_DEFAULT_EA_VAL_LEN 250 -#define GA_DEFAULT_EA_NUM 10 +#define GA_DEFAULT_EA_NAME_LEN 20 +#define GA_DEFAULT_EA_VAL_LEN 250 +#define GA_DEFAULT_EA_NUM 10 static struct ptlrpc_request * -mdc_intent_getxattr_pack(struct obd_export *exp, - struct lookup_intent *it, +mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it, struct md_op_data *op_data) { - struct ptlrpc_request *req; - struct ldlm_intent *lit; - int rc, count = 0; + struct ptlrpc_request *req; + struct ldlm_intent *lit; + int rc, count = 0; LIST_HEAD(cancels); u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM; ENTRY; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT_GETXATTR); if (req == NULL) @@ -469,14 +466,14 @@ mdc_intent_getxattr_pack(struct obd_export *exp, exp->exp_obd->obd_name, PFID(&op_data->op_fid1)); #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) - /* If the supplied buffer is too small then the server will - * return -ERANGE and llite will fallback to using non cached - * xattr operations. On servers before 2.10.1 a (non-cached) - * listxattr RPC for an orphan or dead file causes an oops. So - * let's try to avoid sending too small a buffer to too old a - * server. This is effectively undoing the memory conservation - * of LU-9417 when it would be *more* likely to crash the - * server. See LU-9856. */ + /* If the supplied buffer is too small then the server will return + * -ERANGE and llite will fallback to using non cached xattr + * operations. On servers before 2.10.1 a (non-cached) listxattr RPC + * for an orphan or dead file causes an oops. So let's try to avoid + * sending too small a buffer to too old a server. This is effectively + * undoing the memory conservation of LU-9417 when it would be *more* + * likely to crash the server. See LU-9856. + */ if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0)) ea_vals_buf_size = max_t(u32, ea_vals_buf_size, exp->exp_connect_data.ocd_max_easize); @@ -520,7 +517,6 @@ mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, int rc; ENTRY; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT_GETATTR); if (req == NULL) @@ -547,7 +543,7 @@ mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, RETURN(ERR_PTR(rc)); } - /* pack the intent */ + /* pack the intent */ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = (__u64)it->it_op; @@ -588,14 +584,14 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct lookup_intent *it, struct md_op_data *op_data) { - struct obd_device *obd = class_exp2obd(exp); - LIST_HEAD(cancels); + struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; - struct ldlm_intent *lit; - struct layout_intent *layout; + struct ldlm_intent *lit; + struct layout_intent *layout; + LIST_HEAD(cancels); int count = 0, rc; - ENTRY; + ENTRY; req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT_LAYOUT); if (req == NULL) @@ -631,68 +627,66 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, RETURN(req); } -static struct ptlrpc_request * -mdc_enqueue_pack(struct obd_export *exp, int lvb_len) +static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp, + int lvb_len) { - struct ptlrpc_request *req; - int rc; - ENTRY; + struct ptlrpc_request *req; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); - if (req == NULL) - RETURN(ERR_PTR(-ENOMEM)); + ENTRY; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(ERR_PTR(rc)); - } + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); - ptlrpc_request_set_replen(req); - RETURN(req); + ptlrpc_request_set_replen(req); + RETURN(req); } static int mdc_finish_enqueue(struct obd_export *exp, - struct ptlrpc_request *req, - struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, - struct lustre_handle *lockh, - int rc) + struct ptlrpc_request *req, + struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, + struct lustre_handle *lockh, int rc) { - struct req_capsule *pill = &req->rq_pill; + struct req_capsule *pill = &req->rq_pill; struct ldlm_request *lockreq; - struct ldlm_reply *lockrep; - struct ldlm_lock *lock; - struct mdt_body *body = NULL; - void *lvb_data = NULL; - __u32 lvb_len = 0; - - ENTRY; - - LASSERT(rc >= 0); - /* Similarly, if we're going to replay this request, we don't want to - * actually get a lock, just perform the intent. */ - if (req->rq_transno || req->rq_replay) { - lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); + struct ldlm_reply *lockrep; + struct ldlm_lock *lock; + struct mdt_body *body = NULL; + void *lvb_data = NULL; + __u32 lvb_len = 0; + + ENTRY; + LASSERT(rc >= 0); + /* Similarly, if we're going to replay this request, we don't want to + * actually get a lock, just perform the intent. + */ + if (req->rq_transno || req->rq_replay) { + lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY); - } + } - if (rc == ELDLM_LOCK_ABORTED) { - einfo->ei_mode = 0; - memset(lockh, 0, sizeof(*lockh)); - rc = 0; - } else { /* rc = 0 */ + if (rc == ELDLM_LOCK_ABORTED) { + einfo->ei_mode = 0; + memset(lockh, 0, sizeof(*lockh)); + rc = 0; + } else { /* rc = 0 */ lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - /* If the server gave us back a different lock mode, we should - * fix up our variables. */ - if (lock->l_req_mode != einfo->ei_mode) { - ldlm_lock_addref(lockh, lock->l_req_mode); - ldlm_lock_decref(lockh, einfo->ei_mode); - einfo->ei_mode = lock->l_req_mode; - } + /* If server returned a different lock mode, fix up variables */ + if (lock->l_req_mode != einfo->ei_mode) { + ldlm_lock_addref(lockh, lock->l_req_mode); + ldlm_lock_decref(lockh, einfo->ei_mode); + einfo->ei_mode = lock->l_req_mode; + } LDLM_LOCK_PUT(lock); } @@ -706,17 +700,19 @@ static int mdc_finish_enqueue(struct obd_export *exp, it->it_request = req; /* Technically speaking rq_transno must already be zero if - * it_status is in error, so the check is a bit redundant */ + * it_status is in error, so the check is a bit redundant. + */ if ((!req->rq_transno || it->it_status < 0) && req->rq_replay) mdc_clear_replay_flag(req, it->it_status); - /* If we're doing an IT_OPEN which did not result in an actual - * successful open, then we need to remove the bit which saves - * this request for unconditional replay. - * - * It's important that we do this first! Otherwise we might exit the - * function without doing so, and try to replay a failed create - * (bug 3440) */ + /* If we're doing an IT_OPEN which did not result in an actual + * successful open, then we need to remove the bit which saves + * this request for unconditional replay. + * + * It's important that we do this first! Otherwise we might exit the + * function without doing so, and try to replay a failed create. + * (b=3440) + */ if (it->it_op & IT_OPEN && req->rq_replay && (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0)) mdc_clear_replay_flag(req, it->it_status); @@ -726,20 +722,22 @@ static int mdc_finish_enqueue(struct obd_export *exp, /* We know what to expect, so we do any byte flipping required here */ if (it_has_reply_body(it)) { - body = req_capsule_server_get(pill, &RMF_MDT_BODY); - if (body == NULL) { - CERROR ("Can't swab mdt_body\n"); - RETURN (-EPROTO); - } - - if (it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - /* - * If this is a successful OPEN request, we need to set - * replay handler and data early, so that if replay - * happens immediately after swabbing below, new reply - * is swabbed by that handler correctly. - */ + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (body == NULL) { + rc = -EPROTO; + CERROR("%s: cannot swab mdt_body: rc = %d\n", + exp->exp_obd->obd_name, rc); + RETURN(rc); + } + + if (it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + /* + * If this is a successful OPEN request, we need to set + * replay handler and data early, so that if replay + * happens immediately after swabbing below, new reply + * is swabbed by that handler correctly. + */ mdc_set_open_replay_data(NULL, NULL, it); } @@ -750,34 +748,33 @@ static int mdc_finish_enqueue(struct obd_export *exp, } if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) { - void *eadata; + void *eadata; mdc_update_max_ea_from_body(exp, body); - /* - * The eadata is opaque; just check that it is there. - * Eventually, obd_unpackmd() will check the contents. - */ - eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + /* + * The eadata is opaque; just check that it is there. + * Eventually, obd_unpackmd() will check the contents. + */ + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, body->mbo_eadatasize); if (eadata == NULL) RETURN(-EPROTO); - /* save lvb data and length in case this is for layout - * lock */ + /* save LVB data and length if for layout lock */ lvb_data = eadata; lvb_len = body->mbo_eadatasize; - /* - * We save the reply LOV EA in case we have to replay a - * create for recovery. If we didn't allocate a large - * enough request buffer above we need to reallocate it - * here to hold the actual LOV EA. - * - * To not save LOV EA if request is not going to replay - * (for example error one). - */ - if ((it->it_op & IT_OPEN) && req->rq_replay) { + /* + * We save the reply LOV EA in case we have to replay a + * create for recovery. If we didn't allocate a large + * enough request buffer above we need to reallocate it + * here to hold the actual LOV EA. + * + * To not save LOV EA if request is not going to replay + * (for example error one). + */ + if ((it->it_op & IT_OPEN) && req->rq_replay) { rc = mdc_save_lovea(req, &RMF_EADATA, eadata, body->mbo_eadatasize); if (rc) { @@ -789,7 +786,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, } } else if (it->it_op & IT_LAYOUT) { /* maybe the lock was granted right away and layout - * is packed into RMF_DLM_LVB of req */ + * is packed into RMF_DLM_LVB of req + */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n", class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno); @@ -814,7 +812,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, * LU-6581: trust layout data only if layout lock is granted. The MDT * has stopped sending layout unless the layout lock is granted. The * client still does this checking in case it's talking with an old - * server. - Jinshan */ + * server. - Jinshan + */ lock = ldlm_handle2lock(lockh); if (lock == NULL) RETURN(rc); @@ -877,7 +876,8 @@ static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it) } /* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. */ + * we don't know in advance the file type. + */ static int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo, const union ldlm_policy_data *policy, @@ -904,8 +904,8 @@ static int mdc_enqueue_base(struct obd_export *exp, __u32 acl_bufsize; enum lvb_type lvb_type = 0; int rc; - ENTRY; + ENTRY; LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type); fid_build_reg_res_name(&op_data->op_fid1, &res_id); @@ -953,25 +953,26 @@ resend: } else if (it->it_op & IT_GETXATTR) { req = mdc_intent_getxattr_pack(exp, it, op_data); } else { - LBUG(); - RETURN(-EINVAL); - } + LBUG(); + RETURN(-EINVAL); + } - if (IS_ERR(req)) - RETURN(PTR_ERR(req)); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); - if (resends) { - req->rq_generation_set = 1; - req->rq_import_generation = generation; + if (resends) { + req->rq_generation_set = 1; + req->rq_import_generation = generation; req->rq_sent = ktime_get_real_seconds() + resends; - } + } einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it); /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() * to avoid possible races. It is safe to have glimpse handler - * for non-DOM locks and costs nothing.*/ + * for non-DOM locks and costs nothing. + */ if (einfo->ei_cb_gl == NULL) einfo->ei_cb_gl = mdc_ldlm_glimpse_ast; @@ -980,13 +981,14 @@ resend: if (!it) { /* For flock requests we immediatelly return without further - delay and let caller deal with the rest, since rest of - this function metadata processing makes no sense for flock - requests anyway. But in case of problem during comms with - Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we - can not rely on caller and this mainly for F_UNLCKs - (explicits or automatically generated by Kernel to clean - current FLocks upon exit) that can't be trashed */ + * delay and let caller deal with the rest, since rest of + * this function metadata processing makes no sense for flock + * requests anyway. But in case of problem during comms with + * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR), + * we cannot rely on caller and this mainly for F_UNLCKs + * (explicits or automatically generated by kernel to clean + * current flocks upon exit) that can't be trashed. + */ ptlrpc_req_finished(req); if (((rc == -EINTR) || (rc == -ETIMEDOUT)) && (einfo->ei_type == LDLM_FLOCK) && @@ -1014,7 +1016,8 @@ resend: /* Retry infinitely when the server returns -EINPROGRESS for the * intent operation, when server returns -EINPROGRESS for acquiring - * intent lock, we'll retry in after_reply(). */ + * intent lock, we'll retry in after_reply(). + */ if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) { mdc_clear_replay_flag(req, rc); ptlrpc_req_finished(req); @@ -1070,19 +1073,19 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, } static int mdc_finish_intent_lock(struct obd_export *exp, - struct ptlrpc_request *request, - struct md_op_data *op_data, - struct lookup_intent *it, - struct lustre_handle *lockh) + struct ptlrpc_request *request, + struct md_op_data *op_data, + struct lookup_intent *it, + struct lustre_handle *lockh) { - struct lustre_handle old_lock; - struct ldlm_lock *lock; + struct lustre_handle old_lock; + struct ldlm_lock *lock; int rc = 0; - ENTRY; - LASSERT(request != NULL); - LASSERT(request != LP_POISON); - LASSERT(request->rq_repmsg != LP_POISON); + ENTRY; + LASSERT(request != NULL); + LASSERT(request != LP_POISON); + LASSERT(request->rq_repmsg != LP_POISON); if (it->it_op & IT_READDIR) RETURN(0); @@ -1124,9 +1127,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp, it_set_disposition(it, DISP_ENQ_OPEN_REF); /* balanced in ll_file_open */ ptlrpc_request_addref(request); - /* BUG 11546 - eviction in the middle of open rpc - * processing - */ + /* eviction in middle of open RPC processing b=11546 */ OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); } @@ -1144,10 +1145,12 @@ static int mdc_finish_intent_lock(struct obd_export *exp, * one. We have to set the data here instead of in * mdc_enqueue, because we need to use the child's inode as * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) */ + * intent_finish has performed the iget(). + */ lock = ldlm_handle2lock(lockh); if (lock) { union ldlm_policy_data policy = lock->l_policy_data; + LDLM_DEBUG(lock, "matching against this"); if (it_has_reply_body(it)) { @@ -1165,9 +1168,9 @@ static int mdc_finish_intent_lock(struct obd_export *exp, } LDLM_LOCK_PUT(lock); - memcpy(&old_lock, lockh, sizeof(*lockh)); - if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) { + memcpy(&old_lock, lockh, sizeof(*lockh)); + if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, + LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) { ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode); memcpy(lockh, &old_lock, sizeof(old_lock)); it->it_lock_handle = lockh->cookie; @@ -1176,31 +1179,32 @@ static int mdc_finish_intent_lock(struct obd_export *exp, EXIT; out: - CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", + CDEBUG(D_DENTRY, + "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n", (int)op_data->op_namelen, op_data->op_name, - ldlm_it2str(it->it_op), it->it_status, - it->it_disposition, rc); + ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc); + return rc; } int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, struct lu_fid *fid, __u64 *bits) { - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. */ + /* We could just return 1 immediately, but as we should only be called + * in revalidate_it if we already have a lock, let's verify that. + */ struct ldlm_res_id res_id; struct lustre_handle lockh; union ldlm_policy_data policy; enum ldlm_mode mode; - ENTRY; + ENTRY; if (it->it_lock_handle) { lockh.cookie = it->it_lock_handle; mode = ldlm_revalidate_lock_handle(&lockh, bits); - } else { - fid_build_reg_res_name(fid, &res_id); - switch (it->it_op) { + } else { + fid_build_reg_res_name(fid, &res_id); + switch (it->it_op) { case IT_GETATTR: /* File attributes are held under multiple bits: * nlink is under lookup lock, size and times are @@ -1212,10 +1216,12 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, * Unfortunately, if the bits are split across multiple * locks, there's no easy way to match all of them here, * so an extra RPC would be performed to fetch all - * of those bits at once for now. */ + * of those bits at once for now. + */ /* For new MDTs(> 2.4), UPDATE|PERM should be enough, * but for old MDTs (< 2.4), permission is covered - * by LOOKUP lock, so it needs to match all bits here.*/ + * by LOOKUP lock, so it needs to match all bits here. + */ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; @@ -1235,7 +1241,7 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, LDLM_IBITS, &policy, LCK_CR | LCK_CW | LCK_PR | LCK_PW, &lockh); - } + } if (mode) { it->it_lock_handle = lockh.cookie; @@ -1288,9 +1294,9 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, }; struct lustre_handle lockh; int rc = 0; + ENTRY; LASSERT(it); - CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID ", intent: %s flags %#llo\n", (int)op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid2), @@ -1302,11 +1308,13 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's - * verify that. */ + * verify that. + */ it->it_lock_handle = 0; rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); /* Only return failure if it was not GETATTR by cfid - (from inode_revalidate) */ + * (from inode_revalidate()). + */ if (rc || op_data->op_namelen != 0) RETURN(rc); } @@ -1315,7 +1323,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc < 0) { - CERROR("Can't alloc new fid, rc %d\n", rc); + CERROR("%s: cannot allocate new FID: rc=%d\n", + exp->exp_obd->obd_name, rc); RETURN(rc); } } @@ -1326,37 +1335,35 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); *reqp = it->it_request; - rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); - RETURN(rc); + rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); + RETURN(rc); } static int mdc_intent_getattr_async_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) { - struct mdc_getattr_args *ga = args; + struct mdc_getattr_args *ga = args; struct obd_export *exp = ga->ga_exp; struct md_enqueue_info *minfo = ga->ga_minfo; struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; - struct lookup_intent *it; - struct lustre_handle *lockh; - struct ldlm_reply *lockrep; - __u64 flags = LDLM_FL_HAS_INTENT; - ENTRY; - - it = &minfo->mi_it; - lockh = &minfo->mi_lockh; + struct lookup_intent *it = &minfo->mi_it; + struct lustre_handle *lockh = &minfo->mi_lockh; + struct ldlm_reply *lockrep; + __u64 flags = LDLM_FL_HAS_INTENT; - if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) - rc = -ETIMEDOUT; + ENTRY; + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) + rc = -ETIMEDOUT; - rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, - &flags, NULL, 0, lockh, rc); - if (rc < 0) { - CERROR("ldlm_cli_enqueue_fini: %d\n", rc); - mdc_clear_replay_flag(req, rc); - GOTO(out, rc); - } + rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, + &flags, NULL, 0, lockh, rc); + if (rc < 0) { + CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + mdc_clear_replay_flag(req, rc); + GOTO(out, rc); + } lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); LASSERT(lockrep != NULL); @@ -1364,40 +1371,42 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, lockrep->lock_policy_res2 = ptlrpc_status_ntoh(lockrep->lock_policy_res2); - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - if (rc) - GOTO(out, rc); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + if (rc) + GOTO(out, rc); - rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); - EXIT; + rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); + EXIT; out: - minfo->mi_cb(req, minfo, rc); - return 0; + minfo->mi_cb(req, minfo, rc); + return 0; } int mdc_intent_getattr_async(struct obd_export *exp, struct md_enqueue_info *minfo) { - struct md_op_data *op_data = &minfo->mi_data; - struct lookup_intent *it = &minfo->mi_it; - struct ptlrpc_request *req; + struct md_op_data *op_data = &minfo->mi_data; + struct lookup_intent *it = &minfo->mi_it; + struct ptlrpc_request *req; struct mdc_getattr_args *ga; - struct ldlm_res_id res_id; + struct ldlm_res_id res_id; union ldlm_policy_data policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE } }; - int rc = 0; - __u64 flags = LDLM_FL_HAS_INTENT; - ENTRY; + .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE } + }; + __u64 flags = LDLM_FL_HAS_INTENT; + int rc = 0; - CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n", - (int)op_data->op_namelen, op_data->op_name, - PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); + ENTRY; + CDEBUG(D_DLMTRACE, + "name: %.*s in inode "DFID", intent: %s flags %#llo\n", + (int)op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); fid_build_reg_res_name(&op_data->op_fid1, &res_id); /* If the MDT return -ERANGE because of large ACL, then the sponsor - * of the async getattr RPC will handle that by itself. */ + * of the async getattr RPC will handle that by itself. + */ req = mdc_intent_getattr_pack(exp, it, op_data, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); if (IS_ERR(req)) @@ -1406,7 +1415,8 @@ int mdc_intent_getattr_async(struct obd_export *exp, /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() * to avoid possible races. It is safe to have glimpse handler - * for non-DOM locks and costs nothing.*/ + * for non-DOM locks and costs nothing. + */ if (minfo->mi_einfo.ei_cb_gl == NULL) minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast; -- 1.8.3.1