From: huanghua Date: Thu, 1 Jun 2006 09:50:45 +0000 (+0000) Subject: more intent lock implementation. X-Git-Tag: v1_8_0_110~486^2~1680 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=0e518826a4a87ad3bcc68b72941d994fd515c9c6;p=fs%2Flustre-release.git more intent lock implementation. --- diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 411a87c..c2e772a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -164,42 +164,105 @@ static int mdt_getattr(struct mdt_thread_info *info) RETURN(result); } -static int mdt_getattr_name(struct mdt_thread_info *info) +/* @ Huang Hua + * UPDATE lock should be taken against parent, and be release before exit; + * child_bits lock should be taken against child, and be returned back: + * (1)normal request will release child lock; + * (2)intent request will grant it to client. + */ +static int mdt_getattr_name_internal(struct mdt_thread_info *info, + int child_bits) { - struct md_object *next = mdt_object_child(info->mti_object); + struct mdt_object *parent = info->mti_object; struct mdt_object *child; + struct md_object *next = mdt_object_child(info->mti_object); struct mdt_body *body; const char *name; int result; + struct mdt_lock_handle *lhp; + struct mdt_lock_handle *lhc; + struct ldlm_reply *ldlm_rep; + + ENTRY; + + lhp = &info->mti_lh[MDT_LH_PARENT]; + lhp->mlh_mode = LCK_CR; + lhc = &info->mti_lh[MDT_LH_CHILD]; + lhc->mlh_mode = LCK_CR; + ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); LASSERT(info->mti_object != NULL); - ENTRY; + intent_set_disposition(ldlm_rep, DISP_IT_EXECD); body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); name = req_capsule_client_get(&info->mti_pill, &RMF_NAME); if (name == NULL) RETURN(-EFAULT); + /*step 1: lock parent */ + result = mdt_object_lock(info->mti_mdt->mdt_namespace, parent, + lhp, MDS_INODELOCK_UPDATE); + if (result != 0) + RETURN(result); + + /*step 2: lookup child's fid by name */ result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1); - if (result == 0) { - child = mdt_object_find(info->mti_ctxt, info->mti_mdt, - &body->fid1); - if (!IS_ERR(child)) { - result = mo_attr_get(info->mti_ctxt, next, - &info->mti_attr); - if (result == 0) { - mdt_pack_attr2body(body, &info->mti_attr); - body->valid |= OBD_MD_FLID; - } - mdt_object_put(info->mti_ctxt, child); - } else - result = PTR_ERR(child); + if (result != 0) + GOTO(out_parent, result); + + /*step 3: find the child object by fid */ + child = mdt_object_find(info->mti_ctxt, info->mti_mdt, &body->fid1); + if (IS_ERR(child)) + GOTO(out_parent, result = PTR_ERR(child)); + + if (!lu_object_exists(info->mti_ctxt, &child->mot_obj.mo_lu)) { + intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG); + GOTO(out_child, result = -ENOENT); + } else { + intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS); } + + /*step 4: lock child: this lock is returned back to caller + * if successfully get attr. + */ + result = mdt_object_lock(info->mti_mdt->mdt_namespace, child, + lhc, child_bits); + if (result != 0) + GOTO(out_child, result); + + /* finally, we can get attr for child. */ + result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr); + if (result == 0) { + mdt_pack_attr2body(body, &info->mti_attr); + body->fid1 = *mdt_object_fid(child); + body->valid |= OBD_MD_FLID; + } else + mdt_object_unlock(info->mti_mdt->mdt_namespace, child, lhc); +out_child: + mdt_object_put(info->mti_ctxt, child); +out_parent: + mdt_object_unlock(info->mti_mdt->mdt_namespace, parent, lhp); RETURN(result); } +/* see the above function */ +static int mdt_getattr_name(struct mdt_thread_info *info) +{ + struct mdt_lock_handle *lhc; + int rc; + + ENTRY; + lhc = &info->mti_lh[MDT_LH_CHILD]; + lhc->mlh_mode = LCK_CR; + + rc = mdt_getattr_name_internal(info, MDS_INODELOCK_UPDATE); + if (rc == 0 && lustre_handle_is_used(&lhc->mlh_lh)) + ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode); + RETURN(rc); +} + static struct lu_device_operations mdt_lu_ops; static int lu_device_is_mdt(struct lu_device *d) @@ -1114,15 +1177,21 @@ enum mdt_it_code { }; static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info); + struct mdt_thread_info *info, + struct ldlm_lock **, + int); static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info); + struct mdt_thread_info *info, + struct ldlm_lock **, + int); static struct mdt_it_flavor { const struct req_format *it_fmt; __u32 it_flags; int (*it_act)(enum mdt_it_code , - struct mdt_thread_info *); + struct mdt_thread_info *, + struct ldlm_lock **, + int); long it_reint; } mdt_it_flavor[] = { [MDT_IT_OPEN] = { @@ -1177,50 +1246,104 @@ static struct mdt_it_flavor { }; static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info) + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + int flags) { - int getattr_part = MDS_INODELOCK_UPDATE; + int child_bits; + struct ldlm_lock *old_lock = *lockp; + struct ldlm_lock *new_lock = NULL; + struct mdt_lock_handle *lhc; + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_reply *ldlm_rep; + + ENTRY; switch (opcode) { case MDT_IT_LOOKUP: - getattr_part = MDS_INODELOCK_LOOKUP; + child_bits = MDS_INODELOCK_LOOKUP; break; case MDT_IT_GETATTR: - getattr_part |= MDS_INODELOCK_LOOKUP; + child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; break; default: + CERROR("Unhandled till now"); + RETURN(-EINVAL); break; } - /*FIXME do something on the above flag */ + + ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); + intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD); + /* lock on child object is returned @info->mti_lh[MDT_LH_CHILD] */ + ldlm_rep->lock_policy_res2 = mdt_getattr_name_internal(info, child_bits); + + if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG)) + ldlm_rep->lock_policy_res2 = 0; + if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { + RETURN(ELDLM_LOCK_ABORTED); + } + + lhc = &info->mti_lh[MDT_LH_CHILD]; + new_lock = ldlm_handle2lock(&lhc->mlh_lh); + if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) + RETURN(0); + + *lockp = new_lock; + + /* Fixup the lock to be given to the client */ + l_lock(&new_lock->l_resource->lr_namespace->ns_lock); + new_lock->l_readers = 0; + new_lock->l_writers = 0; + + new_lock->l_export = class_export_get(req->rq_export); + list_add(&new_lock->l_export_chain, + &new_lock->l_export->exp_ldlm_data.led_held_locks); + + new_lock->l_blocking_ast = old_lock->l_blocking_ast; + new_lock->l_completion_ast = old_lock->l_completion_ast; - return mdt_getattr_name(info); + new_lock->l_remote_handle = old_lock->l_remote_handle; + + new_lock->l_flags &= ~LDLM_FL_LOCAL; + + LDLM_LOCK_PUT(old_lock); + l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); + + RETURN(ELDLM_LOCK_REPLACED); } static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info) + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + int flags) { long opc; - int rc; - + struct ldlm_reply *rep; + static const struct req_format *intent_fmts[REINT_MAX] = { [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN }; ENTRY; + rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); + if (rep == NULL) + RETURN(-EFAULT); opc = mdt_reint_opcode(info, intent_fmts); - if (opc >= 0) { - if (mdt_it_flavor[opcode].it_reint == opc) - rc = mdt_reint_internal(info, opc); - else { - CERROR("Reint code %ld doesn't match intent: %d\n", - opc, opcode); - rc = -EPROTO; - } - } else - rc = opc; - RETURN(rc); + if (opc < 0) + RETURN(opc); + + if (mdt_it_flavor[opcode].it_reint != opc) { + CERROR("Reint code %ld doesn't match intent: %d\n", + opc, opcode); + RETURN(-EPROTO); + } + rep->lock_policy_res2 = mdt_reint_internal(info, opc); + intent_set_disposition(rep, DISP_IT_EXECD); + + RETURN(ELDLM_LOCK_ABORTED); } static int mdt_intent_code(long itcode) @@ -1263,7 +1386,8 @@ static int mdt_intent_code(long itcode) return result; } -static int mdt_intent_opc(long itopc, struct mdt_thread_info *info) +static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, + struct ldlm_lock **lockp, int flags) { int opc; int rc; @@ -1274,34 +1398,26 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info) ENTRY; opc = mdt_intent_code(itopc); - if (opc >= 0) { - pill = &info->mti_pill; - flv = &mdt_it_flavor[opc]; - - if (flv->it_fmt != NULL) - req_capsule_extend(pill, flv->it_fmt); - - rc = mdt_unpack_req_pack_rep(info, flv->it_flags); - if (rc == 0) { - struct ldlm_reply *rep; - - rep = req_capsule_server_get(pill, &RMF_DLM_REP); - if (rep != NULL) { - /* execute policy */ - /*XXX LASSERT( flv->it_act) */ - if (flv->it_act) { - rc = flv->it_act(opc, info); - rep->lock_policy_res2 = rc; - intent_set_disposition(rep, - DISP_IT_EXECD); - rc = 0; - } else - rc = -EOPNOTSUPP; - } else - rc = -EFAULT; - } - } else - rc = -EINVAL; + if (opc < 0) + RETURN(-EINVAL); + + pill = &info->mti_pill; + flv = &mdt_it_flavor[opc]; + + if (flv->it_fmt != NULL) + req_capsule_extend(pill, flv->it_fmt); + + rc = mdt_unpack_req_pack_rep(info, flv->it_flags); + if (rc) + RETURN(rc); + + + /* execute policy */ + /*XXX LASSERT( flv->it_act) */ + if (flv->it_act) { + rc = flv->it_act(opc, info, lockp, flags); + } else + rc = -EOPNOTSUPP; RETURN(rc); } @@ -1333,7 +1449,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "intent policy opc: %s", ldlm_it2str(it->opc)); - rc = mdt_intent_opc(it->opc, info); + rc = mdt_intent_opc(it->opc, info, lockp, flags); if (rc == 0) rc = ELDLM_OK; } else