From 3e79bcd6cad78e4e7980152b03e7e4adebbab480 Mon Sep 17 00:00:00 2001 From: huanghua Date: Tue, 11 Jul 2006 11:06:45 +0000 Subject: [PATCH] fixed a bug in mdt_intent_getattr(): use reply message before packing it. --- lustre/mdt/mdt_handler.c | 83 +++++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 2373acc..7ca9202 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -191,7 +191,8 @@ static inline int mdt_body_has_lov(const struct lu_attr *la, } static int mdt_getattr_internal(struct mdt_thread_info *info, - struct mdt_object *o) + struct mdt_object *o, + int need_pack_reply) { struct md_object *next = mdt_object_child(o); const struct mdt_body *reqbody = info->mti_body; @@ -211,21 +212,20 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, rc = mo_attr_get(ctxt, next, la); if (rc == -EREMOTE) { - /* This object is located on remote node */ - req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, - RCL_SERVER, 0); -#ifdef CONFIG_FS_POSIX_ACL - req_capsule_set_size(&info->mti_pill, &RMF_EADATA, - RCL_SERVER, 0); -#endif - rc = req_capsule_pack(&info->mti_pill); - if (rc == 0) { - repbody = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_BODY); - repbody->fid1 = *mdt_object_fid(o); - repbody->valid |= OBD_MD_FLID; + /* FIXME: This object is located on remote node. + * What value should we return to client? + */ + if (need_pack_reply) { + rc = req_capsule_pack(&info->mti_pill); + if (rc) + RETURN(rc); } - RETURN(rc); + + repbody = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_BODY); + repbody->fid1 = *mdt_object_fid(o); + repbody->valid |= OBD_MD_FLID; + RETURN(0); } if (rc){ CERROR("getattr error for "DFID3": %d\n", @@ -233,6 +233,9 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, RETURN(rc); } + if ( !need_pack_reply) + goto skip_packing; + /* pre-getattr: to guess how many bytes we need. */ if (mdt_body_has_lov(la, reqbody)) { /* this should return the total length, or error */ @@ -276,6 +279,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, RETURN(rc); } +skip_packing: repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); mdt_pack_attr2body(repbody, la, mdt_object_fid(o)); @@ -357,7 +361,7 @@ static int mdt_getattr(struct mdt_thread_info *info) if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { result = -ENOMEM; } else { - result = mdt_getattr_internal(info, info->mti_object); + result = mdt_getattr_internal(info, info->mti_object, 1); } RETURN(result); } @@ -370,7 +374,8 @@ static int mdt_getattr(struct mdt_thread_info *info) */ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct mdt_lock_handle *lhc, - __u64 child_bits) + __u64 child_bits, + struct ldlm_reply *ldlm_rep) { struct mdt_object *parent = info->mti_object; struct mdt_object *child; @@ -387,15 +392,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (name == NULL) RETURN(-EFAULT); + intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD); if (strlen(name) == 0) { /* only open the child. parent is on another node. */ + intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); child = parent; mdt_lock_handle_init(lhc); lhc->mlh_mode = LCK_CR; result = mdt_object_lock(info, child, lhc, child_bits); if (result != 0) { /* finally, we can get attr for child. */ - result = mdt_getattr_internal(info, child); + result = mdt_getattr_internal(info, child, + ldlm_rep ? 0 : 1); if (result != 0) mdt_object_unlock(info, child, lhc); } @@ -411,9 +419,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /*step 2: lookup child's fid by name */ result = mdo_lookup(info->mti_ctxt, next, name, child_fid); - if (result != 0) + if (result != 0) { + if (result == -ENOENT) + intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG); GOTO(out_parent, result); - + } else + intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); /* *step 3: find the child object by fid & lock it. * regardless if it is local or remote. @@ -425,7 +436,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out_parent, result = PTR_ERR(child)); /* finally, we can get attr for child. */ - result = mdt_getattr_internal(info, child); + result = mdt_getattr_internal(info, child, ldlm_rep ? 0 : 1); if (result != 0) mdt_object_unlock(info, child, lhc); mdt_object_put(info->mti_ctxt, child); @@ -444,7 +455,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info) ENTRY; - rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE); + rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); if (lustre_handle_is_used(&lhc->mlh_lh)) { ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode); lhc->mlh_lh.cookie = 0; @@ -1427,6 +1438,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct ldlm_reply *ldlm_rep; struct mdt_lock_handle tmp_lock; struct mdt_lock_handle *lhc = &tmp_lock; + struct mdt_device *mdt = info->mti_mdt; __u64 child_bits; int rc; @@ -1444,23 +1456,28 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, RETURN(-EINVAL); } - rc = mdt_getattr_name_lock(info, lhc, child_bits); + req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, + RCL_SERVER, mdt->mdt_max_mdsize); +#ifdef CONFIG_FS_POSIX_ACL + req_capsule_set_size(&info->mti_pill, &RMF_EADATA, + RCL_SERVER, mdt->mdt_max_cookiesize); +#endif + rc = req_capsule_pack(&info->mti_pill); + if (rc) + RETURN(rc); ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); intent_set_disposition(ldlm_rep, DISP_IT_EXECD); - intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD); - if (rc) { - if (rc == -EREMOTE) - intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); - else - intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG); - if (ldlm_rep) - ldlm_rep->lock_policy_res2 = 0; + ldlm_rep->lock_policy_res2 = + mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); + + if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG)) + ldlm_rep->lock_policy_res2 = 0; + if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { RETURN(ELDLM_LOCK_ABORTED); } - intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); - new_lock = ldlm_handle2lock(&lhc->mlh_lh); if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) RETURN(0); -- 1.8.3.1