Whamcloud - gitweb
b=18143 Make VBR compatible with pdirops.
authorMikhail Pershin <tappro@sun.com>
Sun, 9 May 2010 09:09:23 +0000 (13:09 +0400)
committerRobert Read <robert.read@oracle.com>
Mon, 10 May 2010 18:13:30 +0000 (11:13 -0700)
i=zam
i=bzzz

12 files changed:
lustre/include/lustre_disk.h
lustre/ldlm/ldlm_request.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/tests/replay-dual.sh
lustre/tests/replay-vbr.sh

index 04caa81..268d531 100644 (file)
@@ -204,6 +204,7 @@ struct lustre_mount_data {
 #define LR_EPOCH_BITS   32
 #define lr_epoch(a) ((a) >> LR_EPOCH_BITS)
 #define LR_EXPIRE_INTERVALS 16 /**< number of intervals to track transno */
 #define LR_EPOCH_BITS   32
 #define lr_epoch(a) ((a) >> LR_EPOCH_BITS)
 #define LR_EXPIRE_INTERVALS 16 /**< number of intervals to track transno */
+#define ENOENT_VERSION 1 /** 'virtual' version of non-existent object */
 
 #define LR_SERVER_SIZE   512
 #define LR_CLIENT_START 8192
 
 #define LR_SERVER_SIZE   512
 #define LR_CLIENT_START 8192
index c5f8b75..a723f42 100644 (file)
@@ -2106,6 +2106,10 @@ int ldlm_replay_locks(struct obd_import *imp)
 
         LASSERT(cfs_atomic_read(&imp->imp_replay_inflight) == 0);
 
 
         LASSERT(cfs_atomic_read(&imp->imp_replay_inflight) == 0);
 
+        /* don't replay locks if import failed recovery */
+        if (imp->imp_vbr_failed)
+                RETURN(0);
+
         /* ensure this doesn't fall to 0 before all have been queued */
         cfs_atomic_inc(&imp->imp_replay_inflight);
 
         /* ensure this doesn't fall to 0 before all have been queued */
         cfs_atomic_inc(&imp->imp_replay_inflight);
 
index d4c7750..7c37c2f 100644 (file)
@@ -2363,7 +2363,7 @@ static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
         LASSERT(mdd_object_exists(mdd_obj));
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
         LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_set(env, mdd_object_child(mdd_obj), version);
+        do_version_set(env, mdd_object_child(mdd_obj), version);
 }
 
 const struct md_object_operations mdd_obj_ops = {
 }
 
 const struct md_object_operations mdd_obj_ops = {
index 5a1076e..bce519f 100644 (file)
@@ -2630,7 +2630,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         if (likely(rc == 0 && req->rq_export && h->mh_opc != MDS_DISCONNECT))
                 target_committed_to_req(req);
 
         if (likely(rc == 0 && req->rq_export && h->mh_opc != MDS_DISCONNECT))
                 target_committed_to_req(req);
 
-        if (unlikely((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+        if (unlikely(req_is_replay(req) &&
                      lustre_msg_get_transno(req->rq_reqmsg) == 0)) {
                 DEBUG_REQ(D_ERROR, req, "transno is 0 during REPLAY");
                 LBUG();
                      lustre_msg_get_transno(req->rq_reqmsg) == 0)) {
                 DEBUG_REQ(D_ERROR, req, "transno is 0 during REPLAY");
                 LBUG();
@@ -2694,10 +2694,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
 
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
-        info->mti_mos[0] = NULL;
-        info->mti_mos[1] = NULL;
-        info->mti_mos[2] = NULL;
-        info->mti_mos[3] = NULL;
+        info->mti_mos = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
         info->mti_body = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
         info->mti_body = NULL;
@@ -5476,17 +5473,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  * fid, this is error to find remote object here
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
                  * fid, this is error to find remote object here
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
-        } else if (rc == 0) {
-                rc = -ENOENT;
-                CDEBUG(D_IOCTL, "no such object: "DFID"\n", PFID(fid));
         } else {
                 version = mo_version_get(mti->mti_env, mdt_object_child(obj));
         } else {
                 version = mo_version_get(mti->mti_env, mdt_object_child(obj));
-                if (version < 0) {
-                        rc = (int)version;
-                } else {
-                        *(__u64 *)data->ioc_inlbuf2 = version;
-                        rc = 0;
-                }
+               *(__u64 *)data->ioc_inlbuf2 = version;
+                rc = 0;
         }
         mdt_object_unlock_put(mti, obj, lh, 1);
         RETURN(rc);
         }
         mdt_object_unlock_put(mti, obj, lh, 1);
         RETURN(rc);
index 5b3f836..9a68d9e 100644 (file)
@@ -337,8 +337,8 @@ struct mdt_thread_info {
         struct mdt_reint_record    mti_rr;
 
         /** md objects included in operation */
         struct mdt_reint_record    mti_rr;
 
         /** md objects included in operation */
-        struct mdt_object         *mti_mos[PTLRPC_NUM_VERSIONS];
-
+        struct mdt_object         *mti_mos;
+        __u64                      mti_ver[PTLRPC_NUM_VERSIONS];
         /*
          * Operation specification (currently create and lookup)
          */
         /*
          * Operation specification (currently create and lookup)
          */
@@ -393,11 +393,6 @@ struct mdt_thread_info {
         struct md_attr             mti_tmp_attr;
 };
 
         struct md_attr             mti_tmp_attr;
 };
 
-#define mti_parent      mti_mos[0]
-#define mti_child       mti_mos[1]
-#define mti_parent1     mti_mos[2]
-#define mti_child1      mti_mos[3]
-
 typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno,
                          void *data, int err);
 struct mdt_commit_cb {
 typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno,
                          void *data, int err);
 struct mdt_commit_cb {
@@ -454,6 +449,12 @@ static inline struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info)
          return info->mti_pill ? info->mti_pill->rc_req : NULL;
 }
 
          return info->mti_pill ? info->mti_pill->rc_req : NULL;
 }
 
+static inline int req_is_replay(struct ptlrpc_request *req)
+{
+        LASSERT(req->rq_reqmsg);
+        return !!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+}
+
 static inline __u64 mdt_conn_flags(struct mdt_thread_info *info)
 {
         LASSERT(info->mti_exp);
 static inline __u64 mdt_conn_flags(struct mdt_thread_info *info)
 {
         LASSERT(info->mti_exp);
@@ -644,7 +645,10 @@ int mdt_check_ucred(struct mdt_thread_info *);
 int mdt_init_ucred(struct mdt_thread_info *, struct mdt_body *);
 int mdt_init_ucred_reint(struct mdt_thread_info *);
 void mdt_exit_ucred(struct mdt_thread_info *);
 int mdt_init_ucred(struct mdt_thread_info *, struct mdt_body *);
 int mdt_init_ucred_reint(struct mdt_thread_info *);
 void mdt_exit_ucred(struct mdt_thread_info *);
-int mdt_version_get_check(struct mdt_thread_info *, int);
+int mdt_version_get_check(struct mdt_thread_info *, struct mdt_object *, int);
+void mdt_version_get_save(struct mdt_thread_info *, struct mdt_object *, int);
+int mdt_version_get_check_save(struct mdt_thread_info *, struct mdt_object *,
+                               int);
 
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
 
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
index de23917..26715a3 100644 (file)
@@ -1071,8 +1071,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
-        if (lustre_msg_get_flags(mdt_info_req(info)->rq_reqmsg) & MSG_REPLAY)
-                info->mti_spec.no_create = 1;
+        info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
@@ -1132,8 +1131,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
-        if (lustre_msg_get_flags(mdt_info_req(info)->rq_reqmsg) & MSG_REPLAY)
-                info->mti_spec.no_create = 1;
+        info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
@@ -1183,8 +1181,8 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
                                  req_capsule_client_get(pill, &RMF_CAPA1));
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
                                  req_capsule_client_get(pill, &RMF_CAPA1));
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
-            (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))) {
+        if (req_is_replay(req) &&
+            req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)) {
 #if 0
                 mdt_set_capainfo(info, 1, rr->rr_fid2,
                                  req_capsule_client_get(pill, &RMF_CAPA2));
 #if 0
                 mdt_set_capainfo(info, 1, rr->rr_fid2,
                                  req_capsule_client_get(pill, &RMF_CAPA2));
@@ -1207,8 +1205,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
                                                      RCL_CLIENT);
         if (sp->u.sp_ea.eadatalen) {
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
                                                      RCL_CLIENT);
         if (sp->u.sp_ea.eadatalen) {
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
-                        sp->no_create = 1;
+                sp->no_create = !!req_is_replay(req);
         }
 
         RETURN(0);
         }
 
         RETURN(0);
index e98a567..801925a 100644 (file)
@@ -85,8 +85,7 @@ struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
         LASSERT(handle != NULL);
         mfd = class_handle2object(handle->cookie);
         /* during dw/setattr replay the mfd can be found by old handle */
         LASSERT(handle != NULL);
         mfd = class_handle2object(handle->cookie);
         /* during dw/setattr replay the mfd can be found by old handle */
