ENTRY;
/* according to the protocol, child should be local, is request sent to
- * wrong MDT.
+ * wrong MDT?
*/
if (mdt_object_remote(child)) {
CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
if (rc)
RETURN(rc);
- if (S_ISDIR(child->mot_header.loh_attr)) {
- rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
- if (rc) {
- mdt_object_unlock(info, child, lh, rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
- rc == -EIO)
- rc = 0;
- }
- }
+ rc = mdt_object_striped(info, child);
+ if (rc == 0)
+ return 0;
- RETURN(rc);
+ if (rc < 0)
+ goto unlock;
+
+ /* lock stripes for striped directory */
+ rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
+ if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+ rc = 0;
+
+unlock:
+ if (rc)
+ mdt_object_unlock(info, child, lh, rc);
+
+ return rc;
}
void mdt_object_stripes_unlock(struct mdt_thread_info *info,
struct mdt_lock_handle *lh,
struct ldlm_enqueue_info *einfo, int decref)
{
- /* this is checked in mdt_object_stripes_lock() */
- LASSERT(!mdt_object_remote(obj));
if (einfo->ei_cbdata)
mdt_stripes_unlock(info, obj, einfo, decref);
mdt_object_unlock(info, obj, lh, decref);
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_fid *fid = &info->mti_tmp_fid2;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
struct lmv_user_md *lum = spec->u.sp_ea.eadata;
struct lu_ucred *uc = mdt_ucred(info);
struct lmv_mds_md_v1 *lmv;
*/
if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
struct mdt_lock_handle *lhc;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
bool cos_incompat;
rc = mdt_object_striped(info, child);
int do_vbr = ma->ma_attr.la_valid &
(LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
__u64 lockpart = MDS_INODELOCK_UPDATE;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
bool cos_incompat;
int rc;
struct mdt_object *mc;
struct mdt_lock_handle *parent_lh;
struct mdt_lock_handle *child_lh;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
struct lu_ucred *uc = mdt_ucred(info);
bool cos_incompat = false;
int no_name = 0;
}
/*
- * in case obj is remote obj on its parent, revoke LOOKUP lock,
- * herein we don't really check it, just do revoke.
+ * lock rename source object.
+ *
+ * Both source and its parent object may be located on remote MDTs, and even on
+ * different MDTs, which means source object is a remote object on parent.
+ *
+ * \retval 0 on success
+ * \retval -ev negative errno upon error
*/
-int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
- struct mdt_object *pobj,
- struct mdt_object *obj)
+static int mdt_rename_source_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child,
+ struct mdt_lock_handle *lh,
+ struct mdt_lock_handle *lh_lookup,
+ __u64 ibits, bool cos_incompat)
{
- struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
int rc;
- rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
- if (rc)
+ LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+ /* if @obj is remote object, LOOKUP lock needs to be taken from
+ * parent MDT.
+ */
+ rc = mdt_is_remote_object(info, parent, child);
+ if (rc < 0)
return rc;
- /*
- * TODO, currently we don't save this lock because there is no place to
- * hold this lock handle, but to avoid race we need to save this lock.
- */
- mdt_object_unlock(info, NULL, lh, 1);
+ if (rc == 1) {
+ rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
+ LCK_EX, cos_incompat);
+ if (rc)
+ return rc;
+
+ ibits &= ~MDS_INODELOCK_LOOKUP;
+ }
+
+ rc = mdt_object_lock(info, child, lh, ibits, LCK_EX, cos_incompat);
+ if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
+ mdt_object_unlock(info, NULL, lh_lookup, rc);
return 0;
}
-/*
- * operation may takes locks of linkea, or directory stripes, group them in
- * different list.
- */
-struct mdt_sub_lock {
- struct mdt_object *msl_obj;
- struct mdt_lock_handle msl_lh;
- struct list_head msl_linkage;
+static void mdt_rename_source_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lh,
+ struct mdt_lock_handle *lh_lookup,
+ int decref)
+{
+ mdt_object_unlock(info, obj, lh, decref);
+ mdt_object_unlock(info, NULL, lh_lookup, decref);
+}
+
+/* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
+struct mdt_link_lock {
+ struct mdt_object *mll_obj;
+ struct mdt_lock_handle mll_lh;
+ struct list_head mll_linkage;
};
-static void mdt_unlock_list(struct mdt_thread_info *info,
- struct list_head *list, int decref)
+static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ struct list_head *list)
{
- struct mdt_sub_lock *msl;
- struct mdt_sub_lock *tmp;
+ struct mdt_link_lock *mll;
- list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
- mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
- list_del(&msl->msl_linkage);
- OBD_FREE_PTR(msl);
- }
+ OBD_ALLOC_PTR(mll);
+ if (mll == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&mll->mll_linkage);
+ mdt_object_get(info->mti_env, o);
+ mll->mll_obj = o;
+ mll->mll_lh = *lh;
+ memset(lh, 0, sizeof(*lh));
+ list_add_tail(&mll->mll_linkage, list);
+
+ return 0;
}
-static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
- struct mdt_object *obj,
- struct mdt_lock_handle *lh,
- struct ldlm_enqueue_info *einfo,
- struct list_head *slave_locks,
+static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
+ struct mdt_link_lock *mll,
int decref)
{
- if (mdt_object_remote(obj)) {
- mdt_unlock_list(info, slave_locks, decref);
- mdt_object_unlock(info, obj, lh, decref);
- } else {
- mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
+ mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
+ mdt_object_put(info->mti_env, mll->mll_obj);
+ list_del(&mll->mll_linkage);
+ OBD_FREE_PTR(mll);
+}
+
+static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
+ struct list_head *list, int decref)
+{
+ struct mdt_link_lock *mll;
+ struct mdt_link_lock *tmp;
+
+ list_for_each_entry_safe(mll, tmp, list, mll_linkage)
+ mdt_migrate_link_lock_del(info, mll, decref);
+}
+
+/* take link parent UPDATE lock.
+ * \retval 0 \a lnkp is already locked, no lock taken.
+ * 1 lock taken
+ * -ev negative errno.
+ */
+static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
+ struct mdt_object *lnkp,
+ struct list_head *update_locks,
+ bool *blocked)
+{
+ const struct lu_fid *fid = mdt_object_fid(lnkp);
+ struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
+ struct mdt_link_lock *entry;
+ __u64 ibits = 0;
+ int rc;
+
+ ENTRY;
+
+ /* check if it's already locked */
+ list_for_each_entry(entry, update_locks, mll_linkage) {
+ if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
+ CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
+ RETURN(0);
+ }
+ }
+
+ /* link parent UPDATE lock */
+ CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
+
+ if (*blocked) {
+ /* revoke lock instead of take in *blocked* mode */
+ rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
+ LCK_PW, true);
+ if (rc)
+ RETURN(rc);
+
+ if (mdt_object_remote(lnkp)) {
+ struct ldlm_lock *lock;
+
+ /*
+ * for remote object, set lock cb_atomic, so lock can be
+ * released in blocking_ast() immediately, then the next
+ * lock_try will have better chance of success.
+ */
+ lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
+ LASSERT(lock != NULL);
+ lock_res_and_lock(lock);
+ ldlm_set_atomic_cb(lock);
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+
+ mdt_object_unlock(info, lnkp, lhl, 1);
+ RETURN(0);
+ }
+
+ /*
+ * we can't follow parent-child lock order like other MD
+ * operations, use lock_try here to avoid deadlock, if the lock
+ * cannot be taken, drop all locks taken, revoke the blocked
+ * one, and continue processing the remaining entries, and in
+ * the end of the loop restart from beginning.
+ *
+ * don't lock with PDO mode in case two links are under the same
+ * parent and their hash values are different.
+ */
+ rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
+ LCK_PW, true);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (!(ibits & MDS_INODELOCK_UPDATE)) {
+ CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
+ *blocked = true;
+ RETURN(-EAGAIN);
+ }
+
+ rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
+ if (rc) {
+ mdt_object_unlock(info, lnkp, lhl, 1);
+ RETURN(rc);
+ }
+
+ RETURN(1);
+}
+
+/* take link LOOKUP lock.
+ * \retval 0 \a lnkp is already locked, no lock taken.
+ * 1 lock taken.
+ * -ev negative errno.
+ */
+static int mdt_migrate_link_lock(struct mdt_thread_info *info,
+ struct mdt_object *lnkp,
+ struct mdt_object *spobj,
+ struct mdt_object *obj,
+ struct list_head *lookup_locks)
+{
+ const struct lu_fid *fid = mdt_object_fid(lnkp);
+ struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
+ struct mdt_link_lock *entry;
+ int rc;
+
+ ENTRY;
+
+ /* check if it's already locked by source */
+ rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
+ if (rc <= 0) {
+ CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
+ PFID(fid));
+ RETURN(rc);
+ }
+
+ /* check if it's already locked by other links */
+ list_for_each_entry(entry, lookup_locks, mll_linkage) {
+ rc = mdt_fids_different_target(info, fid,
+ mdt_object_fid(entry->mll_obj));
+ if (rc <= 0) {
+ CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
+ PFID(fid));
+ RETURN(rc);
+ }
}
+
+ rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX, true);
+ if (rc)
+ RETURN(rc);
+
+ /* don't take local LOOKUP lock, because later we will lock other ibits
+ * of sobj (which is on local MDT), and lock the same object twice may
+ * deadlock, just revoke this lock.
+ */
+ if (!mdt_object_remote(lnkp))
+ GOTO(unlock, rc = 0);
+
+ rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
+ if (rc)
+ GOTO(unlock, rc);
+
+ RETURN(1);
+unlock:
+ mdt_object_unlock(info, lnkp, lhl, 1);
+ return rc;
}
/*
- * lock parents of links, and also check whether total locks don't exceed
- * RS_MAX_LOCKS.
+ * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
+ * total local lock count exceeds RS_MAX_LOCKS.
*
* \retval 0 on success, and locks can be saved in ptlrpc_reply_stat
* \retval 1 on success, but total lock count may exceed RS_MAX_LOCKS
* \retval -ev negative errno upon error
*/
-static int mdt_link_parents_lock(struct mdt_thread_info *info,
- struct mdt_object *pobj,
- const struct md_attr *ma,
- struct mdt_object *obj,
- struct mdt_lock_handle *lhp,
- struct ldlm_enqueue_info *peinfo,
- struct list_head *parent_slave_locks,
- struct list_head *link_locks)
+static int mdt_migrate_links_lock(struct mdt_thread_info *info,
+ struct mdt_object *spobj,
+ struct mdt_object *tpobj,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhsp,
+ struct mdt_lock_handle *lhtp,
+ struct list_head *link_locks)
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_buf *buf = &info->mti_big_buf;
struct lu_name *lname = &info->mti_name;
struct linkea_data ldata = { NULL };
+ int local_lock_cnt = 0;
bool blocked = false;
- int local_lnkp_cnt = 0;
+ bool saved;
+ struct mdt_object *lnkp;
+ struct lu_fid fid;
+ LIST_HEAD(update_locks);
+ LIST_HEAD(lookup_locks);
int rc;
ENTRY;
for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
linkea_next_entry(&ldata)) {
- struct mdt_object *lnkp;
- struct mdt_sub_lock *msl;
- struct lu_fid fid;
- __u64 ibits;
-
linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
&fid);
- /* check if it's also linked to parent */
- if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
- CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
+ /* check if link parent is source parent too */
+ if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
+ CDEBUG(D_INFO,
+ "skip lock on source parent "DFID"/"DNAME"\n",
PFID(&fid), PNAME(lname));
- /* in case link is remote object, revoke LOOKUP lock */
- rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
continue;
}
- lnkp = NULL;
-
- /* check if it's linked to a stripe of parent */
- if (ma->ma_valid & MA_LMV) {
- struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
- struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
- int j = 0;
-
- for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
- fid_le_to_cpu(stripe_fid,
- &lmv->lmv_stripe_fids[j]);
- if (lu_fid_eq(stripe_fid, &fid)) {
- CDEBUG(D_INFO, "skip stripe "DFID
- ", reovke "DNAME"\n",
- PFID(&fid), PNAME(lname));
- lnkp = mdt_object_find(info->mti_env,
- mdt, &fid);
- if (IS_ERR(lnkp))
- GOTO(out, rc = PTR_ERR(lnkp));
- break;
- }
- }
-
- if (lnkp) {
- rc = mdt_revoke_remote_lookup_lock(info, lnkp,
- obj);
- mdt_object_put(info->mti_env, lnkp);
- continue;
- }
- }
-
- /* Check if it's already locked */
- list_for_each_entry(msl, link_locks, msl_linkage) {
- if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
- CDEBUG(D_INFO,
- DFID" was locked, revoke "DNAME"\n",
- PFID(&fid), PNAME(lname));
- lnkp = msl->msl_obj;
- break;
- }
- }
-
- if (lnkp) {
- rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+ /* check if link parent is target parent too */
+ if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
+ CDEBUG(D_INFO,
+ "skip lock on target parent "DFID"/"DNAME"\n",
+ PFID(&fid), PNAME(lname));
continue;
}
- CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
- PFID(&fid), PNAME(lname));
-
lnkp = mdt_object_find(info->mti_env, mdt, &fid);
if (IS_ERR(lnkp)) {
CWARN("%s: cannot find obj "DFID": %ld\n",
if (!mdt_object_exists(lnkp)) {
CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
- PFID(&fid), PNAME(lname));
+ PFID(&fid), PNAME(lname));
mdt_object_put(info->mti_env, lnkp);
continue;
}
-
- if (!mdt_object_remote(lnkp))
- local_lnkp_cnt++;
-
- OBD_ALLOC_PTR(msl);
- if (msl == NULL)
- GOTO(out, rc = -ENOMEM);
-
- /*
- * we can't follow parent-child lock order like other MD
- * operations, use lock_try here to avoid deadlock, if the lock
- * cannot be taken, drop all locks taken, revoke the blocked
- * one, and continue processing the remaining entries, and in
- * the end of the loop restart from beginning.
- */
- ibits = 0;
- rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
- MDS_INODELOCK_UPDATE, LCK_PW, true);
- if (!(ibits & MDS_INODELOCK_UPDATE)) {
-
- CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
- PFID(&fid), PNAME(lname));
-
- mdt_unlock_list(info, link_locks, 1);
- /* also unlock parent locks to avoid deadlock */
- if (!blocked)
- mdt_migrate_object_unlock(info, pobj, lhp,
- peinfo,
- parent_slave_locks,
- 1);
-
- blocked = true;
-
- rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
- MDS_INODELOCK_UPDATE, LCK_PW,
- true);
- if (rc) {
- mdt_object_put(info->mti_env, lnkp);
- OBD_FREE_PTR(msl);
- GOTO(out, rc);
- }
-
- if (mdt_object_remote(lnkp)) {
- struct ldlm_lock *lock;
-
- /*
- * for remote object, set lock cb_atomic,
- * so lock can be released in blocking_ast()
- * immediately, then the next lock_try will
- * have better chance of success.
- */
- lock = ldlm_handle2lock(
- &msl->msl_lh.mlh_rreg_lh);
- LASSERT(lock != NULL);
- lock_res_and_lock(lock);
- ldlm_set_atomic_cb(lock);
- unlock_res_and_lock(lock);
- LDLM_LOCK_PUT(lock);
- }
-
- mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
- OBD_FREE_PTR(msl);
- continue;
+relock:
+ saved = blocked;
+ rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
+ &blocked);
+ if (!saved && blocked) {
+ /* unlock all locks taken to avoid deadlock */
+ mdt_migrate_links_unlock(info, &update_locks, 1);
+ mdt_object_unlock(info, spobj, lhsp, 1);
+ if (tpobj != spobj)
+ mdt_object_unlock(info, tpobj, lhtp, 1);
+ goto relock;
+ }
+ if (rc < 0) {
+ mdt_object_put(info->mti_env, lnkp);
+ GOTO(out, rc);
}
- INIT_LIST_HEAD(&msl->msl_linkage);
- msl->msl_obj = lnkp;
- list_add_tail(&msl->msl_linkage, link_locks);
+ if (rc == 1 && !mdt_object_remote(lnkp))
+ local_lock_cnt++;
- rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+ rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
+ &lookup_locks);
+ if (rc < 0) {
+ mdt_object_put(info->mti_env, lnkp);
+ GOTO(out, rc);
+ }
+ if (rc == 1 && !mdt_object_remote(lnkp))
+ local_lock_cnt++;
+ mdt_object_put(info->mti_env, lnkp);
}
if (blocked)
EXIT;
out:
- if (rc) {
- mdt_unlock_list(info, link_locks, rc);
- } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
- CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
- local_lnkp_cnt);
+ list_splice(&update_locks, link_locks);
+ list_splice(&lookup_locks, link_locks);
+ if (rc < 0) {
+ mdt_migrate_links_unlock(info, link_locks, rc);
+ } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
/*
* parent may have 3 local objects: master object and 2 stripes
* (if it's being migrated too); source may have 1 local objects
* Note, source may have 2 local locks if it is directory but it
* can't have hardlinks, so it is not considered here.
*/
+ CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
+ local_lock_cnt);
rc = 1;
}
return rc;
}
-static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
- struct mdt_object *obj,
- const struct md_attr *ma,
- struct list_head *slave_locks)
-{
- struct mdt_device *mdt = info->mti_mdt;
- const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
- struct lu_fid *fid = &info->mti_tmp_fid1;
- struct mdt_object *slave;
- struct mdt_sub_lock *msl;
- int i;
- int rc;
-
- ENTRY;
- LASSERT(mdt_object_remote(obj));
- LASSERT(ma->ma_valid & MA_LMV);
- LASSERT(lmv);
-
- if (!lmv_is_sane(lmv))
- RETURN(-EINVAL);
-
- for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
-
- if (!fid_is_sane(fid))
- continue;
-
- slave = mdt_object_find(info->mti_env, mdt, fid);
- if (IS_ERR(slave))
- GOTO(out, rc = PTR_ERR(slave));
-
- OBD_ALLOC_PTR(msl);
- if (!msl) {
- mdt_object_put(info->mti_env, slave);
- GOTO(out, rc = -ENOMEM);
- }
-
- rc = mdt_object_lock(info, slave, &msl->msl_lh,
- MDS_INODELOCK_UPDATE, LCK_EX, true);
- if (rc) {
- OBD_FREE_PTR(msl);
- mdt_object_put(info->mti_env, slave);
- GOTO(out, rc);
- }
-
- INIT_LIST_HEAD(&msl->msl_linkage);
- msl->msl_obj = slave;
- list_add_tail(&msl->msl_linkage, slave_locks);
- }
- EXIT;
-
-out:
- if (rc)
- mdt_unlock_list(info, slave_locks, rc);
- return rc;
-}
-
-/* lock parent and its stripes */
-static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
- struct mdt_object *obj,
- const struct md_attr *ma,
- struct mdt_lock_handle *lh,
- struct ldlm_enqueue_info *einfo,
- struct list_head *slave_locks)
-{
- int rc;
-
- if (mdt_object_remote(obj)) {
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
- LCK_PW, true);
- if (rc)
- return rc;
-
- /*
- * if obj is remote and striped, lock its stripes explicitly
- * because it's not striped in LOD layer on this MDT.
- */
- if (ma->ma_valid & MA_LMV) {
- rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
- if (rc)
- mdt_object_unlock(info, obj, lh, rc);
- }
- } else {
- rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
- MDS_INODELOCK_UPDATE, LCK_PW,
- true);
- }
-
- return rc;
-}
-
-/*
- * in migration, object may be remote, and we need take full lock of it and its
- * stripes if it's directory, besides, object may be a remote object on its
- * parent, revoke its LOOKUP lock on where its parent is located.
- */
-static int mdt_migrate_object_lock(struct mdt_thread_info *info,
- struct mdt_object *pobj,
- struct mdt_object *obj,
- struct mdt_lock_handle *lh,
- struct ldlm_enqueue_info *einfo,
- struct list_head *slave_locks)
-{
- int rc;
-
- if (mdt_object_remote(obj)) {
- rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
- if (rc)
- return rc;
-
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
- true);
- if (rc)
- return rc;
-
- /*
- * if obj is remote and striped, lock its stripes explicitly
- * because it's not striped in LOD layer on this MDT.
- */
- if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
- struct md_attr *ma = &info->mti_attr;
-
- rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
- if (rc) {
- mdt_object_unlock(info, obj, lh, rc);
- return rc;
- }
-
- if (ma->ma_valid & MA_LMV) {
- rc = mdt_lock_remote_slaves(info, obj, ma,
- slave_locks);
- if (rc)
- mdt_object_unlock(info, obj, lh, rc);
- }
- }
- } else {
- rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
- MDS_INODELOCK_FULL, LCK_EX, true);
- }
-
- return rc;
-}
-
/*
* lookup source by name, if parent is striped directory, we need to find the
* corresponding stripe where source is located, and then lookup there.
*
* besides, if parent is migrating too, and file is already in target stripe,
* this should be a redo of 'lfs migrate' on client side.
+ *
+ * \retval 1 tpobj stripe index is less than spobj stripe index
+ * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
+ * \retval -ev negative errno upon error
*/
static int mdt_migrate_lookup(struct mdt_thread_info *info,
struct mdt_object *pobj,
const struct md_attr *ma,
const struct lu_name *lname,
struct mdt_object **spobj,
+ struct mdt_object **tpobj,
struct mdt_object **sobj)
{
const struct lu_env *env = info->mti_env;
struct lu_fid *fid = &info->mti_tmp_fid1;
- struct mdt_object *stripe;
+ int spindex = -1;
+ int tpindex = -1;
int rc;
if (ma->ma_valid & MA_LMV) {
/* if parent is striped, lookup on corresponding stripe */
struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ struct lu_fid *fid2 = &info->mti_tmp_fid2;
if (!lmv_is_sane(lmv))
return -EBADF;
- rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
- lname->ln_namelen);
- if (rc < 0)
- return rc;
+ spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
+ lname->ln_namelen);
+ if (spindex < 0)
+ return spindex;
+
+ fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+ *spobj = mdt_object_find(env, info->mti_mdt, fid2);
+ if (IS_ERR(*spobj)) {
+ rc = PTR_ERR(*spobj);
+ *spobj = NULL;
+ return rc;
+ }
- stripe = mdt_object_find(env, info->mti_mdt, fid);
- if (IS_ERR(stripe))
- return PTR_ERR(stripe);
+ if (!mdt_object_exists(*spobj))
+ GOTO(spobj_put, rc = -ENOENT);
fid_zero(fid);
- rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+ rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
&info->mti_spec);
- if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
+ if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
+ /* fail check here to let top dir migration succeed. */
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+ GOTO(spobj_put, rc = -EIO);
+
/*
* if parent layout is changeing, and lookup child
* failed on source stripe, lookup again on target
* was interrupted, and current file was migrated
* already.
*/
- mdt_object_put(env, stripe);
-
- rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
- lname->ln_namelen);
- if (rc < 0)
- return rc;
-
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+ tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
+ lname->ln_namelen);
+ if (tpindex < 0)
+ GOTO(spobj_put, rc = tpindex);
+
+ fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
+
+ *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
+ if (IS_ERR(*tpobj)) {
+ rc = PTR_ERR(*tpobj);
+ *tpobj = NULL;
+ GOTO(spobj_put, rc);
+ }
- stripe = mdt_object_find(env, info->mti_mdt, fid);
- if (IS_ERR(stripe))
- return PTR_ERR(stripe);
+ if (!mdt_object_exists(*tpobj))
+ GOTO(tpobj_put, rc = -ENOENT);
- fid_zero(fid);
- rc = mdo_lookup(env, mdt_object_child(stripe), lname,
- fid, &info->mti_spec);
- mdt_object_put(env, stripe);
- return rc ?: -EALREADY;
+ if (rc == -ENOENT) {
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(*tpobj),
+ lname, fid, &info->mti_spec);
+ GOTO(tpobj_put, rc = rc ?: -EALREADY);
+ }
} else if (rc) {
- mdt_object_put(env, stripe);
- return rc;
+ GOTO(spobj_put, rc);
+ } else {
+ *tpobj = *spobj;
+ tpindex = spindex;
+ mdt_object_get(env, *tpobj);
}
} else {
fid_zero(fid);
if (rc)
return rc;
- stripe = pobj;
- mdt_object_get(env, stripe);
+ *spobj = pobj;
+ *tpobj = pobj;
+ mdt_object_get(env, pobj);
+ mdt_object_get(env, pobj);
}
- *spobj = stripe;
-
*sobj = mdt_object_find(env, info->mti_mdt, fid);
if (IS_ERR(*sobj)) {
- mdt_object_put(env, stripe);
rc = PTR_ERR(*sobj);
- *spobj = NULL;
*sobj = NULL;
+ GOTO(tpobj_put, rc);
}
+ if (!mdt_object_exists(*sobj))
+ GOTO(sobj_put, rc = -ENOENT);
+
+ return (tpindex < spindex);
+
+sobj_put:
+ mdt_object_put(env, *sobj);
+ *sobj = NULL;
+tpobj_put:
+ mdt_object_put(env, *tpobj);
+ *tpobj = NULL;
+spobj_put:
+ mdt_object_put(env, *spobj);
+ *spobj = NULL;
+
return rc;
}
/*
* migrate file in below steps:
- * 1. lock parent and its stripes
+ * 1. lock source and target stripes
* 2. lookup source by name
* 3. lock parents of source links if source is not directory
* 4. reject if source is in HSM
* 5. take source open_sem and close file if source is regular file
- * 6. lock source and its stripes if it's directory
- * 7. lock target so subsequent change to it can trigger COS
- * 8. migrate file
+ * 6. lock source, and its stripes if it's directory
+ * 7. migrate file
+ * 8. lock target so subsequent change to it can trigger COS
* 9. unlock above locks
- * 10. sync device if source has links
+ * 10. sync device if source has too many links
*/
int mdt_reint_migrate(struct mdt_thread_info *info,
struct mdt_lock_handle *unused)
struct mdt_reint_record *rr = &info->mti_rr;
struct lu_ucred *uc = mdt_ucred(info);
struct md_attr *ma = &info->mti_attr;
- struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
- struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
struct mdt_object *pobj;
- struct mdt_object *spobj = NULL;
- struct mdt_object *sobj = NULL;
+ struct mdt_object *spobj;
+ struct mdt_object *tpobj;
+ struct mdt_object *sobj;
struct mdt_object *tobj;
struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
- struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhsp;
+ struct mdt_lock_handle *lhtp;
struct mdt_lock_handle *lhs;
- struct mdt_lock_handle *lht;
- LIST_HEAD(parent_slave_locks);
- LIST_HEAD(child_slave_locks);
+ struct mdt_lock_handle *lhl;
LIST_HEAD(link_locks);
int lock_retries = 5;
+ bool reverse = false;
bool open_sem_locked = false;
bool do_sync = false;
+ bool is_plain_dir = false;
int rc;
ENTRY;
if (rc)
GOTO(put_parent, rc);
-lock_parent:
- /* lock parent object */
- lhp = &info->mti_lh[MDT_LH_PARENT];
- rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
- &parent_slave_locks);
- if (rc)
+ /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
+ * if @pobj is migrating too, tpobj is the target parent stripe.
+ */
+ rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
+ &sobj);
+ if (rc < 0)
GOTO(put_parent, rc);
+ reverse = rc;
- /*
- * spobj is the corresponding stripe against name if pobj is striped
- * directory, which is the real parent, and no need to lock, because
- * we've taken full lock of pobj.
- */
- rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
- if (rc)
- GOTO(unlock_parent, rc);
+ /* parent unchanged, this happens in dir restripe */
+ if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
+ GOTO(put_source, rc = -EALREADY);
+
+lock_parent:
+ LASSERT(spobj);
+ LASSERT(tpobj);
+ lhsp = &info->mti_lh[MDT_LH_PARENT];
+ lhtp = &info->mti_lh[MDT_LH_CHILD];
+ /* lock spobj and tpobj in stripe index order */
+ if (reverse) {
+ rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW,
+ true);
+ if (rc)
+ GOTO(put_source, rc);
+
+ LASSERT(spobj != tpobj);
+ rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
+ true);
+ if (rc)
+ GOTO(unlock_parent, rc);
+ } else {
+ rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
+ true);
+ if (rc)
+ GOTO(put_source, rc);
- /* lock parents of source links, and revoke LOOKUP lock of links */
- rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
- &parent_slave_locks, &link_locks);
- if (rc == -EBUSY && lock_retries-- > 0) {
- mdt_object_put(env, sobj);
- mdt_object_put(env, spobj);
- goto lock_parent;
+ if (tpobj != spobj) {
+ rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
+ LCK_PW, true);
+ if (rc)
+ GOTO(unlock_parent, rc);
+ }
}
- if (rc < 0)
- GOTO(put_source, rc);
+ /* if inode is not migrated, or is dir, no need to lock links */
+ if (!info->mti_spec.sp_migrate_nsonly &&
+ !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
+ /* lock link parents, and take LOOKUP lock of links */
+ rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
+ lhtp, &link_locks);
+ if (rc == -EBUSY && lock_retries-- > 0) {
+ LASSERT(list_empty(&link_locks));
+ goto lock_parent;
+ }
- /*
- * RS_MAX_LOCKS is the limit of number of locks that can be saved along
- * with one request, if total lock count exceeds this limit, we will
- * drop all locks after migration, and synchronous device in the end.
- */
- do_sync = rc;
+ if (rc < 0)
+ GOTO(put_source, rc);
+
+ /*
+ * RS_MAX_LOCKS is the limit of number of locks that can be
+ * saved along with one request, if total lock count exceeds
+ * this limit, we will drop all locks after migration, and
+ * trigger commit in the end.
+ */
+ do_sync = rc;
+ }
+
+ /* lock source */
+ lhs = &info->mti_lh[MDT_LH_OLD];
+ lhl = &info->mti_lh[MDT_LH_LOOKUP];
+ rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
+ MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
+ MDS_INODELOCK_OPEN, true);
+ if (rc)
+ GOTO(unlock_links, rc);
- /* TODO: DoM migration is not supported, migrate dirent only */
if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
+ /* TODO: DoM migration is not supported, migrate dirent only */
rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
if (rc)
- GOTO(unlock_links, rc);
+ GOTO(unlock_source, rc);
if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
info->mti_spec.sp_migrate_nsonly = 1;
} else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
if (rc)
- GOTO(unlock_links, rc);
+ GOTO(unlock_source, rc);
- /* race with restripe/auto-split? */
- if ((ma->ma_valid & MA_LMV) &&
- lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
- GOTO(unlock_links, rc = -EBUSY);
+ if (!(ma->ma_valid & MA_LMV))
+ is_plain_dir = true;
+ else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
+ /* race with restripe/auto-split */
+ GOTO(unlock_source, rc = -EBUSY);
}
/* if migration HSM is allowed */
ma->ma_valid = 0;
rc = mdt_attr_get_complex(info, sobj, ma);
if (rc)
- GOTO(unlock_links, rc);
+ GOTO(unlock_source, rc);
if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
- GOTO(unlock_links, rc = -EOPNOTSUPP);
+ GOTO(unlock_source, rc = -EOPNOTSUPP);
}
/* end lease and close file for regular file */
if (!down_write_trylock(&sobj->mot_open_sem)) {
/* close anyway */
mdd_migrate_close(info, sobj);
- GOTO(unlock_links, rc = -EBUSY);
+ GOTO(unlock_source, rc = -EBUSY);
} else {
open_sem_locked = true;
rc = mdd_migrate_close(info, sobj);
- if (rc)
+ if (rc && rc != -ESTALE)
GOTO(unlock_open_sem, rc);
}
}
- /* lock source */
- lhs = &info->mti_lh[MDT_LH_OLD];
- rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
- &child_slave_locks);
- if (rc)
- GOTO(unlock_open_sem, rc);
-
- /* lock target */
tobj = mdt_object_find(env, mdt, rr->rr_fid2);
if (IS_ERR(tobj))
- GOTO(unlock_source, rc = PTR_ERR(tobj));
-
- lht = &info->mti_lh[MDT_LH_NEW];
- rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
- if (rc)
- GOTO(put_target, rc);
+ GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
/* Don't do lookup sanity check. We know name doesn't exist. */
info->mti_spec.sp_cr_lookup = 0;
info->mti_spec.sp_feat = &dt_directory_features;
- rc = mdo_migrate(env, mdt_object_child(pobj),
- mdt_object_child(sobj), &rr->rr_name,
- mdt_object_child(tobj),
+ rc = mdo_migrate(env, mdt_object_child(spobj),
+ mdt_object_child(tpobj), mdt_object_child(sobj),
+ mdt_object_child(tobj), &rr->rr_name,
&info->mti_spec, ma);
- if (!rc)
- lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
- LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
- EXIT;
+ if (rc)
+ GOTO(put_target, rc);
+
+ /* save target locks for directory */
+ if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
+ !info->mti_spec.sp_migrate_nsonly) {
+ struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+
+ /* in case sobj becomes a stripe of tobj, unlock sobj here,
+ * otherwise stripes lock may deadlock.
+ */
+ if (is_plain_dir)
+ mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
- mdt_object_unlock(info, tobj, lht, rc);
+ rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
+ MDS_INODELOCK_UPDATE, LCK_PW,
+ true);
+ if (rc)
+ GOTO(put_target, rc);
+
+ mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
+ }
+
+ lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
+ LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
+
+ EXIT;
put_target:
mdt_object_put(env, tobj);
-unlock_source:
- mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
- &child_slave_locks, rc);
unlock_open_sem:
if (open_sem_locked)
up_write(&sobj->mot_open_sem);
+unlock_source:
+ mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
unlock_links:
/* if we've got too many locks to save into RPC,
* then just commit before the locks are released
*/
if (!rc && do_sync)
mdt_device_sync(env, mdt);
- mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
+ mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
+unlock_parent:
+ mdt_object_unlock(info, spobj, lhsp, rc);
+ mdt_object_unlock(info, tpobj, lhtp, rc);
put_source:
mdt_object_put(env, sobj);
mdt_object_put(env, spobj);
-unlock_parent:
- mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
- &parent_slave_locks, rc);
+ mdt_object_put(env, tpobj);
put_parent:
+ mo_invalidate(env, mdt_object_child(pobj));
mdt_object_put(env, pobj);
unlock_rename:
mdt_rename_unlock(info, rename_lh);
+ if (rc)
+ CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+ PNAME(&rr->rr_name), rc);
+
return rc;
}
return sindex < tindex ? 0 : 1;
}
-/*
- * lock rename source object.
- *
- * Both source and source parent may be remote, and source may be a remote
- * object on source parent, to avoid overriding lock handle, store remote
- * LOOKUP lock separately in @lhr.
- *
- * \retval 0 on success
- * \retval -ev negative errno upon error
- */
-static int mdt_rename_source_lock(struct mdt_thread_info *info,
- struct mdt_object *parent,
- struct mdt_object *child,
- struct mdt_lock_handle *lhc,
- struct mdt_lock_handle *lhr,
- __u64 ibits,
- bool cos_incompat)
-{
- int rc;
-
- rc = mdt_is_remote_object(info, parent, child);
- if (rc < 0)
- return rc;
-
- if (rc == 1) {
- rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
- cos_incompat);
- if (rc)
- return rc;
-
- ibits &= ~MDS_INODELOCK_LOOKUP;
- }
-
- rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
- if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
- mdt_object_unlock(info, NULL, lhr, rc);
-
- return rc;
-}
-
/* Helper function for mdt_reint_rename so we don't need to opencode
* two different order lockings
*/
struct mdt_lock_handle *lh_srcdirp;
struct mdt_lock_handle *lh_tgtdirp;
struct mdt_lock_handle *lh_oldp = NULL;
- struct mdt_lock_handle *lh_rmt = NULL;
+ struct mdt_lock_handle *lh_lookup = NULL;
struct mdt_lock_handle *lh_newp = NULL;
struct lu_fid *old_fid = &info->mti_tmp_fid1;
struct lu_fid *new_fid = &info->mti_tmp_fid2;
GOTO(out_put_new, rc = -EISDIR);
lh_oldp = &info->mti_lh[MDT_LH_OLD];
- lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
+ lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
- lh_rmt, MDS_INODELOCK_LOOKUP |
+ lh_lookup,
+ MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_XATTR, cos_incompat);
if (rc < 0)
GOTO(out_put_new, rc);
GOTO(out_put_old, rc);
} else {
lh_oldp = &info->mti_lh[MDT_LH_OLD];
- lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
+ lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
- lh_rmt, MDS_INODELOCK_LOOKUP |
+ lh_lookup,
+ MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_XATTR, cos_incompat);
if (rc != 0)
GOTO(out_put_old, rc);
if (mnew != NULL)
mdt_object_unlock(info, mnew, lh_newp, rc);
out_unlock_old:
- mdt_object_unlock(info, NULL, lh_rmt, rc);
+ mdt_object_unlock(info, NULL, lh_lookup, rc);
mdt_object_unlock(info, mold, lh_oldp, rc);
out_put_new:
if (mnew && !discard)