+ OBD_ALLOC_PTR(msl);
+ if (!msl) {
+ mdt_object_put(info->mti_env, slave);
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
+ rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
+ MDS_INODELOCK_UPDATE, true);
+ if (rc) {
+ OBD_FREE_PTR(msl);
+ mdt_object_put(info->mti_env, slave);
+ GOTO(out, rc);
+ }
+
+ INIT_LIST_HEAD(&msl->msl_linkage);
+ msl->msl_obj = slave;
+ list_add_tail(&msl->msl_linkage, slave_locks);
+ }
+ EXIT;
+
+out:
+ if (rc)
+ mdt_unlock_list(info, slave_locks, rc);
+ return rc;
+}
+
+/* lock parent and its stripes */
+static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ const struct md_attr *ma,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks)
+{
+ int rc;
+
+ if (mdt_object_remote(obj)) {
+ rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+ &lh->mlh_rreg_lh, LCK_PW,
+ MDS_INODELOCK_UPDATE, false);
+ if (rc != ELDLM_OK)
+ return rc;
+
+ /*
+ * if obj is remote and striped, lock its stripes explicitly
+ * because it's not striped in LOD layer on this MDT.
+ */
+ if (ma->ma_valid & MA_LMV) {
+ rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, rc);
+ }
+ } else {
+ rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+ einfo, true);
+ }
+
+ return rc;
+}
+
+/*
+ * in migration, object may be remote, and we need take full lock of it and its
+ * stripes if it's directory, besides, object may be a remote object on its
+ * parent, revoke its LOOKUP lock on where its parent is located.
+ */
+static int mdt_migrate_object_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks)
+{
+ int rc;
+
+ if (mdt_object_remote(obj)) {
+ rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+ if (rc)
+ return rc;
+
+ rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+ &lh->mlh_rreg_lh, LCK_EX,
+ MDS_INODELOCK_FULL, false);
+ if (rc != ELDLM_OK)
+ return rc;
+
+ /*
+ * if obj is remote and striped, lock its stripes explicitly
+ * because it's not striped in LOD layer on this MDT.
+ */
+ if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
+ struct md_attr *ma = &info->mti_attr;
+
+ rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
+ if (rc) {
+ mdt_object_unlock(info, obj, lh, rc);
+ return rc;
+ }
+
+ if (ma->ma_valid & MA_LMV) {
+ rc = mdt_lock_remote_slaves(info, obj, ma,
+ slave_locks);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, rc);
+ }
+ }
+ } else {
+ if (mdt_object_remote(pobj)) {
+ rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+ if (rc)
+ return rc;
+ }
+
+ rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
+ einfo, true);
+ }
+
+ return rc;
+}
+
+/*
+ * lookup source by name, if parent is striped directory, we need to find the
+ * corresponding stripe where source is located, and then lookup there.
+ *
+ * besides, if parent is migrating too, and file is already in target stripe,
+ * this should be a redo of 'lfs migrate' on client side.
+ */
+static int mdt_migrate_lookup(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ const struct md_attr *ma,
+ const struct lu_name *lname,
+ struct mdt_object **spobj,
+ struct mdt_object **sobj)
+{
+ const struct lu_env *env = info->mti_env;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *stripe;
+ int rc;
+
+ if (ma->ma_valid & MA_LMV) {
+ /* if parent is striped, lookup on corresponding stripe */
+ struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+
+ if (!lmv_is_sane(lmv))
+ return -EBADF;
+
+ rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
+
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+ &info->mti_spec);
+ if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
+ /*
+ * if parent layout is changeing, and lookup child
+ * failed on source stripe, lookup again on target
+ * stripe, if it exists, it means previous migration
+ * was interrupted, and current file was migrated
+ * already.
+ */
+ mdt_object_put(env, stripe);
+
+ rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
+
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname,
+ fid, &info->mti_spec);
+ mdt_object_put(env, stripe);
+ return rc ?: -EALREADY;
+ } else if (rc) {
+ mdt_object_put(env, stripe);
+ return rc;
+ }
+ } else {
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
+ &info->mti_spec);
+ if (rc)
+ return rc;
+
+ stripe = pobj;
+ mdt_object_get(env, stripe);
+ }
+
+ *spobj = stripe;
+
+ *sobj = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(*sobj)) {
+ mdt_object_put(env, stripe);
+ rc = PTR_ERR(*sobj);
+ *spobj = NULL;
+ *sobj = NULL;
+ }
+
+ return rc;
+}
+
+/* end lease and close file for regular file */
+static int mdd_migrate_close(struct mdt_thread_info *info,
+ struct mdt_object *obj)
+{
+ struct close_data *data;
+ struct mdt_body *repbody;
+ struct ldlm_lock *lease;
+ int rc;
+ int rc2;
+
+ rc = -EPROTO;
+ if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
+ RCL_CLIENT) ||
+ !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
+ RCL_CLIENT))
+ goto close;
+
+ data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+ if (!data)
+ goto close;
+
+ rc = -ESTALE;
+ lease = ldlm_handle2lock(&data->cd_handle);
+ if (!lease)
+ goto close;
+
+ /* check if the lease was already canceled */
+ lock_res_and_lock(lease);
+ rc = ldlm_is_cancel(lease);
+ unlock_res_and_lock(lease);
+
+ if (rc) {
+ rc = -EAGAIN;
+ LDLM_DEBUG(lease, DFID" lease broken",
+ PFID(mdt_object_fid(obj)));
+ }
+
+ /*
+ * cancel server side lease, client side counterpart should have been
+ * cancelled, it's okay to cancel it now as we've held mot_open_sem.
+ */
+ ldlm_lock_cancel(lease);
+ ldlm_reprocess_all(lease->l_resource, lease);
+ LDLM_LOCK_PUT(lease);
+
+close:
+ rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+
+ return rc ?: rc2;
+}
+
+/*
+ * migrate file in below steps:
+ * 1. lock parent and its stripes
+ * 2. lookup source by name
+ * 3. lock parents of source links if source is not directory
+ * 4. reject if source is in HSM
+ * 5. take source open_sem and close file if source is regular file
+ * 6. lock source and its stripes if it's directory
+ * 7. lock target so subsequent change to it can trigger COS
+ * 8. migrate file
+ * 9. unlock above locks
+ * 10. sync device if source has links
+ */
+int mdt_reint_migrate(struct mdt_thread_info *info,
+ struct mdt_lock_handle *unused)
+{
+ const struct lu_env *env = info->mti_env;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct lu_ucred *uc = mdt_ucred(info);
+ struct md_attr *ma = &info->mti_attr;
+ struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
+ struct mdt_object *pobj;
+ struct mdt_object *spobj = NULL;
+ struct mdt_object *sobj = NULL;
+ struct mdt_object *tobj;
+ struct lustre_handle rename_lh = { 0 };
+ struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhs;
+ struct mdt_lock_handle *lht;
+ LIST_HEAD(parent_slave_locks);
+ LIST_HEAD(child_slave_locks);
+ LIST_HEAD(link_locks);
+ int lock_retries = 5;
+ bool open_sem_locked = false;
+ bool do_sync = false;
+ int rc;
+
+ ENTRY;