Whamcloud - gitweb
LU-11047 mdt: standardize mdt object locking 64/40764/46
authorLai Siyao <lai.siyao@whamcloud.com>
Fri, 23 Sep 2022 02:45:19 +0000 (22:45 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 11 Apr 2023 20:04:56 +0000 (20:04 +0000)
* object LOOKUP lock should be taken from parent MDT if object and
  its parent are located on different MDTs, but current mdt object
  locking doesn't handle this explicitly, instead it implies its
  parent is on local MDT.
* PDO locking should be put into a separate function.
* with the above changes, mdt_object_lock_internal() becomes a simple
  wrapper: it calls mdt_remote_object_lock_try() if object is remote,
  and mdt_fid_lock() otherwise.
* the MDT object locking functions are as below:
    . mdt_object_lock(): lock object, this is used in most places, and
normally lock ibits doesn't contain LOOKUP, unless the caller
knows its parent is on the same MDT.
    . mdt_object_check_lock(): lock object with LOOKUP and other
ibits, it needs to check whether parent is on different MDT,
if so, take LOOKUP lock on parent MDT, and then lock other
ibits on child MDT.
    . mdt_parent_lock(): take parent UPDATE lock with specific mode,
if parent is local, take PDO lock, otherwise take regular
lock.
    . mdt_object_stripes_lock(): lock object which should be local,
and if it's a striped directory, lock its stripes. This is
called in operations which modify dir object and its stripes.
    . mdt_object_lock_try(): lock object with trybits, the trybits
contains optional inode lock bits that can be granted. This is
called by getattr/open to fetch more inode lock bits to
client, and is also called by dir migration to lock link
parent in non-block mode to avoid deadlock.
    . rename/migrate source object are locked in specific functions,
because source object and its parent may be located on
        different remote MDTs.

Test-Parameters: mdscount=2 mdtcount=4 testlist=racer,racer,racer
Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I8225cbee4a1f5db8f77399866061f12e0a4cbb47
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40764
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Qian Yingjin <qian@ddn.com>
Reviewed-by: jsimmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/mdt/mdt_coordinator.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_hsm.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_restripe.c
lustre/mdt/mdt_xattr.c

index 2ac5853..54084a1 100644 (file)
@@ -850,9 +850,8 @@ int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
        mutex_unlock(&cdt->cdt_restore_lock);
 
        /* get the layout lock */
-       mdt_lock_reg_init(&lh, LCK_EX);
        obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
-                                  MDS_INODELOCK_LAYOUT);
+                                  MDS_INODELOCK_LAYOUT, LCK_EX);
        if (IS_ERR(obj)) {
                mutex_lock(&cdt->cdt_restore_lock);
                GOTO(out_ldel, rc = PTR_ERR(obj));
@@ -1377,8 +1376,8 @@ static int hsm_swap_layouts(struct mdt_thread_info *mti,
        /* we already have layout lock on obj so take only
         * on dfid */
        dlh = &mti->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(dlh, LCK_EX);
-       dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT);
+       dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT,
+                                   LCK_EX);
        if (IS_ERR(dobj))
                GOTO(out, rc = PTR_ERR(dobj));
 
@@ -1614,8 +1613,8 @@ static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
                if (!IS_ERR_OR_NULL(obj)) {
                        /* flush UPDATE lock so attributes are upadated */
                        lh = &mti->mti_lh[MDT_LH_OLD];
-                       mdt_lock_reg_init(lh, LCK_EX);
-                       mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE);
+                       mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE,
+                                       LCK_EX, false);
                        mdt_object_unlock(mti, obj, lh, 1);
                }
        }
index 6d01604..c2acfbb 100644 (file)
@@ -188,8 +188,17 @@ void mdt_set_disposition(struct mdt_thread_info *info,
                rep->lock_policy_res1 |= op_flag;
 }
 
+/* assert lock is unlocked before reuse */
+static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh)
+{
+       LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh));
+}
+
 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_pdo_hash = 0;
        lh->mlh_reg_mode = lm;
        lh->mlh_rreg_mode = lm;
@@ -206,6 +215,7 @@ void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_reg_mode = lock_mode;
        lh->mlh_pdo_mode = LCK_MINMODE;
        lh->mlh_rreg_mode = lock_mode;
@@ -1756,9 +1766,7 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
                if (layout->mlc_opc == MD_LAYOUT_WRITE)
                        lockpart |= MDS_INODELOCK_UPDATE;
 
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_EX);
-               rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+               rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX, false);
                if (rc)
                        RETURN(rc);
        }
@@ -1856,17 +1864,14 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
                GOTO(put, rc = -EPROTO);
 
        lh1 = &info->mti_lh[MDT_LH_NEW];
-       mdt_lock_reg_init(lh1, LCK_EX);
        lh2 = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lh2, LCK_EX);
-
        rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc < 0)
                GOTO(put, rc);
 
        rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc < 0)
                GOTO(unlock1, rc);
 
@@ -2079,9 +2084,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                if (rc < 0) {
                        RETURN(rc);
                } else if (rc > 0) {
-                       mdt_lock_handle_init(lhc);
-                       mdt_lock_reg_init(lhc, LCK_PR);
-
                        /*
                         * Object's name entry is on another MDS, it will
                         * request PERM lock only because LOOKUP lock is owned
@@ -2092,8 +2094,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        child_bits &= ~(MDS_INODELOCK_LOOKUP |
                                        MDS_INODELOCK_LAYOUT);
                        child_bits |= MDS_INODELOCK_PERM;
-
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                       rc = mdt_object_lock(info, child, lhc, child_bits,
+                                            LCK_PR, false);
                        if (rc < 0)
                                RETURN(rc);
                }
@@ -2248,9 +2250,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                /* step 1: lock parent only if parent is a directory */
                if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
                        lhp = &info->mti_lh[MDT_LH_PARENT];
-                       mdt_lock_pdo_init(lhp, LCK_PR, lname);
-                       rc = mdt_object_lock(info, parent, lhp,
-                                            MDS_INODELOCK_UPDATE);
+                       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR,
+                                             false);
                        if (unlikely(rc != 0))
                                RETURN(rc);
                }
@@ -2301,9 +2302,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (rc < 0) {
                GOTO(out_child, rc);
        } else if (rc > 0) {
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_PR);
-
                if (!(child_bits & MDS_INODELOCK_UPDATE) &&
                    !mdt_object_remote(child)) {
                        struct md_attr *ma = &info->mti_attr;
@@ -2345,15 +2343,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        /* try layout lock, it may fail to be granted due to
                         * contention at LOOKUP or UPDATE */
                        rc = mdt_object_lock_try(info, child, lhc, &child_bits,
-                                                try_bits, false);
+                                                try_bits, LCK_PR, false);
                        if (child_bits & MDS_INODELOCK_LAYOUT)
                                ma_need |= MA_LOV;
                } else {
                        /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
                         * client will enqueue the lock to the remote MDT */
                        if (mdt_object_remote(child))
-                               child_bits &= ~MDS_INODELOCK_UPDATE;
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                               rc = mdt_object_lookup_lock(info, NULL, child,
+                                                           lhc, LCK_PR, false);
+                       else
+                               rc = mdt_object_lock(info, child, lhc,
+                                                    child_bits, LCK_PR, false);
                }
                if (unlikely(rc != 0))
                        GOTO(out_child, rc);
@@ -2502,15 +2503,14 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
        if (IS_ERR(pobj))
                GOTO(out, rc = PTR_ERR(pobj));
 
+       if (mdt_object_remote(pobj))
+               cos_incompat = true;
+
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, name);
-       rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW, cos_incompat);
        if (rc != 0)
                GOTO(put_parent, rc);
 
-       if (mdt_object_remote(pobj))
-               cos_incompat = true;
-
        rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
                        name, child_fid, &info->mti_spec);
        if (rc != 0)
@@ -2520,10 +2520,10 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
                GOTO(unlock_parent, rc = -EREMCHG);
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(child_lh, LCK_EX);
-       rc = mdt_reint_striped_lock(info, obj, child_lh,
-                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
-                                   einfo, cos_incompat);
+       rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo,
+                                    MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE,
+                                    LCK_EX, cos_incompat);
        if (rc != 0)
                GOTO(unlock_parent, rc);
 
@@ -2546,7 +2546,7 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
        mutex_unlock(&obj->mot_lov_mutex);
 
 unlock_child:
-       mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+       mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1);
 unlock_parent:
        mdt_object_unlock(info, pobj, parent_lh, 1);
 put_parent:
@@ -3727,20 +3727,18 @@ static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock)
        mdt_object_get(NULL, lock->l_ast_data);
 }
 
-int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
-                              struct mdt_object *o, const struct lu_fid *fid,
-                              struct lustre_handle *lh, enum ldlm_mode mode,
-                              __u64 *ibits, __u64 trybits, bool cache)
+static int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+                                     struct mdt_object *obj,
+                                     struct lustre_handle *lh,
+                                     enum ldlm_mode mode,
+                                     union ldlm_policy_data *policy,
+                                     struct ldlm_res_id *res_id,
+                                     bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
-       union ldlm_policy_data *policy = &mti->mti_policy;
-       struct ldlm_res_id *res_id = &mti->mti_res_id;
-       int rc = 0;
-       ENTRY;
-
-       LASSERT(mdt_object_remote(o));
+       int rc;
 
-       fid_build_reg_res_name(fid, res_id);
+       LASSERT(mdt_object_remote(obj));
 
        memset(einfo, 0, sizeof(*einfo));
        einfo->ei_type = LDLM_IBITS;