-        if (mfd == NULL &&
-            lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+        if (mfd == NULL && req_is_replay(req)) {
                 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
                 cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                         if (mfd->mfd_old_handle.cookie == handle->cookie)
                 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
                 cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                         if (mfd->mfd_old_handle.cookie == handle->cookie)
@@ -667,7 +666,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 mfd->mfd_xid = req->rq_xid;
 
                 /* replay handle */
                 mfd->mfd_xid = req->rq_xid;
 
                 /* replay handle */
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                if (req_is_replay(req)) {
                         struct mdt_file_data *old_mfd;
                         /* Check wheather old cookie already exist in
                          * the list, becasue when do recovery, client
                         struct mdt_file_data *old_mfd;
                         /* Check wheather old cookie already exist in
                          * the list, becasue when do recovery, client
@@ -1194,7 +1193,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                PFID(rr->rr_fid2), create_flags,
                ma->ma_attr.la_mode, msg_flags);
 
                PFID(rr->rr_fid2), create_flags,
                ma->ma_attr.la_mode, msg_flags);
 
-        if (msg_flags & MSG_REPLAY ||
+        if (req_is_replay(req) ||
             (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
                 /* This is a replay request or from liblustre with ea. */
                 result = mdt_open_by_fid(info, ldlm_rep);
             (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
                 /* This is a replay request or from liblustre with ea. */
                 result = mdt_open_by_fid(info, ldlm_rep);
@@ -1244,10 +1243,14 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
 
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
 
+        /* get and check version of parent */
+        result = mdt_version_get_check(info, parent, 0);
+        if (result)
+                GOTO(out_parent, result);
+
         fid_zero(child_fid);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         fid_zero(child_fid);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-
         result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                             lname, child_fid, &info->mti_spec);
         LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
         result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                             lname, child_fid, &info->mti_spec);
         LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
@@ -1284,17 +1287,23 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
+        /** check version of child  */
+        rc = mdt_version_get_check(info, child, 1);
+        if (rc)
+                GOTO(out_child, result = rc);
+
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
+                /* save versions in reply */
+                mdt_version_get_save(info, parent, 0);
+                mdt_version_get_save(info, child, 1);
+
+                /* version of child will be changed */
+                info->mti_mos = child;
+
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
-                info->mti_mos[0] = parent;
-                info->mti_mos[1] = child;
-                result = mdt_version_get_check(info, 0);
-                if (result)
-                        GOTO(out_child, result);
-
                 /* Let lower layers know what is lock mode on directory. */
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
                 /* Let lower layers know what is lock mode on directory. */
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
@@ -1369,7 +1378,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
 
         /* get openlock if this is not replay and if a client requested it */
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
 
         /* get openlock if this is not replay and if a client requested it */
-        if (!(msg_flags & MSG_REPLAY) && create_flags & MDS_OPEN_LOCK) {
+        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
                 ldlm_mode_t lm;
 
                 if (create_flags & FMODE_WRITE)
                 ldlm_mode_t lm;
 
                 if (create_flags & FMODE_WRITE)
index bce0088..b92727b 100644 (file)
@@ -799,14 +799,13 @@ static int mdt_txn_start_cb(const struct lu_env *env,
 }
 
 /* Set new object versions */
 }
 
 /* Set new object versions */
-static void mdt_versions_set(struct mdt_thread_info *info)
+static void mdt_version_set(struct mdt_thread_info *info)
 {
 {
-        int i;
-        for (i = 0; i < PTLRPC_NUM_VERSIONS; i++)
-                if (info->mti_mos[i] != NULL)
-                        mo_version_set(info->mti_env,
-                                       mdt_object_child(info->mti_mos[i]),
-                                       info->mti_transno);
+        if (info->mti_mos != NULL) {
+                mo_version_set(info->mti_env, mdt_object_child(info->mti_mos),
+                               info->mti_transno);
+                info->mti_mos = NULL;
+        }
 }
 
 /* Update last_rcvd records with latests transaction data */
 }
 
 /* Update last_rcvd records with latests transaction data */
@@ -857,7 +856,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
 
         /** VBR: set new versions */
         if (txn->th_result == 0)
 
         /** VBR: set new versions */
         if (txn->th_result == 0)
-                mdt_versions_set(mti);
+                mdt_version_set(mti);
 
         /* filling reply data */
         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
 
         /* filling reply data */
         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
index f5f8166..3ffd2e6 100644 (file)
@@ -96,52 +96,176 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-int mdt_version_get_check(struct mdt_thread_info *info, int index)
+/**
+ * Get version of object by fid.
+ *
+ * Return real version or ENOENT_VERSION if object doesn't exist
+ */
+static void mdt_obj_version_get(struct mdt_thread_info *info,
+                                struct mdt_object *o, __u64 *version)
+{
+        LASSERT(o);
+        LASSERT(mdt_object_exists(o) >= 0);
+        if (mdt_object_exists(o) > 0)
+                *version = mo_version_get(info->mti_env, mdt_object_child(o));
+        else
+                *version = ENOENT_VERSION;
+        CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
+               PFID(mdt_object_fid(o)), *version);
+}
+
+/**
+ * Check version is correct.
+ *
+ * Should be called only during replay.
+ */
+static int mdt_version_check(struct ptlrpc_request *req,
+                             __u64 version, int idx)
 {
 {
-        /** version recovery */
-        struct md_object *mo;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        __u64 curr_version, *pre_versions;
+        __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
         ENTRY;
 
         if (!exp_connect_vbr(req->rq_export))
                 RETURN(0);
 
         ENTRY;
 
         if (!exp_connect_vbr(req->rq_export))
                 RETURN(0);
 
-        LASSERT(info->mti_mos[index]);
-        if (mdt_object_exists(info->mti_mos[index]) == 0)
-                RETURN(-ESTALE);
-        mo = mdt_object_child(info->mti_mos[index]);
-
-        curr_version = mo_version_get(info->mti_env, mo);
-        CDEBUG(D_INODE, "Version is "LPX64"\n", curr_version);
+        LASSERT(req_is_replay(req));
         /** VBR: version is checked always because costs nothing */
         /** VBR: version is checked always because costs nothing */
-        if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
-                pre_versions = lustre_msg_get_versions(req->rq_reqmsg);
-                LASSERT(index < PTLRPC_NUM_VERSIONS);
-                /** Sanity check for malformed buffers */
-                if (pre_versions == NULL) {
-                        CERROR("No versions in request buffer\n");
-                        cfs_spin_lock(&req->rq_export->exp_lock);
-                        req->rq_export->exp_vbr_failed = 1;
-                        cfs_spin_unlock(&req->rq_export->exp_lock);
-                        RETURN(-EOVERFLOW);
-                } else if (pre_versions[index] != curr_version) {
-                        CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
-                               pre_versions[index], curr_version);
-                        cfs_spin_lock(&req->rq_export->exp_lock);
-                        req->rq_export->exp_vbr_failed = 1;
-                        cfs_spin_unlock(&req->rq_export->exp_lock);
-                        RETURN(-EOVERFLOW);
-                }
+        LASSERT(idx < PTLRPC_NUM_VERSIONS);
+        /** Sanity check for malformed buffers */
+        if (pre_ver == NULL) {
+                CERROR("No versions in request buffer\n");
+                cfs_spin_lock(&req->rq_export->exp_lock);
+                req->rq_export->exp_vbr_failed = 1;
+                cfs_spin_unlock(&req->rq_export->exp_lock);
+                RETURN(-EOVERFLOW);
+        } else if (pre_ver[idx] != version) {
+                CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+                       pre_ver[idx], version);
+                cfs_spin_lock(&req->rq_export->exp_lock);
+                req->rq_export->exp_vbr_failed = 1;
+                cfs_spin_unlock(&req->rq_export->exp_lock);
+                RETURN(-EOVERFLOW);
         }
         }
-        /** save pre-versions in reply */
-        LASSERT(req->rq_repmsg != NULL);
-        pre_versions = lustre_msg_get_versions(req->rq_repmsg);
-        if (pre_versions)
-                pre_versions[index] = curr_version;
         RETURN(0);
 }
 
         RETURN(0);
 }
 
+/**
+ * Save pre-versions in reply.
+ */
+static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
+                             int idx)
+{
+        __u64 *reply_ver;
+
+        if (!exp_connect_vbr(req->rq_export))
+                return;
+
+        LASSERT(!req_is_replay(req));
+        LASSERT(req->rq_repmsg != NULL);
+        reply_ver = lustre_msg_get_versions(req->rq_repmsg);
+        if (reply_ver)
+                reply_ver[idx] = version;
+}
+
+/**
+ * Save enoent version, it is needed when it is obvious that object doesn't
+ * exist, e.g. child during create.
+ */
+static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
+{
+        /* save version of file name for replay, it must be ENOENT here */
+        if (!req_is_replay(mdt_info_req(info))) {
+                info->mti_ver[idx] = ENOENT_VERSION;
+                mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
+        }
+}
+
+/**
+ * Get version from disk and save in reply buffer.
+ *
+ * Versions are saved in reply only during normal operations not replays.
+ */
+void mdt_version_get_save(struct mdt_thread_info *info,
+                          struct mdt_object *mto, int idx)
+{
+        /* don't save versions during replay */
+        if (!req_is_replay(mdt_info_req(info))) {
+                mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
+                mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
+        }
+}
+
+/**
+ * Get version from disk and check it, no save in reply.
+ */
+int mdt_version_get_check(struct mdt_thread_info *info,
+                          struct mdt_object *mto, int idx)
+{
+        /* only check versions during replay */
+        if (!req_is_replay(mdt_info_req(info)))
+                return 0;
+
+        mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
+        return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
+}
+
+/**
+ * Get version from disk and check if recovery or just save.
+ */
+int mdt_version_get_check_save(struct mdt_thread_info *info,
+                               struct mdt_object *mto, int idx)
+{
+        int rc = 0;
+
+        mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
+        if (req_is_replay(mdt_info_req(info)))
+                rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
+                                       idx);
+        else
+                mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
+        return rc;
+}
+
+/**
+ * Lookup with version checking.
+ *
+ * This checks version of 'name'. Many reint functions uses 'name' for child not
+ * FID, therefore we need to get object by name and check its version.
+ */
+int mdt_lookup_version_check(struct mdt_thread_info *info,
+                             struct mdt_object *p, struct lu_name *lname,
+                             struct lu_fid *fid, int idx)
+{
+        int rc, vbrc;
+
+        rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
+                        &info->mti_spec);
+        /* Check version only during replay */
+        if (!req_is_replay(mdt_info_req(info)))
+                return rc;
+
+        info->mti_ver[idx] = ENOENT_VERSION;
+        if (rc == 0) {
+                struct mdt_object *child;
+                child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+                if (likely(!IS_ERR(child))) {
+                        mdt_obj_version_get(info, child, &info->mti_ver[idx]);
+                        mdt_object_put(info->mti_env, child);
+                }
+        }
+        vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
+        return vbrc ? vbrc : rc;
+
+}
+
+/*
+ * VBR: we save three versions in reply:
+ * 0 - parent. Check that parent version is the same during replay.
+ * 1 - name. Version of 'name' if file exists with the same name or
+ * ENOENT_VERSION, it is needed because file may appear due to missed replays.
+ * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
+ * check.
+ */
 static int mdt_md_create(struct mdt_thread_info *info)
 {
         struct mdt_device       *mdt = info->mti_mdt;
 static int mdt_md_create(struct mdt_thread_info *info)
 {
         struct mdt_device       *mdt = info->mti_mdt;
@@ -168,6 +292,24 @@ static int mdt_md_create(struct mdt_thread_info *info)
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
+        rc = mdt_version_get_check_save(info, parent, 0);
+        if (rc)
+                GOTO(out_put_parent, rc);
+
+        /*
+         * Check child name version during replay.
+         * During create replay a file may exist with same name.
+         */
+        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
+        rc = mdt_lookup_version_check(info, parent, lname,
+                                      &info->mti_tmp_fid1, 1);
+        /* -ENOENT is expected here */
+        if (rc != 0 && rc != -ENOENT)
+                GOTO(out_put_parent, rc);
+
+        /* save version of file name for replay, it must be ENOENT here */
+        mdt_enoent_version_save(info, 1);
+
         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
@@ -182,9 +324,9 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
-                info->mti_mos[0] = parent;
-                info->mti_mos[1] = child;
-                rc = mdt_version_get_check(info, 0);
+                /* Version of child will be updated on disk. */
+                info->mti_mos = child;
+                rc = mdt_version_get_check_save(info, child, 2);
                 if (rc)
                         GOTO(out_put_child, rc);
 
                 if (rc)
                         GOTO(out_put_child, rc);
 
@@ -199,8 +341,6 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 info->mti_spec.sp_cr_lookup = 1;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
                 info->mti_spec.sp_cr_lookup = 1;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
-                lname = mdt_name(info->mti_env, (char *)rr->rr_name,
-                                 rr->rr_namelen);
                 rc = mdo_create(info->mti_env, next, lname,
                                 mdt_object_child(child),
                                 &info->mti_spec, ma);
                 rc = mdo_create(info->mti_env, next, lname,
                                 mdt_object_child(child),
                                 &info->mti_spec, ma);
@@ -212,10 +352,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 }
 out_put_child:
                 mdt_object_put(info->mti_env, child);
                 }
 out_put_child:
                 mdt_object_put(info->mti_env, child);
