Whamcloud - gitweb
added some partial operation support.
authorhuanghua <huanghua>
Mon, 26 Jun 2006 17:27:36 +0000 (17:27 +0000)
committerhuanghua <huanghua>
Mon, 26 Jun 2006 17:27:36 +0000 (17:27 +0000)
lustre/mdt/mdt_reint.c

index 86e0d81..3f7422a 100644 (file)
@@ -288,7 +288,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
 
         rc = req_capsule_pack(pill);
         if (rc)
-                RETURN(rc);        
+                RETURN(rc);
 
         if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 RETURN(-EROFS);
@@ -301,6 +301,12 @@ static int mdt_reint_unlink(struct mdt_thread_info *info)
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
+        if (strlen(rr->rr_name) == 0) {
+                /* remote partial operation */
+                rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp));
+                GOTO(out_unlock_parent, rc);
+        }
+
         /*step 2: find & lock the child */
         lhc = &info->mti_lh[MDT_LH_CHILD];
         lhc->mlh_mode = LCK_EX;
@@ -416,6 +422,11 @@ static int mdt_reint_link(struct mdt_thread_info *info)
         if (IS_ERR(ms))
                 RETURN(PTR_ERR(ms));
 
+        if (strlen(rr->rr_name) == 0) {
+                /* remote partial operation */
+                rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
+                GOTO(out_unlock_source, rc);
+        }
         /*step 2: find & lock the target */
         lht = &info->mti_lh[MDT_LH_CHILD];
         lht->mlh_mode = LCK_EX;
@@ -446,6 +457,79 @@ out_unlock_source:
         RETURN(-EOPNOTSUPP);
 }
 
+/* partial operation for rename */
+static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
+{
+#ifdef MDT_CODE
+        struct lu_attr *attr = &info->mti_attr;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct req_capsule *pill = &info->mti_pill;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_object *mtgtdir;
+        struct mdt_object *mtgt = NULL;
+        struct mdt_lock_handle *lh_tgtdir;
+        struct mdt_lock_handle *lh_tgt;
+        struct mdt_body *repbody;
+        struct lu_fid tgt_fid;
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+        int rc;
+
+        ENTRY;
+
+        DEBUG_REQ(D_INODE, req, "rename_tgr "DFID3" to "DFID3" %s",
+                  PFID3(rr->rr_fid2),
+                  PFID3(rr->rr_fid1), rr->rr_tgt);
+
+        /* step 1: lookup & lock the tgt dir */
+        lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
+        lh_tgtdir->mlh_mode = LCK_PW;
+        mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                       rr->rr_fid1, lh_tgtdir, 
+                                       MDS_INODELOCK_UPDATE);
+        if (IS_ERR(mtgtdir))
+                GOTO(out, rc = PTR_ERR(mtgtdir));
+
+        /*step 2: find & lock the target object if exists*/
+        rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
+                        rr->rr_tgt, &tgt_fid);
+        if (rc && rc != -ENOENT)
+                GOTO(out_unlock_tgtdir, rc);
+        
+        if (rc == 0) {
+                lh_tgt = &info->mti_lh[MDT_LH_CHILD];
+                lh_tgt->mlh_mode = LCK_EX;
+                mtgt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                            &tgt_fid, lh_tgt, 
+                                            MDS_INODELOCK_LOOKUP);
+                if (IS_ERR(mold))
+                        GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
+        }
+        
+        /* step 3: rename_tgt or name_insert */
+        if (mtgt)
+                rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
+                                    mdt_object_child(mtgt), 
+                                    &rr->rr_fid2, rr->rr_tgt);
+        else
+                rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
+                                     rr->rr_tgt, &rr->rr_fid2);
+                                     
+        GOTO(out_unlock_tgt, rc);
+out_unlock_tgt:
+        if (mtgt) {
+                mdt_object_unlock(ns, mtgt, lh_tgt);
+                mdt_object_put(info->mti_ctxt, mtgt);
+        }
+out_unlock_tgtdir:
+        mdt_object_unlock(ns, msrcdir, lh_tgtdir);
+        mdt_object_put(info->mti_ctxt, mtgtdir);
+out:
+        return rc;
+#endif
+}
+
+
 
 static int mdt_reint_rename(struct mdt_thread_info *info)
 {
@@ -457,7 +541,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         struct mdt_object *msrcdir;
         struct mdt_object *mtgtdir;
         struct mdt_object *mold;
-        struct mdt_object *mnew;
+        struct mdt_object *mnew = NULL;
         struct mdt_lock_handle *lh_srcdirp;
         struct mdt_lock_handle *lh_tgtdirp;
         struct mdt_lock_handle lh_old;
@@ -483,9 +567,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                 RETURN(-EROFS);
 
-        /*FIXME: The lock order is important. Which algorithm should we take?
-         * parent/child order? lock resource order? or others?
-         */
+        rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+        if (rc == 1) {
+        /* if (strlen(rr->rr_name) == 0) {*/
+                RETURN(mdt_reint_rename_tgt(info));
+        }
 
         /* step 1: lock the source dir */
         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
@@ -509,9 +595,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /*step 3: find & lock the old object*/
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
                         rr->rr_name, &old_fid);
-        if (rc) {
+        if (rc) 
                 GOTO(out_unlock_target, rc);
-        }
         lh_old.mlh_mode = LCK_EX;
         mold = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
                                     &old_fid, &lh_old, 
@@ -520,20 +606,21 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
         
         /*step 4: find & lock the new object*/
-        /* new object may not exist now */
+        /* new target object may not exist now */
         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
                         rr->rr_tgt, &new_fid);
-        if (rc && rc != -ENOENT) {
+        if (rc && rc != -ENOENT)
                 GOTO(out_unlock_old, rc);
-        }
+
         /* NB: the new_fid may be zero at this moment*/
-        lh_new.mlh_mode = LCK_EX;
-        mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
-                                    &new_fid, &lh_new, 
-                                    MDS_INODELOCK_FULL);
-        if (IS_ERR(mnew))
-                GOTO(out_unlock_old, rc = PTR_ERR(mnew));
-        
+        if (rc == 0) { 
+                lh_new.mlh_mode = LCK_EX;
+                mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                            &new_fid, &lh_new, 
+                                            MDS_INODELOCK_FULL);
+                if (IS_ERR(mnew))
+                        GOTO(out_unlock_old, rc = PTR_ERR(mnew));
+        }
 
         /* step 5:TODO orphan handling on new object*/
         /* if isorphan(mnew) {...}
@@ -542,12 +629,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         /* step 6: rename it */
         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
                         mdt_object_child(mtgtdir), &old_fid,
-                        rr->rr_name, mdt_object_child(mnew),
+                        rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
                         rr->rr_tgt);
         GOTO(out_unlock_new, rc);
 out_unlock_new:
-        mdt_object_unlock(ns, mnew, &lh_new);
-        mdt_object_put(info->mti_ctxt, mnew);
+        if (mnew) {
+                mdt_object_unlock(ns, mnew, &lh_new);
+                mdt_object_put(info->mti_ctxt, mnew);
+        }
 out_unlock_old:
         mdt_object_unlock(ns, mold, &lh_old);
         mdt_object_put(info->mti_ctxt, mold);