Whamcloud - gitweb
LU-11135 mdt: LASSERT(lu_object_exists(o)) fails
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index f72289e..62308a8 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -290,82 +290,49 @@ static int mdt_remote_permission(struct mdt_thread_info *info)
 }
 
 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
-                            struct mdt_object *obj, __u64 ibits,
-                            struct mdt_lock_handle *s0_lh,
-                            struct mdt_object *s0_obj,
+                            struct mdt_object *obj,
                             struct ldlm_enqueue_info *einfo,
                             int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
+       struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
        struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int i;
-       int rc;
-       ENTRY;
-
-       if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
 
-       /* Unlock stripe 0 */
-       if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
-               LASSERT(s0_obj != NULL);
-               mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
-       }
+       LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+       LASSERT(slave_locks);
 
        memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = ibits;
-
-       if (slave_locks != NULL) {
-               LASSERT(s0_lh != NULL);
-               for (i = 1; i < slave_locks->count; i++) {
-                       /* borrow s0_lh temporarily to do mdt unlock */
-                       mdt_lock_reg_init(s0_lh, einfo->ei_mode);
-                       s0_lh->mlh_rreg_lh = slave_locks->handles[i];
-                       mdt_object_unlock(mti, NULL, s0_lh, decref);
-                       slave_locks->handles[i].cookie = 0ull;
-               }
+       policy->l_inodebits.bits = einfo->ei_inodebits;
+       mdt_lock_handle_init(lh);
+       mdt_lock_reg_init(lh, einfo->ei_mode);
+       for (i = 0; i < slave_locks->ha_count; i++) {
+               if (test_bit(i, (void *)slave_locks->ha_map))
+                       lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+               else
+                       lh->mlh_reg_lh = slave_locks->ha_handles[i];
+               mdt_object_unlock(mti, NULL, lh, decref);
+               slave_locks->ha_handles[i].cookie = 0ull;
        }
 
-       rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
-                             policy);
-       RETURN(rc);
+       return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+                               policy);
 }
 
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+                                    struct mdt_object *obj)
 {
-       struct lu_buf *buf = &mti->mti_buf;
-       struct lmv_mds_md_v1 *lmv;
        int rc;
-       ENTRY;
 
        if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
+               return 0;
 
-       buf->lb_buf = mti->mti_xattr_buf;
-       buf->lb_len = sizeof(mti->mti_xattr_buf);
-       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
                          XATTR_NAME_LMV);
-       if (rc == -ERANGE) {
-               rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
-               if (rc > 0) {
-                       buf->lb_buf = mti->mti_big_lmm;
-                       buf->lb_len = mti->mti_big_lmmsize;
-               }
-       }
-
-       if (rc == -ENODATA || rc == -ENOENT)
-               RETURN(0);
-
        if (rc <= 0)
-               RETURN(rc);
+               return rc == -ENODATA ? 0 : rc;
 
-       lmv = buf->lb_buf;
-       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
-               RETURN(-EINVAL);
-
-       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
-       RETURN(rc);
+       return 1;
 }
 
 /**
@@ -374,40 +341,12 @@ static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
  **/
 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                           enum ldlm_mode mode, __u64 ibits,
-                          struct lu_fid *s0_fid,
-                          struct mdt_lock_handle *s0_lh,
-                          struct mdt_object **s0_objp,
                           struct ldlm_enqueue_info *einfo)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
-       int rc;
-       ENTRY;
-
-       memset(einfo, 0, sizeof(*einfo));
-
-       rc = mdt_init_slaves(mti, obj, s0_fid);
-       if (rc <= 0)
-               RETURN(rc);
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
 
