+ /* Note: lname might miss \0 at the end */
+ snprintf(filename, sizeof(info->mti_name), "%.*s",
+ lname.ln_namelen, lname.ln_name);
+ lname.ln_name = filename;
+
+ CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
+
+ rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+ handle);
+ }
+
+ return rc;
+}
+
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval 0 do migrate
+ * \retval 1 don't migrate
+ * \retval -errno on failure
+ */
+static int migrate_linkea_prepare(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ const struct lu_name *lname,
+ const struct lu_attr *attr,
+ struct linkea_data *ldata)
+{
+ __u32 source_mdt_index;
+ int rc;
+
+ ENTRY;
+
+ memset(ldata, 0, sizeof(*ldata));
+ rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
+ mdo2fid(tpobj), lname, 1, 0, ldata);
+ if (rc)
+ RETURN(rc);
+
+ /*
+ * Then it will check if the file should be migrated. If the file has
+ * mulitple links, we only need migrate the file if all of its entries
+ * has been migrated to the remote MDT.
+ */
+ if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+ RETURN(0);
+
+ /* If there are still links locally, don't migrate this file */
+ LASSERT(ldata->ld_leh != NULL);
+
+ /*
+ * If linkEA is overflow, it means there are some unknown name entries
+ * under unknown parents, which will prevent the migration.
+ */
+ if (unlikely(ldata->ld_leh->leh_overflow_time))
+ RETURN(-EOVERFLOW);
+
+ rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
+ &source_mdt_index, NULL,
+ mdd_is_link_on_source_mdt);
+ RETURN(rc);
+}
+
+static int mdd_dir_declare_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
+{
+ int rc;
+
+ if (!lmv_buf->lb_buf)
+ rc = mdo_declare_index_delete(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_declare_delete_stripe);
+ else
+ rc = mdo_declare_xattr_set(env, obj, lmu_buf,
+ XATTR_NAME_LMV".del", 0, handle);
+
+ return rc;
+}
+
+static int mdd_dir_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
+{
+ int rc;
+
+ ENTRY;
+
+ mdd_write_lock(env, obj, DT_SRC_PARENT);
+ if (!lmv_buf->lb_buf)
+ /* normal dir */
+ rc = __mdd_index_delete_only(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ /* striped, but remote */
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_delete_stripe);
+ else
+ rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
+ handle);
+ mdd_write_unlock(env, obj);
+
+ RETURN(rc);
+}
+
+static int mdd_declare_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
+ int rc;
+
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
+
+ if (lmv) {
+ struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
+
+ rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
+ handle);
+ if (rc)
+ return rc;
+
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
+
+ rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+ lname, attr, handle, spec, ldata, NULL, NULL,
+ NULL, hint);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
+ if (!lmv) {
+ /*
+ * if sobj is not striped, fake a 1-stripe LMV, which
+ * will be used to generate a compound LMV for tobj.
+ */
+ LASSERT(sizeof(info->mti_key) >
+ lmv_mds_md_size(1, LMV_MAGIC_V1));
+ lmv = (typeof(lmv))info->mti_key;
+ memset(lmv, 0, sizeof(*lmv));
+ lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+ lmv->lmv_stripe_count = cpu_to_le32(1);
+ fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
+ sbuf->lb_buf = lmv;
+ sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ sbuf->lb_buf = NULL;
+ sbuf->lb_len = 0;
+ } else {
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ }
+ if (rc)
+ return rc;
+ }
+
+ /*
+ * tobj mode will be used in lod_declare_xattr_set(), but it's not
+ * createb yet, copy from sobj.
+ */
+ tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+ tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+ sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+ mdo_declare_xattr_set);
+ if (rc)
+ return rc;
+
+ if (S_ISREG(attr->la_mode)) {
+ struct lu_buf fid_buf;
+
+ handle->th_complex = 1;
+
+ /* target may be remote, update PFID via sobj. */
+ fid_buf.lb_buf = (void *)mdo2fid(tobj);
+ fid_buf.lb_len = sizeof(struct lu_fid);
+ rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
+ 0, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ if (rc)
+ return rc;
+ }
+
+ if (!S_ISDIR(attr->la_mode)) {
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+ ldata, NULL, handle,
+ mdd_declare_update_link);
+ if (rc)
+ return rc;
+
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
+
+ return rc;
+}
+
+/**
+ * Create target, migrate xattrs and update links.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target, else update fid to target
+ * for links.
+ *
+ * \param[in] env execution environment
+ * \param[in] tpobj target parent object
+ * \param[in] sobj source object
+ * \param[in] tobj target object
+ * \param[in] lname file name
+ * \param[in] attr source attributes
+ * \param[in] sbuf source LMV buf
+ * \param[in] ldata source linkea
+ * \param[in] spec migrate create spec
+ * \param[in] hint target creation hint
+ * \param[in] handle tranasction handle
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ const struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
+{
+ int rc;
+
+ ENTRY;
+
+ /*
+ * directory will migrate sobj stripes to tobj:
+ * 1. delete stripes from sobj.
+ * 2. add stripes to tobj, see lod_dir_declare_layout_add().
+ * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
+ */
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
+
+ if (sbuf->lb_buf) {
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
+
+ rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+ if (rc)
+ RETURN(rc);
+
+ /*
+ * delete LMV so that later when destroying sobj it won't delete
+ * stripes again.
+ */
+ if (sbuf->lb_buf) {
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
+ }
+ }
+
+ /* don't set nlink from sobj */
+ attr->la_valid &= ~LA_NLINK;
+
+ rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
+ hint, handle, false);
+ if (rc)
+ RETURN(rc);
+
+ mdd_write_lock(env, tobj, DT_TGT_CHILD);
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+ mdd_write_unlock(env, tobj);
+ if (rc)
+ RETURN(rc);
+
+ /* for regular file, update OST objects XATTR_NAME_FID */
+ if (S_ISREG(attr->la_mode)) {
+ struct lu_buf fid_buf;
+
+ /* target may be remote, update PFID via sobj. */
+ fid_buf.lb_buf = (void *)mdo2fid(tobj);
+ fid_buf.lb_len = sizeof(struct lu_fid);
+ rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
+ handle);
+ if (rc)
+ RETURN(rc);
+
+ /* delete LOV to avoid deleting OST objs when destroying sobj */
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
+ }
+
+ if (!S_ISDIR(attr->la_mode))
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+ ldata, NULL, handle, mdd_update_link);
+
+ RETURN(rc);
+}
+
+static int mdd_declare_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
+
+ rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_del(env, spobj, handle);
+ if (rc)
+ return rc;
+ }
+
+ rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_add(env, tpobj, handle);
+ if (rc)
+ return rc;
+ }
+
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdo_declare_attr_set(env, spobj, la, handle);
+ if (rc)
+ return rc;
+
+ if (tpobj != spobj) {
+ rc = mdo_declare_attr_set(env, tpobj, la, handle);
+ if (rc)
+ return rc;
+ }
+
+ if (do_create && do_destroy) {
+ rc = mdo_declare_ref_del(env, sobj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_destroy(env, sobj, handle);
+ if (rc)
+ return rc;
+ }
+
+ return rc;
+}
+
+/**
+ * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
+ **/
+static int mdd_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
+
+ ENTRY;
+
+ CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
+ lname->ln_name, PFID(mdo2fid(spobj)),
+ PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
+ PFID(fid));
+
+ rc = __mdd_index_delete(env, spobj, lname->ln_name,
+ S_ISDIR(attr->la_mode), handle);
+ if (rc)
+ RETURN(rc);
+
+ rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
+ if (rc)
+ RETURN(rc);
+
+ la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, spobj, DT_SRC_PARENT);
+ rc = mdd_update_time(env, spobj, spattr, la, handle);
+ mdd_write_unlock(env, spobj);
+ if (rc)
+ RETURN(rc);
+
+ if (tpobj != spobj) {
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, tpobj, DT_TGT_PARENT);
+ rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+ mdd_write_unlock(env, tpobj);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /*
+ * there are three situations we shouldn't destroy source:
+ * 1. if source is not dir, and it happens to be located on the same MDT
+ * as target parent.
+ * 2. if source is not dir, and has link on the same MDT where source is
+ * located.
+ * 3. if source is dir, and it's a normal, non-empty dir.
+ *
+ * the first two situations equals to !do_create, and the 3rd equals to
+ * !do_destroy, so the below condition is actually
+ * !(!do_create || !do_destroy).
+ *
+ * NB, if user has opened source dir before migration, he will get
+ * -ENOENT error when close it later, because source is likely to be
+ * remote, which can't be moved to orphan list, but except this error
+ * message, this won't cause any inconsistency or trouble.
+ */
+ if (do_create && do_destroy) {
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ mdo_ref_del(env, sobj, handle);
+ rc = mdo_destroy(env, sobj, handle);
+ mdd_write_unlock(env, sobj);
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ * 1. create target, append source stripes after target's if it's directory,
+ * migrate xattrs and update fid of source links.
+ * 2. update namespace: migrate dirent from source parent to target parent,
+ * update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env execution environment
+ * \param[in] md_pobj parent master object
+ * \param[in] md_sobj source object
+ * \param[in] lname file name
+ * \param[in] md_tobj target object
+ * \param[in] spec target creation spec
+ * \param[in] ma used to update \a pobj mtime and ctime
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+ struct md_object *md_sobj, const struct lu_name *lname,
+ struct md_object *md_tobj, struct md_op_spec *spec,
+ struct md_attr *ma)
+{
+ struct mdd_device *mdd = mdo2mdd(md_pobj);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *pobj = md2mdd_obj(md_pobj);
+ struct mdd_object *sobj = md2mdd_obj(md_sobj);
+ struct mdd_object *tobj = md2mdd_obj(md_tobj);
+ struct mdd_object *spobj = NULL;
+ struct mdd_object *tpobj = NULL;
+ struct lu_attr *spattr = &info->mti_pattr;
+ struct lu_attr *tpattr = &info->mti_tpattr;
+ struct lu_attr *attr = &info->mti_cattr;
+ struct linkea_data *ldata = &info->mti_link_data;
+ struct dt_allocation_hint *hint = &info->mti_hint;
+ struct lu_fid *fid = &info->mti_fid2;
+ struct lu_buf pbuf = { NULL };
+ struct lu_buf sbuf = { NULL };
+ struct lmv_mds_md_v1 *plmv;
+ struct thandle *handle;
+ bool do_create = true;
+ bool do_destroy = true;
+ int rc;
+ ENTRY;
+
+ rc = mdd_la_get(env, sobj, attr);
+ if (rc)
+ RETURN(rc);
+
+ /* locate source and target stripe on pobj, which are the real parent */
+ rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+ if (rc < 0 && rc != -ENODATA)
+ RETURN(rc);
+
+ plmv = pbuf.lb_buf;
+ if (plmv) {
+ __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+ __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+ int index;
+
+ /* locate target parent stripe */
+ if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+ /*
+ * fail check here to make sure top dir migration
+ * succeed.
+ */
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+ GOTO(out, rc = -EIO);
+ hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+ count = le32_to_cpu(plmv->lmv_migrate_offset);
+ }
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0)
+ GOTO(out, rc = index);
+
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ tpobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(tpobj))
+ GOTO(out, rc = PTR_ERR(tpobj));
+
+ /* locate source parent stripe */
+ if (le32_to_cpu(plmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+ count = le32_to_cpu(plmv->lmv_stripe_count) -
+ le32_to_cpu(plmv->lmv_migrate_offset);
+
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = index);
+ }
+
+ index += le32_to_cpu(plmv->lmv_migrate_offset);
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ spobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(spobj)) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = PTR_ERR(spobj));