From 4b766fe25322a18d56f5c3a2276d62b742c7b2a9 Mon Sep 17 00:00:00 2001 From: huanghua Date: Mon, 26 Jun 2006 09:50:51 +0000 Subject: [PATCH] added more code for reint (link/unlink/rename). --- lustre/mdt/mdt_reint.c | 305 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 294 insertions(+), 11 deletions(-) diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index e3e1dcd..4062f3a 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -177,7 +177,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info) locked = 1; } - LASSERT(lu_object_assert_exists(info->mti_ctxt, &mo->mot_obj.mo_lu)); + if (lu_object_assert_not_exists(info->mti_ctxt, &mo->mot_obj.mo_lu)) + GOTO(out_unlock, rc = -ENOENT); rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), attr); if (rc != 0) @@ -205,14 +206,14 @@ static int mdt_reint_setattr(struct mdt_thread_info *info) if (rc) GOTO(out_unlock, rc); - /* FIXME Please deal with logcookies here*/ - + /* FIXME & TODO Please deal with logcookies here*/ + GOTO(out_unlock, rc); out_unlock: if (locked) { mdt_object_unlock(info->mti_mdt->mdt_namespace, mo, lh); } mdt_object_put(info->mti_ctxt, mo); - RETURN(rc); + return (rc); #endif ENTRY; RETURN(-EOPNOTSUPP); @@ -265,12 +266,182 @@ static int mdt_reint_create(struct mdt_thread_info *info) static int mdt_reint_unlink(struct mdt_thread_info *info) { +#ifdef MDT_CODE + struct lu_attr *attr = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + struct req_capsule *pill = &info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *mp; + struct mdt_object *mc; + struct mdt_lock_handle *lhp; + struct mdt_lock_handle *lhc; + struct mdt_body *repbody; + struct lu_fid child_fid; + int rc; + + ENTRY; + + DEBUG_REQ(D_INODE, req, "unlink "DFID3"/"DFID3, PFID3(rr->rr_fid1), + PFID3(rr->rr_fid2)); + + /* MDS_CHECK_RESENT here */ + + rc = req_capsule_pack(pill); + if (rc) + RETURN(rc); + + if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) + RETURN(-EROFS); + + /* step 1: lock the parent */ + lhp = &info->mti_lh[MDT_LH_PARENT]; + lhp->mlh_mode = LCK_EX; + mp = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE); + if (IS_ERR(mp)) + RETURN(PTR_ERR(mp)); + + /*step 2: find & lock the child */ + lhc = &info->mti_lh[MDT_LH_CHILD]; + lhc->mlh_mode = LCK_EX; + rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp), + rr->rr_name, &child_fid); + if (rc) { + GOTO(out_unlock_parent, rc); + } + + mc = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, &child_fid, + lhc, MDS_INODELOCK_FULL); + if (IS_ERR(mc)) + GOTO(out_unlock_parent, rc = PTR_ERR(mc)); + + if (lu_object_assert_not_exists(info->mti_ctxt, &mc->mot_obj.mo_lu)) + GOTO(out_unlock_child, rc = -ENOENT); + + /* NB: Be aware of Bug 2029 */ + + /*step 3: deal with orphan */ + + /* If this is potentially the last reference to this inode, get the + * OBD EA data first so the client can destroy OST objects. We + * only do the object removal later if no open files/links remain. */ + + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + rc = mo_attr_get(info->mti_ctxt, mdt_object_child(mc), attr); + if (rc != 0) + GOTO(out_unlock_child, rc); + + if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1)) + /* && if opencount == 0*/ + /* see mds_reint */ + void * lmm = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_MD); + int len = req_capsule_get_size(&info->mti_pill, + &RMF_MDT_MD, + RCL_SERVER); + mdt_pack_attr2body(repbody, attr); + repbody->fid1 = *mdt_object_fid(mc); + repbody->valid |= OBD_MD_FLID; + + rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(mc), + lmm, len, "lov"); + if (rc < 0) + GOTO(out_unlock_child, rc); + + if (S_ISDIR(attr->la_mode)) + repbody->valid |= OBD_MD_FLDIREA; + else + repbody->valid |= OBD_MD_FLEASIZE; + repbody->eadatasize = rc; + rc = 0; + } + + /* step 4: delete it */ + rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mc), + mdt_object_child(mc), rr->rr_name); + + /*step 5: orphan handling & recovery issue */ + if (rc == 0) { + /* FIXME & TODO: + * 1. orphan handling here + * 2. Please deal with logcookies here */ + } + GOTO(out_unlock_child, rc); +out_unlock_child: + mdt_object_unlock(info->mti_mdt->mdt_namespace, mc, lhc); + mdt_object_put(info->mti_ctxt, mc); +out_unlock_parent: + mdt_object_unlock(info->mti_mdt->mdt_namespace, mp, lhp); + mdt_object_put(info->mti_ctxt, mp); + return rc; +#endif + ENTRY; RETURN(-EOPNOTSUPP); } static int mdt_reint_link(struct mdt_thread_info *info) { +#ifdef MDT_CODE + struct lu_attr *attr = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + struct req_capsule *pill = &info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *ms; + struct mdt_object *mt; + struct mdt_lock_handle *lhs; + struct mdt_lock_handle *lht; + struct mdt_body *repbody; + int rc; + + ENTRY; + + DEBUG_REQ(D_INODE, req, "link original "DFID3" to "DFID3" %s", + PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), rr->rr_name); + + /* MDS_CHECK_RESENT here */ + + rc = req_capsule_pack(pill); + if (rc) + RETURN(rc); + + if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) + RETURN(-EROFS); + + /* step 1: lock the source */ + lhs = &info->mti_lh[MDT_LH_PARENT]; + lhs->mlh_mode = LCK_EX; + ms = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE); + if (IS_ERR(ms)) + RETURN(PTR_ERR(ms)); + + /*step 2: find & lock the target */ + lht = &info->mti_lh[MDT_LH_CHILD]; + lht->mlh_mode = LCK_EX; + mt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid2, lht, MDS_INODELOCK_UPDATE); + if (IS_ERR(mt)) + GOTO(out_unlock_source, rc = PTR_ERR(mt)); + + /* step 3: do some checking :TODO*/ + /* if isorphan(ms) + * return -ENOENT + */ + + /* step 4: link it */ + rc = mdo_link(info->mti_ctxt, mdt_object_child(mt), + mdt_object_child(ms), rr->rr_name); + GOTO(out_unlock_target, rc); +out_unlock_target: + mdt_object_unlock(info->mti_mdt->mdt_namespace, mt, lht); + mdt_object_put(info->mti_ctxt, mt); +out_unlock_source: + mdt_object_unlock(info->mti_mdt->mdt_namespace, ms, lhs); + mdt_object_put(info->mti_ctxt, ms); + return rc; +#endif + ENTRY; RETURN(-EOPNOTSUPP); } @@ -278,6 +449,119 @@ static int mdt_reint_link(struct mdt_thread_info *info) static int mdt_reint_rename(struct mdt_thread_info *info) { +#ifdef MDT_CODE + struct lu_attr *attr = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + struct req_capsule *pill = &info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *msrcdir; + struct mdt_object *mtgtdir; + struct mdt_object *mold; + struct mdt_object *mnew; + struct mdt_lock_handle *lh_srcdirp; + struct mdt_lock_handle *lh_tgtdirp; + struct mdt_lock_handle lh_old; + struct mdt_lock_handle lh_new; + struct lu_fid old_fid = {0}; + struct lu_fid new_fid = {0}; + struct mdt_body *repbody; + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; + int rc; + + ENTRY; + + DEBUG_REQ(D_INODE, req, "rename "DFID3"/%s to "DFID3" %s", + PFID3(rr->rr_fid1), rr->rr_tgt, + PFID3(rr->rr_fid2), rr->rr_name); + + /* MDS_CHECK_RESENT here */ + + rc = req_capsule_pack(pill); + if (rc) + RETURN(rc); + + if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) + RETURN(-EROFS); + + /*FIXME: The lock order is important. Which algorithm should we take? + * parent/child order? lock resource order? or others? + */ + + /* step 1: lock the source dir */ + lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; + lh_srcdirp->mlh_mode = LCK_EX; + msrcdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid1, lh_srcdirp, + MDS_INODELOCK_UPDATE); + if (IS_ERR(msrcdir)) + GOTO(out, rc = PTR_ERR(msrcdir)); + + /*step 2: find & lock the target dir*/ + lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; + lh_tgtdirp->mlh_mode = LCK_EX; + mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + rr->rr_fid2, lh_tgtdirp, + MDS_INODELOCK_UPDATE); + if (IS_ERR(mtgtdir)) + GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir)); + + + /*step 3: find & lock the old object*/ + rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir), + rr->rr_name, &old_fid); + if (rc) { + GOTO(out_unlock_target, rc); + } + lh_old.mlh_mode = LCK_EX; + mold = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + &old_fid, &lh_old, + MDS_INODELOCK_LOOKUP); + if (IS_ERR(mold)) + GOTO(out_unlock_target, rc = PTR_ERR(mold)); + + /*step 4: find & lock the new object*/ + /* new object may not exist now */ + rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir), + rr->rr_tgt, &new_fid); + if (rc && rc != -ENOENT) { + GOTO(out_unlock_old, rc); + } + /* NB: the new_fid may be zero at this moment*/ + lh_new.mlh_mode = LCK_EX; + mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, + &new_fid, &lh_new, + MDS_INODELOCK_FULL); + if (IS_ERR(mnew)) + GOTO(out_unlock_old, rc = PTR_ERR(mnew)); + + + /* step 5:TODO orphan handling on new object*/ + /* if isorphan(mnew) {...} + */ + + /* step 6: rename it */ + rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir), + mdt_object_child(mtgtdir), &old_fid, + rr->rr_name, mdt_object_child(mnew), + rr->rr_tgt); + GOTO(out_unlock_new, rc); +out_unlock_new: + mdt_object_unlock(ns, mnew, &lh_new); + mdt_object_put(info->mti_ctxt, mnew); +out_unlock_old: + mdt_object_unlock(ns, mold, &lh_old); + mdt_object_put(info->mti_ctxt, mold); +out_unlock_target: + mdt_object_unlock(ns, mtgtdir, lh_tgtdirp); + mdt_object_put(info->mti_ctxt, mtgtdir); +out_unlock_source: + mdt_object_unlock(ns, msrcdir, lh_srcdirp); + mdt_object_put(info->mti_ctxt, msrcdir); +out: + return rc; +#endif + + ENTRY; RETURN(-EOPNOTSUPP); } @@ -293,6 +577,7 @@ static int mdt_reint_open(struct mdt_thread_info *info) struct ptlrpc_request *req = mdt_info_req(info); struct mdt_body *body = info->mti_reint_rep.mrr_body; struct lov_mds_md *lmm = info->mti_reint_rep.mrr_md; + struct mdt_reint_record *rr = &info->mti_rr; int created = 0; struct lu_fid child_fid; ENTRY; @@ -303,15 +588,13 @@ static int mdt_reint_open(struct mdt_thread_info *info) req_capsule_pack(&info->mti_pill); ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP); - parent = mdt_object_find_lock(info->mti_ctxt, - mdt, info->mti_rr.rr_fid1, - lh, - MDS_INODELOCK_UPDATE); + parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1, + lh, MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) RETURN(PTR_ERR(parent)); result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent), - info->mti_rr.rr_name, &child_fid); + rr->rr_name, &child_fid); if (result && result != -ENOENT) { GOTO(out_parent, result); } @@ -338,7 +621,7 @@ static int mdt_reint_open(struct mdt_thread_info *info) /* not found and with MDS_OPEN_CREAT: let's create something */ result = mdo_create(info->mti_ctxt, mdt_object_child(parent), - info->mti_rr.rr_name, + rr->rr_name, mdt_object_child(child), &info->mti_attr); intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE); @@ -399,7 +682,7 @@ static int mdt_reint_open(struct mdt_thread_info *info) destroy_child: if (result != 0 && created) mdo_unlink(info->mti_ctxt, mdt_object_child(parent), - mdt_object_child(child), info->mti_rr.rr_name); + mdt_object_child(child), rr->rr_name); out_child: mdt_object_put(info->mti_ctxt, child); out_parent: -- 1.8.3.1