@@ -3750,252 +3748,380 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
        einfo->ei_enq_slave = 0;
        einfo->ei_res_id = res_id;
        einfo->ei_req_slot = 1;
-
        if (cache) {
                /*
                 * if we cache lock, couple lock with mdt_object, so that object
                 * can be easily found in lock ASTs.
                 */
-               einfo->ei_cbdata = o;
+               einfo->ei_cbdata = obj;
                einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
        }
 
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = *ibits;
-       policy->l_inodebits.try_bits = trybits;
-
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo,
                            policy);
-
-       /* Return successfully acquired bits to a caller */
-       if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(lh);
-
-               LASSERT(lock);
-               *ibits = lock->l_policy_data.l_inodebits.bits;
-               LDLM_LOCK_PUT(lock);
+       if (rc) {
+               lh->cookie = 0ull;
+               return rc;
        }
-       RETURN(rc);
-}
 
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
-                          const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool cache)
-{
-       return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
-                                         cache);
+       /* other components like LFSCK can use lockless access
+        * and populate cache, so we better invalidate it
+        */
+       if (policy->l_inodebits.bits &
+           (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR))
+               mo_invalidate(mti->mti_env, mdt_object_child(obj));
+
+       return 0;
 }
 
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 *ibits,
-                         __u64 trybits, bool cos_incompat)
+/*
+ * Helper function to take PDO and hash lock.
+ *
+ * if \a pdo_lock is false, don't take PDO lock, this is case in rename.
+ */
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                       struct mdt_lock_handle *lh, const struct lu_name *name,
+                       enum ldlm_mode mode, bool pdo_lock, bool cos_incompat)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
-       __u64 dlmflags = 0, *cookie = NULL;
+       /*
+        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to
+        * be sent to client and we do not want it slowed down due to possible
+        * cancels.
+        */
+       __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+       __u64 *cookie = NULL;
        int rc;
-       ENTRY;
 
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-        LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
-        LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+       LASSERT(obj);
+
+       /* check for exists after object is locked */
+       if (!mdt_object_exists(obj))
+               /* Non-existent object shouldn't have PDO lock */
+               return -ESTALE;
+
+       /* Non-dir object shouldn't have PDO lock */
+       if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               return -ENOTDIR;
 
        if (cos_incompat) {
-               LASSERT(lh->mlh_reg_mode == LCK_PW ||
-                       lh->mlh_reg_mode == LCK_EX);
+               LASSERT(mode == LCK_PW || mode == LCK_EX);
                dlmflags |= LDLM_FL_COS_INCOMPAT;
        } else if (mdt_cos_is_enabled(info->mti_mdt)) {
                dlmflags |= LDLM_FL_COS_ENABLED;
        }
 
-       /* Only enqueue LOOKUP lock for remote object */
-       LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
-
-       /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
-       if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
-           *ibits == MDS_INODELOCK_OPEN)
-               dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
-
-       if (lh->mlh_type == MDT_PDO_LOCK) {
-                /* check for exists after object is locked */
-                if (mdt_object_exists(o) == 0) {
-                        /* Non-existent object shouldn't have PDO lock */
-                        RETURN(-ESTALE);
-                } else {
-                        /* Non-dir object shouldn't have PDO lock */
-                       if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
-                               RETURN(-ENOTDIR);
-               }
-       }
-
-       fid_build_reg_res_name(mdt_object_fid(o), res_id);
-       dlmflags |= LDLM_FL_ATOMIC_CB;
-
+       policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+       policy->l_inodebits.try_bits = 0;
+       policy->l_inodebits.li_gid = 0;
+       fid_build_reg_res_name(mdt_object_fid(obj), res_id);
        if (info->mti_exp)
                cookie = &info->mti_exp->exp_handle.h_cookie;
 
-       /*
-        * Take PDO lock on whole directory and build correct @res_id for lock
-        * on part of directory.
-        */
-       if (lh->mlh_pdo_hash != 0) {
-               LASSERT(lh->mlh_type == MDT_PDO_LOCK);
-               mdt_lock_pdo_mode(info, o, lh);
-               if (lh->mlh_pdo_mode != LCK_NL) {
-                       /*
-                        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
-                        * is never going to be sent to client and we do not
-                        * want it slowed down due to possible cancels.
-                        */
-                       policy->l_inodebits.bits =
-                               *ibits & MDS_INODELOCK_UPDATE;
-                       policy->l_inodebits.try_bits =
-                               trybits & MDS_INODELOCK_UPDATE;
-                       /* at least one of them should be set */
-                       LASSERT(policy->l_inodebits.bits |
-                               policy->l_inodebits.try_bits);
+       mdt_lock_pdo_init(lh, mode, name);
+       mdt_lock_pdo_mode(info, obj, lh);
+       if (lh->mlh_pdo_mode != LCK_NL) {
+               if (pdo_lock) {
                        rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
                                          lh->mlh_pdo_mode, policy, res_id,
                                          dlmflags, cookie);
-                       if (unlikely(rc != 0))
-                               GOTO(out_unlock, rc);
-                }
+                       if (rc) {
+                               mdt_object_unlock(info, obj, lh, 1);
+                               return rc;
+                       }
+               }
+               res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
+       }
 
-                /*
-                 * Finish res_id initializing by name hash marking part of
-                 * directory which is taking modification.
-                 */
-                res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
-        }
+       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+                         policy, res_id, dlmflags, cookie);
+       if (rc)
+               mdt_object_unlock(info, obj, lh, 1);
+       else if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) &&
+                  lh->mlh_pdo_hash != 0 &&
+                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+
+       return rc;
+}
+
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+                            struct mdt_object *obj, const struct lu_fid *fid,
+                            struct mdt_lock_handle *lh, __u64 *ibits,
+                            __u64 trybits, bool cache, bool cos_incompat)
+{
+       union ldlm_policy_data *policy = &info->mti_policy;
+       struct ldlm_res_id *res_id = &info->mti_res_id;
+       struct lustre_handle *handle;
+       int rc;
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
        policy->l_inodebits.li_gid = lh->mlh_gid;
+       fid_build_reg_res_name(fid, res_id);
 
-        /*
-         * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
-         * going to be sent to client. If it is - mdt_intent_policy() path will
-         * fix it up and turn FL_LOCAL flag off.
-         */
-       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
-                         policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
-                         cookie);
-out_unlock:
-       if (rc != 0)
-               mdt_object_unlock(info, o, lh, 1);
-       else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
-                  lh->mlh_pdo_hash != 0 &&
-                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+       if (obj && mdt_object_remote(obj)) {
+               handle = &lh->mlh_rreg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_rreg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+               rc = mdt_remote_object_lock_try(info, obj, handle,
+                                               lh->mlh_rreg_mode, policy,
+                                               res_id, cache);
+       } else {
+               struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+               /*
+                * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if
+                * it is going to be sent to client. If it is -
+                * mdt_intent_policy() path will fix it up and turn FL_LOCAL
+                * flag off.
+                */
+               __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY;
+               __u64 *cookie = NULL;
+
+               handle = &lh->mlh_reg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+
+               if (cos_incompat) {
+                       LASSERT(lh->mlh_reg_mode == LCK_PW ||
+                               lh->mlh_reg_mode == LCK_EX);
+                       dlmflags |= LDLM_FL_COS_INCOMPAT;
+               } else if (mdt_cos_is_enabled(info->mti_mdt)) {
+                       dlmflags |= LDLM_FL_COS_ENABLED;
+               }
+
+               /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
+               if (lh->mlh_type == MDT_REG_LOCK &&
+                   lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN)
+                       dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
+
+
+               if (info->mti_exp)
+                       cookie = &info->mti_exp->exp_handle.h_cookie;
+
+               rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode,
+                                 policy, res_id, dlmflags, cookie);
+               if (rc)
+                       mdt_object_unlock(info, obj, lh, 1);
+       }
 
-       /* Return successfully acquired bits to a caller */
        if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+               struct ldlm_lock *lock;
 
+               /* Return successfully acquired bits to a caller */
+               lock = ldlm_handle2lock(handle);
                LASSERT(lock);
                *ibits = lock->l_policy_data.l_inodebits.bits;
                LDLM_LOCK_PUT(lock);
        }
-       RETURN(rc);
+
+       return rc;
 }
 
