Whamcloud - gitweb
LU-3538 dne: Commit-on-Sharing for DNE 30/12530/48
authorLai Siyao <lai.siyao@intel.com>
Sun, 2 Nov 2014 15:30:23 +0000 (23:30 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 28 Jan 2016 18:01:08 +0000 (18:01 +0000)
This patch contains three parts:
1. Sync-on-Cancel for cross-MDT lock, which eleminates dependency
   between transactions and distributed transaction which modified
   remote object, this can guarantee the change of the distributed
   transaction will not be lost.
2. enable Commit-on-Sharing for DNE, PW/EX locks will be converted
   to COS locks, but by default they are ignored, when operation
   finds itself a distributed transaction, it will lock with
   LDLM_FL_COS_INCOMPAT flag to check against existed COS locks.
   This will eliminate dependency between distributed transaction
   and transactions which modify the same local object, and it
   guarantees distributed transaction can always be recovered.
3. striped directory creation needs to ensure its parent permanent
   on disk, to ensure this, cache child locks in mkdir.

Sync-on-Cancel for cross-MDT lock

When two operations have dependency on an object, and the first
operation has a PW/EX cross-MDT lock on this object, trigger
transaction commit on the MDT where the object resides to
eliminate dependency, in short, this patch eliminates dependency
between locks and existed PW/EX cross-MDT lock.

This patch contains following changes:
* enable Sync on Cancel for DNE by default.
* save cross-MDT lock into tgt_uncommitted_soc_locks after use,
  and it will be released upon transaction commit, note, just
  a lock refcount is taken when lock is saved, the read/write
  count is released in mdt_object_unlock().
* the saved cross-MDT lock will be discarded upon BAST,
  because the MDT where the object resides will do sync on lock
  cancel.
* use existed BLOCKING_SYNC_ON_CANCEL mechanism to commit
  transaction upon cross-MDT lock cancel.

Commit-on-Sharing for DNE

On DNE, Commit-on-Sharing is disabled by default, but MDT local
PW/EX lock will be saved as COS lock, and such lock will be
ignored in compatilibity check by default, unless it's required,
there are two situations:
1. when distributed transaction locks local object, it will
   conflict with COS locks.
2. when distributed transaction enqueues cross-MDT lock, it will
   conflict with COS locks.

This patch contains following changes:
* on DNE, local PW/EX lock is converted to COS and saved like
  before even when COS is not enabled.
* above COS locks will be ignored in lock compatibility check by
  default, so for local operations COS won't take effect. But if
  operation finds itself may modify remote MDT object, it will lock
  all local locks with COS checked.
* cross-MDT lock will always conflict with COS locks.
* if operation is reint, it will check whether it's a distributed
  operation (involved objects are remote or striped) if so, check
  against COS locks when enqueing locks.

Eliminate dependency in dir creation

Mkdir needs to take a lock on child, so that any subsequent
distributed operation using that directory would observe a conflict
and ensure that the original mkdir is committed.

Benchmark result with createmany/unlinkmany is as follows:
        mkdir rmdir open unlink mknod unlink (ops/sec)
2.6     1194 1310 1314 1185 2242 1396
master   978 1166  937 1028 1681 1202
current  930 1161  918 1018 1691 1202

* 10 createmany/unlinkmany processes running on local client
  (on MDS), 4M dirs/files created/unlinked, and the numbers are
  average of 10 processes.
* for 2.6, each process is running on a separate mountpoint.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I91928d097cbb26bd1e1089c3f8851ac6a6440a69
Reviewed-on: http://review.whamcloud.com/12530
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
21 files changed:
lustre/contrib/wireshark/lustre_dlm_flags_wshark.c
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_dlm_flags.h
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/lod/lod_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_reint.c
lustre/osp/osp_md_object.c
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/target/tgt_main.c
lustre/tests/conf-sanity.sh
lustre/tests/sanityn.sh
lustre/utils/liblustreapi.c

index 0428d8b..eb091fb 100644 (file)
@@ -21,6 +21,7 @@ static int hf_lustre_ldlm_fl_no_timeout          = -1;
 static int hf_lustre_ldlm_fl_block_nowait        = -1;
 static int hf_lustre_ldlm_fl_test_lock           = -1;
 static int hf_lustre_ldlm_fl_cancel_on_block     = -1;
+static int hf_lustre_ldlm_fl_cos_incompat        = -1;
 static int hf_lustre_ldlm_fl_deny_on_contention  = -1;
 static int hf_lustre_ldlm_fl_ast_discard_data    = -1;
 
@@ -39,6 +40,7 @@ const value_string lustre_ldlm_flags_vals[] = {
   {LDLM_FL_BLOCK_NOWAIT,        "LDLM_FL_BLOCK_NOWAIT"},
   {LDLM_FL_TEST_LOCK,           "LDLM_FL_TEST_LOCK"},
   {LDLM_FL_CANCEL_ON_BLOCK,     "LDLM_FL_CANCEL_ON_BLOCK"},
+  {LDLM_FL_COS_INCOMPAT,        "LDLM_FL_COS_INCOMPAT"},
   {LDLM_FL_DENY_ON_CONTENTION,  "LDLM_FL_DENY_ON_CONTENTION"},
   {LDLM_FL_AST_DISCARD_DATA,    "LDLM_FL_AST_DISCARD_DATA"},
   { 0, NULL }
@@ -81,6 +83,7 @@ lustre_dissect_element_ldlm_lock_flags(
   dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_block_nowait);
   dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_test_lock);
   dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_cancel_on_block);
+  dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_cos_incompat);
   dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_deny_on_contention);
   return
     dissect_uint32(tvb, offset, pinfo, tree, hf_lustre_ldlm_fl_ast_discard_data);
@@ -279,6 +282,22 @@ lustre_dissect_element_ldlm_lock_flags(
     }
   },
   {
+    /* p_id    */ &hf_lustre_ldlm_fl_cos_incompat,
+    /* hfinfo  */ {
+      /* name    */ "LDLM_FL_COS_INCOMPAT",
+      /* abbrev  */ "lustre.ldlm_fl_cos_incompat",
+      /* type    */ FT_BOOLEAN,
+      /* display */ 32,
+      /* strings */ TFS(&lnet_flags_set_truth),
+      /* bitmask */ LDLM_FL_COS_INCOMPAT,
+      /* blurb   */ "Flag whether a lock is enqueued from a distributed transaction, and the\n"
+       "requesting lock mode is PW/EX, if so, it will check compatibility with COS\n"
+       "locks, and different from original COS semantic, transactions from the same\n"
+       "client is also treated as lock conflict.",
+      /* id      */ HFILL
+    }
+  },
+  {
     /* p_id    */ &hf_lustre_ldlm_fl_deny_on_contention,
     /* hfinfo  */ {
       /* name    */ "LDLM_FL_DENY_ON_CONTENTION",
index 5da2544..e831975 100644 (file)
@@ -159,6 +159,8 @@ struct lu_target {
        struct dt_object        *lut_reply_data;
        /** Bitmap of used slots in the reply data file */
        unsigned long           **lut_reply_bitmap;
+       /** target sync count, used for debug & test */
+       atomic_t                 lut_sync_count;
 };
 
 /* number of slots in reply bitmap */
@@ -425,6 +427,8 @@ int tgt_hpreq_handler(struct ptlrpc_request *req);
 
 /* target/tgt_main.c */
 void tgt_boot_epoch_update(struct lu_target *lut);
+void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno);
+void tgt_discard_slc_lock(struct ldlm_lock *lock);
 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
                           struct obd_export *exp, __u64 transno);
 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp);
index 354b6aa..b0fa434 100644 (file)
@@ -1032,6 +1032,11 @@ static inline void lustre_handle_copy(struct lustre_handle *tgt,
        tgt->cookie = src->cookie;
 }
 
+struct lustre_handle_array {
+       unsigned int            count;
+       struct lustre_handle    handles[0];
+};
+
 /* flags for lm_flags */
 #define MSGHDR_AT_SUPPORT               0x1
 #define MSGHDR_CKSUM_INCOMPAT18         0x2
index 89a029a..61728ea 100644 (file)
@@ -900,6 +900,13 @@ struct ldlm_lock {
        struct list_head        l_exp_list;
 };
 
+/** For uncommitted cross-MDT lock, store transno this lock belongs to */
+#define l_transno l_client_cookie
+
+/** For uncommitted cross-MDT lock, which is client lock, share with l_rk_ast
+ *  which is for server. */
+#define l_slc_link l_rk_ast
+
 /**
  * LDLM resource description.
  * Basically, resource is a representation for a single object.
index 5f7206d..9b7037e 100644 (file)
 #define ldlm_set_cancel_on_block(_l)    LDLM_SET_FLAG((  _l), 1ULL << 23)
 #define ldlm_clear_cancel_on_block(_l)  LDLM_CLEAR_FLAG((_l), 1ULL << 23)
 
+/** Flag whether a lock is enqueued from a distributed transaction, and the
+ *  requesting lock mode is PW/EX, if so, it will check compatibility with COS
+ *  locks, and different from original COS semantic, transactions from the same
+ *  client is also treated as lock conflict. */
+#define LDLM_FL_COS_INCOMPAT           0x0000000001000000ULL /* bit  24 */
+#define ldlm_is_cos_incompat(_l)       LDLM_TEST_FLAG((_l), 1ULL << 24)
+#define ldlm_set_cos_incompat(_l)      LDLM_SET_FLAG((_l), 1ULL << 24)
+#define ldlm_clear_cos_incompat(_l)    LDLM_CLEAR_FLAG((_l), 1ULL << 24)
+
 /**
  * measure lock contention and return -EUSERS if locking contention is high */
 #define LDLM_FL_DENY_ON_CONTENTION        0x0000000040000000ULL // bit  30
 /** Flag whether a lock is found on server for re-sent RPC. */
 #define LDLM_FL_RESENT                   0x0100000000000000ULL // bit  56
 
+/** Flag whether Commit-on-Sharing is enabled, if LDLM_FL_COS_INCOMPAT is set
+ *  this flag may not be set because once the former is set this flag won't be
+ *  checked, and for cross-MDT lock COS_INCOMPAT is always set but ast handle is
+ *  in ldlm context which doesn't know whether COS is enabled or not. */
+#define LDLM_FL_COS_ENABLED              0x0200000000000000ULL /* bit  57 */
+#define ldlm_is_cos_enabled(_l)          LDLM_TEST_FLAG((_l), 1ULL << 57)
+#define ldlm_set_cos_enabled(_l)         LDLM_SET_FLAG((_l), 1ULL << 57)
+
 /** l_flags bits marked as "ast" bits */
 #define LDLM_FL_AST_MASK                (LDLM_FL_FLOCK_DEADLOCK                |\
                                         LDLM_FL_AST_DISCARD_DATA)
index 0c31481..74119fb 100644 (file)
@@ -56,6 +56,7 @@
 #include <lustre_dlm.h>
 #include <obd_support.h>
 #include <lustre_lib.h>
+#include <obd_class.h>
 
 #include "ldlm_internal.h"
 
@@ -81,7 +82,6 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 {
        struct list_head *tmp;
        struct ldlm_lock *lock;
-       enum ldlm_mode req_mode = req->l_req_mode;
        __u64 req_bits = req->l_policy_data.l_inodebits.bits;
        int compat = 1;
        ENTRY;
@@ -98,23 +98,33 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                /* We stop walking the queue if we hit ourselves so we don't
                 * take conflicting locks enqueued after us into account,
                 * or we'd wait forever. */
-                if (req == lock)
-                        RETURN(compat);
+               if (req == lock)
+                       RETURN(compat);
 
-                /* last lock in mode group */
-                LASSERT(lock->l_sl_mode.prev != NULL);
+               /* last lock in mode group */
+               LASSERT(lock->l_sl_mode.prev != NULL);
                mode_tail = &list_entry(lock->l_sl_mode.prev,
-                                            struct ldlm_lock,
-                                            l_sl_mode)->l_res_link;
+                                       struct ldlm_lock,
+                                       l_sl_mode)->l_res_link;
 