-       if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
-               /* Except migrating object, whose 0_stripe and master
-                * object are the same object, 0_stripe and master
-                * object are different, though they are in the same
-                * MDT, to avoid adding osd_object_lock here, so we
-                * will enqueue the stripe0 lock in MDT0 for now */
-               *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
-               if (IS_ERR(*s0_objp))
-                       RETURN(PTR_ERR(*s0_objp));
-
-               rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
-               if (rc < 0) {
-                       mdt_object_put(mti->mti_env, *s0_objp);
-                       RETURN(rc);
-               }
-       }
-
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -415,12 +354,56 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 1;
        einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+       einfo->ei_inodebits = ibits;
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
-                           policy);
-       RETURN(rc);
+       return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+                             policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+                                        struct mdt_object *o,
+                                        struct mdt_lock_handle *lh,
+                                        __u64 ibits,
+                                        struct ldlm_enqueue_info *einfo,
+                                        bool cos_incompat)
+{
+       int rc;
+
+       LASSERT(!mdt_object_remote(o));
+
+       memset(einfo, 0, sizeof(*einfo));
+
+       rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+       if (rc)
+               return rc;
+
+       rc = mdt_object_striped(info, o);
+       if (rc != 1) {
+               if (rc < 0)
+                       mdt_object_unlock(info, o, lh, rc);
+               return rc;
+       }
+
+       rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+       if (rc) {
+               mdt_object_unlock(info, o, lh, rc);
+               if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+                       rc = 0;
+       }
+
+       return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+                        struct mdt_lock_handle *lh,
+                        struct ldlm_enqueue_info *einfo, int decref)
+{
+       if (einfo->ei_cbdata)
+               mdt_unlock_slaves(info, o, einfo, decref);
+       mdt_object_unlock(info, o, lh, decref);
 }
 
 /*
@@ -536,15 +519,15 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
                struct mdt_lock_handle *lhc;
-               struct mdt_lock_handle *s0_lh;
-               struct mdt_object *s0_obj = NULL;
-               struct ldlm_enqueue_info *einfo;
-               struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-               bool cos_incompat = false;
-
-               rc = mdt_init_slaves(info, child, s0_fid);
-               if (rc > 0) {
-                       cos_incompat = true;
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+               bool cos_incompat;
+
+               rc = mdt_object_striped(info, child);
+               if (rc < 0)
+                       GOTO(put_child, rc);
+
+               cos_incompat = rc;
+               if (cos_incompat) {
                        if (!mdt_object_remote(parent)) {
                                mdt_object_unlock(info, parent, lh, 1);
                                mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
@@ -556,26 +539,16 @@ static int mdt_create(struct mdt_thread_info *info)
                        }
                }
 
-               einfo = &info->mti_einfo;
                lhc = &info->mti_lh[MDT_LH_CHILD];
                mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PW);
-               rc = mdt_reint_object_lock(info, child, lhc,
-                                          MDS_INODELOCK_UPDATE,
-                                          cos_incompat);
+               rc = mdt_reint_striped_lock(info, child, lhc,
+                                           MDS_INODELOCK_UPDATE, einfo,
+                                           cos_incompat);
                if (rc)
                        GOTO(put_child, rc);
-               mdt_object_unlock(info, child, lhc, rc);
-
-               s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-               mdt_lock_handle_init(s0_lh);
-               mdt_lock_reg_init(s0_lh, LCK_PW);
-               rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
-                                    s0_fid, s0_lh, &s0_obj, einfo);
-               mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
-                                 s0_obj, einfo, rc);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
-                       rc = 0;
+
+               mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
        }
 
        /* Return fid & attr to client. */