-static int
-mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
-                        struct mdt_lock_handle *lh, __u64 *ibits,
-                        __u64 trybits, bool cos_incompat)
+/*
+ * MDT object locking functions:
+ * mdt_object_lock(): lock object, this is used in most places, and normally
+ *     lock ibits doesn't contain LOOKUP, unless the caller knows it's not
+ *     remote object.
+ * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs
+ *     to check whether parent is on remote MDT, if so, take LOOKUP on parent
+ *     MDT separately, and then lock other ibits on child object.
+ * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is
+ *     local, take PDO lock by name hash, otherwise take regular lock.
+ * mdt_object_stripes_lock(): lock object which should be local, and if it's a
+ *     striped directory, lock its stripes, this is called in operations which
+ *     modify both object and stripes.
+ * mdt_object_lock_try(): lock object with trybits, the trybits contains
+ *     optional inode lock bits that can be granted. This is called by
+ *     getattr/open to fetch more inode lock bits to client, and is also called
+ *     by dir migration to lock link parent in non-block mode to avoid
+ *     deadlock.
+ */
+
+/**
+ * lock object
+ *
+ * this is used to lock object in most places, and normally lock ibits doesn't
+ * contain LOOKUP, unless the caller knows it's not remote object.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, __u64 ibits,
+                   enum ldlm_mode mode, bool cos_incompat)
 {
-       struct mdt_lock_handle *local_lh = NULL;
        int rc;
-       ENTRY;
-
-       if (!mdt_object_remote(o)) {
-               rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
-                                          cos_incompat);
-               RETURN(rc);
-       }
 
-       /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
-       *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
-                   MDS_INODELOCK_XATTR);
+       ENTRY;
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false, cos_incompat);
+       RETURN(rc);
+}
 
-       /* Only enqueue LOOKUP lock for remote object */
-       if (*ibits & MDS_INODELOCK_LOOKUP) {
-               __u64 local = MDS_INODELOCK_LOOKUP;
+/**
+ * lock object with LOOKUP and other ibits
+ *
+ * it will check whether parent and child are on different MDTs, if so, take
+ * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock
+ * all ibits on child MDT. Note, parent and child shouldn't be both on remote
+ * MDTs, in which case specific lock function should be used, and it's in
+ * rename and migrate only.
+ *
+ * \param info         struct mdt_thread_info
+ * \param parent       parent object
+ * \param child                child object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_check_lock(struct mdt_thread_info *info,
+                         struct mdt_object *parent, struct mdt_object *child,
+                         struct mdt_lock_handle *lh, __u64 ibits,
+                         enum ldlm_mode mode, bool cos_incompat)
+{
+       int rc;
 
-               rc = mdt_object_local_lock(info, o, lh, &local, 0,
-                                          cos_incompat);
-               if (rc != ELDLM_OK)
+       ENTRY;
+       /* if LOOKUP ibit is not set, use mdt_object_lock() */
+       LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+       /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */
+       LASSERT(ibits != MDS_INODELOCK_LOOKUP);
+       LASSERT(parent);
+       /* @parent and @child shouldn't both be on remote MDTs */
+       LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child)));
+
+       mdt_lock_reg_init(lh, mode);
+       if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+               __u64 lookup_ibits = MDS_INODELOCK_LOOKUP;
+
+               rc = mdt_object_lock_internal(info, parent,
+                                             mdt_object_fid(child), lh,
+                                             &lookup_ibits, 0, false,
+                                             cos_incompat);
+               if (rc)
                        RETURN(rc);
 
-               local_lh = lh;
-       }
-
-       if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
-               /* Sigh, PDO needs to enqueue 2 locks right now, but
-                * enqueue RPC can only request 1 lock, to avoid extra
-                * RPC, so it will instead enqueue EX lock for remote
-                * object anyway XXX*/
-               if (lh->mlh_type == MDT_PDO_LOCK &&
-                   lh->mlh_pdo_hash != 0) {
-                       CDEBUG(D_INFO,
-                              "%s: "DFID" convert PDO lock to EX lock.\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)));
-                       lh->mlh_pdo_hash = 0;
-                       lh->mlh_rreg_mode = LCK_EX;
-                       lh->mlh_type = MDT_REG_LOCK;
-               }
-
-               rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
-                                               &lh->mlh_rreg_lh,
-                                               lh->mlh_rreg_mode,
-                                               ibits, trybits, false);
-               if (rc != ELDLM_OK) {
-                       if (local_lh != NULL)
-                               mdt_object_unlock(info, o, local_lh, rc);
-                       RETURN(rc);
-               }
+               ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       /* other components like LFSCK can use lockless access
-        * and populate cache, so we better invalidate it */
-       mo_invalidate(info->mti_env, mdt_object_child(o));
+       rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh,
+                                     &ibits, 0, false, cos_incompat);
+       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+               mdt_object_unlock(info, NULL, lh, 1);
 
-       RETURN(0);
+       RETURN(rc);
 }
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                   struct mdt_lock_handle *lh, __u64 ibits)
+/**
+ * take parent UPDATE lock
+ *
+ * if parent is local, take PDO lock by name hash, otherwise take regular lock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj  parent object
+ * \param lh   lock handle
+ * \param lname        child name
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval     0 on success, -ev on error.
+ */
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, const struct lu_name *lname,
+                   enum ldlm_mode mode, bool cos_incompat)
 {
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
-}
+       int rc;
 
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 ibits,
-                         bool cos_incompat)
-{
-       LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0,
-                                       cos_incompat);
+       ENTRY;
+       LASSERT(obj && lname);
+       if (mdt_object_remote(obj)) {
+               __u64 ibits = MDS_INODELOCK_UPDATE;
+
+               mdt_lock_reg_init(lh, mode);
+               rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj),
+                                             lh, &ibits, 0, false,
+                                             cos_incompat);
+       } else {
+               rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true,
+                                        cos_incompat);
+       }
+       RETURN(rc);
 }
 
-int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+/**
+ * lock object with trybits
+ *
+ * the trybits contains optional inode lock bits that can be granted. This is
+ * called by getattr/open to fetch more inode lock bits to client, and is also
+ * called by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param trybits      optional inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj,
                        struct mdt_lock_handle *lh, __u64 *ibits,
-                       __u64 trybits, bool cos_incompat)
+                       __u64 trybits, enum ldlm_mode mode, bool cos_incompat)
 {
        bool trylock_only = *ibits == 0;
        int rc;
 
+       ENTRY;
        LASSERT(!(*ibits & trybits));
-       rc = mdt_object_lock_internal(info, o, lh, ibits, trybits,
-                                     cos_incompat);
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits,
+                                     trybits, false, cos_incompat);
        if (rc && trylock_only) { /* clear error for try ibits lock only */
                LASSERT(*ibits == 0);
                rc = 0;
        }
-       return rc;
+       RETURN(rc);
+}
+
+/*
+ * Helper function to take \a obj LOOKUP lock.
+ *
+ * Both \a pobj and \a obj may be located on remote MDTs.
+ */
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+                          struct mdt_object *pobj, struct mdt_object *obj,
+                          struct mdt_lock_handle *lh, enum ldlm_mode mode,
+                          bool cos_incompat)
+{
+       __u64 ibits = MDS_INODELOCK_LOOKUP;
+       int rc;
+
+       ENTRY;
+       /* if @parent is NULL, it's on local MDT, and @child is remote,
+        * this is case in getattr/unlink/open by name.
+        */
+       LASSERT(ergo(!pobj, mdt_object_remote(obj)));
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false, cos_incompat);
+       RETURN(rc);
 }
 
 /**
@@ -4011,8 +4137,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
  * \param mode lock mode
  * \param decref force immediate lock releasing
  */
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
-                  enum ldlm_mode mode, int decref)
+static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+                         enum ldlm_mode mode, int decref)
 {
        struct tgt_session_info *tsi = info->mti_env->le_ses ?
                                       tgt_ses_info(info->mti_env) : NULL;
@@ -4134,32 +4260,32 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 }
 
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
-                                        const struct lu_fid *f,
-                                        struct mdt_lock_handle *lh,
-                                        __u64 ibits)
+                                       const struct lu_fid *f,
+                                       struct mdt_lock_handle *lh,
+                                       __u64 ibits, enum ldlm_mode mode)
 {
-        struct mdt_object *o;
+       struct mdt_object *o;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, f);
-        if (!IS_ERR(o)) {
-                int rc;
+       o = mdt_object_find(info->mti_env, info->mti_mdt, f);
+       if (!IS_ERR(o)) {
+               int rc;
 
-               rc = mdt_object_lock(info, o, lh, ibits);
-                if (rc != 0) {
-                        mdt_object_put(info->mti_env, o);
-                        o = ERR_PTR(rc);
-                }
-        }
-        return o;
+               rc = mdt_object_lock(info, o, lh, ibits, mode, false);
+               if (rc != 0) {
+                       mdt_object_put(info->mti_env, o);
+                       o = ERR_PTR(rc);
+               }
+       }
+       return o;
 }
 
 void mdt_object_unlock_put(struct mdt_thread_info * info,
-                           struct mdt_object * o,
-                           struct mdt_lock_handle *lh,
-                           int decref)
+                          struct mdt_object *o,
+                          struct mdt_lock_handle *lh,
+                          int decref)
 {
-        mdt_object_unlock(info, o, lh, decref);
-        mdt_object_put(info->mti_env, o);
+       mdt_object_unlock(info, o, lh, decref);
+       mdt_object_put(info->mti_env, o);
 }
 
 /*
@@ -4256,23 +4382,6 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
        RETURN(rc);
 }
 
-void mdt_lock_handle_init(struct mdt_lock_handle *lh)
-{
-        lh->mlh_type = MDT_NUL_LOCK;
-        lh->mlh_reg_lh.cookie = 0ull;
-        lh->mlh_reg_mode = LCK_MINMODE;
-        lh->mlh_pdo_lh.cookie = 0ull;
-        lh->mlh_pdo_mode = LCK_MINMODE;
-       lh->mlh_rreg_lh.cookie = 0ull;
-       lh->mlh_rreg_mode = LCK_MINMODE;
-}
-
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
-{
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-}
-
 void mdt_thread_info_reset(struct mdt_thread_info *info)
 {
        memset(&info->mti_attr, 0, sizeof(info->mti_attr));
@@ -4305,14 +4414,8 @@ void mdt_thread_info_reset(struct mdt_thread_info *info)
 void mdt_thread_info_init(struct ptlrpc_request *req,
                          struct mdt_thread_info *info)
 {
-        int i;
-
         info->mti_pill = &req->rq_pill;
 
-        /* lock handle */
-        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-                mdt_lock_handle_init(&info->mti_lh[i]);
-
         /* mdt device: it can be NULL while CONNECT */
         if (req->rq_export) {
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
@@ -4339,7 +4442,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info)
        }
 
        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-               mdt_lock_handle_fini(&info->mti_lh[i]);