-                /* locks are compatible, bits don't matter */
-                if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                        /* jump to last lock in mode group */
-                        tmp = mode_tail;
-                        continue;
-                }
+               /* if reqest lock is not COS_INCOMPAT and COS is disabled,
+                * they are compatible, IOW this request is from a local
+                * transaction on a DNE system. */
+               if (lock->l_req_mode == LCK_COS && !ldlm_is_cos_incompat(req) &&
+                   !ldlm_is_cos_enabled(req)) {
+                       /* jump to last lock in mode group */
+                       tmp = mode_tail;
+                       continue;
+               }
 
-                for (;;) {
+               /* locks' mode are compatible, bits don't matter */
+               if (lockmode_compat(lock->l_req_mode, req->l_req_mode)) {
+                       /* jump to last lock in mode group */
+                       tmp = mode_tail;
+                       continue;
+               }
+
+               for (;;) {
                        struct list_head *head;
 
                        /* Advance loop cursor to last lock in policy group. */
@@ -128,6 +138,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                 * requirement: it is only compatible with
                                 * locks from the same client. */
                                if (lock->l_req_mode == LCK_COS &&
+                                   !ldlm_is_cos_incompat(req) &&
+                                   ldlm_is_cos_enabled(req) &&
                                    lock->l_client_cookie == req->l_client_cookie)
                                        goto not_conflicting;
                                /* Found a conflicting policy group. */
@@ -206,8 +218,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                 ldlm_grant_lock(lock, work_list);
 
                *err = ELDLM_OK;
-                RETURN(LDLM_ITER_CONTINUE);
-        }
+               RETURN(LDLM_ITER_CONTINUE);
+       }
 
  restart:
         rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
index 484f0b4..90580b0 100644 (file)
@@ -1799,6 +1799,10 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
                ldlm_set_ast_discard_data(lock);
        if (*flags & LDLM_FL_TEST_LOCK)
                ldlm_set_test_lock(lock);
+       if (*flags & LDLM_FL_COS_INCOMPAT)
+               ldlm_set_cos_incompat(lock);
+       if (*flags & LDLM_FL_COS_ENABLED)
+               ldlm_set_cos_enabled(lock);
 
        /* This distinction between local lock trees is very important; a client
         * namespace only has information about locks taken by that client, and
index 5060265..a0586af 100644 (file)
@@ -903,6 +903,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LDLM_DEBUG(lock, "server preparing blocking AST");
 
         ptlrpc_request_set_replen(req);
+       ldlm_set_cbpending(lock);
        if (instant_cancel) {
                unlock_res_and_lock(lock);
                ldlm_lock_cancel(lock);
index 5118151..ccf79ec 100644 (file)
@@ -3890,11 +3890,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
        return dt_object_sync(env, dt_object_child(dt), start, end);
 }
 
-struct lod_slave_locks {
-       int                     lsl_lock_count;
-       struct lustre_handle    lsl_handle[0];
-};
-
 /**
  * Release LDLM locks on the stripes of a striped directory.
  *
@@ -3914,7 +3909,7 @@ static int lod_object_unlock_internal(const struct lu_env *env,
                                      struct ldlm_enqueue_info *einfo,
                                      union ldlm_policy_data *policy)
 {
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int                     rc = 0;
        int                     i;
        ENTRY;
@@ -3922,9 +3917,9 @@ static int lod_object_unlock_internal(const struct lu_env *env,
        if (slave_locks == NULL)
                RETURN(0);
 
-       for (i = 1; i < slave_locks->lsl_lock_count; i++) {
-               if (lustre_handle_is_used(&slave_locks->lsl_handle[i]))
-                       ldlm_lock_decref(&slave_locks->lsl_handle[i],
+       for (i = 1; i < slave_locks->count; i++) {
+               if (lustre_handle_is_used(&slave_locks->handles[i]))
+                       ldlm_lock_decref(&slave_locks->handles[i],
                                         einfo->ei_mode);
        }
 
@@ -3943,32 +3938,31 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
                             struct ldlm_enqueue_info *einfo,
                             union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
-       int                     slave_locks_size;
-       int                     rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       int slave_locks_size;
+       int i;
        ENTRY;
 
        if (slave_locks == NULL)
                RETURN(0);
 
-       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               RETURN(-ENOTDIR);
-
+       LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+       LASSERT(lo->ldo_stripenr > 1);
        /* Note: for remote lock for single stripe dir, MDT will cancel
         * the lock by lockh directly */
-       if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
-               RETURN(0);
+       LASSERT(!dt_object_remote(dt_object_child(dt)));
 
-       /* Only cancel slave lock for striped dir */
-       rc = lod_object_unlock_internal(env, dt, einfo, policy);
+       /* locks were unlocked in MDT layer */
+       for (i = 1; i < slave_locks->count; i++)
+               LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
 
-       slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
-                          sizeof(slave_locks->lsl_handle[0]);
+       slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+                          sizeof(slave_locks->handles[0]);
        OBD_FREE(slave_locks, slave_locks_size);
        einfo->ei_cbdata = NULL;
 
-       RETURN(rc);
+       RETURN(0);
 }
 
 /**
@@ -3989,7 +3983,7 @@ static int lod_object_lock(const struct lu_env *env,
        int                     rc = 0;
        int                     i;
        int                     slave_locks_size;
-       struct lod_slave_locks  *slave_locks = NULL;
+       struct lustre_handle_array *slave_locks = NULL;
        ENTRY;
 
        /* remote object lock */
@@ -4011,12 +4005,12 @@ static int lod_object_lock(const struct lu_env *env,
                RETURN(0);
 
        slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
-                          sizeof(slave_locks->lsl_handle[0]);
+                          sizeof(slave_locks->handles[0]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
        if (slave_locks == NULL)
                RETURN(-ENOMEM);
-       slave_locks->lsl_lock_count = lo->ldo_stripenr;
+       slave_locks->count = lo->ldo_stripenr;
 
        /* striped directory lock */
        for (i = 1; i < lo->ldo_stripenr; i++) {
@@ -4038,6 +4032,10 @@ static int lod_object_lock(const struct lu_env *env,
                        ldlm_completion_callback completion = einfo->ei_cb_cp;
                        __u64   dlmflags = LDLM_FL_ATOMIC_CB;
 
+                       if (einfo->ei_mode == LCK_PW ||
+                           einfo->ei_mode == LCK_EX)
+                               dlmflags |= LDLM_FL_COS_INCOMPAT;
+
                        /* This only happens if there are mulitple stripes
                         * on the master MDT, i.e. except stripe0, there are
                         * other stripes on the Master MDT as well, Only
@@ -4052,7 +4050,7 @@ static int lod_object_lock(const struct lu_env *env,
                }
                if (rc != 0)
                        GOTO(out, rc);
-               slave_locks->lsl_handle[i] = lockh;
+               slave_locks->handles[i] = lockh;
        }
 
        einfo->ei_cbdata = slave_locks;
index 18ab7b6..3035202 100644 (file)
@@ -170,6 +170,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
        lh->mlh_reg_mode = lock_mode;
+       lh->mlh_pdo_mode = LCK_MINMODE;
        lh->mlh_rreg_mode = lock_mode;
        lh->mlh_type = MDT_PDO_LOCK;
 
@@ -2155,11 +2156,14 @@ static void mdt_device_commit_async(const struct lu_env *env,
 {
        struct dt_device *dt = mdt->mdt_bottom;
        int rc;
+       ENTRY;
 
        rc = dt->dd_ops->dt_commit_async(env, dt);
        if (unlikely(rc != 0))
                CWARN("%s: async commit start failed: rc = %d\n",
                      mdt_obd_name(mdt), rc);
+       atomic_inc(&mdt->mdt_async_commit_count);
+       EXIT;
 }
 
 /**
@@ -2215,17 +2219,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
         if (flag == LDLM_CB_CANCELING)
                 RETURN(0);
+
         lock_res_and_lock(lock);
         if (lock->l_blocking_ast != mdt_blocking_ast) {
                 unlock_res_and_lock(lock);
                 RETURN(0);
         }
-        if (mdt_cos_is_enabled(mdt) &&
-            lock->l_req_mode & (LCK_PW | LCK_EX) &&
-            lock->l_blocking_lock != NULL &&
-            lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
-                mdt_set_lock_sync(lock);
-        }
+       if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
+           lock->l_blocking_lock != NULL) {
+               if (mdt_cos_is_enabled(mdt) &&
+                   lock->l_client_cookie !=
+                   lock->l_blocking_lock->l_client_cookie)
+                       mdt_set_lock_sync(lock);
+               else if (mdt_slc_is_enabled(mdt) &&
+                        ldlm_is_cos_incompat(lock->l_blocking_lock))
+                       mdt_set_lock_sync(lock);
+       }
         rc = ldlm_blocking_ast_nocheck(lock);
 
         /* There is no lock conflict if l_blocking_lock == NULL,
@@ -2246,12 +2255,24 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(rc);
 }
 
-/* Used for cross-MDT lock */
+/*
+ * Blocking AST for cross-MDT lock
+ *
+ * Discard lock from uncommitted_slc_locks and cancel it.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval     0 on success
+ * \retval     negative number on error
+ */
 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
        struct lustre_handle lockh;
        int               rc;
+       ENTRY;
 
        switch (flag) {
        case LDLM_CB_BLOCKING:
@@ -2265,10 +2286,14 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                break;
        case LDLM_CB_CANCELING:
                LDLM_DEBUG(lock, "Revoke remote lock\n");
+               /* discard slc lock here so that it can be cleaned anytime,
+                * especially for cleanup_resource() */
+               tgt_discard_slc_lock(lock);
                break;
        default:
                LBUG();
        }
+
        RETURN(0);
 }
 
@@ -2342,12 +2367,12 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
 static int mdt_object_local_lock(struct mdt_thread_info *info,
                                 struct mdt_object *o,
                                 struct mdt_lock_handle *lh, __u64 ibits,
-                                bool nonblock)
+                                bool nonblock, bool cos_incompat)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
-       __u64 dlmflags;
+       __u64 dlmflags = 0;
        int rc;
        ENTRY;
 
@@ -2356,6 +2381,14 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
         LASSERT(lh->mlh_type != MDT_NUL_LOCK);
 
+       if (cos_incompat) {
+               LASSERT(lh->mlh_reg_mode == LCK_PW ||
+                       lh->mlh_reg_mode == LCK_EX);
+               dlmflags |= LDLM_FL_COS_INCOMPAT;
+       } else if (mdt_cos_is_enabled(info->mti_mdt)) {
+               dlmflags |= LDLM_FL_COS_ENABLED;
+       }
+
        /* Only enqueue LOOKUP lock for remote object */
        if (mdt_object_remote(o))
                LASSERT(ibits == MDS_INODELOCK_LOOKUP);
@@ -2375,7 +2408,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
         memset(policy, 0, sizeof(*policy));
         fid_build_reg_res_name(mdt_object_fid(o), res_id);
 
-       dlmflags = LDLM_FL_ATOMIC_CB;
+       dlmflags |= LDLM_FL_ATOMIC_CB;
        if (nonblock)
                dlmflags |= LDLM_FL_BLOCK_NOWAIT;
 
@@ -2432,15 +2465,18 @@ out_unlock:
 
 static int
 mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
-                        struct mdt_lock_handle *lh, __u64 ibits,
-                        bool nonblock)
+                        struct mdt_lock_handle *lh, __u64 ibits, bool nonblock,
+                        bool cos_incompat)
 {
        struct mdt_lock_handle *local_lh = NULL;
        int rc;
        ENTRY;
 
-       if (!mdt_object_remote(o))
-               return mdt_object_local_lock(info, o, lh, ibits, nonblock);
+       if (!mdt_object_remote(o)) {
+               rc = mdt_object_local_lock(info, o, lh, ibits, nonblock,
+                                          cos_incompat);
+               RETURN(rc);
+       }
 
        /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
        ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
@@ -2448,9 +2484,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
 
        /* Only enqueue LOOKUP lock for remote object */
        if (ibits & MDS_INODELOCK_LOOKUP) {
-               rc = mdt_object_local_lock(info, o, lh,
-                                          MDS_INODELOCK_LOOKUP,
-                                          nonblock);
+               rc = mdt_object_local_lock(info, o, lh, MDS_INODELOCK_LOOKUP,
+                                          nonblock, cos_incompat);
                if (rc != ELDLM_OK)
                        RETURN(rc);
 
@@ -2488,7 +2523,16 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                    struct mdt_lock_handle *lh, __u64 ibits)
 {
-       return mdt_object_lock_internal(info, o, lh, ibits, false);
+       return mdt_object_lock_internal(info, o, lh, ibits, false, false);
+}
+
+int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                         struct mdt_lock_handle *lh, __u64 ibits,
+                         bool cos_incompat)
+{
+       LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
+       return mdt_object_lock_internal(info, o, lh, ibits, false,
+                                       cos_incompat);
 }
 
 int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
