Whamcloud - gitweb
LU-11047 mdt: standardize mdt object locking
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 6d01604..c2acfbb 100644 (file)
@@ -188,8 +188,17 @@ void mdt_set_disposition(struct mdt_thread_info *info,
                rep->lock_policy_res1 |= op_flag;
 }
 
+/* assert lock is unlocked before reuse */
+static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh)
+{
+       LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh));
+}
+
 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_pdo_hash = 0;
        lh->mlh_reg_mode = lm;
        lh->mlh_rreg_mode = lm;
@@ -206,6 +215,7 @@ void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_reg_mode = lock_mode;
        lh->mlh_pdo_mode = LCK_MINMODE;
        lh->mlh_rreg_mode = lock_mode;
@@ -1756,9 +1766,7 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
                if (layout->mlc_opc == MD_LAYOUT_WRITE)
                        lockpart |= MDS_INODELOCK_UPDATE;
 
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_EX);
-               rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+               rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX, false);
                if (rc)
                        RETURN(rc);
        }
@@ -1856,17 +1864,14 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
                GOTO(put, rc = -EPROTO);
 
        lh1 = &info->mti_lh[MDT_LH_NEW];
-       mdt_lock_reg_init(lh1, LCK_EX);
        lh2 = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lh2, LCK_EX);
-
        rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc < 0)
                GOTO(put, rc);
 
        rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc < 0)
                GOTO(unlock1, rc);
 
@@ -2079,9 +2084,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                if (rc < 0) {
                        RETURN(rc);
                } else if (rc > 0) {
-                       mdt_lock_handle_init(lhc);
-                       mdt_lock_reg_init(lhc, LCK_PR);
-
                        /*
                         * Object's name entry is on another MDS, it will
                         * request PERM lock only because LOOKUP lock is owned
@@ -2092,8 +2094,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        child_bits &= ~(MDS_INODELOCK_LOOKUP |
                                        MDS_INODELOCK_LAYOUT);
                        child_bits |= MDS_INODELOCK_PERM;
-
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                       rc = mdt_object_lock(info, child, lhc, child_bits,
+                                            LCK_PR, false);
                        if (rc < 0)
                                RETURN(rc);
                }
@@ -2248,9 +2250,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                /* step 1: lock parent only if parent is a directory */
                if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
                        lhp = &info->mti_lh[MDT_LH_PARENT];
-                       mdt_lock_pdo_init(lhp, LCK_PR, lname);
-                       rc = mdt_object_lock(info, parent, lhp,
-                                            MDS_INODELOCK_UPDATE);
+                       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR,
+                                             false);
                        if (unlikely(rc != 0))
                                RETURN(rc);
                }
@@ -2301,9 +2302,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (rc < 0) {
                GOTO(out_child, rc);
        } else if (rc > 0) {
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_PR);
-
                if (!(child_bits & MDS_INODELOCK_UPDATE) &&
                    !mdt_object_remote(child)) {
                        struct md_attr *ma = &info->mti_attr;
@@ -2345,15 +2343,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        /* try layout lock, it may fail to be granted due to
                         * contention at LOOKUP or UPDATE */
                        rc = mdt_object_lock_try(info, child, lhc, &child_bits,
-                                                try_bits, false);
+                                                try_bits, LCK_PR, false);
                        if (child_bits & MDS_INODELOCK_LAYOUT)
                                ma_need |= MA_LOV;
                } else {
                        /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
                         * client will enqueue the lock to the remote MDT */
                        if (mdt_object_remote(child))
-                               child_bits &= ~MDS_INODELOCK_UPDATE;
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                               rc = mdt_object_lookup_lock(info, NULL, child,
+                                                           lhc, LCK_PR, false);
+                       else
+                               rc = mdt_object_lock(info, child, lhc,
+                                                    child_bits, LCK_PR, false);
                }
                if (unlikely(rc != 0))
                        GOTO(out_child, rc);
@@ -2502,15 +2503,14 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
        if (IS_ERR(pobj))
                GOTO(out, rc = PTR_ERR(pobj));
 
+       if (mdt_object_remote(pobj))
+               cos_incompat = true;
+
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, name);
-       rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW, cos_incompat);
        if (rc != 0)
                GOTO(put_parent, rc);
 
-       if (mdt_object_remote(pobj))
-               cos_incompat = true;
-
        rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
                        name, child_fid, &info->mti_spec);
        if (rc != 0)
