Whamcloud - gitweb
added more code for reint (link/unlink/rename).
authorhuanghua <huanghua>
Mon, 26 Jun 2006 09:50:51 +0000 (09:50 +0000)
committerhuanghua <huanghua>
Mon, 26 Jun 2006 09:50:51 +0000 (09:50 +0000)
lustre/mdt/mdt_reint.c

index e3e1dcd..4062f3a 100644 (file)
@@ -177,7 +177,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
                 locked = 1;
         }
 
-        LASSERT(lu_object_assert_exists(info->mti_ctxt, &mo->mot_obj.mo_lu));
+        if (lu_object_assert_not_exists(info->mti_ctxt, &mo->mot_obj.mo_lu))
+                GOTO(out_unlock, rc = -ENOENT);
 
         rc = mo_attr_set(info->mti_ctxt, mdt_object_child(mo), attr);
         if (rc != 0)
@@ -205,14 +206,14 @@ static int mdt_reint_setattr(struct mdt_thread_info *info)
         if (rc)
                 GOTO(out_unlock, rc);
 
-        /* FIXME Please deal with logcookies here*/
-
+        /* FIXME & TODO Please deal with logcookies here*/
+        GOTO(out_unlock, rc);
 out_unlock:
         if (locked) {
                 mdt_object_unlock(info->mti_mdt->mdt_namespace, mo, lh);
         }
         mdt_object_put(info->mti_ctxt, mo);
-        RETURN(rc);
+        return (rc);
 #endif
         ENTRY;
         RETURN(-EOPNOTSUPP);
