+ const struct lu_env *env = info->mti_env;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *stripe;
+ int rc;
+
+ if (ma->ma_valid & MA_LMV) {
+ /* if parent is striped, lookup on corresponding stripe */
+ struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ bool is_migrating = le32_to_cpu(lmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION;
+
+ if (is_migrating) {
+ hash_type = le32_to_cpu(lmv->lmv_migrate_hash);
+ stripe_count -= le32_to_cpu(lmv->lmv_migrate_offset);
+ }
+
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
+
+ if (le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+ rc += le32_to_cpu(lmv->lmv_migrate_offset);
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
+
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+ &info->mti_spec);
+ if (rc == -ENOENT && is_migrating) {
+ /*
+ * if parent is migrating, and lookup child failed on
+ * source stripe, lookup again on target stripe, if it
+ * exists, it means previous migration was interrupted,
+ * and current file was migrated already.
+ */
+ mdt_object_put(env, stripe);
+
+ hash_type = le32_to_cpu(lmv->lmv_hash_type);
+ stripe_count = le32_to_cpu(lmv->lmv_migrate_offset);
+
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
+
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname,
+ fid, &info->mti_spec);
+ mdt_object_put(env, stripe);
+ return rc ?: -EALREADY;
+ } else if (rc) {
+ mdt_object_put(env, stripe);
+ return rc;
+ }
+ } else {
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
+ &info->mti_spec);
+ if (rc)
+ return rc;
+
+ stripe = pobj;
+ mdt_object_get(env, stripe);
+ }
+
+ *spobj = stripe;
+
+ *sobj = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(*sobj)) {
+ mdt_object_put(env, stripe);
+ rc = PTR_ERR(*sobj);
+ *spobj = NULL;
+ *sobj = NULL;
+ }
+
+ return rc;
+}
+
+/* end lease and close file for regular file */
+static int mdd_migrate_close(struct mdt_thread_info *info,
+ struct mdt_object *obj)
+{
+ struct close_data *data;
+ struct mdt_body *repbody;
+ struct ldlm_lock *lease;
+ int rc;
+ int rc2;
+
+ rc = -EPROTO;
+ if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
+ RCL_CLIENT) ||
+ !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
+ RCL_CLIENT))
+ goto close;
+
+ data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+ if (!data)
+ goto close;
+
+ rc = -ESTALE;
+ lease = ldlm_handle2lock(&data->cd_handle);
+ if (!lease)
+ goto close;
+
+ /* check if the lease was already canceled */
+ lock_res_and_lock(lease);
+ rc = ldlm_is_cancel(lease);
+ unlock_res_and_lock(lease);
+
+ if (rc) {
+ rc = -EAGAIN;
+ LDLM_DEBUG(lease, DFID" lease broken",
+ PFID(mdt_object_fid(obj)));
+ }
+
+ /*
+ * cancel server side lease, client side counterpart should have been
+ * cancelled, it's okay to cancel it now as we've held mot_open_sem.
+ */
+ ldlm_lock_cancel(lease);
+ ldlm_reprocess_all(lease->l_resource);
+ LDLM_LOCK_PUT(lease);
+
+close:
+ rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+
+ return rc ?: rc2;
+}
+
+/*
+ * migrate file in below steps:
+ * 1. lock parent and its stripes
+ * 2. lookup source by name
+ * 3. lock parents of source links if source is not directory
+ * 4. reject if source is in HSM
+ * 5. take source open_sem and close file if source is regular file
+ * 6. lock source and its stripes if it's directory
+ * 7. lock target so subsequent change to it can trigger COS
+ * 8. migrate file
+ * 9. unlock above locks
+ * 10. sync device if source has links
+ */
+static int mdt_reint_migrate(struct mdt_thread_info *info,
+ struct mdt_lock_handle *unused)
+{
+ const struct lu_env *env = info->mti_env;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct lu_ucred *uc = mdt_ucred(info);
+ struct md_attr *ma = &info->mti_attr;
+ struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
+ struct mdt_object *pobj;
+ struct mdt_object *spobj = NULL;
+ struct mdt_object *sobj = NULL;
+ struct mdt_object *tobj;
+ struct lustre_handle rename_lh = { 0 };
+ struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhs;
+ struct mdt_lock_handle *lht;
+ LIST_HEAD(parent_slave_locks);
+ LIST_HEAD(child_slave_locks);
+ LIST_HEAD(link_locks);
+ int lock_retries = 5;
+ bool open_sem_locked = false;
+ bool do_sync = false;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+ PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
+
+ if (!fid_is_md_operative(rr->rr_fid1) ||
+ !fid_is_md_operative(rr->rr_fid2))
+ RETURN(-EPERM);
+
+ /* don't allow migrate . or .. */
+ if (lu_name_is_dot_or_dotdot(&rr->rr_name))
+ RETURN(-EBUSY);
+
+ if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
+ RETURN(-EPERM);
+
+ if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
+ uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
+ mdt->mdt_enable_remote_dir_gid != -1)
+ RETURN(-EPERM);
+
+ /*
+ * Note: do not enqueue rename lock for replay request, because
+ * if other MDT holds rename lock, but being blocked to wait for
+ * this MDT to finish its recovery, and the failover MDT can not
+ * get rename lock, which will cause deadlock.
+ */
+ if (!req_is_replay(req)) {
+ rc = mdt_rename_lock(info, &rename_lh);
+ if (rc != 0) {
+ CERROR("%s: can't lock FS for rename: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), rc);
+ RETURN(rc);
+ }
+ }
+
+ /* pobj is master object of parent */
+ pobj = mdt_parent_find_check(info, rr->rr_fid1, 0);
+ if (IS_ERR(pobj))
+ GOTO(unlock_rename, rc = PTR_ERR(pobj));
+
+ if (unlikely(!info->mti_big_lmm)) {
+ info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+ OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+ if (!info->mti_big_lmm)
+ GOTO(put_parent, rc = -ENOMEM);
+ }
+
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ GOTO(put_parent, rc);
+
+lock_parent:
+ /* lock parent object */
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_reg_init(lhp, LCK_PW);
+ rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
+ &parent_slave_locks);
+ if (rc)
+ GOTO(put_parent, rc);
+
+ /*
+ * spobj is the corresponding stripe against name if pobj is striped
+ * directory, which is the real parent, and no need to lock, because
+ * we've taken full lock of pobj.
+ */
+ rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
+ if (rc)
+ GOTO(unlock_parent, rc);
+
+ /* lock parents of source links, and revoke LOOKUP lock of links */
+ rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
+ &parent_slave_locks, &link_locks);
+ if (rc == -EBUSY && lock_retries-- > 0) {
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+ goto lock_parent;
+ }
+
+ if (rc < 0)
+ GOTO(put_source, rc);
+
+ /*
+ * RS_MAX_LOCKS is the limit of number of locks that can be saved along
+ * with one request, if total lock count exceeds this limit, we will
+ * drop all locks after migration, and synchronous device in the end.
+ */
+ do_sync = rc;
+
+ /* TODO: DoM migration is not supported yet */
+ if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
+ ma->ma_lmm = info->mti_big_lmm;
+ ma->ma_lmm_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
+ if (rc)
+ GOTO(put_source, rc);
+
+ if (ma->ma_valid & MA_LOV &&
+ mdt_lmm_dom_entry(ma->ma_lmm) != LMM_NO_DOM)
+ GOTO(put_source, rc = -EOPNOTSUPP);
+ }
+
+ /* if migration HSM is allowed */
+ if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
+ ma->ma_need = MA_HSM;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, sobj, ma);
+ if (rc)
+ GOTO(unlock_links, rc);
+
+ if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
+ GOTO(unlock_links, rc = -EOPNOTSUPP);
+ }
+
+ /* end lease and close file for regular file */
+ if (info->mti_spec.sp_migrate_close) {
+ /* try to hold open_sem so that nobody else can open the file */
+ if (!down_write_trylock(&sobj->mot_open_sem)) {
+ /* close anyway */
+ mdd_migrate_close(info, sobj);
+ GOTO(unlock_links, rc = -EBUSY);
+ } else {
+ open_sem_locked = true;
+ rc = mdd_migrate_close(info, sobj);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+ }
+ }
+
+ /* lock source */
+ lhs = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lhs, LCK_EX);
+ rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
+ &child_slave_locks);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+
+ /* lock target */
+ tobj = mdt_object_find(env, mdt, rr->rr_fid2);
+ if (IS_ERR(tobj))
+ GOTO(unlock_source, rc = PTR_ERR(tobj));
+
+ lht = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lht, LCK_EX);
+ rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+ if (rc)
+ GOTO(put_target, rc);
+
+ /* Don't do lookup sanity check. We know name doesn't exist. */
+ info->mti_spec.sp_cr_lookup = 0;
+ info->mti_spec.sp_feat = &dt_directory_features;
+
+ rc = mdo_migrate(env, mdt_object_child(pobj),
+ mdt_object_child(sobj), &rr->rr_name,
+ mdt_object_child(tobj), &info->mti_spec, ma);
+ EXIT;
+
+ mdt_object_unlock(info, tobj, lht, rc);
+put_target:
+ mdt_object_put(env, tobj);
+unlock_source:
+ mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
+ &child_slave_locks, rc);
+unlock_open_sem:
+ if (open_sem_locked)
+ up_write(&sobj->mot_open_sem);
+unlock_links:
+ mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
+put_source:
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+unlock_parent:
+ mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
+ &parent_slave_locks, rc);
+put_parent:
+ mdt_object_put(env, pobj);
+unlock_rename:
+ if (lustre_handle_is_used(&rename_lh))
+ mdt_rename_unlock(&rename_lh);
+
+ if (!rc && do_sync)
+ mdt_device_sync(env, mdt);