@@ -2520,10 +2520,10 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
                GOTO(unlock_parent, rc = -EREMCHG);
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(child_lh, LCK_EX);
-       rc = mdt_reint_striped_lock(info, obj, child_lh,
-                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
-                                   einfo, cos_incompat);
+       rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo,
+                                    MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE,
+                                    LCK_EX, cos_incompat);
        if (rc != 0)
                GOTO(unlock_parent, rc);
 
@@ -2546,7 +2546,7 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
        mutex_unlock(&obj->mot_lov_mutex);
 
 unlock_child:
-       mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+       mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1);
 unlock_parent:
        mdt_object_unlock(info, pobj, parent_lh, 1);
 put_parent:
@@ -3727,20 +3727,18 @@ static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock)
        mdt_object_get(NULL, lock->l_ast_data);
 }
 
-int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
-                              struct mdt_object *o, const struct lu_fid *fid,
-                              struct lustre_handle *lh, enum ldlm_mode mode,
-                              __u64 *ibits, __u64 trybits, bool cache)
+static int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+                                     struct mdt_object *obj,
+                                     struct lustre_handle *lh,
+                                     enum ldlm_mode mode,
+                                     union ldlm_policy_data *policy,
+                                     struct ldlm_res_id *res_id,
+                                     bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
-       union ldlm_policy_data *policy = &mti->mti_policy;
-       struct ldlm_res_id *res_id = &mti->mti_res_id;
-       int rc = 0;
-       ENTRY;
-
-       LASSERT(mdt_object_remote(o));
+       int rc;
 
-       fid_build_reg_res_name(fid, res_id);
+       LASSERT(mdt_object_remote(obj));
 
        memset(einfo, 0, sizeof(*einfo));
        einfo->ei_type = LDLM_IBITS;
@@ -3750,252 +3748,380 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
        einfo->ei_enq_slave = 0;
        einfo->ei_res_id = res_id;
        einfo->ei_req_slot = 1;
-
        if (cache) {
                /*
                 * if we cache lock, couple lock with mdt_object, so that object
                 * can be easily found in lock ASTs.
                 */
-               einfo->ei_cbdata = o;
+               einfo->ei_cbdata = obj;
                einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
        }
 
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = *ibits;
-       policy->l_inodebits.try_bits = trybits;
-
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo,
                            policy);
-
-       /* Return successfully acquired bits to a caller */
-       if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(lh);
-
-               LASSERT(lock);
-               *ibits = lock->l_policy_data.l_inodebits.bits;
-               LDLM_LOCK_PUT(lock);
+       if (rc) {
+               lh->cookie = 0ull;
+               return rc;
        }
-       RETURN(rc);
-}
 
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
-                          const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool cache)
-{
-       return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
-                                         cache);
+       /* other components like LFSCK can use lockless access
+        * and populate cache, so we better invalidate it
+        */
+       if (policy->l_inodebits.bits &
+           (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR))
+               mo_invalidate(mti->mti_env, mdt_object_child(obj));
+
+       return 0;
 }
 
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 *ibits,
-                         __u64 trybits, bool cos_incompat)
+/*
+ * Helper function to take PDO and hash lock.
+ *
+ * if \a pdo_lock is false, don't take PDO lock, this is case in rename.
+ */
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                       struct mdt_lock_handle *lh, const struct lu_name *name,
+                       enum ldlm_mode mode, bool pdo_lock, bool cos_incompat)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
-       __u64 dlmflags = 0, *cookie = NULL;
+       /*
+        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to
+        * be sent to client and we do not want it slowed down due to possible
+        * cancels.
+        */
+       __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+       __u64 *cookie = NULL;
        int rc;
-       ENTRY;
 
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-        LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
-        LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+       LASSERT(obj);
+
+       /* check for exists after object is locked */
+       if (!mdt_object_exists(obj))
+               /* Non-existent object shouldn't have PDO lock */
+               return -ESTALE;
+
+       /* Non-dir object shouldn't have PDO lock */
+       if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               return -ENOTDIR;
 
        if (cos_incompat) {
-               LASSERT(lh->mlh_reg_mode == LCK_PW ||
-                       lh->mlh_reg_mode == LCK_EX);
+               LASSERT(mode == LCK_PW || mode == LCK_EX);
                dlmflags |= LDLM_FL_COS_INCOMPAT;
        } else if (mdt_cos_is_enabled(info->mti_mdt)) {
                dlmflags |= LDLM_FL_COS_ENABLED;
        }
 
