From: yury Date: Tue, 19 Sep 2006 18:11:44 +0000 (+0000) Subject: - added resent support for getatr_name. X-Git-Tag: v1_8_0_110~486^2~899 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=e1a6dd0a28530899b9fa23c0a23022ce22892988;p=fs%2Flustre-release.git - added resent support for getatr_name. --- diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 155f385..b4a2f0d 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -423,15 +423,21 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, __u64 child_bits, struct ldlm_reply *ldlm_rep) { - struct mdt_object *parent = info->mti_object; - struct mdt_object *child; - struct md_object *next = mdt_object_child(info->mti_object); - struct lu_fid *child_fid = &info->mti_tmp_fid1; - const char *name; - int rc; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *parent = info->mti_object; + struct mdt_object *child; + struct md_object *next = mdt_object_child(info->mti_object); + struct lu_fid *child_fid = &info->mti_tmp_fid1; + int is_resent, rc; + const char *name; struct mdt_lock_handle *lhp; + struct ldlm_lock *lock; ENTRY; + is_resent = lustre_handle_is_used(&lhc->mlh_lh); + if (is_resent) + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); + LASSERT(info->mti_object != NULL); name = req_capsule_client_get(&info->mti_pill, &RMF_NAME); if (name == NULL) @@ -446,12 +452,25 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); child = parent; CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID - ", ldlm_rep=%p\n", - PFID(mdt_object_fid(child)), ldlm_rep); - - mdt_lock_handle_init(lhc); - lhc->mlh_mode = LCK_CR; - rc = mdt_object_lock(info, child, lhc, child_bits); + ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep); + + if (is_resent) { + /* Do not take lock for resent case. */ + lock = ldlm_handle2lock(&lhc->mlh_lh); + if (!lock) { + CERROR("Invalid lock handle "LPX64"\n", + lhc->mlh_lh.cookie); + LBUG(); + } + LASSERT(fid_res_name_eq(mdt_object_fid(child), + &lock->l_resource->lr_name)); + LDLM_LOCK_PUT(lock); + rc = 0; + } else { + mdt_lock_handle_init(lhc); + lhc->mlh_mode = LCK_CR; + rc = mdt_object_lock(info, child, lhc, child_bits); + } if (rc == 0) { /* finally, we can get attr for child. */ rc = mdt_getattr_internal(info, child); @@ -480,17 +499,32 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, *step 3: find the child object by fid & lock it. * regardless if it is local or remote. */ - mdt_lock_handle_init(lhc); - lhc->mlh_mode = LCK_CR; - child = mdt_object_find_lock(info, child_fid, lhc, child_bits); + if (is_resent) { + /* Do not take lock for resent case. */ + lock = ldlm_handle2lock(&lhc->mlh_lh); + if (!lock) { + CERROR("Invalid lock handle "LPX64"\n", + lhc->mlh_lh.cookie); + LBUG(); + } + LASSERT(fid_res_name_eq(mdt_object_fid(child), + &lock->l_resource->lr_name)); + LDLM_LOCK_PUT(lock); + child = mdt_object_find(info->mti_ctxt, info->mti_mdt, + child_fid); + } else { + mdt_lock_handle_init(lhc); + lhc->mlh_mode = LCK_CR; + child = mdt_object_find_lock(info, child_fid, lhc, child_bits); + } if (IS_ERR(child)) GOTO(out_parent, rc = PTR_ERR(child)); /* finally, we can get attr for child. */ rc = mdt_getattr_internal(info, child); - if (rc != 0) + if (rc != 0) { mdt_object_unlock(info, child, lhc, 1); - else { + } else { struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh); if (lock) { struct ldlm_res_id *res_id; @@ -1084,7 +1118,7 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, ldlm_policy_data_t *policy, struct ldlm_res_id *res_id) { - int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/ + int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; int rc; LASSERT(ns != NULL); @@ -1098,22 +1132,26 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, return rc == ELDLM_OK ? 0 : -EIO; } -/* just call ldlm_lock_decref() if decref, - * else we only call ptlrpc_save_lock() to save this lock in req. - * when transaction committed, req will be released, and lock will, too */ +/* + * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock() + * to save this lock in req. when transaction committed, req will be released, + * and lock will, too. + */ void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f, struct lustre_handle *lh, ldlm_mode_t mode, int decref) { { - /* FIXME: this is debug stuff, remove it later. */ + /* FIXME: this is debug stuff, remove it later. */ struct ldlm_lock *lock = ldlm_handle2lock(lh); if (!lock) { - CERROR("invalid lock handle "LPX64, lh->cookie); + CERROR("Invalid lock handle "LPX64"\n", + lh->cookie); LBUG(); } LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name)); LDLM_LOCK_PUT(lock); } + if (decref) ldlm_lock_decref(lh, mode); else @@ -1819,15 +1857,73 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, RETURN(ELDLM_LOCK_REPLACED); } +static void mdt_fixup_resent(struct req_capsule *pill, + struct ldlm_lock *new_lock, + struct mdt_lock_handle *lh) +{ + struct ptlrpc_request *req = pill->rc_req; + struct obd_export *exp = req->rq_export; + struct lustre_handle remote_hdl; + struct ldlm_request *dlmreq; + struct list_head *iter; + + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + return; + + dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ); + remote_hdl = dlmreq->lock_handle1; + + spin_lock(&exp->exp_ldlm_data.led_lock); + list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { + struct ldlm_lock *lock; + lock = list_entry(iter, struct ldlm_lock, l_export_chain); + if (lock == new_lock) + continue; + if (lock->l_remote_handle.cookie == remote_hdl.cookie) { + lh->mlh_lh.cookie = lock->l_handle.h_cookie; + lh->mlh_mode = lock->l_granted_mode; + + LDLM_DEBUG(lock, "restoring lock cookie"); + DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64, + lh->mlh_lh.cookie); + break; + } + } + spin_unlock(&exp->exp_ldlm_data.led_lock); + + /* + * If the xid matches, then we know this is a resent request, and allow + * it. (It's probably an OPEN, for which we don't send a lock. + */ + if (req->rq_xid == + le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_xid)) + return; + + if (req->rq_xid == + le64_to_cpu(exp->exp_mdt_data.med_mcd->mcd_last_close_xid)) + return; + + /* + * This remote handle isn't enqueued, so we never received or processed + * this request. Clear MSG_RESENT, because it can be handled like any + * normal request now. + */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + + DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64, + remote_hdl.cookie); +} + static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, int flags) { struct ldlm_reply *ldlm_rep; - struct mdt_lock_handle tmp_lock; + struct mdt_lock_handle tmp_lock = { 0 }; struct mdt_lock_handle *lhc = &tmp_lock; __u64 child_bits; + struct ptlrpc_request *req; ENTRY; @@ -1843,9 +1939,13 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, RETURN(-EINVAL); } + req = info->mti_pill.rc_req; ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); + if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + mdt_fixup_resent(&info->mti_pill, *lockp, lhc); + ldlm_rep->lock_policy_res2 = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);