@@ -599,19 +572,18 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                        (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
-       bool cos_incompat = false;
+       bool cos_incompat;
        int rc;
        ENTRY;
 
-       rc = mdt_init_slaves(info, mo, s0_fid);
-       if (rc > 0)
-               cos_incompat = true;
+       rc = mdt_object_striped(info, mo);
+       if (rc < 0)
+               RETURN(rc);
 
-        lh = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_reg_init(lh, LCK_PW);
+       cos_incompat = rc;
+
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_reg_init(lh, LCK_PW);
 
        /* Even though the new MDT will grant PERM lock to the old
         * client, but the old client will almost ignore that during
@@ -620,52 +592,41 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
 
-       rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+       rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+                                   cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_PW);
-       rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
-                            einfo);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
-
-        /* all attrs are packed into mti_attr in unpack_setattr */
-        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                       OBD_FAIL_MDS_REINT_SETATTR_WRITE);
-
-        /* This is only for set ctime when rename's source is on remote MDS. */
-        if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
-                ma->ma_attr_flags |= MDS_VTX_BYPASS;
+       /* all attrs are packed into mti_attr in unpack_setattr */
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                      OBD_FAIL_MDS_REINT_SETATTR_WRITE);
 
-        /* VBR: update version if attr changed are important for recovery */
-        if (do_vbr) {
-                /* update on-disk version of changed object */
+       /* VBR: update version if attr changed are important for recovery */
+       if (do_vbr) {
+               /* update on-disk version of changed object */
                tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
-                rc = mdt_version_get_check_save(info, mo, 0);
-                if (rc)
-                        GOTO(out_unlock, rc);
-        }
+               rc = mdt_version_get_check_save(info, mo, 0);
+               if (rc)
+                       GOTO(out_unlock, rc);
+       }
 
        /* Ensure constant striping during chown(). See LU-2789. */
        if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
                mutex_lock(&mo->mot_lov_mutex);
 
-        /* all attrs are packed into mti_attr in unpack_setattr */
-        rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
+       /* all attrs are packed into mti_attr in unpack_setattr */
+       rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
 
        if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
                mutex_unlock(&mo->mot_lov_mutex);
 
-        if (rc != 0)
-                GOTO(out_unlock, rc);
-
-        EXIT;
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+       mdt_dom_obj_lvb_update(info->mti_env, mo, false);
+       EXIT;
 out_unlock:
-       mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
-        mdt_object_unlock(info, mo, lh, rc);
-        return rc;
+       mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
+       return rc;
 }
 
 /**
@@ -731,11 +692,30 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
        if (mdt_object_remote(mo))
                GOTO(out_put, rc = -EREMOTE);
 
-       if ((ma->ma_attr.la_valid & LA_SIZE) ||
-           (rr->rr_flags & MRF_OPEN_TRUNC)) {
+       if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
                /* Check write access for the O_TRUNC case */
                if (mdt_write_read(mo) < 0)
                        GOTO(out_put, rc = -ETXTBSY);
+
+               /* LU-10286: compatibility check for FLR.
+                * Please check the comment in mdt_finish_open() for details */
+               if (!exp_connect_flr(info->mti_exp)) {
+                       rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
+                       if (rc < 0 && rc != -ENODATA)
+                               GOTO(out_put, rc);
+
+                       if (rc > 0 && mdt_lmm_is_flr(info->mti_big_lmm))
+                               GOTO(out_put, rc = -EOPNOTSUPP);
+               }
+
+               /* For truncate, the file size sent from client
+                * is believable, but the blocks are incorrect,
+                * which makes the block size in LSOM attribute
+                * inconsisent with the real block size.
+                */
+               rc = mdt_lsom_update(info, mo, true);
+               if (rc)
+                       GOTO(out_put, rc);
        }
 
        if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
@@ -795,11 +775,11 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
        mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
 
-        EXIT;
+       EXIT;
 out_put:
-        mdt_object_put(info->mti_env, mo);
+       mdt_object_put(info->mti_env, mo);
 out:
-        if (rc == 0)
+       if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_SETATTR);
 
         mdt_client_compatibility(info);
@@ -866,13 +846,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid2;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
        __u64 lock_ibits;
-       bool cos_incompat = false;
+       bool cos_incompat = false, discard = false;
        int no_name = 0;
        int rc;
+
        ENTRY;
 
        DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
@@ -943,11 +921,17 @@ relock:
        if (IS_ERR(mc))
                GOTO(unlock_parent, rc = PTR_ERR(mc));
 