-       /* Only enqueue LOOKUP lock for remote object */
-       LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
-
-       /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
-       if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
-           *ibits == MDS_INODELOCK_OPEN)
-               dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
-
-       if (lh->mlh_type == MDT_PDO_LOCK) {
-                /* check for exists after object is locked */
-                if (mdt_object_exists(o) == 0) {
-                        /* Non-existent object shouldn't have PDO lock */
-                        RETURN(-ESTALE);
-                } else {
-                        /* Non-dir object shouldn't have PDO lock */
-                       if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
-                               RETURN(-ENOTDIR);
-               }
-       }
-
-       fid_build_reg_res_name(mdt_object_fid(o), res_id);
-       dlmflags |= LDLM_FL_ATOMIC_CB;
-
+       policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+       policy->l_inodebits.try_bits = 0;
+       policy->l_inodebits.li_gid = 0;
+       fid_build_reg_res_name(mdt_object_fid(obj), res_id);
        if (info->mti_exp)
                cookie = &info->mti_exp->exp_handle.h_cookie;
 
-       /*
-        * Take PDO lock on whole directory and build correct @res_id for lock
-        * on part of directory.
-        */
-       if (lh->mlh_pdo_hash != 0) {
-               LASSERT(lh->mlh_type == MDT_PDO_LOCK);
-               mdt_lock_pdo_mode(info, o, lh);
-               if (lh->mlh_pdo_mode != LCK_NL) {
-                       /*
-                        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
-                        * is never going to be sent to client and we do not
-                        * want it slowed down due to possible cancels.
-                        */
-                       policy->l_inodebits.bits =
-                               *ibits & MDS_INODELOCK_UPDATE;
-                       policy->l_inodebits.try_bits =
-                               trybits & MDS_INODELOCK_UPDATE;
-                       /* at least one of them should be set */
-                       LASSERT(policy->l_inodebits.bits |
-                               policy->l_inodebits.try_bits);
+       mdt_lock_pdo_init(lh, mode, name);
+       mdt_lock_pdo_mode(info, obj, lh);
+       if (lh->mlh_pdo_mode != LCK_NL) {
+               if (pdo_lock) {
                        rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
                                          lh->mlh_pdo_mode, policy, res_id,
                                          dlmflags, cookie);
-                       if (unlikely(rc != 0))
-                               GOTO(out_unlock, rc);
-                }
+                       if (rc) {
+                               mdt_object_unlock(info, obj, lh, 1);
+                               return rc;
+                       }
+               }
+               res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
+       }
 
-                /*
-                 * Finish res_id initializing by name hash marking part of
-                 * directory which is taking modification.
-                 */
-                res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
-        }
+       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+                         policy, res_id, dlmflags, cookie);
+       if (rc)
+               mdt_object_unlock(info, obj, lh, 1);
+       else if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) &&
+                  lh->mlh_pdo_hash != 0 &&
+                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+
+       return rc;
+}
+
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+                            struct mdt_object *obj, const struct lu_fid *fid,
+                            struct mdt_lock_handle *lh, __u64 *ibits,
+                            __u64 trybits, bool cache, bool cos_incompat)
+{
+       union ldlm_policy_data *policy = &info->mti_policy;
+       struct ldlm_res_id *res_id = &info->mti_res_id;
+       struct lustre_handle *handle;
+       int rc;
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
        policy->l_inodebits.li_gid = lh->mlh_gid;
+       fid_build_reg_res_name(fid, res_id);
 
-        /*
-         * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
-         * going to be sent to client. If it is - mdt_intent_policy() path will
-         * fix it up and turn FL_LOCAL flag off.
-         */
-       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
-                         policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
-                         cookie);
-out_unlock:
-       if (rc != 0)
-               mdt_object_unlock(info, o, lh, 1);
-       else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
-                  lh->mlh_pdo_hash != 0 &&
-                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+       if (obj && mdt_object_remote(obj)) {
+               handle = &lh->mlh_rreg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_rreg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+               rc = mdt_remote_object_lock_try(info, obj, handle,
+                                               lh->mlh_rreg_mode, policy,
+                                               res_id, cache);
+       } else {
+               struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+               /*
+                * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if
+                * it is going to be sent to client. If it is -
+                * mdt_intent_policy() path will fix it up and turn FL_LOCAL
+                * flag off.
+                */
+               __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY;
+               __u64 *cookie = NULL;
+
+               handle = &lh->mlh_reg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+
+               if (cos_incompat) {
+                       LASSERT(lh->mlh_reg_mode == LCK_PW ||
+                               lh->mlh_reg_mode == LCK_EX);
+                       dlmflags |= LDLM_FL_COS_INCOMPAT;
+               } else if (mdt_cos_is_enabled(info->mti_mdt)) {
+                       dlmflags |= LDLM_FL_COS_ENABLED;
+               }
+
+               /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
+               if (lh->mlh_type == MDT_REG_LOCK &&
+                   lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN)
+                       dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
+
+
+               if (info->mti_exp)
+                       cookie = &info->mti_exp->exp_handle.h_cookie;
+
+               rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode,
+                                 policy, res_id, dlmflags, cookie);
+               if (rc)
+                       mdt_object_unlock(info, obj, lh, 1);
+       }
 