@@ -2497,7 +2541,22 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
        struct mdt_lock_handle tmp = *lh;
        int rc;
 
-       rc = mdt_object_lock_internal(info, o, &tmp, ibits, true);
+       rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, false);
+       if (rc == 0)
+               *lh = tmp;
+
+       return rc == 0;
+}
+
+int mdt_reint_object_lock_try(struct mdt_thread_info *info,
+                             struct mdt_object *o, struct mdt_lock_handle *lh,
+                             __u64 ibits, bool cos_incompat)
+{
+       struct mdt_lock_handle tmp = *lh;
+       int rc;
+
+       LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
+       rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, cos_incompat);
        if (rc == 0)
                *lh = tmp;
 
@@ -2530,24 +2589,27 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       int no_ack = 0;
+                       int cos;
+
+                       cos = (mdt_cos_is_enabled(mdt) ||
+                              mdt_slc_is_enabled(mdt));
 
                        LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
                                 h->cookie);
+
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
                                CDEBUG(D_HA, "request = %p reply state = %p"
                                       " transno = "LPD64"\n", req,
                                       req->rq_reply_state, req->rq_transno);
-                               if (mdt_cos_is_enabled(mdt)) {
-                                       no_ack = 1;
+                               if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, no_ack);
+                               ptlrpc_save_lock(req, h, mode, cos);
                        } else {
-                               ldlm_lock_decref(h, mode);
+                               mdt_fid_unlock(h, mode);
                        }
                         if (mdt_is_lock_sync(lock)) {
                                 CDEBUG(D_HA, "found sync-lock,"
@@ -2564,6 +2626,41 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
 }
 
 /**
+ * Save cross-MDT lock in uncommitted_slc_locks
+ *
+ * Keep the lock referenced until transaction commit happens or release the lock
+ * immediately depending on input parameters.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static void mdt_save_remote_lock(struct mdt_thread_info *info,
+                                struct lustre_handle *h, enum ldlm_mode mode,
+                                int decref)
+{
+       ENTRY;
+
+       if (lustre_handle_is_used(h)) {
+               if (decref || !info->mti_has_trans ||
+                   !(mode & (LCK_PW | LCK_EX))) {
+                       ldlm_lock_decref_and_cancel(h, mode);
+               } else {
+                       struct ldlm_lock *lock = ldlm_handle2lock(h);
+                       struct ptlrpc_request *req = mdt_info_req(info);
+
+                       LASSERT(req != NULL);
+                       tgt_save_slc_lock(lock, req->rq_transno);
+                       ldlm_lock_decref(h, mode);
+               }
+               h->cookie = 0ull;
+       }
+
+       EXIT;
+}
+
+/**
  * Unlock mdt object.
  *
  * Immeditely release the regular lock and the PDO lock or save the
@@ -2576,17 +2673,15 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
  * \param decref force immediate lock releasing
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
-                       struct mdt_lock_handle *lh, int decref)
+                      struct mdt_lock_handle *lh, int decref)
 {
-        ENTRY;
-
-        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
-        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+       ENTRY;
 
-       if (lustre_handle_is_used(&lh->mlh_rreg_lh))
-               ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode);
+       mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+       mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+       mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref);
 
-        EXIT;
+       EXIT;
 }
 
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
@@ -4375,6 +4470,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_remote_dir = 0;
        m->mdt_enable_remote_dir_gid = 0;
 
+       atomic_set(&m->mdt_mds_mds_conns, 0);
+       atomic_set(&m->mdt_async_commit_count, 0);
+
        m->mdt_lu_dev.ld_ops = &mdt_lu_ops;
        m->mdt_lu_dev.ld_obd = obd;
        /* Set this lu_device to obd for error handling purposes. */
@@ -5004,6 +5102,18 @@ static int mdt_export_cleanup(struct obd_export *exp)
         RETURN(rc);
 }
 
+static inline void mdt_enable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
+               mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL;
+}
+
+static inline void mdt_disable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL)
+               mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+}
+
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
         int rc;
@@ -5012,6 +5122,14 @@ static int mdt_obd_disconnect(struct obd_export *exp)
         LASSERT(exp);
         class_export_get(exp);
 
+       if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
+           !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) {
+               struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+
+               if (atomic_dec_and_test(&mdt->mdt_mds_mds_conns))
+                       mdt_disable_slc(mdt);
+       }
+
        rc = server_disconnect_export(exp);
        if (rc != 0)
                CDEBUG(D_IOCTL, "server disconnect error: rc = %d\n", rc);
@@ -5042,6 +5160,12 @@ static int mdt_obd_connect(const struct lu_env *env,
 
        mdt = mdt_dev(obd->obd_lu_dev);
 
+       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+               atomic_inc(&mdt->mdt_mds_mds_conns);
+               mdt_enable_slc(mdt);
+       }
+
        /*
         * first, check whether the stack is ready to handle requests
         * XXX: probably not very appropriate method is used now
index 178868f..95c8a24 100644 (file)
@@ -216,6 +216,12 @@ struct mdt_device {
        struct lu_device          *mdt_qmt_dev;
 
        struct coordinator         mdt_coordinator;
+
+       /* inter-MDT connection count */
+       atomic_t                   mdt_mds_mds_conns;
+
+       /* MDT device async commit count, used for debug and sanity test */
+       atomic_t                   mdt_async_commit_count;
 };
 
 #define MDT_SERVICE_WATCHDOG_FACTOR    (2)
@@ -586,9 +592,17 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, struct mdt_object *mo,
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *mo,
                    struct mdt_lock_handle *lh, __u64 ibits);
 
+int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                         struct mdt_lock_handle *lh, __u64 ibits,
+                         bool cos_incompat);
+
 int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo,
                        struct mdt_lock_handle *lh, __u64 ibits);
 
+int mdt_reint_object_lock_try(struct mdt_thread_info *info,
+                             struct mdt_object *o, struct mdt_lock_handle *lh,
+                             __u64 ibits, bool cos_incompat);
+
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
                       struct mdt_lock_handle *lh, int decref);
 
@@ -965,6 +979,11 @@ static inline void mdt_fid_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
        ldlm_lock_decref(lh, mode);
 }
 
+static inline bool mdt_slc_is_enabled(struct mdt_device *mdt)
+{
+       return mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL;
+}
+
 extern mdl_mode_t mdt_mdl_lock_modes[];
 extern enum ldlm_mode mdt_dlm_lock_modes[];
 
index 9f6ca37..58728e8 100644 (file)
@@ -677,6 +677,100 @@ mdt_enable_remote_dir_gid_seq_write(struct file *file,
 }
 LPROC_SEQ_FOPS(mdt_enable_remote_dir_gid);
 
+/**
+ * Show MDT policy for handling dirty metadata under a lock being cancelled.
+ *
+ * \param[in] m                seq_file handle
+ * \param[in] data     unused for single entry
+ *
+ * \retval             0 on success
+ * \retval             negative value on error
+ */
+static int mdt_slc_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device *obd = m->private;
+       struct lu_target *tgt = obd->u.obt.obt_lut;
+       char *slc_states[] = {"never", "blocking", "always" };
+
+       return seq_printf(m, "%s\n", slc_states[tgt->lut_sync_lock_cancel]);
+}
+LPROC_SEQ_FOPS_RO(mdt_slc);
+
+/**
+ * Show MDT async commit count.
+ *
+ * \param[in] m                seq_file handle
+ * \param[in] data     unused for single entry
+ *
+ * \retval             0 on success
+ * \retval             negative value on error
+ */
+static int mdt_async_commit_count_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device *obd = m->private;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return seq_printf(m, "%d\n", atomic_read(&mdt->mdt_async_commit_count));
+}
+
+static ssize_t
+mdt_async_commit_count_seq_write(struct file *file, const char __user *buffer,
+                                size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct obd_device *obd = m->private;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       int val;
+       int rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       atomic_set(&mdt->mdt_async_commit_count, val);
+
+       return count;
+}
+LPROC_SEQ_FOPS(mdt_async_commit_count);
+
+/**
+ * Show MDT sync count.
+ *
+ * \param[in] m                seq_file handle
+ * \param[in] data     unused for single entry
+ *
+ * \retval             0 on success
+ * \retval             negative value on error
+ */
+static int mdt_sync_count_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device *obd = m->private;
+       struct lu_target *tgt = obd->u.obt.obt_lut;
+
+       return seq_printf(m, "%d\n", atomic_read(&tgt->lut_sync_count));
+}
+
+static ssize_t
+mdt_sync_count_seq_write(struct file *file, const char __user *buffer,
+                        size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct obd_device *obd = m->private;
+       struct lu_target *tgt = obd->u.obt.obt_lut;
+       int val;
+       int rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       atomic_set(&tgt->lut_sync_count, val);
+
+       return count;
+}
+LPROC_SEQ_FOPS(mdt_sync_count);
+
+
 LPROC_SEQ_FOPS_RO_TYPE(mdt, uuid);
 LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status);
 LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports);
@@ -740,6 +834,12 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
          .fops =       &mdt_recovery_time_hard_fops    },
        { .name =       "recovery_time_soft",
          .fops =       &mdt_recovery_time_soft_fops    },
+       { .name =       "sync_lock_cancel",
+         .fops =       &mdt_slc_fops                           },
+       { .name =       "async_commit_count",
+         .fops =       &mdt_async_commit_count_fops            },
+       { .name =       "sync_count",
+         .fops =       &mdt_sync_count_fops                    },
        { NULL }
 };
 
index d18019b..5aac290 100644 (file)
@@ -292,131 +292,16 @@ static int mdt_remote_permission(struct mdt_thread_info *info)
        return 0;
 }
 