-       if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
-               cos_incompat = true;
-               mdt_object_put(info->mti_env, mc);
-               mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
-               goto relock;
+       if (!cos_incompat) {
+               rc = mdt_object_striped(info, mc);
+               if (rc < 0)
+                       GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+               cos_incompat = rc;
+               if (cos_incompat) {
+                       mdt_object_put(info->mti_env, mc);
+                       mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+                       goto relock;
+               }
        }
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -1014,10 +998,10 @@ relock:
                lock_ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
-                                  cos_incompat);
+       rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+                                   cos_incompat);
        if (rc != 0)
-               GOTO(unlock_child, rc);
+               GOTO(put_child, rc);
 
        /*
         * Now we can only make sure we need MA_INODE, in mdd layer, will check
@@ -1026,13 +1010,6 @@ relock:
        ma->ma_need = MA_INODE;
        ma->ma_valid = 0;
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_EX);
-       rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
-                            s0_lh, &s0_obj, einfo);
-       if (rc != 0)
-               GOTO(unlock_child, rc);
-
        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                       OBD_FAIL_MDS_REINT_UNLINK_WRITE);
        /* save version when object is locked */
@@ -1044,38 +1021,45 @@ relock:
                        mdt_object_child(mc), &rr->rr_name, ma, no_name);
 
        mutex_unlock(&mc->mot_lov_mutex);
+       if (rc != 0)
+               GOTO(unlock_child, rc);
 
-       if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+       if (!lu_object_is_dying(&mc->mot_header)) {
                rc = mdt_attr_get_complex(info, mc, ma);
-       if (rc == 0)
-               mdt_handle_last_unlink(info, mc, ma);
+               if (rc)
+                       GOTO(out_stat, rc);
+       } else {
+               discard = true;
+       }
+       mdt_handle_last_unlink(info, mc, ma);
 
-        if (ma->ma_valid & MA_INODE) {
-                switch (ma->ma_attr.la_mode & S_IFMT) {
-                case S_IFDIR:
+out_stat:
+       if (ma->ma_valid & MA_INODE) {
+               switch (ma->ma_attr.la_mode & S_IFMT) {
+               case S_IFDIR:
                        mdt_counter_incr(req, LPROC_MDT_RMDIR);
-                        break;
-                case S_IFREG:
-                case S_IFLNK:
-                case S_IFCHR:
-                case S_IFBLK:
-                case S_IFIFO:
-                case S_IFSOCK:
+                       break;
+               case S_IFREG:
+               case S_IFLNK:
+               case S_IFCHR:
+               case S_IFBLK:
+               case S_IFIFO:
+               case S_IFSOCK:
                        mdt_counter_incr(req, LPROC_MDT_UNLINK);
-                        break;
-                default:
-                        LASSERTF(0, "bad file type %o unlinking\n",
-                                 ma->ma_attr.la_mode);
-                }
-        }
+                       break;
+               default:
+                       LASSERTF(0, "bad file type %o unlinking\n",
+                               ma->ma_attr.la_mode);
+               }
+       }
 
-        EXIT;
+       EXIT;
 
 unlock_child:
-       mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
-                         rc);
-       mdt_object_unlock(info, mc, child_lh, rc);
+       mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
 put_child:
+       if (discard)
+               mdt_dom_check_and_discard(info, mc);
        mdt_object_put(info->mti_env, mc);
 unlock_parent:
        mdt_object_unlock(info, mp, parent_lh, rc);
@@ -1293,45 +1277,6 @@ static void mdt_rename_unlock(struct lustre_handle *lh)
        EXIT;
 }
 
