Whamcloud - gitweb
LU-2739 mdt: Deny non-DNE client to access remote directory
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 171ee3e..f16da5d 100644 (file)
@@ -91,7 +91,8 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
                                 struct mdt_object *o, __u64 *version)
 {
         LASSERT(o);
-       if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
+       if (mdt_object_exists(o) && !mdt_object_remote(o) &&
+           !mdt_object_obf(o))
                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
                 *version = ENOENT_VERSION;
@@ -305,7 +306,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
-               if (mdt_object_exists(child) < 0) {
+               if (mdt_object_remote(child)) {
                        struct seq_server_site *ss;
                        struct lu_ucred *uc  = mdt_ucred(info);
 
@@ -325,6 +326,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                                       mdt2obd_dev(mdt)->obd_name);
                                GOTO(out_put_child, rc = -EPERM);
                        }
+                       if (!mdt_is_dne_client(mdt_info_req(info)->rq_export)) {
+                               /* Return -EIO for old client */
+                               GOTO(out_put_child, rc = -EIO);
+                       }
+
                }
                 ma->ma_need = MA_INODE;
                 ma->ma_valid = 0;
@@ -385,14 +391,18 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
         int rc;
         ENTRY;
 
-        /* attr shouldn't be set on remote object */
-        LASSERT(mdt_object_exists(mo) >= 0);
+       /* attr shouldn't be set on remote object */
+       LASSERT(!mdt_object_remote(mo));
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
-        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
-                lockpart |= MDS_INODELOCK_LOOKUP;
+       /* Even though the new MDT will grant PERM lock to the old
+        * client, but the old client will almost ignore that during
+        * So it needs to revoke both LOOKUP and PERM lock here, so
+        * both new and old client can cancel the dcache */
+       if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+               lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
 
         rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
         if (rc != 0)
@@ -691,7 +701,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-       if (mdt_object_exists(mp) < 0) {
+       if (mdt_object_remote(mp)) {
                mdt_lock_reg_init(parent_lh, LCK_EX);
                rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
                                            parent_lh->mlh_rreg_mode,
@@ -728,7 +738,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         child_lh = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(child_lh, LCK_EX);
-       if (mdt_object_exists(mc) < 0) {
+       if (mdt_object_remote(mc)) {
                struct mdt_body  *repbody;
 
                if (!fid_is_zero(rr->rr_fid2)) {
@@ -741,6 +751,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                       mdt2obd_dev(info->mti_mdt)->obd_name,
                       (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
 
+               if (!mdt_is_dne_client(req->rq_export))
+                       /* Return -EIO for old client */
+                       GOTO(unlock_parent, rc = -EIO);
+
                if (info->mti_spec.sp_rm_entry) {
                        struct lu_ucred *uc  = mdt_ucred(info);
 
@@ -892,6 +906,13 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
+       if (mdt_object_remote(ms)) {
+               mdt_object_put(info->mti_env, ms);
+               CERROR("Target directory "DFID" is on another MDT\n",
+                       PFID(rr->rr_fid1));
+               GOTO(out_unlock_parent, rc = -EXDEV);
+       }
+
         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
                             MDT_CROSS_LOCK);
         if (rc != 0) {
@@ -1129,20 +1150,26 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (rc)
                         GOTO(out_put_target, rc);
 
-                rc = mdt_object_exists(mtgtdir);
-                if (rc == 0) {
-                        GOTO(out_put_target, rc = -ESTALE);
-                } else if (rc > 0) {
-                        /* we lock the target dir if it is local */
-                        rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
-                                             MDS_INODELOCK_UPDATE,
-                                             MDT_LOCAL_LOCK);
-                        if (rc != 0)
-                                GOTO(out_put_target, rc);
-                        /* get and save correct version after locking */
-                        mdt_version_get_save(info, mtgtdir, 1);
-                }
-        }
+               if (unlikely(mdt_object_remote(mtgtdir))) {
+                       CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
+                              "on different MDTs\n", PFID(rr->rr_fid1),
+                              PFID(rr->rr_fid2));
+                       GOTO(out_put_target, rc = -EXDEV);
+               } else {
+                       if (likely(mdt_object_exists(mtgtdir))) {
+                               /* we lock the target dir if it is local */
+                               rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
+                                                    MDS_INODELOCK_UPDATE,
+                                                    MDT_LOCAL_LOCK);
+                               if (rc != 0)
+                                       GOTO(out_put_target, rc);
+                               /* get and save correct version after locking */
+                               mdt_version_get_save(info, mtgtdir, 1);
+                       } else {
+                               GOTO(out_put_target, rc = -ESTALE);
+                       }
+               }
+       }
 
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
@@ -1155,9 +1182,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                 GOTO(out_unlock_target, rc = -EINVAL);
 
-        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
-        if (IS_ERR(mold))
-                GOTO(out_unlock_target, rc = PTR_ERR(mold));
+       mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
+       if (IS_ERR(mold))
+               GOTO(out_unlock_target, rc = PTR_ERR(mold));
+       if (mdt_object_remote(mold)) {
+               mdt_object_put(info->mti_env, mold);
+               CDEBUG(D_INFO, "Source child "DFID" is on another MDT\n",
+                      PFID(old_fid));
+               GOTO(out_unlock_target, rc = -EXDEV);
+       }
 
        if (mdt_object_obf(mold)) {
                mdt_object_put(info->mti_env, mold);
@@ -1203,6 +1236,13 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                        GOTO(out_unlock_old, rc = -EPERM);
                }
 
+               if (mdt_object_remote(mnew)) {
+                       mdt_object_put(info->mti_env, mnew);
+                       CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
+                              PFID(new_fid));
+                       GOTO(out_unlock_old, rc = -EXDEV);
+               }
+
                 rc = mdt_object_lock(info, mnew, lh_newp,
                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
                 if (rc != 0) {