+               mdt_lock_handle_assert(&info->mti_lh[i]);
        info->mti_env = NULL;
        info->mti_pill = NULL;
        info->mti_exp = NULL;
@@ -4552,9 +4655,9 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
         */
        mdt_intent_fixup_resent(info, *lockp, lhc, flags);
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-               mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
-                                    MDS_INODELOCK_XATTR);
+                                    MDS_INODELOCK_XATTR, (*lockp)->l_req_mode,
+                                    false);
                if (rc)
                        return rc;
        }
@@ -7518,12 +7621,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 
        CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
 
-        lh = &mti->mti_lh[MDT_LH_PARENT];
-        mdt_lock_reg_init(lh, LCK_CR);
-
-        obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
+       lh = &mti->mti_lh[MDT_LH_PARENT];
+       obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
 
        if (mdt_object_remote(obj)) {
                rc = -EREMOTE;
index 6617277..56d9a3a 100644 (file)
@@ -241,8 +241,8 @@ int mdt_hsm_state_get(struct tgt_session_info *tsi)
                GOTO(out, rc = err_serious(rc));
 
        lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lh, LCK_PR);
-       rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP);
+       rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP, LCK_PR,
+                            false);
        if (rc < 0)
                GOTO(out_ucred, rc);
 
@@ -301,9 +301,8 @@ int mdt_hsm_state_set(struct tgt_session_info *tsi)
                GOTO(out, rc = err_serious(rc));
 
        lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lh, LCK_PW);
        rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_PW, false);
        if (rc < 0)
                GOTO(out_ucred, rc);
 
index 8b95ff9..bf95893 100644 (file)
@@ -411,6 +411,8 @@ enum {
        MDT_LH_NEW,     /* new lockh for rename */
        MDT_LH_RMT,     /* used for return lh to caller */
        MDT_LH_LOCAL,   /* local lock never return to client */
+       MDT_LH_LOOKUP,  /* lookup lock for source object in rename/migrate if
+                        * it's remote object */
        MDT_LH_NR
 };
 
@@ -817,21 +819,43 @@ int mdt_lock_setup(struct mdt_thread_info *info, struct mdt_object *mo,
 int mdt_check_resent_lock(struct mdt_thread_info *info, struct mdt_object *mo,
                          struct mdt_lock_handle *lhc);
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *mo,
-                   struct mdt_lock_handle *lh, __u64 ibits);
-
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, __u64 ibits,
+                   enum ldlm_mode mode, bool cos_incompat);
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                   struct mdt_lock_handle *lh, const struct lu_name *lname,
+                   enum ldlm_mode mode, bool cos_incompat);
+int mdt_object_stripes_lock(struct mdt_thread_info *info,
+                           struct mdt_object *pobj, struct mdt_object *o,
+                           struct mdt_lock_handle *lh,
+                           struct ldlm_enqueue_info *einfo, __u64 ibits,
+                           enum ldlm_mode mode, bool cos_incompat);
+int mdt_object_check_lock(struct mdt_thread_info *info,
+                         struct mdt_object *parent, struct mdt_object *child,
                          struct mdt_lock_handle *lh, __u64 ibits,
-                         bool cos_incompat);
-
+                         enum ldlm_mode mode, bool cos_incompat);
 int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo,
                        struct mdt_lock_handle *lh, __u64 *ibits,
-                       __u64 trybits, bool cos_incompat);
+                       __u64 trybits, enum ldlm_mode mode, bool cos_incompat);
+
+/* below three lock functions are used internally */
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+                            struct mdt_object *obj, const struct lu_fid *fid,
+                            struct mdt_lock_handle *lh, __u64 *ibits,
+                            __u64 trybits, bool cache, bool cos_incompat);
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                       struct mdt_lock_handle *lh, const struct lu_name *name,
+                       enum ldlm_mode mode, bool pdo_lock, bool cos_incompat);
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+                          struct mdt_object *pobj, struct mdt_object *obj,
+                          struct mdt_lock_handle *lh, enum ldlm_mode mode,
+                          bool cos_incompat);
 
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
                       struct mdt_lock_handle *lh, int decref);
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
-                  enum ldlm_mode mode, int decref);
+void mdt_object_stripes_unlock(struct mdt_thread_info *info,
+                              struct mdt_object *o, struct mdt_lock_handle *lh,
+                              struct ldlm_enqueue_info *einfo, int decref);
 
 struct mdt_object *mdt_object_new(const struct lu_env *env,
                                  struct mdt_device *,
@@ -839,34 +863,16 @@ struct mdt_object *mdt_object_new(const struct lu_env *env,
 struct mdt_object *mdt_object_find(const struct lu_env *,
                                    struct mdt_device *,
                                    const struct lu_fid *);
-struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
-                                        const struct lu_fid *,
-                                        struct mdt_lock_handle *,
-                                        __u64);
-void mdt_object_unlock_put(struct mdt_thread_info *,
-                           struct mdt_object *,
-                           struct mdt_lock_handle *,
-                           int decref);
-
-void mdt_client_compatibility(struct mdt_thread_info *info);
-
-int mdt_remote_object_lock(struct mdt_thread_info *mti,
-                          struct mdt_object *o, const struct lu_fid *fid,
-                          struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool cache);
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 *ibits,
-                         __u64 trybits, bool cos_incompat);
-int mdt_reint_striped_lock(struct mdt_thread_info *info,
+struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
+                                       const struct lu_fid *f,
+                                       struct mdt_lock_handle *lh,
+                                       __u64 ibits, enum ldlm_mode mode);
+void mdt_object_unlock_put(struct mdt_thread_info *info,
                           struct mdt_object *o,
                           struct mdt_lock_handle *lh,
-                          __u64 ibits,
-                          struct ldlm_enqueue_info *einfo,
-                          bool cos_incompat);
-void mdt_reint_striped_unlock(struct mdt_thread_info *info,
-                             struct mdt_object *o,
-                             struct mdt_lock_handle *lh,
-                             struct ldlm_enqueue_info *einfo, int decref);
+                          int decref);
+
+void mdt_client_compatibility(struct mdt_thread_info *info);
 
 enum mdt_name_flags {
        MNF_FIX_ANON = 1,
@@ -892,9 +898,6 @@ int mdt_getxattr(struct mdt_thread_info *info);
 int mdt_reint_setxattr(struct mdt_thread_info *info,
                        struct mdt_lock_handle *lh);
 
-void mdt_lock_handle_init(struct mdt_lock_handle *lh);
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
-
 void mdt_reconstruct(struct mdt_thread_info *, struct mdt_lock_handle *);
 void mdt_reconstruct_generic(struct mdt_thread_info *mti,
                              struct mdt_lock_handle *lhc);
index 3e86539..8b8c990 100644 (file)
@@ -1519,7 +1519,8 @@ int mdt_brw_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
        mdt_intent_fixup_resent(mti, *lockp, lhc, flags);
        /* resent case */
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-               mdt_lock_handle_init(lhc);
+               __u64 ibits = MDS_INODELOCK_DOM;
+
                mdt_lh_reg_init(lhc, *lockp);
 
                /* This will block MDT thread but it should be fine until
@@ -1531,7 +1532,8 @@ int mdt_brw_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
                 * return ELDLM_OK here and fall back into normal lock enqueue
                 * process.
                 */
-               rc = mdt_object_lock(mti, mo, lhc, MDS_INODELOCK_DOM);
+               rc = mdt_object_lock_internal(mti, mo, mdt_object_fid(mo), lhc,
+                                             &ibits, 0, false, false);
                if (rc)
                        GOTO(out, rc);
        }
index 8a68273..ee1cc2a 100644 (file)
@@ -801,8 +801,6 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
 
        ENTRY;
        *ibits = 0;
-       mdt_lock_handle_init(lhc);
-
        if (req_is_replay(mdt_info_req(info)))
                RETURN(0);
 
@@ -893,8 +891,6 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        atomic_read(&obj->mot_lease_count), lm);
        }
 
-       mdt_lock_reg_init(lhc, lm);
-
        /* Return lookup lock to validate inode at the client side.
         * This is pretty important otherwise MDT will return layout
         * lock for each open.
@@ -908,7 +904,8 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
        }
 
        if (*ibits | trybits)
-               rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, false);
+               rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, lm,
+                                        false);
 
        CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx/%#llx"
               ", open_flags = %#llo, try_layout = %d : rc = %d\n",
@@ -933,9 +930,8 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        mdt_object_unlock(info, obj, lhc, 1);
 
                LASSERT(!try_layout);
-               mdt_lock_handle_init(ll);
-               mdt_lock_reg_init(ll, LCK_EX);
-               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT);
+               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+                                    LCK_EX, false);
 
                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
        }
@@ -1250,7 +1246,8 @@ static int mdt_lock_root_xattr(struct mdt_thread_info *info,
                               struct mdt_device *mdt)
 {
        struct mdt_object *md_root = mdt->mdt_md_root;
-       struct lustre_handle lhroot;
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       __u64 ibits = MDS_INODELOCK_XATTR;
        int rc;
 
        if (md_root == NULL) {
@@ -1282,16 +1279,17 @@ static int mdt_lock_root_xattr(struct mdt_thread_info *info,
        if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
                return 0;
 
-       rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
-                                   &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
-                                   true);
+       mdt_lock_reg_init(lh, LCK_PR);
+       rc = mdt_object_lock_internal(info, md_root, mdt_object_fid(md_root),
+                                     lh, &ibits, 0, true, false);
        if (rc < 0)
                return rc;
 
        md_root->mot_cache_attr = 1;
 
        /* don't cancel this lock, so that we know the cached xattr is valid. */
