+ if (!fid_is_md_operative(rr->rr_fid1) ||
+ !fid_is_md_operative(rr->rr_fid2))
+ RETURN(-EPERM);
+
+ /* don't allow migrate . or .. */
+ if (lu_name_is_dot_or_dotdot(&rr->rr_name))
+ RETURN(-EBUSY);
+
+ if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
+ RETURN(-EPERM);
+
+ if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
+ uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
+ mdt->mdt_enable_remote_dir_gid != -1)
+ RETURN(-EPERM);
+
+ /*
+ * Note: do not enqueue rename lock for replay request, because
+ * if other MDT holds rename lock, but being blocked to wait for
+ * this MDT to finish its recovery, and the failover MDT can not
+ * get rename lock, which will cause deadlock.
+ */
+ if (!req_is_replay(req)) {
+ rc = mdt_rename_lock(info, &rename_lh);
+ if (rc != 0) {
+ CERROR("%s: can't lock FS for rename: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), rc);
+ RETURN(rc);
+ }
+ }
+
+ /* pobj is master object of parent */
+ pobj = mdt_parent_find_check(info, rr->rr_fid1, 0);
+ if (IS_ERR(pobj))
+ GOTO(unlock_rename, rc = PTR_ERR(pobj));
+
+ if (unlikely(!info->mti_big_lmm)) {
+ info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+ OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+ if (!info->mti_big_lmm)
+ GOTO(put_parent, rc = -ENOMEM);
+ }
+
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ GOTO(put_parent, rc);
+
+lock_parent:
+ /* lock parent object */
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_reg_init(lhp, LCK_PW);
+ rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
+ &parent_slave_locks);
+ if (rc)
+ GOTO(put_parent, rc);
+
+ /*
+ * spobj is the corresponding stripe against name if pobj is striped
+ * directory, which is the real parent, and no need to lock, because
+ * we've taken full lock of pobj.
+ */
+ rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
+ if (rc)
+ GOTO(unlock_parent, rc);
+
+ /* lock parents of source links, and revoke LOOKUP lock of links */
+ rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
+ &parent_slave_locks, &link_locks);
+ if (rc == -EBUSY && lock_retries-- > 0) {
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+ goto lock_parent;
+ }
+
+ if (rc < 0)
+ GOTO(put_source, rc);
+
+ /*
+ * RS_MAX_LOCKS is the limit of number of locks that can be saved along
+ * with one request, if total lock count exceeds this limit, we will
+ * drop all locks after migration, and synchronous device in the end.
+ */
+ do_sync = rc;
+
+ /* TODO: DoM migration is not supported yet */
+ if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
+ ma->ma_lmm = info->mti_big_lmm;
+ ma->ma_lmm_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
+ if (rc)
+ GOTO(put_source, rc);
+
+ if (ma->ma_valid & MA_LOV &&
+ mdt_lmm_dom_entry(ma->ma_lmm) != LMM_NO_DOM)
+ GOTO(put_source, rc = -EOPNOTSUPP);
+ }
+
+ /* if migration HSM is allowed */
+ if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
+ ma->ma_need = MA_HSM;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, sobj, ma);
+ if (rc)
+ GOTO(unlock_links, rc);
+
+ if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
+ GOTO(unlock_links, rc = -EOPNOTSUPP);
+ }
+
+ /* end lease and close file for regular file */
+ if (info->mti_spec.sp_migrate_close) {
+ /* try to hold open_sem so that nobody else can open the file */
+ if (!down_write_trylock(&sobj->mot_open_sem)) {
+ /* close anyway */
+ mdd_migrate_close(info, sobj);
+ GOTO(unlock_links, rc = -EBUSY);
+ } else {
+ open_sem_locked = true;
+ rc = mdd_migrate_close(info, sobj);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+ }
+ }
+
+ /* lock source */
+ lhs = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lhs, LCK_EX);
+ rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
+ &child_slave_locks);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+
+ /* lock target */
+ tobj = mdt_object_find(env, mdt, rr->rr_fid2);
+ if (IS_ERR(tobj))
+ GOTO(unlock_source, rc = PTR_ERR(tobj));
+
+ lht = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lht, LCK_EX);
+ rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+ if (rc)
+ GOTO(put_target, rc);
+
+ /* Don't do lookup sanity check. We know name doesn't exist. */
+ info->mti_spec.sp_cr_lookup = 0;
+ info->mti_spec.sp_feat = &dt_directory_features;
+
+ rc = mdo_migrate(env, mdt_object_child(pobj),
+ mdt_object_child(sobj), &rr->rr_name,
+ mdt_object_child(tobj), &info->mti_spec, ma);
+ EXIT;
+
+ mdt_object_unlock(info, tobj, lht, rc);
+put_target:
+ mdt_object_put(env, tobj);
+unlock_source:
+ mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
+ &child_slave_locks, rc);
+unlock_open_sem:
+ if (open_sem_locked)
+ up_write(&sobj->mot_open_sem);
+unlock_links:
+ mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
+put_source:
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+unlock_parent:
+ mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
+ &parent_slave_locks, rc);
+put_parent:
+ mdt_object_put(env, pobj);
+unlock_rename:
+ if (lustre_handle_is_used(&rename_lh))
+ mdt_rename_unlock(&rename_lh);
+
+ if (!rc && do_sync)
+ mdt_device_sync(env, mdt);
+
+ return rc;
+}
+
+static int mdt_object_lock_save(struct mdt_thread_info *info,
+ struct mdt_object *dir,
+ struct mdt_lock_handle *lh,
+ int idx, bool cos_incompat)
+{
+ int rc;
+
+ /* we lock the target dir if it is local */
+ rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
+ cos_incompat);
+ if (rc != 0)
+ return rc;
+
+ /* get and save correct version after locking */
+ mdt_version_get_save(info, dir, idx);
+ return 0;
+}
+
+/*
+ * determine lock order of sobj and tobj
+ *
+ * there are two situations we need to lock tobj before sobj:
+ * 1. sobj is child of tobj
+ * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
+ * larger than that of tobj
+ *
+ * \retval 1 lock tobj before sobj
+ * \retval 0 lock sobj before tobj
+ * \retval -ev negative errno upon error
+ */
+static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
+ struct mdt_object *sobj,
+ struct mdt_object *tobj)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_fid *spfid = &info->mti_tmp_fid1;
+ struct lu_fid *tpfid = &info->mti_tmp_fid2;
+ struct lmv_mds_md_v1 *lmv;
+ __u32 sindex;
+ __u32 tindex;
+ int rc;
+
+ /* sobj and tobj are the same */
+ if (sobj == tobj)
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(sobj)))
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(tobj)))
+ return 1;
+
+ /* check whether sobj is child of tobj */
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
+ mdt_object_fid(tobj));
+ if (rc < 0)
+ return rc;
+
+ if (rc == 1)
+ return 1;
+
+ /* check whether sobj and tobj are children of the same parent */
+ rc = mdt_attr_get_pfid(info, sobj, spfid);
+ if (rc)
+ return rc;
+
+ rc = mdt_attr_get_pfid(info, tobj, tpfid);
+ if (rc)
+ return rc;
+
+ if (!lu_fid_eq(spfid, tpfid))
+ return 0;
+
+ /* check whether sobj and tobj are sibling stripes */
+ ma->ma_need = MA_LMV;
+ ma->ma_valid = 0;
+ ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+ ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+ rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return 0;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return 0;
+ sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return -ENODATA;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return -EINVAL;
+ tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ /* check stripe index of sobj and tobj */
+ if (sindex == tindex)
+ return -EINVAL;
+
+ return sindex < tindex ? 0 : 1;
+}
+
+/*
+ * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
+ * 2 - srcdir child; 3 - tgtdir child.
+ * Update on disk version of srcdir child.
+ */
+static int mdt_reint_rename(struct mdt_thread_info *info,
+ struct mdt_lock_handle *unused)
+{
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct md_attr *ma = &info->mti_attr;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_object *msrcdir = NULL;
+ struct mdt_object *mtgtdir = NULL;
+ struct mdt_object *mold;
+ struct mdt_object *mnew = NULL;
+ struct lustre_handle rename_lh = { 0 };
+ struct mdt_lock_handle *lh_srcdirp;
+ struct mdt_lock_handle *lh_tgtdirp;
+ struct mdt_lock_handle *lh_oldp = NULL;
+ struct mdt_lock_handle *lh_newp = NULL;
+ struct lu_fid *old_fid = &info->mti_tmp_fid1;
+ struct lu_fid *new_fid = &info->mti_tmp_fid2;
+ __u64 lock_ibits;
+ bool reverse = false;
+ bool cos_incompat, discard = false;
+ int rc;
+ ENTRY;