-/*
- * VBR: we save three versions in reply:
- * 0 - parent. Check that parent version is the same during replay.
- * 1 - name. Version of 'name' if file exists with the same name or
- * ENOENT_VERSION, it is needed because file may appear due to missed replays.
- * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
- * check.
- */
-static int mdt_md_create(struct mdt_thread_info *info)
-{
-        struct mdt_device       *mdt = info->mti_mdt;
-        struct mdt_object       *parent;
-        struct mdt_object       *child;
-        struct mdt_lock_handle  *lh;
-        struct mdt_body         *repbody;
-        struct md_attr          *ma = &info->mti_attr;
-        struct mdt_reint_record *rr = &info->mti_rr;
-        int rc;
-        ENTRY;
-
-       DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  ("DNAME"->"DFID") "
-                 "in "DFID,
-                 PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
-
-       if (!fid_is_md_operative(rr->rr_fid1))
-               RETURN(-EPERM);
-
-       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-
-       parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
-       if (IS_ERR(parent))
-               RETURN(PTR_ERR(parent));
-
-       if (!mdt_object_exists(parent))
-               GOTO(put_parent, rc = -ENOENT);
-
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-       rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
-       if (rc)
-               GOTO(put_parent, rc);
-
-       if (!mdt_object_remote(parent)) {
-               rc = mdt_version_get_check_save(info, parent, 0);
-               if (rc)
-                       GOTO(unlock_parent, rc);
-       }
-
-       /*
-        * Check child name version during replay.
-        * During create replay a file may exist with same name.
-        */
-       rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
-                                     &info->mti_tmp_fid1, 1);
-       if (rc == 0)
-               GOTO(unlock_parent, rc = -EEXIST);
-
-       /* -ENOENT is expected here */
-       if (rc != -ENOENT)
-               GOTO(unlock_parent, rc);
-
-       /* save version of file name for replay, it must be ENOENT here */
-       mdt_enoent_version_save(info, 1);
-
-       child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
-        if (likely(!IS_ERR(child))) {
-                struct md_object *next = mdt_object_child(parent);
-
-               rc = mdt_remote_permission(info);
-               if (rc != 0)
-                       GOTO(out_put_child, rc);
-
-               ma->ma_need = MA_INODE;
-                ma->ma_valid = 0;
-
-                mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                               OBD_FAIL_MDS_REINT_CREATE_WRITE);
-
-                /* Version of child will be updated on disk. */
-               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
-                rc = mdt_version_get_check_save(info, child, 2);
-                if (rc)
-                        GOTO(out_put_child, rc);
-
-                /* Let lower layer know current lock mode. */
-                info->mti_spec.sp_cr_mode =
-                        mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
-
-               /*
-                * Do not perform lookup sanity check. We know that name does
-                * not exist.
-                */
-               info->mti_spec.sp_cr_lookup = 0;
-                info->mti_spec.sp_feat = &dt_directory_features;
-
-               rc = mdo_create(info->mti_env, next, &rr->rr_name,
-                               mdt_object_child(child), &info->mti_spec, ma);
-               if (rc == 0)
-                       rc = mdt_attr_get_complex(info, child, ma);
-
-               if (rc == 0) {
-                       /* Return fid & attr to client. */
-                       if (ma->ma_valid & MA_INODE)
-                               mdt_pack_attr2body(info, repbody, &ma->ma_attr,
-                                                  mdt_object_fid(child));
-               }
-out_put_child:
-               mdt_object_put(info->mti_env, child);
-       } else {
-               rc = PTR_ERR(child);
-       }
-unlock_parent:
-       mdt_object_unlock(info, parent, lh, rc);
-put_parent:
-       mdt_object_put(info->mti_env, parent);
-       RETURN(rc);
-}
-
 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
                             struct mdt_object *obj, __u64 ibits,
                             struct mdt_lock_handle *s0_lh,
                             struct mdt_object *s0_obj,
-                            struct ldlm_enqueue_info *einfo)
+                            struct ldlm_enqueue_info *einfo,
+                            int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       int i;
        int rc;
        ENTRY;
 
@@ -426,31 +311,33 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti,
        /* Unlock stripe 0 */
        if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
                LASSERT(s0_obj != NULL);
-               mdt_object_unlock_put(mti, s0_obj, s0_lh, 1);
+               mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
        }
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
+       if (slave_locks != NULL) {
+               LASSERT(s0_lh != NULL);
+               for (i = 1; i < slave_locks->count; i++) {
+                       /* borrow s0_lh temporarily to do mdt unlock */
+                       mdt_lock_reg_init(s0_lh, einfo->ei_mode);
+                       s0_lh->mlh_rreg_lh = slave_locks->handles[i];
+                       mdt_object_unlock(mti, obj, s0_lh, decref);
+                       slave_locks->handles[i].cookie = 0ull;
+               }
+       }
+
        rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
                              policy);
        RETURN(rc);
 }
 
-/**
- * Lock slave stripes if necessary, the lock handles of slave stripes
- * will be stored in einfo->ei_cbdata.
- **/
-static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          enum ldlm_mode mode, __u64 ibits,
-                          struct mdt_lock_handle *s0_lh,
-                          struct mdt_object **s0_objp,
-                          struct ldlm_enqueue_info *einfo)
+static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
+                          struct lu_fid *fid)
 {
-       union ldlm_policy_data *policy = &mti->mti_policy;
        struct lu_buf *buf = &mti->mti_buf;
        struct lmv_mds_md_v1 *lmv;
-       struct lu_fid *fid = &mti->mti_tmp_fid1;
        int rc;
        ENTRY;
 
@@ -480,15 +367,46 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                RETURN(-EINVAL);
 
        fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-       if (!lu_fid_eq(fid, mdt_object_fid(obj))) {
+
+       RETURN(rc);
+}
+
+/**
+ * Lock slave stripes if necessary, the lock handles of slave stripes
+ * will be stored in einfo->ei_cbdata.
+ **/
+static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
+                          enum ldlm_mode mode, __u64 ibits,
+                          struct lu_fid *s0_fid,
+                          struct mdt_lock_handle *s0_lh,
+                          struct mdt_object **s0_objp,
+                          struct ldlm_enqueue_info *einfo)
+{
+       union ldlm_policy_data *policy = &mti->mti_policy;
+       int rc;
+       ENTRY;
+
+       rc = mdt_init_slaves(mti, obj, s0_fid);
+       if (rc <= 0)
+               RETURN(rc);
+
+       LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+
+       if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
                /* Except migrating object, whose 0_stripe and master
                 * object are the same object, 0_stripe and master
                 * object are different, though they are in the same
                 * MDT, to avoid adding osd_object_lock here, so we
                 * will enqueue the stripe0 lock in MDT0 for now */
-               *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits);
+               *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
                if (IS_ERR(*s0_objp))
                        RETURN(PTR_ERR(*s0_objp));
+
+               rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
+               if (rc < 0) {
+                       mdt_object_put(mti->mti_env, *s0_objp);
+                       RETURN(rc);
+               }
        }
 
        memset(einfo, 0, sizeof(*einfo));
@@ -507,6 +425,174 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        RETURN(rc);
 }
 
+/*
+ * VBR: we save three versions in reply:
+ * 0 - parent. Check that parent version is the same during replay.
+ * 1 - name. Version of 'name' if file exists with the same name or
+ * ENOENT_VERSION, it is needed because file may appear due to missed replays.
+ * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
+ * check.
+ */
+static int mdt_md_create(struct mdt_thread_info *info)
+{
+       struct mdt_device       *mdt = info->mti_mdt;
+       struct mdt_object       *parent;
+       struct mdt_object       *child;
+       struct mdt_lock_handle  *lh;
+       struct mdt_body         *repbody;
+       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       int rc;
+       ENTRY;
+
+       DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  ("DNAME"->"DFID") "
+                 "in "DFID,
+                 PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
+
+       if (!fid_is_md_operative(rr->rr_fid1))
+               RETURN(-EPERM);
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+
+       parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(parent))
+               RETURN(PTR_ERR(parent));
+
+       if (!mdt_object_exists(parent))
+               GOTO(put_parent, rc = -ENOENT);
+
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+       rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+       if (rc)
+               GOTO(put_parent, rc);
+
+       if (!mdt_object_remote(parent)) {
+               rc = mdt_version_get_check_save(info, parent, 0);
+               if (rc)
+                       GOTO(unlock_parent, rc);
+       }
+
+       /*
+        * Check child name version during replay.
+        * During create replay a file may exist with same name.
+        */
+       rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
+                                     &info->mti_tmp_fid1, 1);
+       if (rc == 0)
+               GOTO(unlock_parent, rc = -EEXIST);
+
+       /* -ENOENT is expected here */
+       if (rc != -ENOENT)
+               GOTO(unlock_parent, rc);
+
+       /* save version of file name for replay, it must be ENOENT here */
+       mdt_enoent_version_save(info, 1);
+
+       child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
+       if (unlikely(IS_ERR(child)))
+               GOTO(unlock_parent, rc = PTR_ERR(child));
+
+       rc = mdt_remote_permission(info);
+       if (rc != 0)
+               GOTO(put_child, rc);
+
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
+
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                       OBD_FAIL_MDS_REINT_CREATE_WRITE);
+
+       /* Version of child will be updated on disk. */
+       tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
+       rc = mdt_version_get_check_save(info, child, 2);
+       if (rc)
+               GOTO(put_child, rc);
+
+       /* Let lower layer know current lock mode. */
+       info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
+
+       /*
+        * Do not perform lookup sanity check. We know that name does
+        * not exist.
+        */
+       info->mti_spec.sp_cr_lookup = 0;
+       info->mti_spec.sp_feat = &dt_directory_features;
+
+       rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
+                       mdt_object_child(child), &info->mti_spec, ma);
+       if (rc == 0)
+               rc = mdt_attr_get_complex(info, child, ma);
+
+       if (rc < 0)
+               GOTO(put_child, rc);
+
+       /*
+        * On DNE, we need to eliminate dependey between 'mkdir a' and
+        * 'mkdir a/b' if b is a striped directory, to achieve this, two
+        * things are done below:
+        * 1. save child and slaves lock.
+        * 2. if the child is a striped directory, relock parent so to
+        *    compare against with COS locks to ensure parent was
+        *    committed to disk.
+        */
+       if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
+               struct mdt_lock_handle *lhc;
+               struct mdt_lock_handle *s0_lh;
+               struct mdt_object *s0_obj = NULL;
+               struct ldlm_enqueue_info *einfo;
+               struct lu_fid *s0_fid = &info->mti_tmp_fid1;
+               bool cos_incompat = false;
+
+               rc = mdt_init_slaves(info, child, s0_fid);
+               if (rc > 0) {
+                       cos_incompat = true;
+                       if (!mdt_object_remote(parent)) {
+                               mdt_object_unlock(info, parent, lh, 1);
+                               mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+                               rc = mdt_reint_object_lock(info, parent, lh,
+                                                          MDS_INODELOCK_UPDATE,
+                                                          true);
+                               if (rc)
+                                       GOTO(put_child, rc);
+                       }
+               }
+
+               einfo = &info->mti_einfo;
+               lhc = &info->mti_lh[MDT_LH_CHILD];
+               mdt_lock_handle_init(lhc);
+               mdt_lock_reg_init(lhc, LCK_PW);
+               rc = mdt_reint_object_lock(info, child, lhc,
+                                          MDS_INODELOCK_UPDATE,
+                                          cos_incompat);
+               if (rc)
+                       GOTO(put_child, rc);
+               mdt_object_unlock(info, child, lhc, rc);
+
+               s0_lh = &info->mti_lh[MDT_LH_LOCAL];
+               mdt_lock_handle_init(s0_lh);
+               mdt_lock_reg_init(s0_lh, LCK_PW);
+               rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
+                                    s0_fid, s0_lh, &s0_obj, einfo);
+               mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
+                                 s0_obj, einfo, rc);
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
+                       rc = 0;
+       }
+
+       /* Return fid & attr to client. */
+       if (ma->ma_valid & MA_INODE)
+               mdt_pack_attr2body(info, repbody, &ma->ma_attr,
+                                  mdt_object_fid(child));
+put_child:
+       mdt_object_put(info->mti_env, child);
+unlock_parent:
+       mdt_object_unlock(info, parent, lh, rc);
+put_parent:
+       mdt_object_put(info->mti_env, parent);
+       RETURN(rc);
+}
+
 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                        struct md_attr *ma)
 {
@@ -514,11 +600,17 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct mdt_lock_handle  *s0_lh;
-       struct mdt_object       *s0_obj = NULL;
+       struct lu_fid *s0_fid = &info->mti_tmp_fid1;
+       struct mdt_lock_handle *s0_lh = NULL;
+       struct mdt_object *s0_obj = NULL;
+       bool cos_incompat = false;
        int rc;
        ENTRY;
 
+       rc = mdt_init_slaves(info, mo, s0_fid);
+       if (rc > 0)
+               cos_incompat = true;
+
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
 
@@ -529,13 +621,14 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
 
-       rc = mdt_object_lock(info, mo, lh, lockpart);
+       rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
        s0_lh = &info->mti_lh[MDT_LH_LOCAL];
        mdt_lock_reg_init(s0_lh, LCK_PW);
-       rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo);
+       rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
+                            einfo);
        if (rc != 0)
                GOTO(out_unlock, rc);
 