-       ldlm_lock_decref(&lhroot, LCK_PR);
+       ldlm_lock_decref(&lh->mlh_rreg_lh, LCK_PR);
+       lh->mlh_rreg_lh.cookie = 0ull;
 
        return 0;
 }
@@ -1449,8 +1447,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 again_pw:
        if (lock_mode != LCK_NL) {
                lh = &info->mti_lh[MDT_LH_PARENT];
-               mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
-               result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+               result = mdt_parent_lock(info, parent, lh, &rr->rr_name,
+                                        lock_mode, false);
                if (result != 0)
                        GOTO(out_parent, result);
 
@@ -1481,7 +1479,6 @@ again_pw:
                        /* unlink vs create race: get write lock and restart */
                        mdt_object_unlock(info, parent, lh, 1);
                        mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
-                       mdt_lock_handle_init(lh);
                        lock_mode = LCK_PW;
                        goto again_pw;
                }
@@ -1562,15 +1559,11 @@ again_pw:
                        LASSERT(lhc != NULL);
 
                        rc = mdt_check_resent_lock(info, child, lhc);
-                       if (rc < 0) {
+                       if (rc < 0)
                                GOTO(out_child, result = rc);
-                       } else if (rc > 0) {
-                               mdt_lock_handle_init(lhc);
-                               mdt_lock_reg_init(lhc, LCK_PR);
-
-                               rc = mdt_object_lock(info, child, lhc,
-                                                    MDS_INODELOCK_LOOKUP);
-                       }
+                       else if (rc > 0)
+                               rc = mdt_object_lookup_lock(info, NULL, child,
+                                                           lhc, LCK_PR, false);
                        repbody->mbo_fid1 = *mdt_object_fid(child);
                        repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                        if (rc != 0)
@@ -1999,9 +1992,8 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
        ma->ma_hsm.mh_flags &= ~HS_RELEASED;
 
-       mdt_lock_reg_init(lh, LCK_EX);
        rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc != 0)
                GOTO(out_close, rc);
 
@@ -2167,16 +2159,14 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        if (lease_broken)
                GOTO(out_unlock_sem, rc = -ESTALE);
 
-       mdt_lock_reg_init(lh1, LCK_EX);
        rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX, false);
        if (rc < 0)
                GOTO(out_unlock_sem, rc);
 
        if (o2) {
-               mdt_lock_reg_init(lh2, LCK_EX);
                rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                                    MDS_INODELOCK_XATTR);
+                                    MDS_INODELOCK_XATTR, LCK_EX, false);
                if (rc < 0)
                        GOTO(out_unlock1, rc);
        }
index f45a84b..17b5e6b 100644 (file)
@@ -218,30 +218,29 @@ int mdt_lookup_version_check(struct mdt_thread_info *info,
 
 }
 
-static int mdt_unlock_slaves(struct mdt_thread_info *mti,
-                            struct mdt_object *obj,
-                            struct ldlm_enqueue_info *einfo,
-                            int decref)
+static int mdt_stripes_unlock(struct mdt_thread_info *mti,
+                             struct mdt_object *obj,
+                             struct ldlm_enqueue_info *einfo,
+                             int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
        struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
-       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       struct lustre_handle_array *locks = einfo->ei_cbdata;
        int i;
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
-       LASSERT(slave_locks);
+       LASSERT(locks);
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = einfo->ei_inodebits;
-       mdt_lock_handle_init(lh);
        mdt_lock_reg_init(lh, einfo->ei_mode);
-       for (i = 0; i < slave_locks->ha_count; i++) {
-               if (test_bit(i, (void *)slave_locks->ha_map))
-                       lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+       for (i = 0; i < locks->ha_count; i++) {
+               if (test_bit(i, (void *)locks->ha_map))
+                       lh->mlh_rreg_lh = locks->ha_handles[i];
                else
-                       lh->mlh_reg_lh = slave_locks->ha_handles[i];
+                       lh->mlh_reg_lh = locks->ha_handles[i];
                mdt_object_unlock(mti, NULL, lh, decref);
-               slave_locks->ha_handles[i].cookie = 0ull;
+               locks->ha_handles[i].cookie = 0ull;
        }
 
        return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
@@ -276,14 +275,13 @@ static inline int mdt_object_striped(struct mdt_thread_info *mti,
  * Lock slave stripes if necessary, the lock handles of slave stripes
  * will be stored in einfo->ei_cbdata.
  **/
-static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          enum ldlm_mode mode, __u64 ibits,
-                          struct ldlm_enqueue_info *einfo)
+static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
+                           enum ldlm_mode mode, __u64 ibits,
+                           struct ldlm_enqueue_info *einfo)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
-
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -300,48 +298,77 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                              policy);
 }
 
-int mdt_reint_striped_lock(struct mdt_thread_info *info,
-                          struct mdt_object *o,
-                          struct mdt_lock_handle *lh,
-                          __u64 ibits,
-                          struct ldlm_enqueue_info *einfo,
-                          bool cos_incompat)
+/** lock object, and stripes if it's a striped directory
+ *
+ * object should be local, this is called in operations which modify both object
+ * and stripes.
+ *
+ * \param info         struct mdt_thread_info
+ * \param parent       parent object, if it's NULL, find parent by mdo_lookup()
+ * \param child                child object
+ * \param lh           lock handle
+ * \param einfo                struct ldlm_enqueue_info
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_stripes_lock(struct mdt_thread_info *info,
+                           struct mdt_object *parent,
+                           struct mdt_object *child,
+                           struct mdt_lock_handle *lh,
+                           struct ldlm_enqueue_info *einfo, __u64 ibits,
+                           enum ldlm_mode mode, bool cos_incompat)
 {
        int rc;
 
-       LASSERT(!mdt_object_remote(o));
+       ENTRY;
+       /* according to the protocol, child should be local, is request sent to
+        * wrong MDT.
+        */
+       if (mdt_object_remote(child)) {
+               CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
+                      -EREMOTE);
+               RETURN(-EREMOTE);
+       }
 
        memset(einfo, 0, sizeof(*einfo));
-
-       rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
-       if (rc)
-               return rc;
-
-       rc = mdt_object_striped(info, o);
-       if (rc != 1) {
-               if (rc < 0)
-                       mdt_object_unlock(info, o, lh, rc);
-               return rc;
+       if (ibits & MDS_INODELOCK_LOOKUP) {
+               LASSERT(parent);
+               rc = mdt_object_check_lock(info, parent, child, lh, ibits,
+                                          mode, cos_incompat);
+       } else {
+               rc = mdt_object_lock(info, child, lh, ibits, mode,
+                                    cos_incompat);
        }
+       if (rc)
+               RETURN(rc);
 
-       rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
-       if (rc) {
-               mdt_object_unlock(info, o, lh, rc);
-               if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
-                       rc = 0;
+       if (S_ISDIR(child->mot_header.loh_attr)) {
+               rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
+               if (rc) {
+                       mdt_object_unlock(info, child, lh, rc);
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                           rc == -EIO)
+                               rc = 0;
+               }
        }
 
-       return rc;
+       RETURN(rc);
 }
 
-void mdt_reint_striped_unlock(struct mdt_thread_info *info,
-                             struct mdt_object *o,
+void mdt_object_stripes_unlock(struct mdt_thread_info *info,
+                             struct mdt_object *obj,
                              struct mdt_lock_handle *lh,
                              struct ldlm_enqueue_info *einfo, int decref)
 {
+       /* this is checked in mdt_object_stripes_lock() */
+       LASSERT(!mdt_object_remote(obj));
        if (einfo->ei_cbdata)
-               mdt_unlock_slaves(info, o, einfo, decref);
-       mdt_object_unlock(info, o, lh, decref);
+               mdt_stripes_unlock(info, obj, einfo, decref);
+       mdt_object_unlock(info, obj, lh, decref);
 }
 
 static int mdt_restripe(struct mdt_thread_info *info,
@@ -379,9 +406,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
                RETURN(rc);
 
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, lname);
-       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                  true);
+       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
        if (rc)
                RETURN(rc);
 
@@ -437,20 +462,8 @@ static int mdt_restripe(struct mdt_thread_info *info,
 
        /* lock object */
        lhc = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhc, LCK_EX);
-
-       /* enqueue object remote LOOKUP lock */
-       if (mdt_object_remote(parent)) {
-               rc = mdt_remote_object_lock(info, parent, fid,
-                                           &lhc->mlh_rreg_lh,
-                                           lhc->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(out_child, rc);
-       }
-
-       rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
-                                   true);
+       rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+                                    MDS_INODELOCK_FULL, LCK_PW, true);
        if (rc)
                GOTO(unlock_child, rc);
 
@@ -483,7 +496,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
 restriping_clear:
        child->mot_restriping = 0;
 unlock_child:
-       mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+       mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
 out_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
@@ -607,8 +620,7 @@ static int mdt_create(struct mdt_thread_info *info)
        OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
 
        lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-       rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
        if (rc)
                GOTO(put_parent, rc);
 
@@ -679,25 +691,22 @@ static int mdt_create(struct mdt_thread_info *info)
                if (cos_incompat) {
                        if (!mdt_object_remote(parent)) {
                                mdt_object_unlock(info, parent, lh, 1);
-                               mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-                               rc = mdt_reint_object_lock(info, parent, lh,
-                                                          MDS_INODELOCK_UPDATE,
-                                                          true);
+                               rc = mdt_parent_lock(info, parent, lh,
+                                                    &rr->rr_name, LCK_PW,
+                                                    true);
                                if (rc)
                                        GOTO(put_child, rc);
                        }
                }
 
                lhc = &info->mti_lh[MDT_LH_CHILD];
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_PW);
-               rc = mdt_reint_striped_lock(info, child, lhc,
-                                           MDS_INODELOCK_UPDATE, einfo,
-                                           cos_incompat);
+               rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            cos_incompat);
                if (rc)
                        GOTO(put_child, rc);
 