@@ -265,12 +266,182 @@ static int mdt_reint_create(struct mdt_thread_info *info)
 
 static int mdt_reint_unlink(struct mdt_thread_info *info)
 {
+#ifdef MDT_CODE
+        struct lu_attr *attr = &info->mti_attr;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct req_capsule *pill = &info->mti_pill;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_object *mp;
+        struct mdt_object *mc;
+        struct mdt_lock_handle *lhp;
+        struct mdt_lock_handle *lhc;
+        struct mdt_body *repbody;
+        struct lu_fid child_fid;
+        int rc;
+
+        ENTRY;
+
+        DEBUG_REQ(D_INODE, req, "unlink "DFID3"/"DFID3, PFID3(rr->rr_fid1),
+                  PFID3(rr->rr_fid2));
+
+        /* MDS_CHECK_RESENT here */
+
+        rc = req_capsule_pack(pill);
+        if (rc)
+                RETURN(rc);        
+
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                RETURN(-EROFS);
+
+        /* step 1: lock the parent */
+        lhp = &info->mti_lh[MDT_LH_PARENT];
+        lhp->mlh_mode = LCK_EX;
+        mp = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                  rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(mp))
+                RETURN(PTR_ERR(mp));
+
+        /*step 2: find & lock the child */
+        lhc = &info->mti_lh[MDT_LH_CHILD];
+        lhc->mlh_mode = LCK_EX;
+        rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
+                        rr->rr_name, &child_fid);
+        if (rc) {
+                GOTO(out_unlock_parent, rc);
+        }
+
+        mc = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt, &child_fid,
+                                  lhc, MDS_INODELOCK_FULL);
+        if (IS_ERR(mc))
+                GOTO(out_unlock_parent, rc = PTR_ERR(mc));
+
+        if (lu_object_assert_not_exists(info->mti_ctxt, &mc->mot_obj.mo_lu))
+                GOTO(out_unlock_child, rc = -ENOENT);
+
+        /* NB: Be aware of Bug 2029 */
+        
+        /*step 3: deal with orphan */
+
+        /* If this is potentially the last reference to this inode, get the
+         * OBD EA data first so the client can destroy OST objects.  We
+         * only do the object removal later if no open files/links remain. */
+
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        rc = mo_attr_get(info->mti_ctxt, mdt_object_child(mc), attr);
+        if (rc != 0)
+                GOTO(out_unlock_child, rc);
+
+        if ((S_ISREG(attr->la_mode) && attr->la_nlink == 1)) 
+                /* && if opencount == 0*/
+                /* see mds_reint */
+                void * lmm = req_capsule_server_get(&info->mti_pill,
+                                     &RMF_MDT_MD);
+                int len = req_capsule_get_size(&info->mti_pill,
+                                               &RMF_MDT_MD,
+                                               RCL_SERVER);
+                mdt_pack_attr2body(repbody, attr);
+                repbody->fid1 = *mdt_object_fid(mc);
+                repbody->valid |= OBD_MD_FLID;
+
+                rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(mc),
+                                  lmm, len, "lov");
+                if (rc < 0)
+                        GOTO(out_unlock_child, rc);
+
+                if (S_ISDIR(attr->la_mode))
+                        repbody->valid |= OBD_MD_FLDIREA;
+                else
+                        repbody->valid |= OBD_MD_FLEASIZE;
+                repbody->eadatasize = rc;
+                rc = 0;
+        }
+
+        /* step 4: delete it */
+        rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mc),
+                        mdt_object_child(mc), rr->rr_name);
+
+        /*step 5: orphan handling & recovery issue */
+        if (rc == 0) {
+                /* FIXME & TODO:
+                 * 1. orphan handling here
+                 * 2. Please deal with logcookies here */ 
+        }
+        GOTO(out_unlock_child, rc);
+out_unlock_child:
+        mdt_object_unlock(info->mti_mdt->mdt_namespace, mc, lhc);
+        mdt_object_put(info->mti_ctxt, mc);
+out_unlock_parent:
+        mdt_object_unlock(info->mti_mdt->mdt_namespace, mp, lhp);
+        mdt_object_put(info->mti_ctxt, mp);
+        return rc;
+#endif
+
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
 
 static int mdt_reint_link(struct mdt_thread_info *info)
 {
+#ifdef MDT_CODE
+        struct lu_attr *attr = &info->mti_attr;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct req_capsule *pill = &info->mti_pill;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_object *ms;
+        struct mdt_object *mt;
+        struct mdt_lock_handle *lhs;
+        struct mdt_lock_handle *lht;
+        struct mdt_body *repbody;
+        int rc;
+
+        ENTRY;
+
+        DEBUG_REQ(D_INODE, req, "link original "DFID3" to "DFID3" %s",
+                  PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), rr->rr_name);
+
+        /* MDS_CHECK_RESENT here */
+
+        rc = req_capsule_pack(pill);
+        if (rc)
+                RETURN(rc);        
+
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                RETURN(-EROFS);
+
+        /* step 1: lock the source */
+        lhs = &info->mti_lh[MDT_LH_PARENT];
+        lhs->mlh_mode = LCK_EX;
+        ms = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                  rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(ms))
+                RETURN(PTR_ERR(ms));
+
+        /*step 2: find & lock the target */
+        lht = &info->mti_lh[MDT_LH_CHILD];
+        lht->mlh_mode = LCK_EX;
+        mt = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                  rr->rr_fid2, lht, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(mt))
+                GOTO(out_unlock_source, rc = PTR_ERR(mt));
+
+        /* step 3: do some checking :TODO*/
+        /* if isorphan(ms)
+         *      return -ENOENT
+         */
+
+        /* step 4: link it */
+        rc = mdo_link(info->mti_ctxt, mdt_object_child(mt),
+                      mdt_object_child(ms), rr->rr_name);
+        GOTO(out_unlock_target, rc);
+out_unlock_target:
+        mdt_object_unlock(info->mti_mdt->mdt_namespace, mt, lht);
+        mdt_object_put(info->mti_ctxt, mt);
+out_unlock_source:
+        mdt_object_unlock(info->mti_mdt->mdt_namespace, ms, lhs);
+        mdt_object_put(info->mti_ctxt, ms);
+        return rc;
+#endif
+
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
@@ -278,6 +449,119 @@ static int mdt_reint_link(struct mdt_thread_info *info)
 
 static int mdt_reint_rename(struct mdt_thread_info *info)
 {
+#ifdef MDT_CODE
+        struct lu_attr *attr = &info->mti_attr;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct req_capsule *pill = &info->mti_pill;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_object *msrcdir;
+        struct mdt_object *mtgtdir;
+        struct mdt_object *mold;
+        struct mdt_object *mnew;
+        struct mdt_lock_handle *lh_srcdirp;
+        struct mdt_lock_handle *lh_tgtdirp;
+        struct mdt_lock_handle lh_old;
+        struct mdt_lock_handle lh_new;
+        struct lu_fid old_fid = {0};
+        struct lu_fid new_fid = {0};
+        struct mdt_body *repbody;
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+        int rc;
+
+        ENTRY;
+
+        DEBUG_REQ(D_INODE, req, "rename "DFID3"/%s to "DFID3" %s",
+                  PFID3(rr->rr_fid1), rr->rr_tgt,
+                  PFID3(rr->rr_fid2), rr->rr_name);
+
+        /* MDS_CHECK_RESENT here */
+
+        rc = req_capsule_pack(pill);
+        if (rc)
+                RETURN(rc);
+
+        if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                RETURN(-EROFS);
+
+        /*FIXME: The lock order is important. Which algorithm should we take?
+         * parent/child order? lock resource order? or others?
+         */
+
+        /* step 1: lock the source dir */
+        lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
+        lh_srcdirp->mlh_mode = LCK_EX;
+        msrcdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                       rr->rr_fid1, lh_srcdirp, 
+                                       MDS_INODELOCK_UPDATE);
+        if (IS_ERR(msrcdir))
+                GOTO(out, rc = PTR_ERR(msrcdir));
+
+        /*step 2: find & lock the target dir*/
+        lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
+        lh_tgtdirp->mlh_mode = LCK_EX;
+        mtgtdir = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                       rr->rr_fid2, lh_tgtdirp, 
+                                       MDS_INODELOCK_UPDATE);
+        if (IS_ERR(mtgtdir))
+                GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
+
+        
+        /*step 3: find & lock the old object*/
+        rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
+                        rr->rr_name, &old_fid);
+        if (rc) {
+                GOTO(out_unlock_target, rc);
+        }
+        lh_old.mlh_mode = LCK_EX;
+        mold = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                    &old_fid, &lh_old, 
+                                    MDS_INODELOCK_LOOKUP);
+        if (IS_ERR(mold))
+                GOTO(out_unlock_target, rc = PTR_ERR(mold));
+        
+        /*step 4: find & lock the new object*/
+        /* new object may not exist now */
+        rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
+                        rr->rr_tgt, &new_fid);
+        if (rc && rc != -ENOENT) {
+                GOTO(out_unlock_old, rc);
+        }
+        /* NB: the new_fid may be zero at this moment*/
+        lh_new.mlh_mode = LCK_EX;
+        mnew = mdt_object_find_lock(info->mti_ctxt, info->mti_mdt,
+                                    &new_fid, &lh_new, 
+                                    MDS_INODELOCK_FULL);
+        if (IS_ERR(mnew))
+                GOTO(out_unlock_old, rc = PTR_ERR(mnew));
+        
+
+        /* step 5:TODO orphan handling on new object*/
+        /* if isorphan(mnew) {...}
+         */
+
+        /* step 6: rename it */
+        rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
+                        mdt_object_child(mtgtdir), &old_fid,
+                        rr->rr_name, mdt_object_child(mnew),
+                        rr->rr_tgt);
+        GOTO(out_unlock_new, rc);
+out_unlock_new:
+        mdt_object_unlock(ns, mnew, &lh_new);
+        mdt_object_put(info->mti_ctxt, mnew);
+out_unlock_old:
+        mdt_object_unlock(ns, mold, &lh_old);
+        mdt_object_put(info->mti_ctxt, mold);
+out_unlock_target:
+        mdt_object_unlock(ns, mtgtdir, lh_tgtdirp);
+        mdt_object_put(info->mti_ctxt, mtgtdir);
+out_unlock_source:
+        mdt_object_unlock(ns, msrcdir, lh_srcdirp);
+        mdt_object_put(info->mti_ctxt, msrcdir);
+out:
+        return rc;
+#endif
+
+
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
@@ -293,6 +577,7 @@ static int mdt_reint_open(struct mdt_thread_info *info)
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct mdt_body        *body = info->mti_reint_rep.mrr_body;
         struct lov_mds_md      *lmm  = info->mti_reint_rep.mrr_md;