@@ -571,7 +664,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
 
         EXIT;
 out_unlock:
-       mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo);
+       mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
         mdt_object_unlock(info, mo, lh, rc);
         return rc;
 }
@@ -776,22 +869,24 @@ static int mdt_reint_create(struct mdt_thread_info *info,
  * Version of child is getting and checking during its lookup. If
  */
 static int mdt_reint_unlink(struct mdt_thread_info *info,
-                            struct mdt_lock_handle *lhc)
+                           struct mdt_lock_handle *lhc)
 {
-        struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct md_attr          *ma = &info->mti_attr;
-        struct lu_fid           *child_fid = &info->mti_tmp_fid1;
-        struct mdt_object       *mp;
-        struct mdt_object       *mc;
-        struct mdt_lock_handle  *parent_lh;
-        struct mdt_lock_handle  *child_lh;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct mdt_object *mp;
+       struct mdt_object *mc;
+       struct mdt_lock_handle *parent_lh;
+       struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct mdt_lock_handle  *s0_lh = NULL;
-       struct mdt_object       *s0_obj = NULL;
-       __u64                   lock_ibits;
-       int                     rc;
-       int                     no_name = 0;
+       struct lu_fid *s0_fid = &info->mti_tmp_fid2;
+       struct mdt_lock_handle *s0_lh = NULL;
+       struct mdt_object *s0_obj = NULL;
+       __u64 lock_ibits;
+       bool cos_incompat = false;
+       int no_name = 0;
+       int rc;
        ENTRY;
 
        DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
@@ -800,34 +895,22 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        if (info->mti_dlm_req)
                ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
-                RETURN(err_serious(-ENOENT));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
+               RETURN(err_serious(-ENOENT));
 
        if (!fid_is_md_operative(rr->rr_fid1))
                RETURN(-EPERM);
 
-        /*
-        * step 1: Found the parent.
-         */
        mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
-       if (IS_ERR(mp)) {
-               rc = PTR_ERR(mp);
-               GOTO(out, rc);
-       }
-
-       parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
-       rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE);
-       if (rc != 0)
-               GOTO(put_parent, rc);
+       if (IS_ERR(mp))
+               RETURN(PTR_ERR(mp));
 
        if (!mdt_object_remote(mp)) {
                rc = mdt_version_get_check_save(info, mp, 0);
                if (rc)
-                       GOTO(unlock_parent, rc);
+                       GOTO(put_parent, rc);
        }
 
-       /* step 2: find & lock the child */
        /* lookup child object along with version checking */
        fid_zero(child_fid);
        rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
@@ -852,35 +935,45 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                        no_name = 1;
                        *child_fid = *rr->rr_fid2;
                 } else {
-                       GOTO(unlock_parent, rc);
+                       GOTO(put_parent, rc);
                 }
        }
 
        if (!fid_is_md_operative(child_fid))
-               GOTO(unlock_parent, rc = -EPERM);
+               GOTO(put_parent, rc = -EPERM);
 
        /* We will lock the child regardless it is local or remote. No harm. */
        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
        if (IS_ERR(mc))
-               GOTO(unlock_parent, rc = PTR_ERR(mc));
+               GOTO(put_parent, rc = PTR_ERR(mc));
+
+       rc = mdt_init_slaves(info, mc, s0_fid);
+       cos_incompat = (mdt_object_remote(mp) || (rc > 0));
+
+       parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
+       rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+                                  cos_incompat);
+       if (rc != 0)
+               GOTO(put_child, rc);
 
-        child_lh = &info->mti_lh[MDT_LH_CHILD];
-        mdt_lock_reg_init(child_lh, LCK_EX);
+       child_lh = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(child_lh, LCK_EX);
        if (info->mti_spec.sp_rm_entry) {
                struct lu_ucred *uc  = mdt_ucred(info);
 
                if (!mdt_is_dne_client(req->rq_export))
                        /* Return -ENOTSUPP for old client */
-                       GOTO(put_child, rc = -ENOTSUPP);
+                       GOTO(unlock_parent, rc = -ENOTSUPP);
 
                if (!md_capable(uc, CFS_CAP_SYS_ADMIN))
-                       GOTO(put_child, rc = -EPERM);
+                       GOTO(unlock_parent, rc = -EPERM);
 
                ma->ma_need = MA_INODE;
                ma->ma_valid = 0;
                rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
                                NULL, &rr->rr_name, ma, no_name);
-               GOTO(put_child, rc);
+               GOTO(unlock_parent, rc);
        }
 
        if (mdt_object_remote(mc)) {
@@ -890,7 +983,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                        CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
                               mdt_obd_name(info->mti_mdt),
                               PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
-                       GOTO(put_child, rc = -ENOENT);
+                       GOTO(unlock_parent, rc = -ENOENT);
                }
                CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
                       mdt_obd_name(info->mti_mdt),
@@ -898,7 +991,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
                if (!mdt_is_dne_client(req->rq_export))
                        /* Return -ENOTSUPP for old client */
-                       GOTO(put_child, rc = -ENOTSUPP);
+                       GOTO(unlock_parent, rc = -ENOTSUPP);
 
                /* Revoke the LOOKUP lock of the remote object granted by
                 * this MDT. Since the unlink will happen on another MDT,
@@ -923,14 +1016,16 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                                            child_lh->mlh_rreg_mode,
                                            MDS_INODELOCK_LOOKUP, false);
                if (rc != ELDLM_OK)
-                       GOTO(put_child, rc);
+                       GOTO(unlock_parent, rc);
 
                lock_ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       rc = mdt_object_lock(info, mc, child_lh, lock_ibits);
+       rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
+                                  cos_incompat);
        if (rc != 0)
-               GOTO(put_child, rc);
+               GOTO(unlock_child, rc);
+
        /*
         * Now we can only make sure we need MA_INODE, in mdd layer, will check
         * whether need MA_LOV and MA_COOKIE.
@@ -940,8 +1035,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
        s0_lh = &info->mti_lh[MDT_LH_LOCAL];
        mdt_lock_reg_init(s0_lh, LCK_EX);
-       rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh,
-                            &s0_obj, einfo);
+       rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
+                            s0_lh, &s0_obj, einfo);
        if (rc != 0)
                GOTO(unlock_child, rc);
 
@@ -984,15 +1079,15 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         EXIT;
 
 unlock_child:
-       mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo);
+       mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
+                         rc);
        mdt_object_unlock(info, mc, child_lh, rc);
-put_child:
-       mdt_object_put(info->mti_env, mc);
 unlock_parent:
        mdt_object_unlock(info, mp, parent_lh, rc);
+put_child:
+       mdt_object_put(info->mti_env, mc);
 put_parent:
        mdt_object_put(info->mti_env, mp);
-out:
         return rc;
 }
 
@@ -1001,109 +1096,117 @@ out:
  * name.
  */
 static int mdt_reint_link(struct mdt_thread_info *info,
-                          struct mdt_lock_handle *lhc)
+                         struct mdt_lock_handle *lhc)
 {
-        struct mdt_reint_record *rr = &info->mti_rr;
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct md_attr          *ma = &info->mti_attr;
-        struct mdt_object       *ms;
-        struct mdt_object       *mp;
-        struct mdt_lock_handle  *lhs;
-        struct mdt_lock_handle  *lhp;
-        int rc;
-        ENTRY;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_object       *ms;
+       struct mdt_object       *mp;
+       struct mdt_lock_handle  *lhs;
+       struct mdt_lock_handle  *lhp;
+       bool cos_incompat;
+       int rc;
+       ENTRY;
 
        DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
                  PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
-                RETURN(err_serious(-ENOENT));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
+               RETURN(err_serious(-ENOENT));
 
        if (info->mti_dlm_req)
                ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
-        /* Invalid case so return error immediately instead of
-         * processing it */
-        if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
-                RETURN(-EPERM);
+       /* Invalid case so return error immediately instead of
+        * processing it */
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
+               RETURN(-EPERM);
 
        if (!fid_is_md_operative(rr->rr_fid1) ||
            !fid_is_md_operative(rr->rr_fid2))
                RETURN(-EPERM);
 
-        /* step 1: find & lock the target parent dir */
-        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
-        mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
-                                  MDS_INODELOCK_UPDATE);
-        if (IS_ERR(mp))
-                RETURN(PTR_ERR(mp));
-
-        rc = mdt_version_get_check_save(info, mp, 0);
-        if (rc)
-                GOTO(out_unlock_parent, rc);
-
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
+       /* step 1: find target parent dir */
+       mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
+       if (IS_ERR(mp))
+               RETURN(PTR_ERR(mp));
 
-        /* step 2: find & lock the source */
-        lhs = &info->mti_lh[MDT_LH_CHILD];
-        mdt_lock_reg_init(lhs, LCK_EX);
+       rc = mdt_version_get_check_save(info, mp, 0);
+       if (rc)
+               GOTO(put_parent, rc);
 
-        ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
-        if (IS_ERR(ms))
-                GOTO(out_unlock_parent, rc = PTR_ERR(ms));
+       /* step 2: find source */
+       ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(ms))
+               GOTO(put_parent, rc = PTR_ERR(ms));
 
        if (!mdt_object_exists(ms)) {
-               mdt_object_put(info->mti_env, ms);
                CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
                       mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
-               GOTO(out_unlock_parent, rc = -ENOENT);
+               GOTO(put_source, rc = -ENOENT);
        }
 
-       rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE |
-                            MDS_INODELOCK_XATTR);
-        if (rc != 0) {
-                mdt_object_put(info->mti_env, ms);
-                GOTO(out_unlock_parent, rc);
-        }
+       cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
 
-        /* step 3: link it */
-        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                       OBD_FAIL_MDS_REINT_LINK_WRITE);
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
+       rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
+                                  cos_incompat);
+       if (rc != 0)
+               GOTO(put_source, rc);
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
+
+       lhs = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(lhs, LCK_EX);
+       rc = mdt_reint_object_lock(info, ms, lhs,
+                                  MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
+                                  cos_incompat);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
+
+       /* step 3: link it */
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                       OBD_FAIL_MDS_REINT_LINK_WRITE);
 
        tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
-        rc = mdt_version_get_check_save(info, ms, 1);
-        if (rc)
-                GOTO(out_unlock_child, rc);
+       rc = mdt_version_get_check_save(info, ms, 1);
+       if (rc)
+               GOTO(unlock_source, rc);
 
-        /** check target version by name during replay */
+       /** check target version by name during replay */
        rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
                                      &info->mti_tmp_fid1, 2);