-               mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+               mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
        }
 
        /* Return fid & attr to client. */
@@ -729,27 +738,19 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        rc = mdt_object_striped(info, mo);
        if (rc < 0)
                RETURN(rc);
-
        cos_incompat = rc;
 
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lh, LCK_PW);
-
-       /* Even though the new MDT will grant PERM lock to the old
-        * client, but the old client will almost ignore that during
-        * So it needs to revoke both LOOKUP and PERM lock here, so
-        * both new and old client can cancel the dcache
-        */
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
-               lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
+               lockpart |= MDS_INODELOCK_PERM;
        /* Clear xattr cache on clients, so the virtual project ID xattr
         * can get the new project ID
         */
        if (ma->ma_attr.la_valid & LA_PROJID)
                lockpart |= MDS_INODELOCK_XATTR;
 
-       rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
-                                   cos_incompat);
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
+                                    LCK_PW, cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
@@ -781,7 +782,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
        EXIT;
 out_unlock:
-       mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
+       mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
        return rc;
 }
 
@@ -869,9 +870,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
                if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
                        lhc = &info->mti_lh[MDT_LH_LOCAL];
-                       mdt_lock_reg_init(lhc, LCK_CW);
-
-                       rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
+                       rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
+                                            LCK_CW, false);
                        if (rc != 0) {
                                up_read(&mo->mot_open_sem);
                                GOTO(out_put, rc);
@@ -946,7 +946,6 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                struct lu_ucred *uc = mdt_ucred(info);
                struct mdt_lock_handle *lh;
                const char *name;
-               __u64 lockpart = MDS_INODELOCK_XATTR;
 
                /* reject if either remote or striped dir is disabled */
                if (ma->ma_valid & MA_LMV) {
@@ -971,27 +970,28 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                        GOTO(out_put, rc = -EPROTO);
 
                lh = &info->mti_lh[MDT_LH_PARENT];
-               mdt_lock_reg_init(lh, LCK_PW);
-
                if (ma->ma_valid & MA_LOV) {
                        buf->lb_buf = ma->ma_lmm;
                        buf->lb_len = ma->ma_lmm_size;
                        name = XATTR_NAME_LOV;
+                       rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
+                                            LCK_PW, false);
                } else {
-                       struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
-                       struct lu_fid *pfid = &info->mti_tmp_fid1;
-                       struct lu_name *pname = &info->mti_name;
-                       const char dotdot[] = "..";
-                       struct mdt_object *pobj;
-
-                       buf->lb_buf = lmu;
+                       buf->lb_buf = &ma->ma_lmv->lmv_user_md;
                        buf->lb_len = ma->ma_lmv_size;
                        name = XATTR_NAME_DEFAULT_LMV;
 
-                       if (fid_is_root(rr->rr_fid1)) {
-                               lockpart |= MDS_INODELOCK_LOOKUP;
+                       if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
+                               rc = mdt_object_lock(info, mo, lh,
+                                                    MDS_INODELOCK_XATTR |
+                                                    MDS_INODELOCK_LOOKUP,
+                                                    LCK_PW, false);
                        } else {
-                               /* force client to update dir default layout */
+                               struct lu_fid *pfid = &info->mti_tmp_fid1;
+                               struct lu_name *pname = &info->mti_name;
+                               const char dotdot[] = "..";
+                               struct mdt_object *pobj;
+
                                fid_zero(pfid);
                                pname->ln_name = dotdot;
                                pname->ln_namelen = sizeof(dotdot);
@@ -1001,27 +1001,19 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                                if (rc)
                                        GOTO(out_put, rc);
 
-                               pobj = mdt_object_find(info->mti_env, mdt,
-                                                      pfid);
+                               pobj = mdt_object_find(info->mti_env,
+                                                      info->mti_mdt, pfid);
                                if (IS_ERR(pobj))
                                        GOTO(out_put, rc = PTR_ERR(pobj));
 
-                               if (mdt_object_remote(pobj))
-                                       rc = mdt_remote_object_lock(info, pobj,
-                                               mdt_object_fid(mo),
-                                               &lh->mlh_rreg_lh, LCK_EX,
-                                               MDS_INODELOCK_LOOKUP, false);
-                               else
-                                       lockpart |= MDS_INODELOCK_LOOKUP;
-
+                               rc = mdt_object_check_lock(info, pobj, mo, lh,
+                                                          MDS_INODELOCK_XATTR |
+                                                          MDS_INODELOCK_LOOKUP,
+                                                          LCK_PW, false);
                                mdt_object_put(info->mti_env, pobj);
-
-                               if (rc)
-                                       GOTO(out_put, rc);
                        }
                }
 
-               rc = mdt_object_lock(info, mo, lh, lockpart);
                if (rc != 0)
                        GOTO(out_put, rc);
 
@@ -1124,7 +1116,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        struct lu_ucred *uc  = mdt_ucred(info);
-       __u64 lock_ibits;
        bool cos_incompat = false;
        int no_name = 0;
        ktime_t kstart = ktime_get();
@@ -1163,9 +1154,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
 relock:
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
-       rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
+       rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
+                            cos_incompat);
        if (rc != 0)
                GOTO(put_parent, rc);
 
@@ -1249,7 +1239,6 @@ relock:
        }
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(child_lh, LCK_EX);
        if (mdt_object_remote(mc)) {
                struct mdt_body  *repbody;
 
@@ -1273,7 +1262,11 @@ relock:
                 * would happen if another client try to grab the LOOKUP
                 * lock at the same time with unlink XXX
                 */
-               mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
+               rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
+                                           false);
+               if (rc)
+                       GOTO(put_child, rc);
+
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
                repbody->mbo_fid1 = *mdt_object_fid(mc);
@@ -1284,21 +1277,10 @@ relock:
         * this now because a running HSM restore on the child (unlink
         * victim) will hold the layout lock. See LU-4002.
         */
-       lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
-       if (mdt_object_remote(mp)) {
-               /* Enqueue lookup lock from parent MDT */
-               rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
-                                           &child_lh->mlh_rreg_lh,
-                                           child_lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(put_child, rc);
-
-               lock_ibits &= ~MDS_INODELOCK_LOOKUP;
-       }
-
-       rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
-                                   cos_incompat);
+       rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
+                                    MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE,
+                                    LCK_EX, cos_incompat);
        if (rc != 0)
                GOTO(put_child, rc);
 
@@ -1357,7 +1339,7 @@ out_stat:
        EXIT;
 
 unlock_child:
-       mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
+       mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
 put_child:
        if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
            info->mti_big_buf.lb_buf)
@@ -1444,19 +1426,16 @@ static int mdt_reint_link(struct mdt_thread_info *info,
        OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
 
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
-       rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
+       rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
        if (rc != 0)
                GOTO(put_source, rc);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
 
        lhs = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhs, LCK_EX);
-       rc = mdt_reint_object_lock(info, ms, lhs,
-                                  MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
-                                  cos_incompat);
+       rc = mdt_object_lock(info, ms, lhs,
+                            MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
+                            cos_incompat);
        if (rc != 0)
                GOTO(unlock_parent, rc);
 
@@ -1503,98 +1482,37 @@ put_parent:
        mdt_object_put(info->mti_env, mp);
        return rc;
 }
-/**
- * lock the part of the directory according to the hash of the name
- * (lh->mlh_pdo_hash) in parallel directory lock.
- */
-static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
-                             struct mdt_lock_handle *lh,
-                             struct mdt_object *obj, __u64 ibits,
-                             bool cos_incompat)
-{
-       struct ldlm_res_id *res = &info->mti_res_id;
-       struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
-       union ldlm_policy_data *policy = &info->mti_policy;
-       __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-       int rc;
-
-       /*
-        * Finish res_id initializing by name hash marking part of
-        * directory which is taking modification.
-        */
-       LASSERT(lh->mlh_pdo_hash != 0);
-       fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = ibits;
-       if (cos_incompat &&
-           (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
-               dlmflags |= LDLM_FL_COS_INCOMPAT;
-       /*
-        * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
-        * going to be sent to client. If it is - mdt_intent_policy() path will
-        * fix it up and turn FL_LOCAL flag off.
-        */
-       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
-                         policy, res, dlmflags,
-                         &info->mti_exp->exp_handle.h_cookie);
-       return rc;
-}
 
 /**
  * Get BFL lock for rename or migrate process.
  **/
 static int mdt_rename_lock(struct mdt_thread_info *info,
-                          struct lustre_handle *lh)
+                          struct mdt_lock_handle *lh)
 {
-       int     rc;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_object *obj;
+       __u64 ibits = MDS_INODELOCK_UPDATE;
+       int rc;
 
        ENTRY;
-       if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
-               struct lu_fid *fid = &info->mti_tmp_fid1;
-               struct mdt_object *obj;
+       lu_root_fid(fid);
+       obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
 
-               /* XXX, right now, it has to use object API to
-                * enqueue lock cross MDT, so it will enqueue
-                * rename lock(with LUSTRE_BFL_FID) by root object
-                */
-               lu_root_fid(fid);
-               obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
-               if (IS_ERR(obj))
-                       RETURN(PTR_ERR(obj));
-
-               rc = mdt_remote_object_lock(info, obj,
-                                           &LUSTRE_BFL_FID, lh,
-                                           LCK_EX,
-                                           MDS_INODELOCK_UPDATE, false);
-               mdt_object_put(info->mti_env, obj);
-       } else {
-               struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
-               union ldlm_policy_data *policy = &info->mti_policy;
-               struct ldlm_res_id *res_id = &info->mti_res_id;
-               __u64 flags = 0;
-
-               fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
-               memset(policy, 0, sizeof(*policy));
-               policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
-               flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-               rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
-                                           LDLM_IBITS, policy, LCK_EX, &flags,
-                                           ldlm_blocking_ast,
-                                           ldlm_completion_ast, NULL, NULL, 0,
-                                           LVB_T_NONE,
-                                           &info->mti_exp->exp_handle.h_cookie,
-                                           lh);
-               RETURN(rc);
-       }
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
+                                     &ibits, 0, false, false);
+       mdt_object_put(info->mti_env, obj);
        RETURN(rc);
 }
 