-/*
- * This is is_subdir() variant, it is CMD if cmm forwards it to correct
- * target. Source should not be ancestor of target dir. May be other rename
- * checks can be moved here later.
- */
-static int mdt_is_subdir(struct mdt_thread_info *info,
-                        struct mdt_object *dir,
-                        const struct lu_fid *fid)
-{
-       struct lu_fid dir_fid = dir->mot_header.loh_fid;
-        int rc = 0;
-        ENTRY;
-
-       /* If the source and target are in the same directory, they can not
-        * be parent/child relationship, so subdir check is not needed */
-       if (lu_fid_eq(&dir_fid, fid))
-               return 0;
-
-       if (!mdt_object_exists(dir))
-               RETURN(-ENOENT);
-
-       rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir),
-                          fid, &dir_fid);
-       if (rc < 0) {
-               CERROR("%s: failed subdir check in "DFID" for "DFID
-                      ": rc = %d\n", mdt_obd_name(info->mti_mdt),
-                      PFID(&dir_fid), PFID(fid), rc);
-               /* Return EINVAL only if a parent is the @fid */
-               if (rc == -EINVAL)
-                       rc = -EIO;
-       } else {
-               /* check the found fid */
-               if (lu_fid_eq(&dir_fid, fid))
-                       rc = -EINVAL;
-       }
-
-        RETURN(rc);
-}
-
 /* Update object linkEA */
 struct mdt_lock_list {
        struct mdt_object       *mll_obj;
@@ -1440,7 +1385,7 @@ again:
                rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, &ibits,
                                         MDS_INODELOCK_UPDATE, true);
                if (!(ibits & MDS_INODELOCK_UPDATE)) {
-                       mdt_unlock_list(info, lock_list, rc);
+                       mdt_unlock_list(info, lock_list, 0);
 
                        CDEBUG(D_INFO, "%s: busy lock on "DFID" %s retry %d\n",
                               mdt_obd_name(mdt), PFID(&fid), name.ln_name,
@@ -1559,6 +1504,13 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
        if (IS_ERR(mold))
                GOTO(out_unlock_parent, rc = PTR_ERR(mold));
 
+       if (!mdt_object_exists(mold)) {
+               LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+                               &mold->mot_obj,
+                               "object does not exist");
+               GOTO(out_put_child, rc = -ENOENT);
+       }
+
        if (mdt_object_remote(mold)) {
                CDEBUG(D_OTHER, "%s: source "DFID" is on the remote MDT\n",
                       mdt_obd_name(info->mti_mdt), PFID(old_fid));
@@ -1813,6 +1765,98 @@ static int mdt_object_lock_save(struct mdt_thread_info *info,
 }
 
 /*
+ * determine lock order of sobj and tobj
+ *
+ * there are two situations we need to lock tobj before sobj:
+ * 1. sobj is child of tobj
+ * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
+ *    larger than that of tobj
+ *
+ * \retval     1 lock tobj before sobj
+ * \retval     0 lock sobj before tobj
+ * \retval     -ev negative errno upon error
+ */
+static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
+                                          struct mdt_object *sobj,
+                                          struct mdt_object *tobj)
+{
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_fid *spfid = &info->mti_tmp_fid1;
+       struct lu_fid *tpfid = &info->mti_tmp_fid2;
+       struct lmv_mds_md_v1 *lmv;
+       __u32 sindex;
+       __u32 tindex;
+       int rc;
+
+       /* sobj and tobj are the same */
+       if (sobj == tobj)
+               return 0;
+
+       if (fid_is_root(mdt_object_fid(sobj)))
+               return 0;
+
+       if (fid_is_root(mdt_object_fid(tobj)))
+               return 1;
+
+       /* check whether sobj is child of tobj */
+       rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
+                          mdt_object_fid(tobj));
+       if (rc < 0)
+               return rc;
+
+       if (rc == 1)
+               return 1;
+
+       /* check whether sobj and tobj are children of the same parent */
+       rc = mdt_attr_get_pfid(info, sobj, spfid);
+       if (rc)
+               return rc;
+
+       rc = mdt_attr_get_pfid(info, tobj, tpfid);
+       if (rc)
+               return rc;
+
+       if (!lu_fid_eq(spfid, tpfid))
+               return 0;
+
+       /* check whether sobj and tobj are sibling stripes */
+       ma->ma_need = MA_LMV;
+       ma->ma_valid = 0;
+       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+       rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
+       if (rc)
+               return rc;
+
+       if (!(ma->ma_valid & MA_LMV))
+               return 0;
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+       if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+               return 0;
+       sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+       ma->ma_valid = 0;
+       rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
+       if (rc)
+               return rc;
+
+       if (!(ma->ma_valid & MA_LMV))
+               return -ENODATA;
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+       if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+               return -EINVAL;
+       tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+       /* check stripe index of sobj and tobj */
+       if (sindex == tindex)
+               return -EINVAL;
+
+       return sindex < tindex ? 0 : 1;
+}
+
+/*
  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
  * 2 - srcdir child; 3 - tgtdir child.
  * Update on disk version of srcdir child.
@@ -1846,7 +1890,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
        struct lu_fid *new_fid = &info->mti_tmp_fid2;
        __u64 lock_ibits;
        bool reverse = false;
-       bool cos_incompat;
+       bool cos_incompat, discard = false;
        int rc;
        ENTRY;
 
@@ -1865,19 +1909,17 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                mtgtdir = msrcdir;
                mdt_object_get(info->mti_env, mtgtdir);
        } else {
-               /* Check if the @msrcdir is not a child of the @mtgtdir,
-                * otherwise a reverse locking must take place. */
-               rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2);
-               if (rc == -EINVAL)
-                       reverse = true;
-               else if (rc)
-                       GOTO(out_put_srcdir, rc);
-
                mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1);
                if (IS_ERR(mtgtdir))
                        GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
        }
 