-        } else
+        } else {
                 rc = PTR_ERR(child);
                 rc = PTR_ERR(child);
-
+        }
         mdt_create_pack_capa(info, rc, child, repbody);
         mdt_create_pack_capa(info, rc, child, repbody);
+out_put_parent:
         mdt_object_unlock_put(info, parent, lh, rc);
         RETURN(rc);
 }
         mdt_object_unlock_put(info, parent, lh, rc);
         RETURN(rc);
 }
@@ -312,8 +453,9 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
 
         /* VBR: update version if attr changed are important for recovery */
         if (do_vbr) {
 
         /* VBR: update version if attr changed are important for recovery */
         if (do_vbr) {
-                info->mti_mos[0] = mo;
-                rc = mdt_version_get_check(info, 0);
+                /* update on-disk version of changed object */
+                info->mti_mos = mo;
+                rc = mdt_version_get_check_save(info, mo, 0);
                 if (rc)
                         GOTO(out_unlock, rc);
         }
                 if (rc)
                         GOTO(out_unlock, rc);
         }
@@ -497,6 +639,10 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/*
+ * VBR: save parent version in reply and child version getting by its name.
+ * Version of child is getting and checking during its lookup. If 
+ */
 static int mdt_reint_unlink(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
 static int mdt_reint_unlink(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
@@ -546,8 +692,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
 
                 GOTO(out, rc);
         }
 
-        info->mti_mos[0] = mp;
-        rc = mdt_version_get_check(info, 0);
+        rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
 
         if (rc)
                 GOTO(out_unlock_parent, rc);
 
@@ -575,8 +720,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         /* step 2: find & lock the child */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
 
         /* step 2: find & lock the child */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
-                        lname, child_fid, &info->mti_spec);
+        /* lookup child object along with version checking */
+        rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
 
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
 
@@ -595,12 +740,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
-
-        info->mti_mos[1] = mc;
-        rc = mdt_version_get_check(info, 1);
-        if (rc)
-                GOTO(out_unlock_child, rc);
-
+        /* save version when object is locked */
+        mdt_version_get_save(info, mc, 1);
         /*
          * Now we can only make sure we need MA_INODE, in mdd layer, will check
          * whether need MA_LOV and MA_COOKIE.
         /*
          * Now we can only make sure we need MA_INODE, in mdd layer, will check
          * whether need MA_LOV and MA_COOKIE.
@@ -614,7 +755,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 mdt_handle_last_unlink(info, mc, ma);
 
         EXIT;
                 mdt_handle_last_unlink(info, mc, ma);
 
         EXIT;
-out_unlock_child:
+
         mdt_object_unlock_put(info, mc, child_lh, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, parent_lh, rc);
         mdt_object_unlock_put(info, mc, child_lh, rc);
 out_unlock_parent:
         mdt_object_unlock_put(info, mp, parent_lh, rc);
@@ -622,6 +763,10 @@ out:
         return rc;
 }
 
         return rc;
 }
 
+/*
+ * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
+ * name.
+ */
 static int mdt_reint_link(struct mdt_thread_info *info,
                           struct mdt_lock_handle *lhc)
 {
 static int mdt_reint_link(struct mdt_thread_info *info,
                           struct mdt_lock_handle *lhc)
 {
@@ -674,8 +819,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
         if (IS_ERR(mp))
                 RETURN(PTR_ERR(mp));
 
-        info->mti_mos[0] = mp;
-        rc = mdt_version_get_check(info, 0);
+        rc = mdt_version_get_check_save(info, mp, 0);
         if (rc)
                 GOTO(out_unlock_parent, rc);
 
         if (rc)
                 GOTO(out_unlock_parent, rc);
 
@@ -698,12 +842,22 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
-        info->mti_mos[1] = ms;
-        rc = mdt_version_get_check(info, 1);
+        info->mti_mos = ms;
+        rc = mdt_version_get_check_save(info, ms, 1);
         if (rc)
                 GOTO(out_unlock_child, rc);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         if (rc)
                 GOTO(out_unlock_child, rc);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
+        /** check target version by name during replay */
+        rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2);
+        if (rc != 0 && rc != -ENOENT)
+                GOTO(out_unlock_child, rc);
+        /* save version of file name for replay, it must be ENOENT here */
+        if (!req_is_replay(mdt_info_req(info))) {
+                info->mti_ver[2] = ENOENT_VERSION;
+                mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
+        }
+
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), lname, ma);
 
         rc = mdo_link(info->mti_env, mdt_object_child(mp),
                       mdt_object_child(ms), lname, ma);
 
@@ -909,6 +1063,11 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/*
+ * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
+ * 2 - src child; 3 - tgt child.
+ * Update on disk version of src child.
+ */
 static int mdt_reint_rename(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
 static int mdt_reint_rename(struct mdt_thread_info *info,
                             struct mdt_lock_handle *lhc)
 {
@@ -932,7 +1091,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         ENTRY;
 
         if (info->mti_dlm_req)
         ENTRY;
 
         if (info->mti_dlm_req)
-                ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+                ldlm_request_cancel(req, info->mti_dlm_req, 0);
 
         if (info->mti_cross_ref) {
                 rc = mdt_reint_rename_tgt(info);
 
         if (info->mti_cross_ref) {
                 rc = mdt_reint_rename_tgt(info);
@@ -960,8 +1119,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
-        info->mti_mos[0] = msrcdir;
-        rc = mdt_version_get_check(info, 0);
+        rc = mdt_version_get_check_save(info, msrcdir, 0);
         if (rc)
                 GOTO(out_unlock_source, rc);
 
         if (rc)
                 GOTO(out_unlock_source, rc);
 
@@ -985,31 +1143,30 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
+                /* check early, the real version will be saved after locking */
+                rc = mdt_version_get_check(info, mtgtdir, 1);
+                if (rc)
+                        GOTO(out_put_target, rc);
+
                 rc = mdt_object_exists(mtgtdir);
                 rc = mdt_object_exists(mtgtdir);
-                if (rc == 0)
-                        GOTO(out_unlock_target, rc = -ESTALE);
-                else if (rc > 0) {
+                if (rc == 0) {
+                        GOTO(out_put_target, rc = -ESTALE);
+                else if (rc > 0) {
                         /* we lock the target dir if it is local */
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
                                              MDS_INODELOCK_UPDATE,
                                              MDT_LOCAL_LOCK);
                         /* we lock the target dir if it is local */
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
                                              MDS_INODELOCK_UPDATE,
                                              MDT_LOCAL_LOCK);
-                        if (rc != 0) {
-                                mdt_object_put(info->mti_env, mtgtdir);
-                                GOTO(out_unlock_source, rc);
-                        }
-
-                        info->mti_mos[1] = mtgtdir;
-                        rc = mdt_version_get_check(info, 1);
-                        if (rc)
-                                GOTO(out_unlock_target, rc);
+                        if (rc != 0)
+                                GOTO(out_put_target, rc);
+                        /* get and save correct version after locking */
+                        mdt_version_get_save(info, mtgtdir, 1);
                 }
         }
 
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         mdt_name_copy(&slname, lname);
                 }
         }
 
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         mdt_name_copy(&slname, lname);
-        rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
-                        &slname, old_fid, &info->mti_spec);
+        rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2);
         if (rc != 0)
                 GOTO(out_unlock_target, rc);
 
         if (rc != 0)
                 GOTO(out_unlock_target, rc);
 
@@ -1029,18 +1186,16 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 GOTO(out_unlock_target, rc);
         }
 
                 GOTO(out_unlock_target, rc);
         }
 
-        info->mti_mos[2] = mold;
-        rc = mdt_version_get_check(info, 2);
-        if (rc)
-                GOTO(out_unlock_old, rc);
-
+        info->mti_mos = mold;
+        /* save version after locking */
+        mdt_version_get_save(info, mold, 2);
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
 
         /* step 4: find & lock the new object. */
         /* new target object may not exist now */
         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
 
         /* step 4: find & lock the new object. */
         /* new target object may not exist now */
         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
-        rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
-                        lname, new_fid, &info->mti_spec);
+        /* lookup with version checking */
+        rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3);
         if (rc == 0) {
                 /* the new_fid should have been filled at this moment */
                 if (lu_fid_eq(old_fid, new_fid))
         if (rc == 0) {
                 /* the new_fid should have been filled at this moment */
                 if (lu_fid_eq(old_fid, new_fid))
@@ -1061,15 +1216,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         mdt_object_put(info->mti_env, mnew);
                         GOTO(out_unlock_old, rc);
                 }
                         mdt_object_put(info->mti_env, mnew);
                         GOTO(out_unlock_old, rc);
                 }
-
-                info->mti_mos[3] = mnew;
-                rc = mdt_version_get_check(info, 3);
-                if (rc)
-                        GOTO(out_unlock_new, rc);
-
+                /* get and save version after locking */
+                mdt_version_get_save(info, mnew, 3);
                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