-static void mdt_rename_unlock(struct lustre_handle *lh)
+static void mdt_rename_unlock(struct mdt_thread_info *info,
+                             struct mdt_lock_handle *lh)
 {
        ENTRY;
-       LASSERT(lustre_handle_is_used(lh));
        /* Cancel the single rename lock right away */
-       ldlm_lock_decref_and_cancel(lh, LCK_EX);
+       mdt_object_unlock(info, NULL, lh, 1);
        EXIT;
 }
 
@@ -1638,32 +1556,8 @@ int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
        struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
        int rc;
 
-       mdt_lock_handle_init(lh);
-       mdt_lock_reg_init(lh, LCK_EX);
-
-       if (mdt_object_remote(pobj)) {
-               /* don't bother to check if pobj and obj are on the same MDT. */
-               rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_EX,
-                                           MDS_INODELOCK_LOOKUP, false);
-       } else if (mdt_object_remote(obj)) {
-               struct ldlm_res_id *res = &info->mti_res_id;
-               union ldlm_policy_data *policy = &info->mti_policy;
-               __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
-                                LDLM_FL_COS_INCOMPAT;
-
-               fid_build_reg_res_name(mdt_object_fid(obj), res);
-               memset(policy, 0, sizeof(*policy));
-               policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
-               rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
-                                 &lh->mlh_reg_lh, LCK_EX, policy, res,
-                                 dlmflags, NULL);
-       } else {
-               /* do nothing if both are local */
-               return 0;
-       }
-
-       if (rc != ELDLM_OK)
+       rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
+       if (rc)
                return rc;
 
        /*
@@ -1709,7 +1603,7 @@ static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
                mdt_unlock_list(info, slave_locks, decref);
                mdt_object_unlock(info, obj, lh, decref);
        } else {
-               mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
+               mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
        }
 }
 
@@ -1851,10 +1745,9 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
                 * one, and continue processing the remaining entries, and in
                 * the end of the loop restart from beginning.
                 */
-               mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
                ibits = 0;
                rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
-                                        MDS_INODELOCK_UPDATE, true);
+                                        MDS_INODELOCK_UPDATE, LCK_PW, true);
                if (!(ibits & MDS_INODELOCK_UPDATE)) {
 
                        CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
@@ -1870,9 +1763,9 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
 
                        blocked = true;
 
-                       mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
                        rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
-                                            MDS_INODELOCK_UPDATE);
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            true);
                        if (rc) {
                                mdt_object_put(info->mti_env, lnkp);
                                OBD_FREE_PTR(msl);
@@ -1968,9 +1861,8 @@ static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
                        GOTO(out, rc = -ENOMEM);
                }
 
-               mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
-               rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
-                                          MDS_INODELOCK_UPDATE, true);
+               rc = mdt_object_lock(info, slave, &msl->msl_lh,
+                                    MDS_INODELOCK_UPDATE, LCK_EX, true);
                if (rc) {
                        OBD_FREE_PTR(msl);
                        mdt_object_put(info->mti_env, slave);
@@ -2000,10 +1892,9 @@ static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
        int rc;
 
        if (mdt_object_remote(obj)) {
-               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_PW,
-                                           MDS_INODELOCK_UPDATE, false);
-               if (rc != ELDLM_OK)
+               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+                                    LCK_PW, true);
+               if (rc)
                        return rc;
 
                /*
@@ -2016,8 +1907,9 @@ static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
                                mdt_object_unlock(info, obj, lh, rc);
                }
        } else {
-               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
-                                           einfo, true);
+               rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            true);
        }
 
        return rc;
@@ -2042,10 +1934,9 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                if (rc)
                        return rc;
 
-               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_EX,
-                                           MDS_INODELOCK_FULL, false);
-               if (rc != ELDLM_OK)
+               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
+                                    true);
+               if (rc)
                        return rc;
 
                /*
@@ -2069,14 +1960,8 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                        }
                }
        } else {
-               if (mdt_object_remote(pobj)) {
-                       rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
-                       if (rc)
-                               return rc;
-               }
-
-               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
-                                           einfo, true);
+               rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
+                                            MDS_INODELOCK_FULL, LCK_EX, true);
        }
 
        return rc;
@@ -2258,7 +2143,7 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
        struct mdt_object *spobj = NULL;
        struct mdt_object *sobj = NULL;
        struct mdt_object *tobj;
-       struct lustre_handle rename_lh = { 0 };
+       struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
        struct mdt_lock_handle *lhp;
        struct mdt_lock_handle *lhs;
        struct mdt_lock_handle *lht;
@@ -2306,7 +2191,7 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
         * req is NULL if this is called by directory auto-split.
         */
        if (req && !req_is_replay(req)) {
-               rc = mdt_rename_lock(info, &rename_lh);
+               rc = mdt_rename_lock(info, rename_lh);
                if (rc != 0) {
                        CERROR("%s: can't lock FS for rename: rc = %d\n",
                               mdt_obd_name(info->mti_mdt), rc);
@@ -2342,7 +2227,6 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
 lock_parent:
        /* lock parent object */
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lhp, LCK_PW);
        rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
                                     &parent_slave_locks);
        if (rc)
@@ -2424,7 +2308,6 @@ lock_parent:
 
        /* lock source */
        lhs = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lhs, LCK_EX);
        rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
                                     &child_slave_locks);
        if (rc)
@@ -2436,8 +2319,7 @@ lock_parent:
                GOTO(unlock_source, rc = PTR_ERR(tobj));
 
        lht = &info->mti_lh[MDT_LH_NEW];
-       mdt_lock_reg_init(lht, LCK_EX);
-       rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+       rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
        if (rc)
                GOTO(put_target, rc);
 
@@ -2479,30 +2361,11 @@ unlock_parent:
 put_parent:
        mdt_object_put(env, pobj);
 unlock_rename:
-       if (lustre_handle_is_used(&rename_lh))
-               mdt_rename_unlock(&rename_lh);
+       mdt_rename_unlock(info, rename_lh);
 
        return rc;
 }
 
-static int mdt_object_lock_save(struct mdt_thread_info *info,
-                               struct mdt_object *dir,
-                               struct mdt_lock_handle *lh,
-                               int idx, bool cos_incompat)
-{
-       int rc;
-
-       /* we lock the target dir if it is local */
-       rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
-       if (rc != 0)
-               return rc;
-
-       /* get and save correct version after locking */
-       mdt_version_get_save(info, dir, idx);
-       return 0;
-}
-
 /*
  * determine lock order of sobj and tobj
  *
@@ -2615,43 +2478,18 @@ static int mdt_rename_source_lock(struct mdt_thread_info *info,
        if (rc < 0)
                return rc;
 
-       if (rc) {
-               /* enqueue remote LOOKUP lock from the parent MDT */
-               __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
-
-               if (mdt_object_remote(parent)) {
-                       rc = mdt_remote_object_lock(info, parent,
-                                                   mdt_object_fid(child),
-                                                   &lhr->mlh_rreg_lh,
-                                                   lhr->mlh_rreg_mode,
-                                                   rmt_ibits, false);
-                       if (rc != ELDLM_OK)
-                               return rc;
-               } else {
-                       LASSERT(mdt_object_remote(child));
-                       rc = mdt_object_local_lock(info, child, lhr,
-                                                  &rmt_ibits, 0, true);
-                       if (rc < 0)
-                               return rc;
-               }
+       if (rc == 1) {
+               rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
+                                           cos_incompat);
+               if (rc)
+                       return rc;
 
                ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       if (mdt_object_remote(child)) {
-               rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
-                                           &lhc->mlh_rreg_lh,
-                                           lhc->mlh_rreg_mode,
-                                           ibits, false);
-               if (rc == ELDLM_OK)
-                       rc = 0;
-       } else {
-               rc = mdt_reint_object_lock(info, child, lhc, ibits,
-                                          cos_incompat);
-       }
-
-       if (!rc)
-               mdt_object_unlock(info, child, lhr, rc);
+       rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
+       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+               mdt_object_unlock(info, NULL, lhr, rc);
 
        return rc;
 }
@@ -2662,30 +2500,35 @@ static int mdt_rename_source_lock(struct mdt_thread_info *info,
 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
                             struct mdt_object *mfirstdir,
                             struct mdt_lock_handle *lh_firstdirp,
+                            const struct lu_name *firstname,
                             struct mdt_object *mseconddir,
                             struct mdt_lock_handle *lh_seconddirp,
+                            const struct lu_name *secondname,
                             bool cos_incompat)
 {
        int rc;
 
-       rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
-                                 cos_incompat);
+       rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
+                            cos_incompat);
        if (rc)
                return rc;
 
