+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ * 1. create target, append source stripes after target's if it's directory,
+ * migrate xattrs and update fid of source links.
+ * 2. update namespace: migrate dirent from source parent to target parent,
+ * update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env execution environment
+ * \param[in] md_pobj parent master object
+ * \param[in] md_sobj source object
+ * \param[in] lname file name
+ * \param[in] md_tobj target object
+ * \param[in] spec target creation spec
+ * \param[in] ma used to update \a pobj mtime and ctime
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+ struct md_object *md_sobj, const struct lu_name *lname,
+ struct md_object *md_tobj, struct md_op_spec *spec,
+ struct md_attr *ma)
+{
+ struct mdd_device *mdd = mdo2mdd(md_pobj);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *pobj = md2mdd_obj(md_pobj);
+ struct mdd_object *sobj = md2mdd_obj(md_sobj);
+ struct mdd_object *tobj = md2mdd_obj(md_tobj);
+ struct mdd_object *spobj = NULL;
+ struct mdd_object *tpobj = NULL;
+ struct lu_attr *spattr = &info->mti_pattr;
+ struct lu_attr *tpattr = &info->mti_tpattr;
+ struct lu_attr *attr = &info->mti_cattr;
+ struct linkea_data *ldata = &info->mti_link_data;
+ struct dt_allocation_hint *hint = &info->mti_hint;
+ struct lu_fid *fid = &info->mti_fid2;
+ struct lu_buf pbuf = { NULL };
+ struct lu_buf sbuf = { NULL };
+ struct lmv_mds_md_v1 *plmv;
+ struct thandle *handle;
+ bool do_create = true;
+ bool do_destroy = true;
+ int rc;
+ ENTRY;
+
+ rc = mdd_la_get(env, sobj, attr);
+ if (rc)
+ RETURN(rc);
+
+ /* locate source and target stripe on pobj, which are the real parent */
+ rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+ if (rc < 0 && rc != -ENODATA)
+ RETURN(rc);
+
+ plmv = pbuf.lb_buf;
+ if (plmv) {
+ __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+ __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+ int index;
+
+ /* locate target parent stripe */
+ if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+ /*
+ * fail check here to make sure top dir migration
+ * succeed.
+ */
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+ GOTO(out, rc = -EIO);
+ hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+ count = le32_to_cpu(plmv->lmv_migrate_offset);
+ }
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0)
+ GOTO(out, rc = index);
+
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ tpobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(tpobj))
+ GOTO(out, rc = PTR_ERR(tpobj));
+
+ /* locate source parent stripe */
+ if (le32_to_cpu(plmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+ count = le32_to_cpu(plmv->lmv_stripe_count) -
+ le32_to_cpu(plmv->lmv_migrate_offset);
+
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = index);
+ }
+
+ index += le32_to_cpu(plmv->lmv_migrate_offset);
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ spobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(spobj)) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = PTR_ERR(spobj));
+ }
+ } else {
+ spobj = tpobj;
+ mdd_object_get(spobj);
+ }
+ } else {
+ tpobj = pobj;
+ spobj = pobj;
+ mdd_object_get(tpobj);
+ mdd_object_get(spobj);
+ }
+
+ rc = mdd_la_get(env, spobj, spattr);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_la_get(env, tpobj, tpattr);
+ if (rc)
+ GOTO(out, rc);
+
+ if (S_ISDIR(attr->la_mode)) {
+ struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
+
+ LASSERT(lmu);
+
+ /*
+ * if user use default value '0' for stripe_count, we need to
+ * adjust it to '1' to create a 1-stripe directory.
+ */
+ if (lmu->lum_stripe_count == 0) {
+ /* eadata is from request, don't alter it */
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
+
+ rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+ if (rc == -ENODATA) {
+ if (mdd_dir_is_empty(env, sobj) == 0) {
+ /*
+ * if sobj is empty, and target is not striped,
+ * create target as a normal directory.
+ */
+ if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = 0;
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
+ } else {
+ /*
+ * sobj is not striped dir, if it's not empty,
+ * it will be migrated to be a stripe of target,
+ * don't destroy it after migration.
+ */
+ do_destroy = false;
+ }
+ } else if (rc) {
+ GOTO(out, rc);
+ } else {
+ struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
+
+ if (le32_to_cpu(lmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ __u32 lum_stripe_count = lmu->lum_stripe_count;
+ __u32 lmv_hash_type = lmv->lmv_hash_type &
+ cpu_to_le32(LMV_HASH_TYPE_MASK);
+
+ if (!lum_stripe_count)
+ lum_stripe_count = cpu_to_le32(1);
+
+ /* TODO: check specific MDTs */
+ if (lmv->lmv_migrate_offset !=
+ lum_stripe_count ||
+ lmv->lmv_master_mdt_index !=
+ lmu->lum_stripe_offset ||
+ (lmv_hash_type != 0 &&
+ lmv_hash_type != lmu->lum_hash_type)) {
+ CERROR("%s: \'"DNAME"\' migration was "
+ "interrupted, run \'lfs migrate "
+ "-m %d -c %d -H %d "DNAME"\' to "
+ "finish migration.\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PNAME(lname),
+ le32_to_cpu(
+ lmv->lmv_master_mdt_index),
+ le32_to_cpu(
+ lmv->lmv_migrate_offset),
+ le32_to_cpu(lmv_hash_type),
+ PNAME(lname));
+ GOTO(out, rc = -EPERM);
+ }
+ GOTO(out, rc = -EALREADY);
+ }
+ }
+ } else if (!mdd_object_remote(tpobj)) {
+ /*
+ * if source is already on MDT where target parent is located,
+ * no need to create, just update namespace.
+ */
+ do_create = false;
+ } else if (S_ISLNK(attr->la_mode)) {
+ lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+ if (!sbuf.lb_buf)
+ GOTO(out, rc = -ENOMEM);
+ rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+ if (rc <= 0) {
+ rc = rc ?: -EFAULT;
+ CERROR("%s: "DFID" readlink failed: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PFID(mdo2fid(sobj)), rc);
+ GOTO(out, rc);
+ }
+ spec->u.sp_symname = sbuf.lb_buf;
+ } else if (S_ISREG(attr->la_mode)) {
+ spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+ spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
+ }
+
+ /*
+ * if sobj has link on the same MDT, no need to create, just update
+ * namespace, and it will be a remote file on target parent, which is
+ * similar to rename.
+ */
+ rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
+ ldata);
+ if (rc > 0)
+ do_create = false;
+ else if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+ spattr, tpattr, attr);
+ if (rc)
+ GOTO(out, rc);
+
+ mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ if (do_create) {
+ rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
+ attr, &sbuf, ldata, spec, hint,
+ handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+ }
+
+ rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
+ attr, spattr, tpattr, ldata, do_create,
+ do_destroy, ma, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+ handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ if (do_create) {
+ rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
+ &sbuf, ldata, spec, hint, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+ }
+
+ rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
+ spattr, tpattr, ldata, do_create, do_destroy,
+ ma, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+ mdo2fid(spobj), mdo2fid(sobj),
+ mdo2fid(tpobj), lname, lname, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ EXIT;
+stop_trans:
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+ if (spobj && !IS_ERR(spobj))
+ mdd_object_put(env, spobj);
+ if (tpobj && !IS_ERR(tpobj))
+ mdd_object_put(env, tpobj);
+ lu_buf_free(&sbuf);
+ lu_buf_free(&pbuf);
+ return rc;