-        } else if (rc != -EREMOTE && rc != -ENOENT)
+        } else if (rc != -EREMOTE && rc != -ENOENT) {
                 GOTO(out_unlock_old, rc);
                 GOTO(out_unlock_old, rc);
+        } else {
+                mdt_enoent_version_save(info, 3);
+        }
 
         /* step 5: rename it */
         mdt_reint_init_ma(info, ma);
 
         /* step 5: rename it */
         mdt_reint_init_ma(info, ma);
@@ -1101,7 +1255,9 @@ out_unlock_new:
 out_unlock_old:
         mdt_object_unlock_put(info, mold, lh_oldp, rc);
 out_unlock_target:
 out_unlock_old:
         mdt_object_unlock_put(info, mold, lh_oldp, rc);
 out_unlock_target:
-        mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
+        mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
+out_put_target:
+        mdt_object_put(info->mti_env, mtgtdir);
 out_unlock_source:
         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
 out_rename_lock:
 out_unlock_source:
         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
 out_rename_lock:
index c268b14..27fc03a 100644 (file)
@@ -352,8 +352,8 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
-        info->mti_mos[0] = obj;
-        rc = mdt_version_get_check(info, 0);
+        info->mti_mos = obj;
+        rc = mdt_version_get_check_save(info, obj, 0);
         if (rc)
                 GOTO(out_unlock, rc);
 
         if (rc)
                 GOTO(out_unlock, rc);
 
index 9131e9d..e3f7c5d 100755 (executable)
@@ -250,25 +250,8 @@ test_13() {
 }
 run_test 13 "close resend timeout"
 
 }
 run_test 13 "close resend timeout"
 