-       /* Return successfully acquired bits to a caller */
        if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+               struct ldlm_lock *lock;
 
+               /* Return successfully acquired bits to a caller */
+               lock = ldlm_handle2lock(handle);
                LASSERT(lock);
                *ibits = lock->l_policy_data.l_inodebits.bits;
                LDLM_LOCK_PUT(lock);
        }
-       RETURN(rc);
+
+       return rc;
 }
 
-static int
-mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
-                        struct mdt_lock_handle *lh, __u64 *ibits,
-                        __u64 trybits, bool cos_incompat)
+/*
+ * MDT object locking functions:
+ * mdt_object_lock(): lock object, this is used in most places, and normally
+ *     lock ibits doesn't contain LOOKUP, unless the caller knows it's not
+ *     remote object.
+ * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs
+ *     to check whether parent is on remote MDT, if so, take LOOKUP on parent
+ *     MDT separately, and then lock other ibits on child object.
+ * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is
+ *     local, take PDO lock by name hash, otherwise take regular lock.
+ * mdt_object_stripes_lock(): lock object which should be local, and if it's a
+ *     striped directory, lock its stripes, this is called in operations which
+ *     modify both object and stripes.
+ * mdt_object_lock_try(): lock object with trybits, the trybits contains
+ *     optional inode lock bits that can be granted. This is called by
+ *     getattr/open to fetch more inode lock bits to client, and is also called
+ *     by dir migration to lock link parent in non-block mode to avoid
+ *     deadlock.
+ */
+
+/**
+ * lock object
+ *
+ * this is used to lock object in most places, and normally lock ibits doesn't
+ * contain LOOKUP, unless the caller knows it's not remote object.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, __u64 ibits,
+                   enum ldlm_mode mode, bool cos_incompat)
 {
-       struct mdt_lock_handle *local_lh = NULL;
        int rc;
-       ENTRY;
-
-       if (!mdt_object_remote(o)) {
-               rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
-                                          cos_incompat);
-               RETURN(rc);
-       }
 
-       /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
-       *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
-                   MDS_INODELOCK_XATTR);
+       ENTRY;
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false, cos_incompat);
+       RETURN(rc);
+}
 
-       /* Only enqueue LOOKUP lock for remote object */
-       if (*ibits & MDS_INODELOCK_LOOKUP) {
-               __u64 local = MDS_INODELOCK_LOOKUP;
+/**
+ * lock object with LOOKUP and other ibits
+ *
+ * it will check whether parent and child are on different MDTs, if so, take
+ * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock
+ * all ibits on child MDT. Note, parent and child shouldn't be both on remote
+ * MDTs, in which case specific lock function should be used, and it's in
+ * rename and migrate only.
+ *
+ * \param info         struct mdt_thread_info
+ * \param parent       parent object
+ * \param child                child object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_check_lock(struct mdt_thread_info *info,
+                         struct mdt_object *parent, struct mdt_object *child,
+                         struct mdt_lock_handle *lh, __u64 ibits,
+                         enum ldlm_mode mode, bool cos_incompat)
+{
+       int rc;
 
-               rc = mdt_object_local_lock(info, o, lh, &local, 0,
-                                          cos_incompat);
-               if (rc != ELDLM_OK)
+       ENTRY;
+       /* if LOOKUP ibit is not set, use mdt_object_lock() */
+       LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+       /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */
+       LASSERT(ibits != MDS_INODELOCK_LOOKUP);
+       LASSERT(parent);
+       /* @parent and @child shouldn't both be on remote MDTs */
+       LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child)));
+
+       mdt_lock_reg_init(lh, mode);
+       if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+               __u64 lookup_ibits = MDS_INODELOCK_LOOKUP;
+
+               rc = mdt_object_lock_internal(info, parent,
+                                             mdt_object_fid(child), lh,
+                                             &lookup_ibits, 0, false,
+                                             cos_incompat);
+               if (rc)
                        RETURN(rc);
 