-        if (rc != 0 && rc != -ENOENT)
-                GOTO(out_unlock_child, rc);
-        /* save version of file name for replay, it must be ENOENT here */
-        if (!req_is_replay(mdt_info_req(info))) {
+       if (rc != 0 && rc != -ENOENT)
+               GOTO(unlock_source, rc);
+       /* save version of file name for replay, it must be ENOENT here */
+       if (!req_is_replay(mdt_info_req(info))) {
                if (rc != -ENOENT) {
                        CDEBUG(D_INFO, "link target "DNAME" existed!\n",
                               PNAME(&rr->rr_name));
-                       GOTO(out_unlock_child, rc = -EEXIST);
+                       GOTO(unlock_source, rc = -EEXIST);
                }
-                info->mti_ver[2] = ENOENT_VERSION;
-                mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
-        }
+               info->mti_ver[2] = ENOENT_VERSION;
+               mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
+       }
 
        rc = mdo_link(info->mti_env, mdt_object_child(mp),
-             mdt_object_child(ms), &rr->rr_name, ma);
+                     mdt_object_child(ms), &rr->rr_name, ma);
 
-        if (rc == 0)
+       if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_LINK);
 
-        EXIT;
-out_unlock_child:
-        mdt_object_unlock_put(info, ms, lhs, rc);
-out_unlock_parent:
-        mdt_object_unlock_put(info, mp, lhp, rc);
-        return rc;
+       EXIT;
+unlock_source:
+       mdt_object_unlock(info, ms, lhs, rc);
+unlock_parent:
+       mdt_object_unlock(info, mp, lhp, rc);
+put_source:
+       mdt_object_put(info->mti_env, ms);
+put_parent:
+       mdt_object_put(info->mti_env, mp);
+       return rc;
 }
 /**
  * lock the part of the directory according to the hash of the name
@@ -1111,11 +1214,13 @@ out_unlock_parent:
  */
 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
                              struct mdt_lock_handle *lh,
-                             struct mdt_object *obj, __u64 ibits)
+                             struct mdt_object *obj, __u64 ibits,
+                             bool cos_incompat)
 {
        struct ldlm_res_id *res = &info->mti_res_id;
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
+       __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
        int rc;
 
        /*
@@ -1126,14 +1231,16 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
        fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
+       if (cos_incompat &&
+           (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+               dlmflags |= LDLM_FL_COS_INCOMPAT;
        /*
         * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
         * going to be sent to client. If it is - mdt_intent_policy() path will
         * fix it up and turn FL_LOCAL flag off.
         */
        rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                         res, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
-                         &info->mti_exp->exp_handle.h_cookie);
+                         res, dlmflags, &info->mti_exp->exp_handle.h_cookie);
        return rc;
 }
 
@@ -1335,8 +1442,8 @@ again:
                 * cannot be gotten because of conflicting locks, then drop all
                 * current locks, send an AST to the client, and start again. */
                mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
-               rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh,
-                                        MDS_INODELOCK_UPDATE);
+               rc = mdt_reint_object_lock_try(info, mdt_pobj, &mll->mll_lh,
+                                               MDS_INODELOCK_UPDATE, true);
                if (rc == 0) {
                        mdt_unlock_list(info, lock_list, rc);
 
@@ -1426,8 +1533,8 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
 
        lh_dirp = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
-       rc = mdt_object_lock(info, msrcdir, lh_dirp,
-                            MDS_INODELOCK_UPDATE);
+       rc = mdt_reint_object_lock(info, msrcdir, lh_dirp, MDS_INODELOCK_UPDATE,
+                                  true);
        if (rc)
                GOTO(out_put_parent, rc);
 
@@ -1547,7 +1654,7 @@ out_lease:
                lock_ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       rc = mdt_object_lock(info, mold, lh_childp, lock_ibits);
+       rc = mdt_reint_object_lock(info, mold, lh_childp, lock_ibits, true);
        if (rc != 0)
                GOTO(out_unlock_child, rc);
 
@@ -1645,7 +1752,10 @@ out_put_new:
 out_unlock_child:
        mdt_object_unlock(info, mold, lh_childp, rc);
 out_unlock_list:
-       mdt_unlock_list(info, &lock_list, rc);
+       /* we don't really modify linkea objects, so we can safely decref these
+        * locks, and this can avoid saving them as COS locks, which may prevent
+        * subsequent migrate. */
+       mdt_unlock_list(info, &lock_list, 1);
        if (lease != NULL) {
                ldlm_reprocess_all(lease->l_resource);
                LDLM_LOCK_PUT(lease);
@@ -1689,12 +1799,13 @@ out_put:
 static int mdt_object_lock_save(struct mdt_thread_info *info,
                                struct mdt_object *dir,
                                struct mdt_lock_handle *lh,
-                               int idx)
+                               int idx, bool cos_incompat)
 {
        int rc;
 
        /* we lock the target dir if it is local */
-       rc = mdt_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
+                                  cos_incompat);
        if (rc != 0)
                return rc;
 
@@ -1703,146 +1814,125 @@ static int mdt_object_lock_save(struct mdt_thread_info *info,
        return 0;
 }
 
-
-static int mdt_rename_parents_lock(struct mdt_thread_info *info,
-                                  struct mdt_object **srcp,
-                                  struct mdt_object **tgtp)
+/*
+ * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
+ * 2 - srcdir child; 3 - tgtdir child.
+ * Update on disk version of srcdir child.
+ */
+/**
+ * For DNE phase I, only these renames are allowed
+ *     mv src_p/src_c tgt_p/tgt_c
+ * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT.
+ * 2. src_p and tgt_p are same directory, and tgt_c does not
+ *    exists. In this case, all of modification will happen
+ *    in the MDT where ithesource parent is, only one remote
+ *    update is needed, i.e. set c_time/m_time on the child.
+ *    And tgt_c will be still in the same MDT as the original
+ *    src_c.
+ */
+static int mdt_reint_rename_internal(struct mdt_thread_info *info,
+                                    struct mdt_lock_handle *lhc)
 {
        struct mdt_reint_record *rr = &info->mti_rr;
-       const struct lu_fid     *fid_src = rr->rr_fid1;
-       const struct lu_fid     *fid_tgt = rr->rr_fid2;
-       struct mdt_lock_handle  *lh_src = &info->mti_lh[MDT_LH_PARENT];
-       struct mdt_lock_handle  *lh_tgt = &info->mti_lh[MDT_LH_CHILD];
-       struct mdt_object       *src;
-       struct mdt_object       *tgt;
-       int                      reverse = 0;
-       int                      rc;
+       struct md_attr *ma = &info->mti_attr;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_object *msrcdir = NULL;
+       struct mdt_object *mtgtdir = NULL;
+       struct mdt_object *mold;
+       struct mdt_object *mnew = NULL;
+       struct mdt_lock_handle *lh_srcdirp;
+       struct mdt_lock_handle *lh_tgtdirp;
+       struct mdt_lock_handle *lh_oldp = NULL;
+       struct mdt_lock_handle *lh_newp = NULL;
+       struct lu_fid *old_fid = &info->mti_tmp_fid1;
+       struct lu_fid *new_fid = &info->mti_tmp_fid2;
+       __u64 lock_ibits;
+       bool reverse = false;
+       bool cos_incompat;
+       int rc;
        ENTRY;
 
+       DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
+                 PFID(rr->rr_fid1), PNAME(&rr->rr_name),
+                 PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
+
        /* find both parents. */
-       src = mdt_object_find_check(info, fid_src, 0);
-       if (IS_ERR(src))
-               RETURN(PTR_ERR(src));
+       msrcdir = mdt_object_find_check(info, rr->rr_fid1, 0);
+       if (IS_ERR(msrcdir))
+               RETURN(PTR_ERR(msrcdir));
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
 
-       if (lu_fid_eq(fid_src, fid_tgt)) {
-               tgt = src;
-               mdt_object_get(info->mti_env, tgt);
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
+               mtgtdir = msrcdir;
+               mdt_object_get(info->mti_env, mtgtdir);
        } else {
-               /* Check if the @src is not a child of the @tgt, otherwise a
-                * reverse locking must take place. */
-               rc = mdt_is_subdir(info, src, fid_tgt);
+               /* Check if the @msrcdir is not a child of the @mtgtdir,
+                * otherwise a reverse locking must take place. */
+               rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2);
                if (rc == -EINVAL)
-                       reverse = 1;
+                       reverse = true;
                else if (rc)
-                       GOTO(err_src_put, rc);
+                       GOTO(out_put_srcdir, rc);
 
-               tgt = mdt_object_find_check(info, fid_tgt, 1);
-               if (IS_ERR(tgt))
-                       GOTO(err_src_put, rc = PTR_ERR(tgt));
+               mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1);
+               if (IS_ERR(mtgtdir))
+                       GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
        }
 
+       /* source needs to be looked up after locking source parent, otherwise
+        * this rename may race with unlink source, and cause rename hang, see
+        * sanityn.sh 55b, so check parents first, if later we found source is
+        * remote, relock parents. */
+       cos_incompat = (mdt_object_remote(msrcdir) ||
+                       mdt_object_remote(mtgtdir));
+
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
 
        /* lock parents in the proper order. */
+       lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
+       lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
+
+relock:
+       mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
+       mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
+
        if (reverse) {
-               rc = mdt_object_lock_save(info, tgt, lh_tgt, 1);
+               rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
+                                         cos_incompat);
                if (rc)
-                       GOTO(err_tgt_put, rc);
+                       GOTO(out_put_tgtdir, rc);
 
                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
 
-               rc = mdt_object_lock_save(info, src, lh_src, 0);
+               rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
+                                         cos_incompat);
        } else {
-               rc = mdt_object_lock_save(info, src, lh_src, 0);
+               rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
+                                         cos_incompat);
                if (rc)
-                       GOTO(err_tgt_put, rc);
+                       GOTO(out_put_tgtdir, rc);
 
                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
 
-               if (tgt != src)
-                       rc = mdt_object_lock_save(info, tgt, lh_tgt, 1);
-               else if (lh_src->mlh_pdo_hash != lh_tgt->mlh_pdo_hash) {
-                       rc = mdt_pdir_hash_lock(info, lh_tgt, tgt,
-                                               MDS_INODELOCK_UPDATE);
+               if (mtgtdir != msrcdir) {
+                       rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
+                                                 cos_incompat);
+               } else if (lh_srcdirp->mlh_pdo_hash !=
+                          lh_tgtdirp->mlh_pdo_hash) {
+                       rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+                                               MDS_INODELOCK_UPDATE,
+                                               cos_incompat);
                        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
                }
        }
        if (rc)