+       mdt_version_get_save(info, mfirstdir, 0);
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
 
        if (mfirstdir != mseconddir) {
-               rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
-                                         cos_incompat);
-       } else if (!mdt_object_remote(mseconddir) &&
-                  lh_firstdirp->mlh_pdo_hash !=
-                  lh_seconddirp->mlh_pdo_hash) {
-               rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
-                                       MDS_INODELOCK_UPDATE,
-                                       cos_incompat);
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
+                                    secondname, LCK_PW, cos_incompat);
+       } else if (!mdt_object_remote(mseconddir)) {
+               if (lh_firstdirp->mlh_pdo_hash !=
+                   lh_seconddirp->mlh_pdo_hash) {
+                       rc = mdt_object_pdo_lock(info, mseconddir,
+                                                lh_seconddirp, secondname,
+                                                LCK_PW, false, cos_incompat);
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               }
        }
+       mdt_version_get_save(info, mseconddir, 1);
 
        if (rc != 0)
                mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
@@ -2709,7 +2552,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct mdt_object *mtgtdir = NULL;
        struct mdt_object *mold;
        struct mdt_object *mnew = NULL;
-       struct lustre_handle rename_lh = { 0 };
+       struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
        struct mdt_lock_handle *lh_srcdirp;
        struct mdt_lock_handle *lh_tgtdirp;
        struct mdt_lock_handle *lh_oldp = NULL;
@@ -2718,7 +2561,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct lu_fid *old_fid = &info->mti_tmp_fid1;
        struct lu_fid *new_fid = &info->mti_tmp_fid2;
        struct lu_ucred *uc = mdt_ucred(info);
-       __u64 lock_ibits;
        bool reverse = false, discard = false;
        bool cos_incompat;
        ktime_t kstart = ktime_get();
@@ -2797,7 +2639,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                     !mdt->mdt_enable_parallel_rename_dir) ||
                    (!S_ISDIR(ma->ma_attr.la_mode) &&
                     !mdt->mdt_enable_parallel_rename_file)) {
-                       rc = mdt_rename_lock(info, &rename_lh);
+                       rc = mdt_rename_lock(info, rename_lh);
                        if (rc != 0) {
                                CERROR("%s: cannot lock for rename: rc = %d\n",
                                       mdt_obd_name(mdt), rc);
@@ -2851,11 +2693,13 @@ relock:
                reverse = 0;
 
        if (reverse)
-               rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
-                                      lh_srcdirp, cos_incompat);
+               rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
+                                      &rr->rr_tgt_name, msrcdir, lh_srcdirp,
+                                      &rr->rr_name, cos_incompat);
        else
-               rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
-                                      lh_tgtdirp, cos_incompat);
+               rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
+                                      mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
+                                      cos_incompat);
 
        if (rc != 0)
                GOTO(out_unlock_rename, rc);
@@ -2965,12 +2809,10 @@ relock:
                        GOTO(out_put_new, rc = -EISDIR);
 
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_RMT];
-               mdt_lock_reg_init(lh_oldp, LCK_EX);
-               mdt_lock_reg_init(lh_rmt, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, lock_ibits, cos_incompat);
+                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           MDS_INODELOCK_XATTR, cos_incompat);
                if (rc < 0)
                        GOTO(out_put_new, rc);
 
@@ -2994,21 +2836,9 @@ relock:
                 */
 
                lh_newp = &info->mti_lh[MDT_LH_NEW];
-               mdt_lock_reg_init(lh_newp, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
-               if (mdt_object_remote(mtgtdir)) {
-                       rc = mdt_remote_object_lock(info, mtgtdir,
-                                                   mdt_object_fid(mnew),
-                                                   &lh_newp->mlh_rreg_lh,
-                                                   lh_newp->mlh_rreg_mode,
-                                                   MDS_INODELOCK_LOOKUP,
-                                                   false);
-                       if (rc != ELDLM_OK)
-                               GOTO(out_unlock_old, rc);
-
-                       lock_ibits &= ~MDS_INODELOCK_LOOKUP;
-               }
-               rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
+               rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
+                                          MDS_INODELOCK_LOOKUP |
+                                          MDS_INODELOCK_UPDATE, LCK_EX,
                                           cos_incompat);
                if (rc != 0)
                        GOTO(out_unlock_new, rc);
@@ -3019,12 +2849,10 @@ relock:
                GOTO(out_put_old, rc);
        } else {
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_RMT];
-               mdt_lock_reg_init(lh_oldp, LCK_EX);
-               mdt_lock_reg_init(lh_rmt, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, lock_ibits, cos_incompat);
+                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           MDS_INODELOCK_XATTR, cos_incompat);
                if (rc != 0)
                        GOTO(out_put_old, rc);
 
@@ -3075,8 +2903,7 @@ out_unlock_parents:
        mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
        mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
 out_unlock_rename:
-       if (lustre_handle_is_used(&rename_lh))
-               mdt_rename_unlock(&rename_lh);
+       mdt_rename_unlock(info, rename_lh);
 out_put_tgtdir:
        mdt_object_put(info->mti_env, mtgtdir);
 out_put_srcdir:
index 2c808e9..811e8cc 100644 (file)
@@ -389,36 +389,22 @@ static int mdt_auto_split(struct mdt_thread_info *info)
                GOTO(restriping_clear, rc);
 
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, lname);
-       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                  true);
+       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
        if (rc)
                GOTO(restriping_clear, rc);
 
        lhc = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhc, LCK_EX);
-       if (mdt_object_remote(parent)) {
-               /* enqueue object remote LOOKUP lock */
-               rc = mdt_remote_object_lock(info, parent, mdt_object_fid(child),
-                                           &lhc->mlh_rreg_lh,
-                                           lhc->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(unlock_parent, rc);
-       }
-
-       rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
-                                   true);
+       rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+                                    MDS_INODELOCK_FULL, LCK_EX, true);
        if (rc)
-               GOTO(unlock_child, rc);
+               GOTO(unlock_parent, rc);
 
        mdt_auto_split_prep(info, spec, ma, lum_stripe_count);
 
        rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
        EXIT;
 
-unlock_child:
-       mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+       mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
 unlock_parent:
        mdt_object_unlock(info, parent, lhp, rc);
 restriping_clear:
@@ -472,9 +458,8 @@ static int mdt_restripe_migrate_finish(struct mdt_thread_info *info,
        buf.lb_len = sizeof(*lmv);
 
        lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lh, LCK_EX);
-       rc = mdt_reint_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR,
-                                  false);
+       rc = mdt_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR, LCK_EX,
+                            false);
        if (!rc)
                rc = mo_xattr_set(info->mti_env, mdt_object_child(stripe), &buf,
                                  XATTR_NAME_LMV, LU_XATTR_REPLACE);
index b51f9b2..72672b2 100644 (file)
@@ -368,33 +368,23 @@ int mdt_dir_layout_update(struct mdt_thread_info *info)
        if (IS_ERR(pobj))
                GOTO(put_obj, rc = PTR_ERR(pobj));
 
-       /* revoke object remote LOOKUP lock */
-       if (mdt_object_remote(pobj)) {
-               rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
-               if (rc)
-                       GOTO(put_pobj, rc);
-       }
-
        /*
         * lock parent if dir will be shrunk to 1 stripe, because dir will be
         * converted to normal directory, as will change dir FID and update
         * namespace of parent.
         */
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lhp, LCK_PW);
-
        if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
-               rc = mdt_reint_object_lock(info, pobj, lhp,
-                                          MDS_INODELOCK_UPDATE, true);
+               rc = mdt_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE,
+                                    LCK_PW, true);
                if (rc)
                        GOTO(put_pobj, rc);
        }
 
        /* lock object */
        lhc = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhc, LCK_EX);
-       rc = mdt_reint_striped_lock(info, obj, lhc, MDS_INODELOCK_FULL, einfo,
-                                   true);
+       rc = mdt_object_stripes_lock(info, pobj, obj, lhc, einfo,
+                                    MDS_INODELOCK_FULL, LCK_EX, true);
        if (rc)
                GOTO(unlock_pobj, rc);
 
@@ -513,7 +503,7 @@ int mdt_dir_layout_update(struct mdt_thread_info *info)
        GOTO(unlock_obj, rc);
 
 unlock_obj:
-       mdt_reint_striped_unlock(info, obj, lhc, einfo, rc);
+       mdt_object_stripes_unlock(info, obj, lhc, einfo, rc);
 unlock_pobj:
        mdt_object_unlock(info, pobj, lhp, rc);
 put_pobj:
@@ -615,16 +605,8 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
                lockpart |= MDS_INODELOCK_LAYOUT;
        }
 
-       /* Revoke all clients' lookup lock, since the access
-        * permissions for this inode is changed when ACL_ACCESS is
-        * set. This isn't needed for ACL_DEFAULT, since that does
-        * not change the access permissions of this inode, nor any
-        * other existing inodes. It is setting the ACLs inherited
-        * by new directories/files at create time.
-        */
-       /* We need revoke both LOOKUP|PERM lock here, see mdt_attr_set. */
        if (!strcmp(xattr_name, XATTR_NAME_ACL_ACCESS))
-               lockpart |= MDS_INODELOCK_PERM | MDS_INODELOCK_LOOKUP;
+               lockpart |= MDS_INODELOCK_PERM;
        /* We need to take the lock on behalf of old clients so that newer
         * clients flush their xattr caches
         */
@@ -635,8 +617,7 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
        /* ACLs were sent to clients under LCK_CR locks, so taking LCK_EX
         * to cancel them.
         */
-       mdt_lock_reg_init(lh, LCK_EX);
-       obj = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
+       obj = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart, LCK_EX);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));