-               local_lh = lh;
-       }
-
-       if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
-               /* Sigh, PDO needs to enqueue 2 locks right now, but
-                * enqueue RPC can only request 1 lock, to avoid extra
-                * RPC, so it will instead enqueue EX lock for remote
-                * object anyway XXX*/
-               if (lh->mlh_type == MDT_PDO_LOCK &&
-                   lh->mlh_pdo_hash != 0) {
-                       CDEBUG(D_INFO,
-                              "%s: "DFID" convert PDO lock to EX lock.\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)));
-                       lh->mlh_pdo_hash = 0;
-                       lh->mlh_rreg_mode = LCK_EX;
-                       lh->mlh_type = MDT_REG_LOCK;
-               }
-
-               rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
-                                               &lh->mlh_rreg_lh,
-                                               lh->mlh_rreg_mode,
-                                               ibits, trybits, false);
-               if (rc != ELDLM_OK) {
-                       if (local_lh != NULL)
-                               mdt_object_unlock(info, o, local_lh, rc);
-                       RETURN(rc);
-               }
+               ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       /* other components like LFSCK can use lockless access
-        * and populate cache, so we better invalidate it */
-       mo_invalidate(info->mti_env, mdt_object_child(o));
+       rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh,
+                                     &ibits, 0, false, cos_incompat);
+       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+               mdt_object_unlock(info, NULL, lh, 1);
 
-       RETURN(0);
+       RETURN(rc);
 }
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                   struct mdt_lock_handle *lh, __u64 ibits)
+/**
+ * take parent UPDATE lock
+ *
+ * if parent is local, take PDO lock by name hash, otherwise take regular lock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj  parent object
+ * \param lh   lock handle
+ * \param lname        child name
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval     0 on success, -ev on error.
+ */
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, const struct lu_name *lname,
+                   enum ldlm_mode mode, bool cos_incompat)
 {
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
-}
+       int rc;
 
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 ibits,
-                         bool cos_incompat)
-{
-       LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0,
-                                       cos_incompat);
+       ENTRY;
+       LASSERT(obj && lname);
+       if (mdt_object_remote(obj)) {
+               __u64 ibits = MDS_INODELOCK_UPDATE;
+
+               mdt_lock_reg_init(lh, mode);
+               rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj),
+                                             lh, &ibits, 0, false,
+                                             cos_incompat);
+       } else {
+               rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true,
+                                        cos_incompat);
+       }
+       RETURN(rc);
 }
 
-int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+/**
+ * lock object with trybits
+ *
+ * the trybits contains optional inode lock bits that can be granted. This is
+ * called by getattr/open to fetch more inode lock bits to client, and is also
+ * called by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param trybits      optional inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj,
                        struct mdt_lock_handle *lh, __u64 *ibits,
-                       __u64 trybits, bool cos_incompat)
+                       __u64 trybits, enum ldlm_mode mode, bool cos_incompat)
 {
        bool trylock_only = *ibits == 0;
        int rc;
 
+       ENTRY;
        LASSERT(!(*ibits & trybits));
-       rc = mdt_object_lock_internal(info, o, lh, ibits, trybits,
-                                     cos_incompat);
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits,
+                                     trybits, false, cos_incompat);
        if (rc && trylock_only) { /* clear error for try ibits lock only */
                LASSERT(*ibits == 0);
                rc = 0;
        }
