From a69b9dc6332c16cda0ffcf8bfc049828b81ed6bf Mon Sep 17 00:00:00 2001 From: tappro Date: Fri, 15 Sep 2006 16:56:29 +0000 Subject: [PATCH] - take a lock for cross-ref object while open as lmv required - separate the lock fixup after intent handling into the mdt_intent_lock_replace() method --- lustre/mdt/mdt_handler.c | 108 ++++++++++++++++++++++++++--------------------- lustre/mdt/mdt_open.c | 15 +++++-- 2 files changed, 72 insertions(+), 51 deletions(-) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index cadc02e..f7d4541 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1690,53 +1690,20 @@ static struct mdt_it_flavor { } }; -static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - int flags) +int mdt_intent_lock_replace(struct mdt_thread_info *info, + struct ldlm_lock **lockp, + struct mdt_lock_handle *lh, int flags) { + struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_lock *old_lock = *lockp; struct ldlm_lock *new_lock = NULL; - struct ptlrpc_request *req = mdt_info_req(info); - struct ldlm_reply *ldlm_rep; - struct mdt_lock_handle tmp_lock; - struct mdt_lock_handle *lhc = &tmp_lock; - __u64 child_bits; - - ENTRY; - switch (opcode) { - case MDT_IT_LOOKUP: - child_bits = MDS_INODELOCK_LOOKUP; - break; - case MDT_IT_GETATTR: - child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; - break; - default: - CERROR("Unhandled till now"); - RETURN(-EINVAL); - } + new_lock = ldlm_handle2lock(&lh->mlh_lh); - ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); - mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); - - ldlm_rep->lock_policy_res2 = - mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); - mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1); - - if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG)) - ldlm_rep->lock_policy_res2 = 0; - if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || - ldlm_rep->lock_policy_res2) { - RETURN(ELDLM_LOCK_ABORTED); - } - - new_lock = ldlm_handle2lock(&lhc->mlh_lh); if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) RETURN(0); - LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n", - opcode, lhc->mlh_lh.cookie); + LASSERTF(new_lock != NULL, "lockh "LPX64"\n", lh->mlh_lh.cookie); *lockp = new_lock; @@ -1749,11 +1716,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, RETURN(ELDLM_LOCK_REPLACED); } - /* TODO: - * These are copied from mds/hander.c, and should be factored into - * ldlm module in order to share these code, and be easy for merge. - */ - /* Fixup the lock to be given to the client */ lock_res_and_lock(new_lock); new_lock->l_readers = 0; @@ -1772,10 +1734,53 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); + lh->mlh_lh.cookie = 0; RETURN(ELDLM_LOCK_REPLACED); } +static int mdt_intent_getattr(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + int flags) +{ + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_reply *ldlm_rep; + struct mdt_lock_handle tmp_lock; + struct mdt_lock_handle *lhc = &tmp_lock; + __u64 child_bits; + + ENTRY; + + switch (opcode) { + case MDT_IT_LOOKUP: + child_bits = MDS_INODELOCK_LOOKUP; + break; + case MDT_IT_GETATTR: + child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; + break; + default: + CERROR("Unhandled till now"); + RETURN(-EINVAL); + } + + ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); + + ldlm_rep->lock_policy_res2 = + mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); + mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1); + + if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG)) + ldlm_rep->lock_policy_res2 = 0; + if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { + RETURN(ELDLM_LOCK_ABORTED); + } + + return mdt_intent_lock_replace(info, lockp, lhc, flags); +} + static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, @@ -1807,12 +1812,19 @@ static int mdt_intent_reint(enum mdt_it_code opcode, rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); if (rep == NULL) RETURN(-EFAULT); - rep->lock_policy_res2 = rc; + rep->lock_policy_res2 = rc; mdt_set_disposition(info, rep, DISP_IT_EXECD); -#if 0 - mdt_finish_reply(info, rc); -#endif + + /* cross-ref case, the lock should be returned to the client */ + if (rc == -EREMOTE) { + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_NEW]; + LASSERT(lhc->mlh_lh.cookie != 0); + rep->lock_policy_res2 = 0; + return mdt_intent_lock_replace(info, lockp, lhc, flags); + } + rep->lock_policy_res2 = rc; + RETURN(ELDLM_LOCK_ABORTED); } diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index c2a1fdf..4d90132 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -626,8 +626,8 @@ int mdt_open(struct mdt_thread_info *info) GOTO(out, result = -EOPNOTSUPP); } - CDEBUG(D_INODE, "I am going to create "DFID"/("DFID":%s) " - "cr_flag=%o mode=%06o replay=%d\n", + CDEBUG(D_INODE, "I am going to open "DFID"/("DFID":%s) " + "cr_flag=0%o mode=%06o replay=%d\n", PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name, create_flags, la->la_mode, lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY); @@ -720,9 +720,18 @@ int mdt_open(struct mdt_thread_info *info) if (result == -EREMOTE) { /* the object is on remote node * return its FID for remote open */ + struct mdt_lock_handle *lhc; + int rc; + lhc = &info->mti_lh[MDT_LH_NEW]; + mdt_lock_handle_init(lhc); + lhc->mlh_mode = LCK_CR; + rc = mdt_object_lock(info, child, lhc, + MDS_INODELOCK_LOOKUP); repbody->fid1 = *mdt_object_fid(child); repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); - GOTO(out_child, result = 0); + if (rc != 0) + result = rc; + GOTO(out_child, result); } } /* Try to open it now. */ -- 1.8.3.1