-               GOTO(err_unlock, rc);
+               GOTO(out_unlock_parents, rc);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
-
-       *srcp = src;
-       *tgtp = tgt;
-       RETURN(0);
-
-err_unlock:
-       /* The order does not matter as the handle is checked inside,
-        * as well as not used handle. */
-       mdt_object_unlock(info, src, lh_src, rc);
-       mdt_object_unlock(info, tgt, lh_tgt, rc);
-err_tgt_put:
-       mdt_object_put(info->mti_env, tgt);
-err_src_put:
-       mdt_object_put(info->mti_env, src);
-       RETURN(rc);
-}
-
-/*
- * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
- * 2 - src child; 3 - tgt child.
- * Update on disk version of src child.
- */
-/**
- * For DNE phase I, only these renames are allowed
- *     mv src_p/src_c tgt_p/tgt_c
- * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT.
- * 2. src_p and tgt_p are same directory, and tgt_c does not
- *    exists. In this case, all of modification will happen
- *    in the MDT where ithesource parent is, only one remote
- *    update is needed, i.e. set c_time/m_time on the child.
- *    And tgt_c will be still in the same MDT as the original
- *    src_c.
- */
-static int mdt_reint_rename_internal(struct mdt_thread_info *info,
-                                    struct mdt_lock_handle *lhc)
-{
-       struct mdt_reint_record *rr = &info->mti_rr;
-       struct md_attr          *ma = &info->mti_attr;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct mdt_object       *msrcdir = NULL;
-       struct mdt_object       *mtgtdir = NULL;
-       struct mdt_object       *mold;
-       struct mdt_object       *mnew = NULL;
-       struct mdt_lock_handle  *lh_srcdirp;
-       struct mdt_lock_handle  *lh_tgtdirp;
-       struct mdt_lock_handle  *lh_oldp = NULL;
-       struct mdt_lock_handle  *lh_newp = NULL;
-       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
-       struct lu_fid           *new_fid = &info->mti_tmp_fid2;
-       __u64                   lock_ibits;
-       int                      rc;
-       ENTRY;
-
-       DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
-                 PFID(rr->rr_fid1), PNAME(&rr->rr_name),
-                 PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
-
-       lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
-       lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
-
-       /* step 1&2: lock the source and target dirs. */
-       rc = mdt_rename_parents_lock(info, &msrcdir, &mtgtdir);
-       if (rc)
-               RETURN(rc);
-
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
 
-       /* step 3: find & lock the old object. */
+       /* find mold object. */
        fid_zero(old_fid);
        rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
        if (rc != 0)
@@ -1868,9 +1958,17 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
        /* save version after locking */
        mdt_version_get_save(info, mold, 2);
 
-       /* step 4: find & lock the new object. */
-       /* new target object may not exist now */
-       /* lookup with version checking */
+       if (!cos_incompat && mdt_object_remote(mold)) {
+               cos_incompat = true;
+               mdt_object_put(info->mti_env, mold);
+               mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
+               mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
+               goto relock;
+       }
+
+       /* find mnew object:
+        * mnew target object may not exist now
+        * lookup with version checking */
        fid_zero(new_fid);
        rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
                                      3);
@@ -1911,7 +2009,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
 
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
                mdt_lock_reg_init(lh_oldp, LCK_EX);
-
                lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
                if (mdt_object_remote(msrcdir)) {
                        /* Enqueue lookup lock from the parent MDT */
@@ -1927,7 +2024,8 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                        lock_ibits &= ~MDS_INODELOCK_LOOKUP;
                }
 
-               rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits);
+               rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
+                                          cos_incompat);
                if (rc != 0)
                        GOTO(out_unlock_old, rc);
 
@@ -1944,9 +2042,10 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
 
                lh_newp = &info->mti_lh[MDT_LH_NEW];
                mdt_lock_reg_init(lh_newp, LCK_EX);
-               rc = mdt_object_lock(info, mnew, lh_newp,
-                                    MDS_INODELOCK_LOOKUP |
-                                    MDS_INODELOCK_UPDATE);
+               rc = mdt_reint_object_lock(info, mnew, lh_newp,
+                                          MDS_INODELOCK_LOOKUP |
+                                          MDS_INODELOCK_UPDATE,
+                                          cos_incompat);
                if (rc != 0)
                        GOTO(out_unlock_old, rc);
 
@@ -1957,7 +2056,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
        } else {
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
                mdt_lock_reg_init(lh_oldp, LCK_EX);
-
                lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
                if (mdt_object_remote(msrcdir)) {
                        /* Enqueue lookup lock from the parent MDT */
@@ -1968,14 +2066,15 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                                                    MDS_INODELOCK_LOOKUP,
                                                    false);
                        if (rc != ELDLM_OK)
-                               GOTO(out_put_new, rc);
+                               GOTO(out_put_old, rc);
 
                        lock_ibits &= ~MDS_INODELOCK_LOOKUP;
                }
 
-               rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits);
+               rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
+                                          cos_incompat);
                if (rc != 0)
-                       GOTO(out_put_old, rc);
+                       GOTO(out_unlock_old, rc);
 
                mdt_enoent_version_save(info, 3);
        }
@@ -2018,8 +2117,12 @@ out_put_new:
 out_put_old:
        mdt_object_put(info->mti_env, mold);
 out_unlock_parents:
-       mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
-       mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
+       mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
+       mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
+out_put_tgtdir:
+       mdt_object_put(info->mti_env, mtgtdir);
+out_put_srcdir:
+       mdt_object_put(info->mti_env, msrcdir);
        return rc;
 }
 
index 26996f6..65efd68 100644 (file)
@@ -904,6 +904,8 @@ static int osp_md_object_lock(const struct lu_env *env,
 
        if (einfo->ei_nonblock)
                flags |= LDLM_FL_BLOCK_NOWAIT;
+       if (einfo->ei_mode & (LCK_EX | LCK_PW))
+               flags |= LDLM_FL_COS_INCOMPAT;
 
        req = ldlm_enqueue_pack(osp->opd_exp, 0);
        if (IS_ERR(req))
@@ -923,13 +925,6 @@ static int osp_md_object_lock(const struct lu_env *env,
                              &flags, NULL, 0, LVB_T_NONE, lh, 0);
 
        ptlrpc_req_finished(req);
-       if (rc == ELDLM_OK) {
-               struct ldlm_lock *lock;
-
-               lock = __ldlm_handle2lock(lh, 0);
-               ldlm_set_cbpending(lock);
-               LDLM_LOCK_PUT(lock);
-       }
 
        return rc == ELDLM_OK ? 0 : -EIO;
 }
index f7a1171..ac96517 100644 (file)
@@ -1284,6 +1284,7 @@ int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
                   tgt->lut_obd->obd_last_committed) {
                rc = dt_object_sync(env, obj, start, end);
        }
+       atomic_inc(&tgt->lut_sync_count);
 
        RETURN(rc);
 }
@@ -1292,14 +1293,27 @@ EXPORT_SYMBOL(tgt_sync);
  * Unified target DLM handlers.
  */
 
-/* Ensure that data and metadata are synced to the disk when lock is cancelled
- * (if requested) */
+/**
+ * Unified target BAST
+ *
+ * Ensure data and metadata are synced to disk when lock is canceled if Sync on
+ * Cancel (SOC) is enabled. If it's extent lock, normally sync obj is enough,
+ * but if it's cross-MDT lock, because remote object version is not set, a
+ * filesystem sync is needed.
+ *
+ * \param lock server side lock
+ * \param desc lock desc
+ * \param data ldlm_cb_set_arg
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval     0 on success
+ * \retval     negative number on error
+ */
 static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
        struct lu_env            env;
        struct lu_target        *tgt;
-       struct dt_object        *obj;
+       struct dt_object        *obj = NULL;
        struct lu_fid            fid;
        int                      rc = 0;
 
@@ -1314,10 +1328,12 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
        }
 
        if (flag == LDLM_CB_CANCELING &&
-           (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) &&
+           (lock->l_granted_mode & (LCK_EX | LCK_PW | LCK_GROUP)) &&
            (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
             (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
-             lock->l_flags & LDLM_FL_CBPENDING))) {
+             ldlm_is_cbpending(lock))) &&
+           ((exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) ||
+            lock->l_resource->lr_type == LDLM_EXTENT)) {
                __u64 start = 0;
                __u64 end = OBD_OBJECT_EOF;
 
@@ -1327,14 +1343,15 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                ost_fid_from_resid(&fid, &lock->l_resource->lr_name,
                                   tgt->lut_lsd.lsd_osd_index);
-               obj = dt_locate(&env, tgt->lut_bottom, &fid);
-               if (IS_ERR(obj))
-                       GOTO(err_env, rc = PTR_ERR(obj));
-
-               if (!dt_object_exists(obj))
-                       GOTO(err_put, rc = -ENOENT);
 
                if (lock->l_resource->lr_type == LDLM_EXTENT) {
+                       obj = dt_locate(&env, tgt->lut_bottom, &fid);
+                       if (IS_ERR(obj))
+                               GOTO(err_env, rc = PTR_ERR(obj));
+
+                       if (!dt_object_exists(obj))
+                               GOTO(err_put, rc = -ENOENT);
+
                        start = lock->l_policy_data.l_extent.start;
                        end = lock->l_policy_data.l_extent.end;
                }
@@ -1348,7 +1365,8 @@ static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                               lock->l_policy_data.l_extent.end, rc);
                }
 err_put:
-               lu_object_put(&env, &obj->do_lu);
+               if (obj != NULL)
+                       lu_object_put(&env, &obj->do_lu);
 err_env:
                lu_env_fini(&env);
        }
index 824a9c7..8d2fc37 100644 (file)
@@ -287,4 +287,6 @@ int sub_thandle_trans_create(const struct lu_env *env,
 void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new);
 int top_trans_create_tmt(const struct lu_env *env,
                         struct top_thandle *top_th);
+
+void tgt_cancel_slc_locks(__u64 transno);
 #endif /* _TG_INTERNAL_H */
index 1c8f206..bf44695 100644 (file)
@@ -781,7 +781,9 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
                spin_unlock(&ccb->llcc_tgt->lut_translock);
+
                ptlrpc_commit_replies(ccb->llcc_exp);
+               tgt_cancel_slc_locks(ccb->llcc_transno);
        } else {
                spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
index 0f0fcd3..391f295 100644 (file)
 #include "tgt_internal.h"
 #include "../ptlrpc/ptlrpc_internal.h"
 
+static spinlock_t uncommitted_slc_locks_guard;
+static struct list_head uncommitted_slc_locks;
+
+/*
+ * Save cross-MDT lock in uncommitted_slc_locks.
+ *
+ * Lock R/W count is not saved, but released in unlock (not canceled remotely),
+ * instead only a refcount is taken, so that the remote MDT where the object
+ * resides can detect conflict with this lock there.
+ *
+ * \param lock cross-MDT lock to save
+ * \param transno when the transaction with this transno is committed, this lock
+ *               can be canceled.
+ */
+void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno)
+{
+       spin_lock(&uncommitted_slc_locks_guard);
+       lock_res_and_lock(lock);
+       if (ldlm_is_cbpending(lock)) {
+               /* if it was canceld by server, don't save, because remote MDT
+                * will do Sync-on-Cancel. */
+               LDLM_LOCK_PUT(lock);
+       } else {
+               lock->l_transno = transno;
+               /* if this lock is in the list already, there are two operations
+                * both use this lock, and save it after use, so for the second
+                * one, just put the refcount. */
+               if (list_empty(&lock->l_slc_link))
+                       list_add_tail(&lock->l_slc_link,
+                                     &uncommitted_slc_locks);
+               else
+                       LDLM_LOCK_PUT(lock);
+       }
+       unlock_res_and_lock(lock);
+       spin_unlock(&uncommitted_slc_locks_guard);
+}
+EXPORT_SYMBOL(tgt_save_slc_lock);
+
+/*
+ * Discard cross-MDT lock from uncommitted_slc_locks.
+ *
+ * This is called upon BAST, just remove lock from uncommitted_slc_locks and put
+ * lock refcount. The BAST will cancel this lock.
+ *
+ * \param lock cross-MDT lock to discard
+ */
+void tgt_discard_slc_lock(struct ldlm_lock *lock)
+{
+       spin_lock(&uncommitted_slc_locks_guard);
+       lock_res_and_lock(lock);
+       /* may race with tgt_cancel_slc_locks() */
+       if (lock->l_transno != 0) {
+               LASSERT(!list_empty(&lock->l_slc_link));
+               LASSERT(ldlm_is_cbpending(lock));
+               list_del_init(&lock->l_slc_link);
+               lock->l_transno = 0;
+               LDLM_LOCK_PUT(lock);
+       }
+       unlock_res_and_lock(lock);
+       spin_unlock(&uncommitted_slc_locks_guard);
+}
+EXPORT_SYMBOL(tgt_discard_slc_lock);
+
+/*
+ * Cancel cross-MDT locks upon transaction commit.
+ *
+ * Remove cross-MDT locks from uncommitted_slc_locks, cancel them and put lock
+ * refcount.
+ *
+ * \param transno transaction with this number was committed.
+ */
+void tgt_cancel_slc_locks(__u64 transno)
+{
+       struct ldlm_lock *lock, *next;
+       LIST_HEAD(list);
+       struct lustre_handle lockh;
+
+       spin_lock(&uncommitted_slc_locks_guard);
+       list_for_each_entry_safe(lock, next, &uncommitted_slc_locks,
+                                l_slc_link) {
+               lock_res_and_lock(lock);
+               LASSERT(lock->l_transno != 0);
+               if (lock->l_transno > transno) {
+                       unlock_res_and_lock(lock);
+                       continue;
+               }
+               /* ouch, another operation is using it after it's saved */
+               if (lock->l_readers != 0 || lock->l_writers != 0) {
+                       unlock_res_and_lock(lock);
+                       continue;
+               }
+               /* set CBPENDING so that this lock won't be used again */
+               ldlm_set_cbpending(lock);
+               lock->l_transno = 0;
+               list_move(&lock->l_slc_link, &list);
+               unlock_res_and_lock(lock);
+       }
+       spin_unlock(&uncommitted_slc_locks_guard);
+
+       list_for_each_entry_safe(lock, next, &list, l_slc_link) {
+               list_del_init(&lock->l_slc_link);
+               ldlm_lock2handle(lock, &lockh);
+               ldlm_cli_cancel(&lockh, LCF_ASYNC);
+               LDLM_LOCK_PUT(lock);
+       }
+}
+
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
             struct obd_device *obd, struct dt_device *dt,
             struct tgt_opc_slice *slice, int request_fail_id,