-test_14a() {
-    replay_barrier $SINGLEMDS
-    createmany -o $MOUNT1/$tfile- 25
-    createmany -o $MOUNT2/$tfile-2- 1
-    createmany -o $MOUNT1/$tfile-3- 25
-    umount $MOUNT2
-
-    facet_failover $SINGLEMDS
-    # expect failover to fail due to missing client 2
-    client_evicted || return 1
-    sleep 1
-
-    # first 25 files should have been replayed 
-    unlinkmany $MOUNT1/$tfile- 25 || return 2
-
-    zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail" 
-    return 0
-}
-run_test 14a "timeouts waiting for lost client during replay"
+# test 14a removed after 18143 because it shouldn't fail anymore and do the same
+# as test_15a
 
 test_14b() {
     BEFOREUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
 
 test_14b() {
     BEFOREUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
index ada1c2c..d6ddb84 100644 (file)
@@ -3,37 +3,41 @@
 set -e
 
 # bug number:  16356
 set -e
 
 # bug number:  16356
-ALWAYS_EXCEPT="2     $REPLAY_VBR_EXCEPT"
+ALWAYS_EXCEPT="12a   $REPLAY_VBR_EXCEPT"
 
 SAVE_PWD=$PWD
 
 SAVE_PWD=$PWD
-PTLDEBUG=${PTLDEBUG:--1}
-LUSTRE=${LUSTRE:-`dirname $0`/..}
-SETUP=${SETUP:-""}
-CLEANUP=${CLEANUP:-""}
+LUSTRE=${LUSTRE:-$(cd $(dirname $0)/..; echo $PWD)}
+SETUP=${SETUP:-}
+CLEANUP=${CLEANUP:-}
+MOUNT_2=${MOUNT_2:-"yes"}
 . $LUSTRE/tests/test-framework.sh
 . $LUSTRE/tests/test-framework.sh
-
 init_test_env $@
 init_test_env $@
-
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 init_logging
 
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 init_logging
 
-[ "$SLOW" = "no" ] && EXCEPT_SLOW=""
+remote_mds_nodsh && log "SKIP: remote MDS with nodsh" && exit 0
 
 
-[ -n "$CLIENTS" ] || { skip_env "Need two or more clients" && exit 0; }
-[ $CLIENTCOUNT -ge 2 ] || \
-    { skip_env "Need two or more clients, have $CLIENTCOUNT" && exit 0; }
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="7"
 
 
-remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0
-[ ! "$NAME" = "ncli" ] && ALWAYS_EXCEPT="$ALWAYS_EXCEPT"
-[ "$NAME" = "ncli" ] && MOUNT_2=""
-MOUNT_2=""
 build_test_filter
 
 check_and_setup_lustre
 build_test_filter
 
 check_and_setup_lustre
+
+assert_DIR
 rm -rf $DIR/[df][0-9]*
 
 [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
 
 rm -rf $DIR/[df][0-9]*
 
 [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
 
+# if there is no CLIENT1 defined, some tests can be ran on localhost
+CLIENT1=${CLIENT1:-$HOSTNAME}
+# if CLIENT2 doesn't exist then use CLIENT1 instead
+# All tests should use CLIENT2 with MOUNT2 only therefore it will work if
+# $CLIENT2 == CLIENT1
+# Exception is the test which need two separate nodes
+CLIENT2=${CLIENT2:-$CLIENT1}
+
+is_mounted $MOUNT2 || error "MOUNT2 is not mounted"
+
 rmultiop_start() {
     local client=$1
     local file=$2
 rmultiop_start() {
     local client=$1
     local file=$2
@@ -51,7 +55,7 @@ rmultiop_start() {
     # /tmp/multiop_bg.pid file
 
     local pid_file=$TMP/multiop_bg.pid.$$
     # /tmp/multiop_bg.pid file
 
     local pid_file=$TMP/multiop_bg.pid.$$
-    do_node $client "rm -f $pid_file && MULTIOP_PID_FILE=$pid_file LUSTRE= runmultiop_bg_pause $file $cmds" &
+    do_node $client "MULTIOP_PID_FILE=$pid_file LUSTRE= sh runmultiop_bg_pause $file $cmds" &
     local pid=$!
     sleep 3
     local multiop_pid
     local pid=$!
     sleep 3
     local multiop_pid
@@ -85,7 +89,8 @@ get_version() {
     do_facet $SINGLEMDS $LCTL --device ${!var} getobjversion $fid
 }
 
     do_facet $SINGLEMDS $LCTL --device ${!var} getobjversion $fid
 }
 
-test_0a() {
+# test set #1: OPEN
+test_1a() { # former test_0a
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -98,29 +103,29 @@ test_0a() {
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
-run_test 0a "open and close do not change versions"
+run_test 1a "open and close do not change versions"
 
 
-test_0b() {
+test_1b() { # former test_0b
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
-    do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
+    do_node $CLIENT1 mkdir -p -m 755 $MOUNT/$tdir
 
     replay_barrier $SINGLEMDS
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
-    do_node $CLIENT1 openfile -f O_RDWR:O_CREAT $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    do_node $CLIENT2 chmod 777 $MOUNT2/$tdir
+    do_node $CLIENT1 openfile -f O_RDWR:O_CREAT $MOUNT/$tdir/$tfile
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "open succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "open succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0b "open (O_CREAT) checks version of parent"
+run_test 1b "open (O_CREAT) checks version of parent"
 
 
-test_0c() {
+test_1c() { # former test_0c
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -128,52 +133,82 @@ test_0c() {
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tdir/$tfile
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tdir/$tfile
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
-    do_node $CLIENT2 chmod 666 $DIR/$tdir/$tfile
+    do_node $CLIENT2 chmod 0777 $MOUNT2/$tdir
+    do_node $CLIENT2 chmod 0666 $MOUNT2/$tdir/$tfile
     rmultiop_start $CLIENT1 $DIR/$tdir/$tfile o_c
     rmultiop_start $CLIENT1 $DIR/$tdir/$tfile o_c
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
     facet_failover $SINGLEMDS
-    client_up $CLIENT1 || error "$CLIENT1 evicted"
 
 
+    client_up $CLIENT1 || error "$CLIENT1 evicted"
     rmultiop_stop $CLIENT1 || error "close failed"
     rmultiop_stop $CLIENT1 || error "close failed"
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0c "open (non O_CREAT) does not checks versions"
+run_test 1c "open (non O_CREAT) does not checks versions"
 
 
-test_0d() {
+# test set #2: CREAT (not open)
+# - version of parent is not changed but checked
+# - pre-version should be -1
+# - post-version should be valid
+test_2a() {  # extended former test_0d
     local pre
     local post
 
     local pre
     local post
 
+    # fifo
     pre=$(get_version $CLIENT1 $DIR)
     pre=$(get_version $CLIENT1 $DIR)
-    do_node $CLIENT1 mkfifo $DIR/$tfile
+    do_node $CLIENT1 mkfifo $DIR/$tfile-fifo
     post=$(get_version $CLIENT1 $DIR)
     post=$(get_version $CLIENT1 $DIR)
-    if (($pre == $post)); then
-        error "version not changed: pre $pre, post $post"
+    if (($pre != $post)); then
+        error "version was changed: pre $pre, post $post"
     fi
     fi
+    # mkdir
+    pre=$(get_version $CLIENT1 $DIR)
+    do_node $CLIENT1 mkdir $DIR/$tfile-dir
+    post=$(get_version $CLIENT1 $DIR)
+    if (($pre != $post)); then
+        error "version was changed: pre $pre, post $post"
+    fi
+    do_node $CLIENT1 rmdir $DIR/$tfile-dir
+
+    # mknod
+    pre=$(get_version $CLIENT1 $DIR)
+    do_node $CLIENT1 mkfifo $DIR/$tfile-nod
+    post=$(get_version $CLIENT1 $DIR)
+    if (($pre != $post)); then
+        error "version was changed: pre $pre, post $post"
+    fi
+    # symlink
+    pre=$(get_version $CLIENT1 $DIR)
+    do_node $CLIENT1 mkfifo $DIR/$tfile-symlink
+    post=$(get_version $CLIENT1 $DIR)
+    if (($pre != $post)); then
+        error "version was changed: pre $pre, post $post"
+    fi
+    do_node $CLIENT1 rm $DIR/$tfile-*
+
 }
 }
-run_test 0d "create changes version of parent"
+run_test 2a "create operations doesn't change version of parent"
 
 
-test_0e() {
+test_2b() { # former test_0e
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
+    do_node $CLIENT2 chmod 777 $MOUNT2/$tdir
     do_node $CLIENT1 mkfifo $DIR/$tdir/$tfile
     do_node $CLIENT1 mkfifo $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "create succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "create succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0e "create checks version of parent"
+run_test 2b "create checks version of parent"
 
 
-test_0f() {
+test_3a() { # former test_0f
     local pre
     local post
 
     local pre
     local post
 
@@ -181,13 +216,13 @@ test_0f() {
     pre=$(get_version $CLIENT1 $DIR)
     do_node $CLIENT1 rm $DIR/$tfile
     post=$(get_version $CLIENT1 $DIR)
     pre=$(get_version $CLIENT1 $DIR)
     do_node $CLIENT1 rm $DIR/$tfile
     post=$(get_version $CLIENT1 $DIR)
-    if (($pre == $post)); then
-        error "version not changed: pre $pre, post $post"
+    if (($pre != $post)); then
+        error "version was changed: pre $pre, post $post"
     fi
 }
     fi
 }
-run_test 0f "unlink changes version of parent"
+run_test 3a "unlink doesn't change version of parent"
 
 
-test_0g() {
+test_3b() { # former test_0g
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -195,20 +230,20 @@ test_0g() {
     do_node $CLIENT1 mcreate $DIR/$tdir/$tfile
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mcreate $DIR/$tdir/$tfile
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
+    do_node $CLIENT2 chmod 777 $MOUNT2/$tdir
     do_node $CLIENT1 rm $DIR/$tdir/$tfile
     do_node $CLIENT1 rm $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "unlink succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "unlink succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0g "unlink checks version of parent"
+run_test 3b "unlink checks version of parent"
 
 
-test_0h() {
+test_4a() { # former test_0h
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -221,9 +256,9 @@ test_0h() {
         error "version not changed: pre $pre, post $post"
     fi
 }
         error "version not changed: pre $pre, post $post"
     fi
 }
-run_test 0h "setattr of UID changes versions"
+run_test 4a "setattr of UID changes versions"
 
 
-test_0i() {
+test_4b() { # former test_0i
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -236,9 +271,9 @@ test_0i() {
         error "version not changed: pre $pre, post $post"
     fi
 }
         error "version not changed: pre $pre, post $post"
     fi
 }
-run_test 0i "setattr of GID changes versions"
+run_test 4b "setattr of GID changes versions"
 
 
-test_0j() {
+test_4c() { # former test_0j
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
@@ -246,20 +281,20 @@ test_0j() {
     do_node $CLIENT1 mcreate $file
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mcreate $file
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chown :$RUNAS_ID $file
+    do_node $CLIENT2 chown :$RUNAS_ID $MOUNT2/$tfile
     do_node $CLIENT1 chown $RUNAS_ID $file
     do_node $CLIENT1 chown $RUNAS_ID $file
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -u \\\#$UID $file; then
         error "setattr of UID succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -u \\\#$UID $file; then
         error "setattr of UID succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0j "setattr of UID checks versions"
+run_test 4c "setattr of UID checks versions"
 
 
-test_0k() {
+test_4d() { # former test_0k
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
@@ -267,20 +302,20 @@ test_0k() {
     do_node $CLIENT1 mcreate $file
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mcreate $file
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chown $RUNAS_ID $file
+    do_node $CLIENT2 chown $RUNAS_ID $MOUNT2/$tfile
     do_node $CLIENT1 chown :$RUNAS_ID $file
     do_node $CLIENT1 chown :$RUNAS_ID $file
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -g \\\#$UID $file; then
         error "setattr of GID succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -g \\\#$UID $file; then
         error "setattr of GID succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0k "setattr of GID checks versions"
+run_test 4d "setattr of GID checks versions"
 
 
-test_0l() {
+test_4e() { # former test_0l
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -293,9 +328,9 @@ test_0l() {
         error "version not changed: pre $pre, post $post"
     fi
 }
         error "version not changed: pre $pre, post $post"
     fi
 }
-run_test 0l "setattr of permission changes versions"
+run_test 4e "setattr of permission changes versions"
 
 
-test_0m() {
+test_4f() { # former test_0m
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
     local file=$DIR/$tfile
     local var=${SINGLEMDS}_svc
 
@@ -303,20 +338,20 @@ test_0m() {
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chown :$RUNAS_ID $file
+    do_node $CLIENT2 chown :$RUNAS_ID $MOUNT2/$tfile
     do_node $CLIENT1 chmod 666 $file
     do_node $CLIENT1 chmod 666 $file
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -p 0644 $file; then
         error "setattr of permission succeeded unexpectedly"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -p 0644 $file; then
         error "setattr of permission succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0m "setattr of permission checks versions"
+run_test 4f "setattr of permission checks versions"
 
 
-test_0n() {
+test_4g() { # former test_0n
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -330,7 +365,7 @@ test_0n() {
         error "version not changed: pre $pre, post $post"
     fi
 }
         error "version not changed: pre $pre, post $post"
     fi
 }
-run_test 0n "setattr of flags changes versions"
+run_test 4g "setattr of flags changes versions"
 
 checkattr() {
     local client=$1
 
 checkattr() {
     local client=$1
@@ -344,7 +379,7 @@ checkattr() {
     do_node $client lsattr $file | cut -d ' ' -f 1 | grep -q $attr
 }
 
     do_node $client lsattr $file | cut -d ' ' -f 1 | grep -q $attr
 }
 
-test_0o() {
+test_4h() { # former test_0o
     local file=$DIR/$tfile
     local rc
     local var=${SINGLEMDS}_svc
     local file=$DIR/$tfile
     local rc
     local var=${SINGLEMDS}_svc
@@ -353,9 +388,9 @@ test_0o() {
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 666 $file
+    do_node $CLIENT2 chmod 666 $MOUNT2/$tfile
     do_node $CLIENT1 chattr +i $file
     do_node $CLIENT1 chattr +i $file
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
@@ -365,11 +400,11 @@ test_0o() {
     if [ $rc -eq 0 ]; then
         error "setattr of flags succeeded unexpectedly"
     fi
     if [ $rc -eq 0 ]; then
         error "setattr of flags succeeded unexpectedly"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0o "setattr of flags checks versions"
+run_test 4h "setattr of flags checks versions"
 
 
-test_0p() {
+test_4i() { # former test_0p
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -391,9 +426,9 @@ test_0p() {
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
-run_test 0p "setattr of times does not change versions"
+run_test 4i "setattr of times does not change versions"
 
 
-test_0q() {
+test_4j() { # former test_0q
     local file=$DIR/$tfile
     local pre
     local post
     local file=$DIR/$tfile
     local pre
     local post
@@ -406,9 +441,9 @@ test_0q() {
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
         error "version changed unexpectedly: pre $pre, post $post"
     fi
 }
-run_test 0q "setattr of size does not change versions"
+run_test 4j "setattr of size does not change versions"
 
 
-test_0r() {
+test_4k() { # former test_0r
     local file=$DIR/$tfile
     local mtime_pre
     local mtime_post
     local file=$DIR/$tfile
     local mtime_pre
     local mtime_post
@@ -420,14 +455,14 @@ test_0r() {
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $file
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 666 $file
+    do_node $CLIENT2 chmod 666 $MOUNT2/$tfile
     do_node $CLIENT1 truncate $file 1
     sleep 1
     mtime_pre=$(do_node $CLIENT1 stat --format=%Y $file)
     do_node $CLIENT1 touch $file
     sleep 1 # avoid stat caching
     mtime_post=$(do_node $CLIENT1 stat --format=%Y $file)
     do_node $CLIENT1 truncate $file 1
     sleep 1
     mtime_pre=$(do_node $CLIENT1 stat --format=%Y $file)
     do_node $CLIENT1 touch $file
     sleep 1 # avoid stat caching
     mtime_post=$(do_node $CLIENT1 stat --format=%Y $file)
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_up $CLIENT1 || error "$CLIENT1 evicted"
     facet_failover $SINGLEMDS
 
     client_up $CLIENT1 || error "$CLIENT1 evicted"
@@ -441,11 +476,11 @@ test_0r() {
     if (($mtime != $mtime_post)); then
         error "setattr of times failed: expected $mtime_post, got $mtime"
     fi
     if (($mtime != $mtime_post)); then
         error "setattr of times failed: expected $mtime_post, got $mtime"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0r "setattr of times and size does not check versions"
+run_test 4k "setattr of times and size does not check versions"
 
 
-test_0s() {
+test_5a() { # former test_0s
     local pre
     local post
     local tp_pre
     local pre
     local post
     local tp_pre
@@ -461,13 +496,13 @@ test_0s() {
     if (($pre == $post)); then
         error "version of source not changed: pre $pre, post $post"
     fi
     if (($pre == $post)); then
         error "version of source not changed: pre $pre, post $post"
     fi
-    if (($tp_pre == $tp_post)); then
-        error "version of target parent not changed: pre $tp_pre, post $tp_post"
+    if (($tp_pre != $tp_post)); then
+        error "version of target parent was changed: pre $tp_pre, post $tp_post"
     fi
 }
     fi
 }
-run_test 0s "link changes versions of source and target parent"
+run_test 5a "link changes versions of source but not target parent"
 
 
-test_0t() {
+test_5b() { # former test_0t
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -475,20 +510,20 @@ test_0t() {
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
+    do_node $CLIENT2 chmod 777 $MOUNT2/$tdir
     do_node $CLIENT1 link $DIR/$tfile $DIR/$tdir/$tfile
     do_node $CLIENT1 link $DIR/$tfile $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "link should fail"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "link should fail"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0t "link checks version of target parent"
+run_test 5b "link checks version of target parent"
 
 
-test_0u() {
+test_5c() { # former test_0u
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -496,20 +531,20 @@ test_0u() {
     do_node $CLIENT1 mkdir -p $DIR/$tdir
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mkdir -p $DIR/$tdir
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 666 $DIR/$tfile
+    do_node $CLIENT2 chmod 666 $MOUNT2/$tfile
     do_node $CLIENT1 link $DIR/$tfile $DIR/$tdir/$tfile
     do_node $CLIENT1 link $DIR/$tfile $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "link should fail"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if ! do_node $CLIENT1 $CHECKSTAT -a $DIR/$tdir/$tfile; then
         error "link should fail"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0u "link checks version of source"
+run_test 5c "link checks version of source"
 
 
-test_0v() {
+test_6a() { # former test_0v
     local sp_pre
     local tp_pre
     local sp_post
     local sp_pre
     local tp_pre
     local sp_post
@@ -522,16 +557,16 @@ test_0v() {
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
     sp_post=$(get_version $CLIENT1 $DIR)
     tp_post=$(get_version $CLIENT1 $DIR/$tdir)
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
     sp_post=$(get_version $CLIENT1 $DIR)
     tp_post=$(get_version $CLIENT1 $DIR/$tdir)
-    if (($sp_pre == $sp_post)); then
-        error "version of source parent not changed: pre $sp_pre, post $sp_post"
+    if (($sp_pre != $sp_post)); then
+        error "version of source parent was changed: pre $sp_pre, post $sp_post"
     fi
     fi
-    if (($tp_pre == $tp_post)); then
-        error "version of target parent not changed: pre $tp_pre, post $tp_post"
+    if (($tp_pre != $tp_post)); then
+        error "version of target parent was changed: pre $tp_pre, post $tp_post"
     fi
 }
     fi
 }
-run_test 0v "rename changes versions of source parent and target parent"
+run_test 6a "rename doesn't change versions of source parent and target parent"
 
 
-test_0w() {
+test_6b() { # former test_0w
     local pre
     local post
 
     local pre
     local post
 
@@ -539,13 +574,13 @@ test_0w() {
     pre=$(get_version $CLIENT1 $DIR)
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tfile-new
     post=$(get_version $CLIENT1 $DIR)
     pre=$(get_version $CLIENT1 $DIR)
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tfile-new
     post=$(get_version $CLIENT1 $DIR)
-    if (($pre == $post)); then
-        error "version of parent not changed: pre $pre, post $post"
+    if (($pre != $post)); then
+        error "version of parent was changed: pre $pre, post $post"
     fi
 }
     fi
 }
-run_test 0w "rename within same dir changes version of parent"
+run_test 6b "rename within same dir doesn't change version of parent"
 
 
-test_0x() {
+test_6c() { # former test_0x
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -553,20 +588,20 @@ test_0x() {
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR
+    do_node $CLIENT2 chmod 777 $MOUNT2
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tfile; then
         error "rename should fail"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tfile; then
         error "rename should fail"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0x "rename checks version of source parent"
+run_test 6c "rename checks version of source parent"
 
 
-test_0y() {
+test_6d() { # former test_0y
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     local var=${SINGLEMDS}_svc
 
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
@@ -574,77 +609,313 @@ test_0y() {
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
     do_node $CLIENT1 mkdir -p -m 755 $DIR/$tdir
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 chmod 777 $DIR/$tdir
+    do_node $CLIENT2 chmod 777 $MOUNT2/$tdir
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
     do_node $CLIENT1 mv $DIR/$tfile $DIR/$tdir/$tfile
-    zconf_umount $CLIENT2 $MOUNT
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tfile; then
         error "rename should fail"
     fi
     facet_failover $SINGLEMDS
 
     client_evicted $CLIENT1 || error "$CLIENT1 not evicted"
     if do_node $CLIENT1 $CHECKSTAT -a $DIR/$tfile; then
         error "rename should fail"
     fi
-    zconf_mount $CLIENT2 $MOUNT
+    zconf_mount $CLIENT2 $MOUNT2
 }
 }
-run_test 0y "rename checks version of target parent"
+run_test 6d "rename checks version of target parent"
 
 
-[ "$CLIENTS" ] && zconf_umount_clients $CLIENTS $DIR
-
-test_1a() {
-    echo "mount client $CLIENT1,$CLIENT2..."
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
+# pdirops tests, bug 18143
+test_7_cycle() {
+    local first=$1
+    local lost=$2
+    local last=$3
+    local rc=0
 
 
-    do_node $CLIENT2 mkdir -p $DIR/$tdir
+    do_node $CLIENT1 mkdir -p $DIR/$tdir
     replay_barrier $SINGLEMDS
     replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 1
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-    zconf_umount $CLIENT2 $DIR
-
+    # first operation
+    do_node $CLIENT1 $first || error "Cannot do first operation"
+    # client2 operations that will be lost
+    do_node $CLIENT2 $lost || error "Cannot do 'lost' operations"
+    # second operation
+    do_node $CLIENT1 $last || error "Cannot do last operation"
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
     facet_failover $SINGLEMDS
-    # recovery shouldn't fail due to missing client 2
-    client_up $CLIENT1 || return 1
+    # should fail as conflict expected
+    client_evicted $CLIENT1 || rc=1
 
 
-    # All 50 files should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+    wait_recovery_complete $SINGLEMDS
+    wait_mds_ost_sync $SINGLEMDS
 
 
-    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
-    [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists"
+    zconf_mount $CLIENT2 $MOUNT2
+    rm -rf $DIR/$tdir
 
 
-    zconf_umount_clients $CLIENTS $DIR
+    return $rc
+}
+
+test_7a() {
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="rm $MOUNT2/$tdir/$tfile-0"
+    last="createmany -o $DIR/$tdir/$tfile- 1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7a.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="rm $MOUNT2/$tdir/$tfile-0"
+    last="mkdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7a.2 failed"
+
+    first="mkdir $DIR/$tdir/$tfile-0"
+    lost="mv $MOUNT2/$tdir/$tfile-0 $MOUNT2/$tdir/$tfile-1"
+    last="createmany -o $DIR/$tdir/$tfile- 1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7a.3 failed"
+    return 0
+}
+run_test 7a "create, {lost}, create"
+
+test_7b() {
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="rm $MOUNT2/$tdir/$tfile-0; createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7b.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="touch $MOUNT2/$tdir/$tfile; mv $MOUNT2/$tdir/$tfile $MOUNT2/$tdir/$tfile-0"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7b.2 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="rm $MOUNT2/$tdir/$tfile-0; mkdir $MOUNT2/$tdir/$tfile-0"
+    last="rmdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7b.3 failed"
+    return 0
+}
+run_test 7b "create, {lost}, unlink"
+
+test_7c() {
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="rm $MOUNT2/$tdir/$tfile-0; createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7c.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2"
+    lost="rm $MOUNT2/$tdir/$tfile-0; mkdir $MOUNT2/$tdir/$tfile-0"
+    last="mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7c.2 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; mkdir $DIR/$tdir/$tfile-1-0"
+    lost="rmdir $MOUNT2/$tdir/$tfile-1-0; createmany -o $MOUNT2/$tdir/$tfile-1- 1"
+    last="mv $DIR/$tdir/$tfile-1-0 $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7c.3 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1"
+    lost="mv $MOUNT2/$tdir/$tfile-0 $MOUNT2/$tdir/$tfile"
+    last="mv $DIR/$tdir/$tfile $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7c.4 failed"
+    return 0
+}
+run_test 7c "create, {lost}, rename"
+
+test_7d() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; rm $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1; rm $MOUNT2/$tdir/$tfile-0"
+    last="createmany -o $DIR/$tdir/$tfile- 1"
+    test_7_cycle "$first" "$lost" "$last" && error "Test 7d.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; rm $DIR/$tdir/$tfile-0"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0; rmdir $MOUNT2/$tdir/$tfile-0"
+    last="mkdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" && error "Test 7d.2 failed"
+
+    first="mkdir $DIR/$tdir/$tfile-0; rmdir $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1; mv $MOUNT2/$tdir/$tfile-0 $MOUNT2/$tdir/$tfile-1"
+    last="createmany -o $DIR/$tdir/$tfile- 1"
+    test_7_cycle "$first" "$lost" "$last" && error "Test 7d.3 failed"
+    return 0
+}
+run_test 7d "unlink, {lost}, create"
+
+test_7e() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; rm $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1; rm $MOUNT2/$tdir/$tfile-0;createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7e.1 failed"
+
+    first="mkdir $DIR/$tdir/$tfile-0; rmdir $DIR/$tdir/$tfile-0"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0; rmdir $MOUNT2/$tdir/$tfile-0; mkdir $MOUNT2/$tdir/$tfile-0"
+    last="rmdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7e.2 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; rm $DIR/$tdir/$tfile-0"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0"
+    last="rmdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7e.3 failed"
+
+    first="mkdir $DIR/$tdir/$tfile-0; rmdir $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7e.4 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; rm $DIR/$tdir/$tfile-0"
+    lost="mv $MOUNT2/$tdir/$tfile-1 $MOUNT2/$tdir/$tfile-0"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7e.5 failed"
+    return 0
+}
+run_test 7e "unlink, {lost}, unlink"
+
+test_7f() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; rm $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7f.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; rm $DIR/$tdir/$tfile-0"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7f.2 failed"
+
+    first="mkdir $DIR/$tdir/$tfile; createmany -o $DIR/$tdir/$tfile- 1; rmdir $DIR/$tdir/$tfile"
+    lost="mkdir $MOUNT2/$tdir/$tfile"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7f.3 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; rm $DIR/$tdir/$tfile-0"
+    lost="mv $MOUNT2/$tdir/$tfile-1 $MOUNT2/$tdir/$tfile-0"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7f.4 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; rm $DIR/$tdir/$tfile-0"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0"
+    last="mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7f.5 failed"
+    return 0
+}
+run_test 7f "unlink, {lost}, rename"
+
+test_7g() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0;rmdir $MOUNT2/$tdir/$tfile-0"
+    last="createmany -o $DIR/$tdir/$tfile- 1"
+    test_7_cycle "$first" "$lost" "$last" && error "Test 7g.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1; rm $MOUNT2/$tdir/$tfile-0"
+    last="mkdir $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" && error "Test 7g.2 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="link $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7g.3 failed"
+    return 0
+}
+run_test 7g "rename, {lost}, create"
+
+test_7h() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7h.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    lost="rm $MOUNT2/$tdir/$tfile-0; createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="rm $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7h.2 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; mkdir  $DIR/$tdir/$tfile; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile"
+    lost="rm $MOUNT2/$tdir/$tfile/$tfile-0"
+    last="rmdir $DIR/$tdir/$tfile"
+    #test_7_cycle "$first" "$lost" "$last" || error "Test 7h.3 failed"
+    return 0
+}
+run_test 7h "rename, {lost}, unlink"
+
+test_7i() {
+    first="createmany -o $DIR/$tdir/$tfile- 1; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    lost="createmany -o $MOUNT2/$tdir/$tfile- 1"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7i.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 1; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-1"
+    lost="mkdir $MOUNT2/$tdir/$tfile-0"
+    last="mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7i.1 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 3; mv $DIR/$tdir/$tfile-1 $DIR/$tdir/$tfile-0"
+    lost="mv $MOUNT2/$tdir/$tfile-2 $MOUNT2/$tdir/$tfile-0"
+    last="mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile-2"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7i.3 failed"
+
+    first="createmany -o $DIR/$tdir/$tfile- 2; mv $DIR/$tdir/$tfile-0 $DIR/$tdir/$tfile"
+    lost="rm $MOUNT2/$tdir/$tfile-1"
+    last="mv $DIR/$tdir/$tfile $DIR/$tdir/$tfile-1"
+    test_7_cycle "$first" "$lost" "$last" || error "Test 7i.4 failed"
     return 0
 }
     return 0
 }
-run_test 1a "client during replay doesn't affect another one"
+run_test 7i "rename, {lost}, rename"
 
 
-test_2a() {
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
+# test set #8: orphan handling bug 15392.
+# Unlink during recovery creates orphan always just in case some late open may
+# arrive. These orphans will be removed after recovery anyway.
+# Tests check that valid create,unlink,create sequence will work in this case
+# too but not fail on second create due to orphan found.
 
 
-    do_node $CLIENT2 mkdir -p $DIR/$tdir
+test_8a() {
+    do_node $CLIENT1 mcreate $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
     replay_barrier $SINGLEMDS
     replay_barrier $SINGLEMDS
-    do_node $CLIENT2 mcreate $DIR/$tdir/$tfile
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    #client1 read data from client2 which will be lost
-    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-    zconf_umount $CLIENT2 $DIR
+    # missed replay from client2 will lead to recovery by versions
+    do_node $CLIENT2 touch $MOUNT2/$tfile-2/$tfile
+    do_node $CLIENT1 rm $DIR/$tfile || return 1
+    do_node $CLIENT1 touch $DIR/$tfile || return 2
 
 
+    zconf_umount $CLIENT2 $MOUNT2
     facet_failover $SINGLEMDS
     facet_failover $SINGLEMDS
-    # recovery shouldn't fail due to missing client 2
-    client_up $CLIENT1 || return 1
+    client_up $CLIENT1 || return 6
 
 
-    # All 50 files should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
-    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile && return 4
+    do_node $CLIENT1 rm $DIR/$tfile || error "$tfile doesn't exists"
+    zconf_mount $CLIENT2 $MOUNT2
+    return 0
+}
+run_test 8a "create | unlink, create shouldn't fail"
+
+test_8b() {
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
+    replay_barrier $SINGLEMDS
+    # missed replay from client2 will lead to recovery by versions
+    do_node $CLIENT2 touch $MOUNT2/$tfile-2/$tfile
+    do_node $CLIENT1 rm -f $MOUNT1/$tfile || return 1
+    do_node $CLIENT1 mcreate $MOUNT1/$tfile || return 2
+
+    zconf_umount $CLIENT2 $MOUNT2
+    facet_failover $SINGLEMDS
+    client_up $CLIENT1 || return 6
+
+    do_node $CLIENT1 rm $MOUNT1/$tfile || error "$tfile doesn't exists"
+    zconf_mount $CLIENT2 $MOUNT2
+    return 0
+}
+run_test 8b "create | unlink, create shouldn't fail"
+
+test_8c() {
+    do_node $CLIENT1 touch $DIR/$tfile
+    do_node $CLIENT1 mkdir $DIR/$tfile-2
+    replay_barrier $SINGLEMDS
+    # missed replay from client2 will lead to recovery by versions
+    do_node $CLIENT2 touch $MOUNT2/$tfile-2/$tfile
+    do_node $CLIENT1 rm -f $MOUNT1/$tfile || return 1
+    do_node $CLIENT1 mkdir $MOUNT1/$tfile || return 2
 
 
-    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+    zconf_umount $CLIENT2 $MOUNT2
+    facet_failover $SINGLEMDS
+    client_up $CLIENT1 || return 6
 
 
-    zconf_umount_clients $CLIENTS $DIR
+    do_node $CLIENT1 rmdir $MOUNT1/$tfile || error "$tfile doesn't exists"
+    zconf_mount $CLIENT2 $MOUNT2
     return 0
 }
     return 0
 }
-run_test 2a "lost data due to missed REMOTE client during replay"
+run_test 8c "create | unlink, create shouldn't fail"
+
+[ "$CLIENTS" ] && zconf_umount_clients $CLIENTS $DIR
 
 #
 # This test uses three Lustre clients on two hosts.
 
 #
 # This test uses three Lustre clients on two hosts.
@@ -653,14 +924,16 @@ run_test 2a "lost data due to missed REMOTE client during replay"
 #   Lustre Client 2:    $CLIENT2:$MOUNT2    ($DIR2)
 #   Lustre Client 3:    $CLIENT2:$MOUNT1    ($DIR1)
 #
 #   Lustre Client 2:    $CLIENT2:$MOUNT2    ($DIR2)
 #   Lustre Client 3:    $CLIENT2:$MOUNT1    ($DIR1)
 #
-test_2b() {
+test_10b() { # former test_2b
     local pre
     local post
     local var=${SINGLEMDS}_svc
 
     local pre
     local post
     local var=${SINGLEMDS}_svc
 
+    [ -n "$CLIENTS" ] || { skip "Need two or more clients" && exit 0; }
+    [ $CLIENTCOUNT -ge 2 ] || \
+        { skip "Need two or more clients, have $CLIENTCOUNT" && exit 0; }
+
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
     do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.sync_permission=0"
-    zconf_mount $CLIENT1 $MOUNT
-    zconf_mount $CLIENT2 $MOUNT2
     zconf_mount $CLIENT2 $MOUNT1
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tfile-a
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tfile-b
     zconf_mount $CLIENT2 $MOUNT1
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tfile-a
     do_node $CLIENT1 openfile -f O_RDWR:O_CREAT -m 0644 $DIR/$tfile-b
@@ -713,548 +986,79 @@ test_2b() {
     do_node $CLIENT2 $CHECKSTAT -p 0666 -u \\\#$RUNAS_ID -g \\\#$RUNAS_ID \
             $DIR1/$tfile-b || error "$DIR/$tfile-b: unexpected state"
 
     do_node $CLIENT2 $CHECKSTAT -p 0666 -u \\\#$RUNAS_ID -g \\\#$RUNAS_ID \
             $DIR1/$tfile-b || error "$DIR/$tfile-b: unexpected state"
 
+    zconf_mount $CLIENT2 $MOUNT2
     zconf_umount $CLIENT2 $MOUNT1
     zconf_umount $CLIENT2 $MOUNT1
-    zconf_umount $CLIENT1 $MOUNT
-}
-run_test 2b "3 clients: some, none, and all reqs replayed"
-
-test_3a() {
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    #make sure the time will change
-    local var=${SINGLEMDS}_svc
-    do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
-    do_node $CLIENT1 touch $DIR/$tfile
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
-    sleep 1
-    replay_barrier $SINGLEMDS
-    #change time
-    do_node $CLIENT2 touch $DIR/$tfile
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
-    #another change
-    do_node $CLIENT1 touch $DIR/$tfile
-    #remove file
-    do_node $CLIENT1 rm $DIR/$tfile
-    zconf_umount $CLIENT2 $DIR
-
-    facet_failover $SINGLEMDS
-    # recovery shouldn't fail due to missing client 2
-    client_up $CLIENT1 || return 1
-    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile && return 2
-
-    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
-
-    zconf_umount_clients $CLIENTS $DIR
-
-    return 0
 }
 }
-run_test 3a "setattr of time/size doesn't change version"
-
-test_3b() {
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
+run_test 10b "3 clients: some, none, and all reqs replayed"
 
 
-    #make sure the time will change
-    local var=${SINGLEMDS}_svc
-    do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
-
-    do_node $CLIENT1 touch $DIR/$tfile
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
-    sleep 1
+# test set #11: operations in single directory
+test_11a() {
     replay_barrier $SINGLEMDS
     replay_barrier $SINGLEMDS
-    #change mode
-    do_node $CLIENT2 chmod +x $DIR/$tfile
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
-    #abother chmod
-    do_node $CLIENT1 chmod -x $DIR/$tfile
-    zconf_umount $CLIENT2 $DIR
-
-    facet_failover $SINGLEMDS
-    # recovery should fail due to missing client 2
-    client_evicted $CLIENT1 || return 1
-
-    do_node $CLIENT1 $CHECKSTAT -p 0755 $DIR/$tfile && return 2
-    zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
-
-    zconf_umount_clients $CLIENTS $DIR
-
-    return 0
-}
-run_test 3b "setattr of permissions changes version"
-
-vbr_deactivate_client() {
-    local client=$1
-    echo "Deactivating client $client";
-    do_node $client "sysctl -w lustre.fail_loc=0x50d"
-}
 
 
-vbr_activate_client() {
-    local client=$1
-    echo "Activating client $client";
-    do_node $client "sysctl -w lustre.fail_loc=0x0"
-}
-
-remote_server ()
-{
-    local client=$1
-    [ -z "$(do_node $client lctl dl | grep mdt)" ] && \
-    [ -z "$(do_node $client lctl dl | grep ost)" ]
-}
-
-test_4a() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    do_node $CLIENT2 mkdir -p $DIR/$tdir
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-    vbr_deactivate_client $CLIENT2
+    do_node $CLIENT1 createmany -o $DIR/$tfile-1- 100 &
+    PID=$!
+    do_node $CLIENT2 createmany -o $MOUNT2/$tfile-2- 100
+    zconf_umount $CLIENT2 $MOUNT2
+    wait $PID
 
     facet_failover $SINGLEMDS
 
     facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
     client_up $CLIENT1 || return 1
     client_up $CLIENT1 || return 1
+    # All files from client1 should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-1- 100 || return 2
 
 
-    # All 50 files should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
-
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 4
-    # All 25 files from client2 should have been replayed
-    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
-
-    zconf_umount_clients $CLIENTS $DIR
+    zconf_mount $CLIENT2 $MOUNT2 || error "mount $CLIENT2 $MOUNT2 fail"
+    [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists"
     return 0
 }
     return 0
 }
-run_test 4a "fail MDS, delayed recovery"
+run_test 11a "concurrent creates don't affect each other"
 
 
-test_4b(){
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
+test_11b() {
+    do_node $CLIENT2 createmany -o $MOUNT2/$tfile-2- 100
 
     replay_barrier $SINGLEMDS
 
     replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 1
-
-    # create another set of files
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 2
-
-    # All files from should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
-    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
-
-    zconf_umount_clients $CLIENTS $DIR
-}
-run_test 4b "fail MDS, normal operation, delayed open recovery"
-
-test_4c() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -m $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -m $DIR/$tdir/$tfile-2- 25
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 1
-
-    # create another set of files
-    do_node $CLIENT1 createmany -m $DIR/$tfile-3- 25
-
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 2
-
-    # All files from should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
-    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
-
-    zconf_umount_clients $CLIENTS $DIR
-}
-run_test 4c "fail MDS, normal operation, delayed recovery"
-
-test_5a() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 1
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    client_evicted $CLIENT1 || return 1
-
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 2
-
-    # First 25 files should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
-    # Third file is failed due to missed client2
-    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-3-0 && error "$tfile-3-0 exists"
-    # file from client2 should exists
-    do_node $CLIENT2 unlinkmany $DIR/$tfile-2- 1 || return 4
-
-    zconf_umount_clients $CLIENTS $DIR
-}
-run_test 5a "fail MDS, delayed recovery should fail"
-
-test_5b() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 1
-    do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
-
-    # create another set of files
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-
-    vbr_activate_client $CLIENT2
-    client_evicted $CLIENT2 || return 4
-    # file from client2 should fail
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
-
-    # All 50 files from client 1 should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
-
-    zconf_umount_clients $CLIENTS $DIR
-}
-run_test 5b "fail MDS, normal operation, delayed recovery should fail"
-
-test_6a() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    do_node $CLIENT2 mkdir -p $DIR/$tdir
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
-    do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    # replay only 5 requests
-    do_node $CLIENT2 "sysctl -w lustre.fail_val=5"
-#define OBD_FAIL_PTLRPC_REPLAY        0x50e
-    do_node $CLIENT2 "sysctl -w lustre.fail_loc=0x2000050e"
-    client_up $CLIENT2
-    # vbr_activate_client $CLIENT2
-    # need way to know that client stops replays
-    sleep 5
+    do_node $CLIENT1 createmany -o $DIR/$tfile-1- 100 &
+    PID=$!
+    do_node $CLIENT2 unlinkmany -o $MOUNT2/$tfile-2- 100
+    zconf_umount $CLIENT2 $MOUNT2
+    wait $PID
 
     facet_failover $SINGLEMDS
 
     facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
     client_up $CLIENT1 || return 1
     client_up $CLIENT1 || return 1
+    # All files from client1 should have been replayed
+    do_node $CLIENT1 unlinkmany $DIR/$tfile-1- 100 || return 2
 
 
-    # All files should have been replayed
-    do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
-    do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
-    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
-
-    zconf_umount_clients $CLIENTS $DIR
+    zconf_mount $CLIENT2 $MOUNT2 || error "mount $CLIENT2 $MOUNT2 fail"
+    [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists"
     return 0
 }
     return 0
 }
-run_test 6a "fail MDS, delayed recovery, fail MDS"
+run_test 11b "concurrent creates and unlinks don't affect each other"
 
 
-test_7a() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    do_node $CLIENT2 mkdir -p $DIR/$tdir
+# test set #12: lock replay with VBR, bug 16356
+test_12a() { # former test_2a
+    do_node $CLIENT2 mkdir -p $MOUNT2/$tdir
     replay_barrier $SINGLEMDS
     replay_barrier $SINGLEMDS
+    do_node $CLIENT2 mcreate $MOUNT2/$tdir/$tfile
     do_node $CLIENT1 createmany -o $DIR/$tfile- 25
     do_node $CLIENT1 createmany -o $DIR/$tfile- 25
-    do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+    #client1 read data from client2 which will be lost
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile
     do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
     do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
-    vbr_deactivate_client $CLIENT2
-
-    facet_failover $SINGLEMDS
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 4
+    zconf_umount $CLIENT2 $MOUNT2
 
     facet_failover $SINGLEMDS
 
     facet_failover $SINGLEMDS
+    # recovery shouldn't fail due to missing client 2
     client_up $CLIENT1 || return 1
 
     client_up $CLIENT1 || return 1
 
-    # All files should have been replayed
+    # All 50 files should have been replayed
     do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
     do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
     do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
     do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
-    do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
-
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 7a "fail MDS, delayed recovery, fail MDS"
-
-test_8a() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    rmultiop_start $CLIENT2 $DIR/$tfile O_tSc || return 1
-    do_node $CLIENT2 rm -f $DIR/$tfile
-    replay_barrier $SINGLEMDS
-    rmultiop_stop $CLIENT2 || return 2
-
-    vbr_deactivate_client $CLIENT2
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 3
-    #client1 is back and will try to open orphan
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 4
-
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 8a "orphans are kept until delayed recovery"
-
-test_8b() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    rmultiop_start $CLIENT2 $DIR/$tfile O_tSc|| return 1
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 rm -f $DIR/$tfile
-
-    vbr_deactivate_client $CLIENT2
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 2
-    #client1 is back and will try to open orphan
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 3
-
-    rmultiop_stop $CLIENT2 || return 1
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 8b "open1 | unlink2 X delayed_replay1, close1"
-
-test_8c() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    rmultiop_start $CLIENT2 $DIR/$tfile O_tSc|| return 1
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 rm -f $DIR/$tfile
-    rmultiop_stop $CLIENT2 || return 2
-
-    vbr_deactivate_client $CLIENT2
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 3
-    #client1 is back and will try to open orphan
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 4
-
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 8c "open1 | unlink2, close1 X delayed_replay1"
-
-test_8d() {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    remote_server $CLIENT2 || \
-        { skip_env "Client $CLIENT2 is on the server node" && return 0; }
-
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    rmultiop_start $CLIENT1 $DIR/$tfile O_tSc|| return 1
-    rmultiop_start $CLIENT2 $DIR/$tfile O_tSc|| return 2
-    replay_barrier $SINGLEMDS
-    do_node $CLIENT1 rm -f $DIR/$tfile
-    rmultiop_stop $CLIENT2 || return 3
-    rmultiop_stop $CLIENT1 || return 4
-
-    vbr_deactivate_client $CLIENT2
-    facet_failover $SINGLEMDS
-    client_up $CLIENT1 || return 6
-
-    #client1 is back and will try to open orphan
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 8
-
-    do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 8d "open1, open2 | unlink2, close1, close2 X delayed_replay1"
-
-test_8e() {
-    zconf_mount $CLIENT1 $DIR
-    zconf_mount $CLIENT2 $DIR
-
-    do_node $CLIENT1 mcreate $DIR/$tfile
-    do_node $CLIENT1 mkdir $DIR/$tfile-2
-    replay_barrier $SINGLEMDS
-    # missed replay from client1 will lead to recovery by versions
-    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
-    do_node $CLIENT2 rm $DIR/$tfile || return 1
-    do_node $CLIENT2 touch $DIR/$tfile || return 2
-
-    zconf_umount $CLIENT1 $DIR
-    facet_failover $SINGLEMDS
-    client_up $CLIENT2 || return 6
-
-    do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
-    zconf_umount_clients $CLIENTS $DIR
-    return 0
-}
-run_test 8e "create | unlink, create shouldn't fail"
-
-test_8f() {
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    do_node $CLIENT1 touch $DIR/$tfile
-    do_node $CLIENT1 mkdir $DIR/$tfile-2
-    replay_barrier $SINGLEMDS
-    # missed replay from client1 will lead to recovery by versions
-    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
-    do_node $CLIENT2 rm -f $DIR/$tfile || return 1
-    do_node $CLIENT2 mcreate $DIR/$tfile || return 2
-
-    zconf_umount $CLIENT1 $DIR
-    facet_failover $SINGLEMDS
-    client_up $CLIENT2 || return 6
-
-    do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
-    zconf_umount $CLIENT2 $DIR
-    return 0
-}
-run_test 8f "create | unlink, create shouldn't fail"
-
-test_8g() {
-    zconf_mount_clients $CLIENT1 $DIR
-    zconf_mount_clients $CLIENT2 $DIR
-
-    do_node $CLIENT1 touch $DIR/$tfile
-    do_node $CLIENT1 mkdir $DIR/$tfile-2
-    replay_barrier $SINGLEMDS
-    # missed replay from client1 will lead to recovery by versions
-    do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
-    do_node $CLIENT2 rm -f $DIR/$tfile || return 1
-    do_node $CLIENT2 mkdir $DIR/$tfile || return 2
-
-    zconf_umount $CLIENT1 $DIR
-    facet_failover $SINGLEMDS
-    client_up $CLIENT2 || return 6
+    do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile && return 4
 
 
-    do_node $CLIENT2 rmdir $DIR/$tfile || error "$tfile doesn't exists"
-    zconf_umount $CLIENT2 $DIR
+    zconf_mount $CLIENT2 $MOUNT2 || error "mount $CLIENT2 $DIR fail"
     return 0
 }
     return 0
 }
-run_test 8g "create | unlink, create shouldn't fail"
-
-test_10 () {
-    delayed_recovery_enabled || { skip "No delayed recovery support"; return 0; }
-
-    [ -z "$DBENCH_LIB" ] && skip_env "DBENCH_LIB is not set" && return 0
-
-    zconf_mount_clients $CLIENTS $DIR
-
-    local duration="-t 60"
-    local cmd="rundbench 1 $duration "
-    local PID=""
-    for CLIENT in ${CLIENTS//,/ }; do
-        $PDSH $CLIENT "set -x; PATH=:$PATH:$LUSTRE/utils:$LUSTRE/tests/:${DBENCH_LIB} DBENCH_LIB=${DBENCH_LIB} $cmd" &
-        PID=$!
-        echo $PID >pid.$CLIENT
-        echo "Started load PID=`cat pid.$CLIENT`"
-    done
-
-    replay_barrier $SINGLEMDS
-    sleep 3 # give clients a time to do operations
-
-    vbr_deactivate_client $CLIENT2
-
-    log "$TESTNAME fail $SINGLEMDS 1"
-    fail $SINGLEMDS
-
-# wait for client to reconnect to MDS
-    sleep $TIMEOUT
-
-    vbr_activate_client $CLIENT2
-    client_up $CLIENT2 || return 4
-
-    for CLIENT in ${CLIENTS//,/ }; do
-        PID=`cat pid.$CLIENT`
-        wait $PID
-        rc=$?
-        echo "load on ${CLIENT} returned $rc"
-    done
-
-    zconf_umount_clients $CLIENTS $DIR
-}
-run_test 10 "mds version recovery; $CLIENTCOUNT clients"
+run_test 12a "lost data due to missed REMOTE client during replay"
 
 [ "$CLIENTS" ] && zconf_mount_clients $CLIENTS $DIR
 
 
 [ "$CLIENTS" ] && zconf_mount_clients $CLIENTS $DIR