+       rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
+       if (rc < 0)
+               GOTO(out_put_tgtdir, rc);
+
+       reverse = rc;
+
        /* source needs to be looked up after locking source parent, otherwise
         * this rename may race with unlink source, and cause rename hang, see
         * sanityn.sh 55b, so check parents first, if later we found source is
@@ -1952,12 +1994,23 @@ relock:
        if (IS_ERR(mold))
                GOTO(out_unlock_parents, rc = PTR_ERR(mold));
 
+       if (!mdt_object_exists(mold)) {
+               LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+                               &mold->mot_obj,
+                               "object does not exist");
+               GOTO(out_put_old, rc = -ENOENT);
+       }
+
        /* Check if @mtgtdir is subdir of @mold, before locking child
         * to avoid reverse locking. */
        if (mtgtdir != msrcdir) {
-               rc = mdt_is_subdir(info, mtgtdir, old_fid);
-               if (rc)
+               rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
+                                  old_fid);
+               if (rc) {
+                       if (rc == 1)
+                               rc = -EINVAL;
                        GOTO(out_put_old, rc);
+               }
        }
 
        tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
@@ -1994,6 +2047,13 @@ relock:
                if (IS_ERR(mnew))
                        GOTO(out_put_old, rc = PTR_ERR(mnew));
 