@@ -146,6 +253,8 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        if (rc < 0)
                GOTO(out, rc);
 
+       atomic_set(&lut->lut_sync_count, 0);
+
        RETURN(0);
 
 out:
@@ -295,6 +404,9 @@ int tgt_mod_init(void)
 
        update_info_init();
 
+       spin_lock_init(&uncommitted_slc_locks_guard);
+       INIT_LIST_HEAD(&uncommitted_slc_locks);
+
        RETURN(0);
 }
 
index 5fe683c..7a6d88e 100755 (executable)
@@ -4753,6 +4753,59 @@ test_70d() {
 }
 run_test 70d "stop MDT1, mkdir succeed, create remote dir fail"
 
+test_70e() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+
+       [ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.7.62) ] ||
+               { skip "Need MDS version at least 2.7.62"; return 0; }
+
+       cleanup || error "cleanup failed with $?"
+
+       local mdsdev=$(mdsdevname 1)
+       local ostdev=$(ostdevname 1)
+       local mdsvdev=$(mdsvdevname 1)
+       local ostvdev=$(ostvdevname 1)
+       local opts_mds="$(mkfs_opts mds1 $mdsdev) --reformat $mdsdev $mdsvdev"
+       local opts_ost="$(mkfs_opts ost1 $ostdev) --reformat $ostdev $ostvdev"
+
+       add mds1 $opts_mds || error "add mds1 failed"
+       start_mdt 1 || error "start mdt1 failed"
+       add ost1 $opts_ost || error "add ost1 failed"
+       start_ost || error "start ost failed"
+       mount_client $MOUNT > /dev/null || error "mount client $MOUNT failed"
+
+       local soc=$(do_facet mds1 "$LCTL get_param -n \
+                   mdt.*MDT0000.sync_lock_cancel")
+       [ $soc == "never" ] || error "SoC enabled on single MDS"
+
+       for i in $(seq 2 $MDSCOUNT); do
+               mdsdev=$(mdsdevname $i)
+               mdsvdev=$(mdsvdevname $i)
+               opts_mds="$(mkfs_opts mds$i $mdsdev) --reformat $mdsdev \
+                         $mdsvdev"
+               add mds$i $opts_mds || error "add mds$i failed"
+               start_mdt $i || error "start mdt$i fail"
+       done
+
+       wait_dne_interconnect
+
+       for i in $(seq $MDSCOUNT); do
+               soc=$(do_facet mds$i "$LCTL get_param -n \
+                       mdt.*MDT000$((i - 1)).sync_lock_cancel")
+               [ $soc == "blocking" ] || error "SoC not enabled on DNE"
+       done
+
+       for i in $(seq 2 $MDSCOUNT); do
+               stop_mdt $i || error "stop mdt$i fail"
+       done
+       soc=$(do_facet mds1 "$LCTL get_param -n \
+               mdt.*MDT0000.sync_lock_cancel")
+       [ $soc == "never" ] || error "SoC enabled on single MDS"
+
+       cleanup || error "cleanup failed with $?"
+}
+run_test 70e "Sync-on-Cancel will be enabled by default on DNE"
+
 test_71a() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
        if combined_mgs_mds; then
index dbaffb0..5bb7c8a 100644 (file)
@@ -925,7 +925,7 @@ test_33b() {
                avgjbd=0
                avgtime=0
                for i in 1 2 3; do
-                       do_node $CLIENT1 "$LFS mkdir -i $MDTIDX -p \
+                       do_node $CLIENT1 "$LFS mkdir -i $MDTIDX \
                                          $DIR1/$tdir-\\\$(hostname)-$i"
 
                        jbdold=$(print_jbd_stat)
@@ -958,6 +958,118 @@ test_33b() {
 }
 run_test 33b "COS: cross create/delete, 2 clients, benchmark under remote dir"
 
+test_33c() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] &&
+               skip "DNE CoS not supported" && return
+
+       sync
+
+       mkdir $DIR/$tdir
+       # remote mkdir is done on MDT2, which enqueued lock of $tdir on MDT1
+       $LFS mkdir -i 1 $DIR/$tdir/d1
+       do_facet mds1 "lctl set_param -n mdt.*.sync_count=0"
+       mkdir $DIR/$tdir/d2
+       local sync_count=$(do_facet mds1 \
+               "lctl get_param -n mdt.*MDT0000.sync_count")
+       [ $sync_count -eq 1 ] || error "Sync-Lock-Cancel not triggered"
+
+       $LFS mkdir -i 1 $DIR/$tdir/d3
+       do_facet mds1 "lctl set_param -n mdt.*.sync_count=0"
+       # during sleep remote mkdir should have been committed and canceled
+       # remote lock spontaneously, which shouldn't trigger sync
+       sleep 6
+       mkdir $DIR/$tdir/d4
+       local sync_count=$(do_facet mds1 \
+               "lctl get_param -n mdt.*MDT0000.sync_count")
+       [ $sync_count -eq 0 ] || error "Sync-Lock-Cancel triggered"
+}
+run_test 33c "Cancel cross-MDT lock should trigger Sync-Lock-Cancel"
+
+ops_do_cos() {
+       local nodes=$(comma_list $(mdts_nodes))
+       do_nodes $nodes "lctl set_param -n mdt.*.async_commit_count=0"
+       sh -c "$@"
+       local async_commit_count=$(do_nodes $nodes \
+               "lctl get_param -n mdt.*.async_commit_count" | calc_sum)
+       [ $async_commit_count -gt 0 ] || error "CoS not triggerred"
+
+       rm -rf $DIR/$tdir
+       sync
+}
+
+test_33d() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] &&
+               skip "DNE CoS not supported" && return
+
+       sync
+       # remote directory create
+       mkdir $DIR/$tdir
+       ops_do_cos "$LFS mkdir -i 1 $DIR/$tdir/subdir"
+       # remote directory unlink
+       $LFS mkdir -i 1 $DIR/$tdir
+       ops_do_cos "rmdir $DIR/$tdir"
+       # striped directory create
+       mkdir $DIR/$tdir
+       ops_do_cos "$LFS mkdir -c 2 $DIR/$tdir/subdir"
+       # striped directory setattr
+       $LFS mkdir -c 2 $DIR/$tdir
+       touch $DIR/$tdir
+       ops_do_cos "chmod 713 $DIR/$tdir"
+       # striped directory unlink
+       $LFS mkdir -c 2 $DIR/$tdir
+       touch $DIR/$tdir
+       ops_do_cos "rmdir $DIR/$tdir"
+       # cross-MDT link
+       $LFS mkdir -c 2 $DIR/$tdir
+       $LFS mkdir -i 0 $DIR/$tdir/d1
+       $LFS mkdir -i 1 $DIR/$tdir/d2
+       touch $DIR/$tdir/d1/tgt
+       ops_do_cos "ln $DIR/$tdir/d1/tgt $DIR/$tdir/d2/src"
+       # cross-MDT rename
+       $LFS mkdir -c 2 $DIR/$tdir
+       $LFS mkdir -i 0 $DIR/$tdir/d1
+       $LFS mkdir -i 1 $DIR/$tdir/d2
+       touch $DIR/$tdir/d1/src
+       ops_do_cos "mv $DIR/$tdir/d1/src $DIR/$tdir/d2/tgt"
+       # migrate
+       $LFS mkdir -i 0 $DIR/$tdir
+       ops_do_cos "$LFS migrate -m 1 $DIR/$tdir"
+       return 0
+}
+run_test 33d "DNE distributed operation should trigger COS"
+
+test_33e() {
+       [ -n "$CLIENTS" ] || { skip "Need two or more clients" && return 0; }
+       [ $CLIENTCOUNT -ge 2 ] ||
+               { skip "Need two or more clients, have $CLIENTCOUNT" &&
+                                                               return 0; }
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.63) ] &&
+               skip "DNE CoS not supported" && return
+
+       local client2=${CLIENT2:-$(hostname)}
+
+       sync
+
+       local nodes=$(comma_list $(mdts_nodes))
+       do_nodes $nodes "lctl set_param -n mdt.*.async_commit_count=0"
+
+       $LFS mkdir -c 2 $DIR/$tdir
+       mkdir $DIR/$tdir/subdir
+       echo abc > $DIR/$tdir/$tfile
+       do_node $client2 echo dfg >> $DIR/$tdir/$tfile
+       do_node $client2 touch $DIR/$tdir/subdir
+
+       local async_commit_count=$(do_nodes $nodes \
+               "lctl get_param -n mdt.*.async_commit_count" | calc_sum)
+       [ $async_commit_count -gt 0 ] && error "CoS triggerred"
+
+       return 0
+}
+run_test 33e "DNE local operation shouldn't trigger COS"
+
 # End commit on sharing tests
 
 get_ost_lock_timeouts() {
@@ -3434,6 +3546,11 @@ run_test 91 "chmod and unlink striped directory"
 
 log "cleanup: ======================================================"
 
+# kill and wait in each test only guarentee script finish, but command in script
+# like 'rm' 'chmod' may still be running, wait for all commands to finish
+# otherwise umount below will fail
+wait_update $HOSTNAME "fuser -m $MOUNT2" "" || true
+
 [ "$(mount | grep $MOUNT2)" ] && umount $MOUNT2
 
 complete $SECONDS
index f7feed1..2e5f9bd 100644 (file)
@@ -3017,6 +3017,7 @@ static int cb_migrate_mdt_init(char *path, DIR *parent, DIR **dirp,
        int                     fd;
        int                     ret;
        char                    *filename;
+       bool                    retry = false;
 
        LASSERT(parent != NULL || dirp != NULL);
        if (dirp != NULL)
@@ -3047,8 +3048,19 @@ static int cb_migrate_mdt_init(char *path, DIR *parent, DIR **dirp,
                goto out;
        }
 
+migrate:
        ret = ioctl(fd, LL_IOC_MIGRATE, rawbuf);
        if (ret != 0) {
+               if (errno == EBUSY && !retry) {
+                       /* because migrate may not be able to lock all involved
+                        * objects in order, for some of them it try lock, while
+                        * there may be conflicting COS locks and cause migrate
+                        * fail with EBUSY, hope a sync() could cause
+                        * transaction commit and release these COS locks. */
+                       sync();
+                       retry = true;
+                       goto migrate;
+               }
                ret = -errno;
                fprintf(stderr, "%s migrate failed: %s (%d)\n",
                        path, strerror(-ret), ret);