-       return rc;
+       RETURN(rc);
+}
+
+/*
+ * Helper function to take \a obj LOOKUP lock.
+ *
+ * Both \a pobj and \a obj may be located on remote MDTs.
+ */
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+                          struct mdt_object *pobj, struct mdt_object *obj,
+                          struct mdt_lock_handle *lh, enum ldlm_mode mode,
+                          bool cos_incompat)
+{
+       __u64 ibits = MDS_INODELOCK_LOOKUP;
+       int rc;
+
+       ENTRY;
+       /* if @parent is NULL, it's on local MDT, and @child is remote,
+        * this is case in getattr/unlink/open by name.
+        */
+       LASSERT(ergo(!pobj, mdt_object_remote(obj)));
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false, cos_incompat);
+       RETURN(rc);
 }
 
 /**
@@ -4011,8 +4137,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
  * \param mode lock mode
  * \param decref force immediate lock releasing
  */
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
-                  enum ldlm_mode mode, int decref)
+static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+                         enum ldlm_mode mode, int decref)
 {
        struct tgt_session_info *tsi = info->mti_env->le_ses ?
                                       tgt_ses_info(info->mti_env) : NULL;
@@ -4134,32 +4260,32 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 }
 
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
-                                        const struct lu_fid *f,
-                                        struct mdt_lock_handle *lh,
-                                        __u64 ibits)
+                                       const struct lu_fid *f,
+                                       struct mdt_lock_handle *lh,
+                                       __u64 ibits, enum ldlm_mode mode)
 {
-        struct mdt_object *o;
+       struct mdt_object *o;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, f);
-        if (!IS_ERR(o)) {
-                int rc;
+       o = mdt_object_find(info->mti_env, info->mti_mdt, f);
+       if (!IS_ERR(o)) {
+               int rc;
 
-               rc = mdt_object_lock(info, o, lh, ibits);
-                if (rc != 0) {
-                        mdt_object_put(info->mti_env, o);
-                        o = ERR_PTR(rc);
-                }
-        }
-        return o;
+               rc = mdt_object_lock(info, o, lh, ibits, mode, false);
+               if (rc != 0) {
+                       mdt_object_put(info->mti_env, o);
+                       o = ERR_PTR(rc);
+               }
+       }
+       return o;
 }
 
 void mdt_object_unlock_put(struct mdt_thread_info * info,
-                           struct mdt_object * o,
-                           struct mdt_lock_handle *lh,
-                           int decref)
+                          struct mdt_object *o,
+                          struct mdt_lock_handle *lh,
+                          int decref)
 {
-        mdt_object_unlock(info, o, lh, decref);
-        mdt_object_put(info->mti_env, o);
+       mdt_object_unlock(info, o, lh, decref);
+       mdt_object_put(info->mti_env, o);
 }
 
 /*
@@ -4256,23 +4382,6 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
        RETURN(rc);
 }
 
-void mdt_lock_handle_init(struct mdt_lock_handle *lh)
-{
-        lh->mlh_type = MDT_NUL_LOCK;
-        lh->mlh_reg_lh.cookie = 0ull;
-        lh->mlh_reg_mode = LCK_MINMODE;
-        lh->mlh_pdo_lh.cookie = 0ull;
-        lh->mlh_pdo_mode = LCK_MINMODE;
-       lh->mlh_rreg_lh.cookie = 0ull;
-       lh->mlh_rreg_mode = LCK_MINMODE;
-}
-
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
-{
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-}
-
 void mdt_thread_info_reset(struct mdt_thread_info *info)
 {
        memset(&info->mti_attr, 0, sizeof(info->mti_attr));
@@ -4305,14 +4414,8 @@ void mdt_thread_info_reset(struct mdt_thread_info *info)
 void mdt_thread_info_init(struct ptlrpc_request *req,
                          struct mdt_thread_info *info)
 {
-        int i;
-
         info->mti_pill = &req->rq_pill;
 
-        /* lock handle */
-        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-                mdt_lock_handle_init(&info->mti_lh[i]);
-
         /* mdt device: it can be NULL while CONNECT */
         if (req->rq_export) {
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
@@ -4339,7 +4442,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info)
        }
 
        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-               mdt_lock_handle_fini(&info->mti_lh[i]);
+               mdt_lock_handle_assert(&info->mti_lh[i]);
        info->mti_env = NULL;
        info->mti_pill = NULL;
        info->mti_exp = NULL;
@@ -4552,9 +4655,9 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
         */
        mdt_intent_fixup_resent(info, *lockp, lhc, flags);
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-               mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
-                                    MDS_INODELOCK_XATTR);
+                                    MDS_INODELOCK_XATTR, (*lockp)->l_req_mode,
+                                    false);
                if (rc)
                        return rc;
        }
@@ -7518,12 +7621,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 
        CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
 
-        lh = &mti->mti_lh[MDT_LH_PARENT];
-        mdt_lock_reg_init(lh, LCK_CR);
-
-        obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
+       lh = &mti->mti_lh[MDT_LH_PARENT];
+       obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
 
        if (mdt_object_remote(obj)) {
                rc = -EREMOTE;