From 8920bafecfc9047006c7d125dd3550ec836b57f3 Mon Sep 17 00:00:00 2001 From: huanghua Date: Mon, 26 Jun 2006 17:27:36 +0000 Subject: [PATCH] added some partial operation support. --- lustre/mdt/mdt_reint.c | 129 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 20 deletions(-) diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 86e0d81..3f7422a 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -288,7 +288,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info) rc = req_capsule_pack(pill); if (rc) - RETURN(rc); + RETURN(rc); if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) RETURN(-EROFS); @@ -301,6 +301,12 @@ static int mdt_reint_unlink(struct mdt_thread_info *info) if (IS_ERR(mp)) RETURN(PTR_ERR(mp)); + if (strlen(rr->rr_name) == 0) { + /* remote partial operation */ + rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp)); + GOTO(out_unlock_parent, rc); + } + /*step 2: find & lock the child */ lhc = &info->mti_lh[MDT_LH_CHILD]; lhc->mlh_mode = LCK_EX; @@ -416,6 +422,11 @@ static int mdt_reint_link(struct mdt_thread_info *info) if (IS_ERR(ms)) RETURN(PTR_ERR(ms)); + if (strlen(rr->rr_name) == 0) { + /* remote partial operation */ + rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms)); + GOTO(out_unlock_source, rc); + } /*step 2: find & lock the target */ lht = &info->mti_lh[MDT_LH_CHILD]; lht->mlh_mode = LCK_EX; @@ -446,6 +457,79 @@ out_unlock_source: RETURN(-EOPNOTSUPP); } +/* partial operation for rename */ +static int mdt_reint_rename_tgt(struct mdt_thread_info *info) +{ +#ifdef MDT_CODE + struct lu_attr *attr = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + struct req_capsule *pill = &info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *mtgtdir; + struct mdt_object *mtgt = NULL; + struct mdt_lock_handle *lh_tgtdir; + struct mdt_lock_handle *lh_tgt; + struct mdt_body *repbody; + struct lu_fid tgt_fid; + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; + int rc; + + ENTRY; + + DEBUG_REQ(D_INODE, req, "rename_tgr "DFID3" to "DFID3" %s", + PFID3(rr->rr_fid2), + PFID3(rr->rr_fid1), rr->rr_tgt); + + /* step 1: lookup & lock the tgt dir */ + lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; + lh_tgtdir->mlh_mode = LCK_PW; + mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid1, lh_tgtdir, + MDS_INODELOCK_UPDATE); + if (IS_ERR(mtgtdir)) + GOTO(out, rc = PTR_ERR(mtgtdir)); + + /*step 2: find & lock the target object if exists*/ + rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir), + rr->rr_tgt, &tgt_fid); + if (rc && rc != -ENOENT) + GOTO(out_unlock_tgtdir, rc); + + if (rc == 0) { + lh_tgt = &info->mti_lh[MDT_LH_CHILD]; + lh_tgt->mlh_mode = LCK_EX; + + mtgt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + &tgt_fid, lh_tgt, + MDS_INODELOCK_LOOKUP); + if (IS_ERR(mold)) + GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt)); + } + + /* step 3: rename_tgt or name_insert */ + if (mtgt) + rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir), + mdt_object_child(mtgt), + &rr->rr_fid2, rr->rr_tgt); + else + rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir), + rr->rr_tgt, &rr->rr_fid2); + + GOTO(out_unlock_tgt, rc); +out_unlock_tgt: + if (mtgt) { + mdt_object_unlock(ns, mtgt, lh_tgt); + mdt_object_put(info->mti_ctxt, mtgt); + } +out_unlock_tgtdir: + mdt_object_unlock(ns, msrcdir, lh_tgtdir); + mdt_object_put(info->mti_ctxt, mtgtdir); +out: + return rc; +#endif +} + + static int mdt_reint_rename(struct mdt_thread_info *info) { @@ -457,7 +541,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info) struct mdt_object *msrcdir; struct mdt_object *mtgtdir; struct mdt_object *mold; - struct mdt_object *mnew; + struct mdt_object *mnew = NULL; struct mdt_lock_handle *lh_srcdirp; struct mdt_lock_handle *lh_tgtdirp; struct mdt_lock_handle lh_old; @@ -483,9 +567,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info) if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) RETURN(-EROFS); - /*FIXME: The lock order is important. Which algorithm should we take? - * parent/child order? lock resource order? or others? - */ + rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); + if (rc == 1) { + /* if (strlen(rr->rr_name) == 0) {*/ + RETURN(mdt_reint_rename_tgt(info)); + } /* step 1: lock the source dir */ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; @@ -509,9 +595,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info) /*step 3: find & lock the old object*/ rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir), rr->rr_name, &old_fid); - if (rc) { + if (rc) GOTO(out_unlock_target, rc); - } + lh_old.mlh_mode = LCK_EX; mold = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, &old_fid, &lh_old, @@ -520,20 +606,21 @@ static int mdt_reint_rename(struct mdt_thread_info *info) GOTO(out_unlock_target, rc = PTR_ERR(mold)); /*step 4: find & lock the new object*/ - /* new object may not exist now */ + /* new target object may not exist now */ rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir), rr->rr_tgt, &new_fid); - if (rc && rc != -ENOENT) { + if (rc && rc != -ENOENT) GOTO(out_unlock_old, rc); - } + /* NB: the new_fid may be zero at this moment*/ - lh_new.mlh_mode = LCK_EX; - mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, - &new_fid, &lh_new, - MDS_INODELOCK_FULL); - if (IS_ERR(mnew)) - GOTO(out_unlock_old, rc = PTR_ERR(mnew)); - + if (rc == 0) { + lh_new.mlh_mode = LCK_EX; + mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + &new_fid, &lh_new, + MDS_INODELOCK_FULL); + if (IS_ERR(mnew)) + GOTO(out_unlock_old, rc = PTR_ERR(mnew)); + } /* step 5:TODO orphan handling on new object*/ /* if isorphan(mnew) {...} @@ -542,12 +629,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info) /* step 6: rename it */ rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir), mdt_object_child(mtgtdir), &old_fid, - rr->rr_name, mdt_object_child(mnew), + rr->rr_name, mnew ? mdt_object_child(mnew): NULL, rr->rr_tgt); GOTO(out_unlock_new, rc); out_unlock_new: - mdt_object_unlock(ns, mnew, &lh_new); - mdt_object_put(info->mti_ctxt, mnew); + if (mnew) { + mdt_object_unlock(ns, mnew, &lh_new); + mdt_object_put(info->mti_ctxt, mnew); + } out_unlock_old: mdt_object_unlock(ns, mold, &lh_old); mdt_object_put(info->mti_ctxt, mold); -- 1.8.3.1