+               if (!mdt_object_exists(mnew)) {
+                       LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+                                       &mnew->mot_obj,
+                                       "object does not exist");
+                       GOTO(out_put_new, rc = -ENOENT);
+               }
+
                if (mdt_object_remote(mnew)) {
                        struct mdt_body  *repbody;
 
@@ -2038,9 +2098,13 @@ relock:
                /* Check if @msrcdir is subdir of @mnew, before locking child
                 * to avoid reverse locking. */
                if (mtgtdir != msrcdir) {
-                       rc = mdt_is_subdir(info, msrcdir, new_fid);
-                       if (rc)
+                       rc = mdo_is_subdir(info->mti_env,
+                                          mdt_object_child(msrcdir), new_fid);
+                       if (rc) {
+                               if (rc == 1)
+                                       rc = -EINVAL;
                                GOTO(out_unlock_old, rc);
+                       }
                }
 
                /* We used to acquire MDS_INODELOCK_FULL here but we
@@ -2107,8 +2171,10 @@ relock:
        /* handle last link of tgt object */
        if (rc == 0) {
                mdt_counter_incr(req, LPROC_MDT_RENAME);
-               if (mnew)
+               if (mnew) {
                        mdt_handle_last_unlink(info, mnew, ma);
+                       discard = true;
+               }
 
                mdt_rename_counter_tally(info, info->mti_mdt, req,
                                         msrcdir, mtgtdir);
@@ -2120,8 +2186,11 @@ relock:
 out_unlock_old:
        mdt_object_unlock(info, mold, lh_oldp, rc);
 out_put_new:
-       if (mnew != NULL)
+       if (mnew != NULL) {
+               if (discard)
+                       mdt_dom_check_and_discard(info, mnew);
                mdt_object_put(info->mti_env, mnew);
+       }
 out_put_old:
        mdt_object_put(info->mti_env, mold);
 out_unlock_parents:
@@ -2186,6 +2255,85 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        return mdt_reint_rename_or_migrate(info, lhc, false);
 }
 
+static int mdt_reint_resync(struct mdt_thread_info *info,
+                           struct mdt_lock_handle *lhc)
+{
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_object       *mo;
+       struct ldlm_lock        *lease;
+       struct mdt_body         *repbody;
+       struct md_layout_change  layout = { 0 };
+       bool                     lease_broken;
+       int                      rc, rc2;
+       ENTRY;
+
+       DEBUG_REQ(D_INODE, req, DFID": FLR file resync\n", PFID(rr->rr_fid1));
+
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
+
+       mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(mo))
+               GOTO(out, rc = PTR_ERR(mo));
+
+       if (!mdt_object_exists(mo))
+               GOTO(out_obj, rc = -ENOENT);
+
+       if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
+               GOTO(out_obj, rc = -EINVAL);
+
+       if (mdt_object_remote(mo))
+               GOTO(out_obj, rc = -EREMOTE);
+
+       lease = ldlm_handle2lock(rr->rr_handle);
+       if (lease == NULL)
+               GOTO(out_obj, rc = -ESTALE);
+
+       /* It's really necessary to grab open_sem and check if the lease lock
+        * has been lost. There would exist a concurrent writer coming in and
+        * generating some dirty data in memory cache, the writeback would fail
+        * after the layout version is increased by MDS_REINT_RESYNC RPC. */
+       if (!down_write_trylock(&mo->mot_open_sem))
+               GOTO(out_put_lease, rc = -EBUSY);
+
+       lock_res_and_lock(lease);
+       lease_broken = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+       if (lease_broken)
+               GOTO(out_unlock, rc = -EBUSY);
+
+       /* the file has yet opened by anyone else after we took the lease. */
+       layout.mlc_opc = MD_LAYOUT_RESYNC;
+       rc = mdt_layout_change(info, mo, &layout);
+       if (rc)
+               GOTO(out_unlock, rc);
+
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
+       rc = mdt_attr_get_complex(info, mo, ma);
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
+
+       EXIT;
+out_unlock:
+       up_write(&mo->mot_open_sem);
+out_put_lease:
+       LDLM_LOCK_PUT(lease);
+out_obj:
+       mdt_object_put(info->mti_env, mo);
+out:
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
+       return rc;
+}
+
 struct mdt_reinter {
        int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
        enum lprocfs_extra_opc mr_extra_opc;
@@ -2228,6 +2376,10 @@ static const struct mdt_reinter mdt_reinters[] = {
                .mr_handler = &mdt_reint_migrate,
                .mr_extra_opc = MDS_REINT_RENAME,
        },
+       [REINT_RESYNC] = {
+               .mr_handler = &mdt_reint_resync,
+               .mr_extra_opc = MDS_REINT_RESYNC,
+       },
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,