+        struct mdt_reint_record *rr = &info->mti_rr;
         int                     created = 0;
         struct lu_fid           child_fid;
         ENTRY;
@@ -303,15 +588,13 @@ static int mdt_reint_open(struct mdt_thread_info *info)
         req_capsule_pack(&info->mti_pill);
         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
 
-        parent = mdt_object_find_lock(info->mti_ctxt,
-                                      mdt, info->mti_rr.rr_fid1,
-                                      lh,
-                                      MDS_INODELOCK_UPDATE);
+        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
+                                      lh, MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
-                            info->mti_rr.rr_name, &child_fid);
+                            rr->rr_name, &child_fid);
         if (result && result != -ENOENT) {
                 GOTO(out_parent, result);
         }
@@ -338,7 +621,7 @@ static int mdt_reint_open(struct mdt_thread_info *info)
                 /* not found and with MDS_OPEN_CREAT: let's create something */
                 result = mdo_create(info->mti_ctxt,
                                     mdt_object_child(parent),
-                                    info->mti_rr.rr_name,
+                                    rr->rr_name,
                                     mdt_object_child(child),
                                     &info->mti_attr);
                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
@@ -399,7 +682,7 @@ static int mdt_reint_open(struct mdt_thread_info *info)
 destroy_child:
         if (result != 0 && created)
                 mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
-                           mdt_object_child(child), info->mti_rr.rr_name);
+                           mdt_object_child(child), rr->rr_name);
 out_child:
         mdt_object_put(info->mti_ctxt, child);
 out_parent: