Whamcloud - gitweb
LU-1187 out: add resend check for update.
authorWang Di <di.wang@whamcloud.com>
Mon, 11 Jun 2012 22:13:55 +0000 (15:13 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 19 Mar 2013 16:25:38 +0000 (12:25 -0400)
1. Add update resend check between MDTs.

2. Even during creating the new object, osp_init_object
   still needs to get remote object attrs, because the
   object might be created already by some partial failed
   operation.

3. During resend handling, OSP needs to delete the existing
   orphan objects first, then do remote create.

4. MDT will check whether the name has been deleted during
   the resend of remote unlink, and only delete the local
   directory if the name on the remote MDT has been delete.

5. Fix the fail_id assignment location to only fail real
   update RPC.

6. Some fixes for replay DNE test sctipts.

Change-Id: Ia9ba8091b6622b0e2fd1f1b4fd355b5ff3eb9758
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/4343
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
20 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_update.h
lustre/include/md_object.h
lustre/include/obd_support.h
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/out_handler.c
lustre/obdecho/echo_client.c
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/ptlrpc/wiretest.c
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 2cde448..602af33 100644 (file)
@@ -3485,7 +3485,7 @@ enum object_update_op {
 
 struct update {
        __u32           u_type;
-       __u32           u_padding;
+       __u32           u_batchid;
        struct lu_fid   u_fid;
        __u32           u_lens[UPDATE_BUF_COUNT];
        __u32           u_bufs[0];
index d79cd70..03a4b63 100644 (file)
 #ifndef _LUSTRE_UPDATE_H
 #define _LUSTRE_UPDATE_H
 
-#define UPDATE_BUFFER_SIZE     8192
+#define UPDATE_BUFFER_SIZE     4096
 struct update_request {
        struct dt_device        *ur_dt;
        cfs_list_t              ur_list;    /* attached itself to thandle */
        int                     ur_flags;
-       int                     ur_rc;
+       int                     ur_rc;      /* request result */
+       int                     ur_batchid; /* Current batch(trans) id */
        struct update_buf       *ur_buf;   /* Holding the update req */
 };
 
index 80722a5..a4af8b4 100644 (file)
@@ -325,9 +325,9 @@ struct md_dir_operations {
                         struct md_object *src_obj, const struct lu_name *lname,
                         struct md_attr *ma);
 
-        int (*mdo_unlink)(const struct lu_env *env, struct md_object *pobj,
-                          struct md_object *cobj, const struct lu_name *lname,
-                          struct md_attr *ma);
+       int (*mdo_unlink)(const struct lu_env *env, struct md_object *pobj,
+                         struct md_object *cobj, const struct lu_name *lname,
+                         struct md_attr *ma, int no_name);
 
         /** This method is used to compare a requested layout to an existing
          * layout (struct lov_mds_md_v1/3 vs struct lov_mds_md_v1/3) */
@@ -800,13 +800,13 @@ static inline int mdo_link(const struct lu_env *env,
 }
 
 static inline int mdo_unlink(const struct lu_env *env,
-                             struct md_object *p,
-                             struct md_object *c,
-                             const struct lu_name *lname,
-                             struct md_attr *ma)
+                            struct md_object *p,
+                            struct md_object *c,
+                            const struct lu_name *lname,
+                            struct md_attr *ma, int no_name)
 {
        LASSERT(p->mo_dir_ops->mdo_unlink);
-       return p->mo_dir_ops->mdo_unlink(env, p, c, lname, ma);
+       return p->mo_dir_ops->mdo_unlink(env, p, c, lname, ma, no_name);
 }
 
 static inline int mdo_lum_lmm_cmp(const struct lu_env *env,
index c0419ba..06396a0 100644 (file)
@@ -465,8 +465,6 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LOV_INIT                          0x1403
 #define OBD_FAIL_GLIMPSE_DELAY                     0x1404
 
-#define OBD_FAIL_UPDATE_OBJ_NET                                0x1500
-
 #define OBD_FAIL_FID_INDIR     0x1501
 #define OBD_FAIL_FID_INLMA     0x1502
 #define OBD_FAIL_FID_IGIF      0x1504
@@ -485,6 +483,11 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_NO_AUTO         0x160b
 #define OBD_FAIL_LFSCK_NO_DOUBLESCAN   0x160c
 
+/* UPDATE */
+#define OBD_FAIL_UPDATE_OBJ_NET                        0x1700
+#define OBD_FAIL_UPDATE_OBJ_NET_REP            0x1701
+
+
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   CFS_FAIL_PRECHECK(id)
 #define OBD_FAIL_CHECK(id)                      CFS_FAIL_CHECK(id)
index 55f2069..930d5ce 100644 (file)
@@ -800,12 +800,12 @@ static int dot_lustre_mdd_link(const struct lu_env *env,
 }
 
 static int dot_lustre_mdd_unlink(const struct lu_env *env,
-                                 struct md_object *pobj,
-                                 struct md_object *cobj,
-                                 const struct lu_name *lname,
-                                 struct md_attr *ma)
+                                struct md_object *pobj,
+                                struct md_object *cobj,
+                                const struct lu_name *lname,
+                                struct md_attr *ma, int no_name)
 {
-        return -EPERM;
+       return -EPERM;
 }
 
 static int dot_lustre_mdd_name_insert(const struct lu_env *env,
@@ -1032,10 +1032,10 @@ static int obf_link(const struct lu_env *env, struct md_object *tgt_obj,
 }
 
 static int obf_unlink(const struct lu_env *env, struct md_object *pobj,
-                      struct md_object *cobj, const struct lu_name *lname,
-                      struct md_attr *ma)
+                     struct md_object *cobj, const struct lu_name *lname,
+                     struct md_attr *ma, int no_name)
 {
-        return -EPERM;
+       return -EPERM;
 }
 
 static struct md_dir_operations mdd_obf_dir_ops = {
index 9f5b6f3..59051e1 100644 (file)
@@ -1094,16 +1094,18 @@ static inline int mdd_declare_links_del(const struct lu_env *env,
 }
 
 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
-                              struct mdd_object *p, struct mdd_object *c,
-                              const struct lu_name *name, struct md_attr *ma,
-                              struct thandle *handle)
+                             struct mdd_object *p, struct mdd_object *c,
+                             const struct lu_name *name, struct md_attr *ma,
+                             struct thandle *handle, int no_name)
 {
        struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
         int rc;
 
-        rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
-        if (rc)
-                return rc;
+       if (likely(no_name == 0)) {
+               rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
+               if (rc)
+                       return rc;
+       }
 
         rc = mdo_declare_ref_del(env, p, handle);
         if (rc)
@@ -1144,9 +1146,18 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
        return rc;
 }
 
+/**
+ * Delete name entry and the object.
+ * Note: no_name == 1 means it only destory the object, i.e. name_entry
+ * does not exist for this object, and it could only happen during resending
+ * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
+ * is also needed in this case(needed by changelog), so we have to add another
+ * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
+ * the ENOENT failure should be able to be fixed by redo mechanism.
+ */
 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
-                      struct md_object *cobj, const struct lu_name *lname,
-                      struct md_attr *ma)
+                     struct md_object *cobj, const struct lu_name *lname,
+                     struct md_attr *ma, int no_name)
 {
         const char *name = lname->ln_name;
        struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
@@ -1174,7 +1185,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                 RETURN(PTR_ERR(handle));
 
        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
-                               lname, ma, handle);
+                               lname, ma, handle, no_name);
        if (rc)
                GOTO(stop, rc);
 
@@ -1203,10 +1214,12 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        if (rc)
                GOTO(cleanup, rc);
 
-       rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
-                               mdd_object_capa(env, mdd_pobj));
-       if (rc)
-               GOTO(cleanup, rc);
+       if (likely(no_name == 0)) {
+               rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
+                                       mdd_object_capa(env, mdd_pobj));
+               if (rc)
+                       GOTO(cleanup, rc);
+       }
 
        if (likely(mdd_cobj != NULL)) {
                rc = mdo_ref_del(env, mdd_cobj, handle);
index 02dcdb8..151680e 100644 (file)
@@ -3122,6 +3122,7 @@ static int mdt_filter_recovery_request(struct ptlrpc_request *req,
         case MDS_SYNC: /* used in unmounting */
         case OBD_PING:
         case MDS_REINT:
+       case UPDATE_OBJ:
         case SEQ_QUERY:
         case FLD_QUERY:
         case LDLM_ENQUEUE:
index d8f869d..5878ef2 100644 (file)
@@ -283,7 +283,7 @@ enum mdt_reint_flag {
 
 struct mdt_thread_info;
 struct tx_arg;
-typedef int (*tx_exec_func_t)(struct mdt_thread_info *, struct thandle *,
+typedef int (*tx_exec_func_t)(const struct lu_env *, struct thandle *,
                              struct tx_arg *);
 
 struct tx_arg {
@@ -291,8 +291,8 @@ struct tx_arg {
        tx_exec_func_t          undo_fn;
        struct dt_object        *object;
        char                    *file;
-       int                     line;
        struct update_reply     *reply;
+       int                     line;
        int                     index;
        union {
                struct {
@@ -1123,6 +1123,8 @@ int out_handle(struct mdt_thread_info *info);
        __out_tx_index_delete(info, obj, th, name, reply, idx, \
                              __FILE__, __LINE__)
 
+#define out_tx_destroy(info, obj, th, reply, idx) \
+       __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
 
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index bfb10f8..cd14179 100644 (file)
@@ -1687,27 +1687,27 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                }
        }
 
-        /* Try to open it now. */
-        rc = mdt_finish_open(info, parent, child, create_flags,
-                             created, ldlm_rep);
-        if (rc) {
-                result = rc;
+       /* Try to open it now. */
+       rc = mdt_finish_open(info, parent, child, create_flags,
+                            created, ldlm_rep);
+       if (rc) {
+               result = rc;
                /* openlock will be released if mdt_finish_open failed */
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-                if (created) {
-                        ma->ma_need = 0;
-                        ma->ma_valid = 0;
-                        ma->ma_cookie_size = 0;
-                        rc = mdo_unlink(info->mti_env,
-                                        mdt_object_child(parent),
-                                        mdt_object_child(child),
-                                        lname,
-                                        &info->mti_attr);
-                        if (rc != 0)
-                                CERROR("Error in cleanup of open\n");
+               if (created) {
+                       ma->ma_need = 0;
+                       ma->ma_valid = 0;
+                       ma->ma_cookie_size = 0;
+                       rc = mdo_unlink(info->mti_env,
+                                       mdt_object_child(parent),
+                                       mdt_object_child(child),
+                                       lname,
+                                       &info->mti_attr, 0);
+                       if (rc != 0)
+                               CERROR("Error in cleanup of open\n");
                        mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
-                }
-        }
+               }
+       }
         EXIT;
 out_child:
        mdt_object_open_unlock(info, child, lhc, ibits, result);
index bed481a..18bea87 100644 (file)
@@ -683,7 +683,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         struct mdt_lock_handle  *child_lh;
         struct lu_name          *lname;
         int                      rc;
-        ENTRY;
+       int                      no_name = 0;
+       ENTRY;
 
         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
                   rr->rr_name);
@@ -732,8 +733,30 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        /* lookup child object along with version checking */
        fid_zero(child_fid);
        rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
-       if (rc != 0)
-               GOTO(unlock_parent, rc);
+       if (rc != 0) {
+               /* Name might not be able to find during resend of
+                * remote unlink, considering following case.
+                * dir_A is a remote directory, the name entry of
+                * dir_A is on MDT0, the directory is on MDT1,
+                *
+                * 1. client sends unlink req to MDT1.
+                * 2. MDT1 sends name delete update to MDT0.
+                * 3. name entry is being deleted in MDT0 synchronously.
+                * 4. MDT1 is restarted.
+                * 5. client resends unlink req to MDT1. So it can not
+                *    find the name entry on MDT0 anymore.
+                * In this case, MDT1 only needs to destory the local
+                * directory.
+                * */
+               if (mdt_object_remote(mp) && rc == -ENOENT &&
+                   !fid_is_zero(rr->rr_fid2) &&
+                   lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+                       no_name = 1;
+                       *child_fid = *rr->rr_fid2;
+                } else {
+                       GOTO(unlock_parent, rc);
+                }
+       }
 
        if (fid_is_obf(child_fid) || fid_is_dot_lustre(child_fid))
                GOTO(unlock_parent, rc = -EPERM);
@@ -779,7 +802,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                        ma->ma_valid = 0;
                        mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
                        rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
-                                       NULL, lname, ma);
+                                       NULL, lname, ma, no_name);
                        GOTO(put_child, rc);
                }
                /* Revoke the LOOKUP lock of the remote object granted by
@@ -801,7 +824,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                GOTO(put_child, rc = -EPERM);
        }
 
-        rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
+       rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
                              MDT_CROSS_LOCK);
        if (rc != 0) {
                GOTO(put_child, rc);
@@ -818,12 +841,13 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
-        rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
-                        mdt_object_child(mc), lname, ma);
+
+       rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
+                       mdt_object_child(mc), lname, ma, no_name);
        if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
                rc = mdt_attr_get_complex(info, mc, ma);
-        if (rc == 0)
-                mdt_handle_last_unlink(info, mc, ma);
+       if (rc == 0)
+               mdt_handle_last_unlink(info, mc, ma);
 
         if (ma->ma_valid & MA_INODE) {
                 switch (ma->ma_attr.la_mode & S_IFMT) {
index 83671d5..0f4424d 100644 (file)
  */
 /*
  * Copyright (c) 2013, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdt/out_handler.c
  *
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
+/* Current out and mdt shared the same thread info, but in the future,
+ * this should be decoupled with MDT XXX*/
+#define out_thread_info                mdt_thread_info
+#define out_thread_key         mdt_thread_key
+
+struct out_thread_info *out_env_info(const struct lu_env *env)
+{
+       struct out_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &out_thread_key);
+       LASSERT(info != NULL);
+       return info;
+}
+
+static inline char *dt_obd_name(struct dt_device *dt)
+{
+       return dt->dd_lu_dev.ld_obd->obd_name;
+}
+
 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
                           tx_exec_func_t undo, char *file, int line)
 {
@@ -65,16 +80,14 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
        return &ta->ta_args[i];
 }
 
-static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
                        struct thandle_exec_args *th)
 {
-       struct dt_device *dt = mdt->mdt_bottom;
-
        memset(th, 0, sizeof(*th));
        th->ta_handle = dt_trans_create(env, dt);
        if (IS_ERR(th->ta_handle)) {
                CERROR("%s: start handle error: rc = %ld\n",
-                      mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
+                      dt_obd_name(dt), PTR_ERR(th->ta_handle));
                return PTR_ERR(th->ta_handle);
        }
        th->ta_dev = dt;
@@ -83,12 +96,12 @@ static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
        return 0;
 }
 
-static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
-                          struct thandle *th)
+static int out_trans_start(const struct lu_env *env,
+                          struct thandle_exec_args *th)
 {
        /* Always do sync commit for Phase I */
-       LASSERT(th->th_sync != 0);
-       return dt_trans_start(env, dt, th);
+       LASSERT(th->ta_handle->th_sync != 0);
+       return dt_trans_start(env, th->ta_dev, th->ta_handle);
 }
 
 static int out_trans_stop(const struct lu_env *env,
@@ -110,24 +123,23 @@ static int out_trans_stop(const struct lu_env *env,
        return rc;
 }
 
-int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
 {
-       struct thandle_exec_args *_th = &info->mti_handle;
+       struct out_thread_info *info = out_env_info(env);
        int i = 0, rc;
 
-       LASSERT(th == _th);
        LASSERT(th->ta_dev);
        LASSERT(th->ta_handle);
 
        if (th->ta_err != 0 || th->ta_argno == 0)
                GOTO(stop, rc = th->ta_err);
 
-       rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
+       rc = out_trans_start(env, th);
        if (unlikely(rc))
                GOTO(stop, rc);
 
        for (i = 0; i < th->ta_argno; i++) {
-               rc = th->ta_args[i].exec_fn(info, th->ta_handle,
+               rc = th->ta_args[i].exec_fn(env, th->ta_handle,
                                            &th->ta_args[i]);
                if (unlikely(rc)) {
                        CDEBUG(D_INFO, "error during execution of #%u from"
@@ -136,16 +148,19 @@ int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
                        while (--i >= 0) {
                                LASSERTF(th->ta_args[i].undo_fn != NULL,
                                    "can't undo changes, hope for failover!\n");
-                               th->ta_args[i].undo_fn(info, th->ta_handle,
+                               th->ta_args[i].undo_fn(env, th->ta_handle,
                                                       &th->ta_args[i]);
                        }
                        break;
                }
        }
+
+       /* Only fail for real update */
+       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
 stop:
        CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
-       out_trans_stop(info->mti_env, th, rc);
+              dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
+       out_trans_stop(env, th, rc);
        th->ta_handle = NULL;
        th->ta_argno = 0;
        th->ta_err = 0;
@@ -153,41 +168,56 @@ stop:
        RETURN(rc);
 }
 
-static struct dt_object *out_get_dt_obj(struct lu_object *obj)
+static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
+                           struct dt_object *obj, struct update_reply *reply,
+                           int index)
 {
-       struct mdt_device *mdt;
-       struct dt_device *dt;
-       struct lu_object *bottom_obj;
+       CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
+              dt_obd_name(dt), reply, index, 0);
 
-       mdt = lu2mdt_dev(obj->lo_dev);
-       dt = mdt->mdt_bottom;
+       update_insert_reply(reply, NULL, 0, index, 0);
+       return;
+}
 
-       bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
-       if (bottom_obj == NULL)
-               return ERR_PTR(-ENOENT);
+typedef void (*out_reconstruct_t)(const struct lu_env *env,
+                                 struct dt_device *dt,
+                                 struct dt_object *obj,
+                                 struct update_reply *reply,
+                                 int index);
+
+static inline int out_check_resent(const struct lu_env *env,
+                                  struct dt_device *dt,
+                                  struct dt_object *obj,
+                                  struct ptlrpc_request *req,
+                                  out_reconstruct_t reconstruct,
+                                  struct update_reply *reply,
+                                  int index)
+{
+       if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+               return 0;
 
-       return lu2dt_obj(bottom_obj);
+       if (req_xid_is_last(req)) {
+               reconstruct(env, dt, obj, reply, index);
+               return 1;
+       }
+       DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
+                req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
+       return 0;
 }
 
-static struct dt_object *out_object_find(struct mdt_thread_info *info,
-                                        struct lu_fid *fid)
+static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+                          struct thandle *th)
 {
-       struct lu_object        *obj;
-       struct dt_object        *dt_obj;
-
-       obj = lu_object_find(info->mti_env,
-                            &info->mti_mdt->mdt_md_dev.md_lu_dev,
-                            fid, NULL);
-       if (IS_ERR(obj))
-               return (struct dt_object *)obj;
-
-       dt_obj = out_get_dt_obj(obj);
-       if (IS_ERR(dt_obj)) {
-               lu_object_put(info->mti_env, obj);
-               return ERR_PTR(-ENOENT);
-       }
+       int rc;
 
-       return dt_obj;
+       CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
+              PFID(lu_object_fid(&dt_obj->do_lu)));
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_destroy(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
+
+       return rc;
 }
 
 /**
@@ -196,53 +226,46 @@ static struct dt_object *out_object_find(struct mdt_thread_info *info,
  * declare phase, i.e. if declare succeed, it should make sure
  * the following executing phase succeed in anyway, so these undo
  * should be useless for most of the time in Phase I
- **/
-int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
+ */
+int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
                       struct tx_arg *arg)
 {
-       struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_destroy(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
-
-       /* we don't like double failures */
+       rc = out_obj_destroy(env, arg->object, th);
        if (rc != 0)
                CERROR("%s: undo failure, we are doomed!: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), rc);
+                      dt_obd_name(th->th_dev), rc);
        return rc;
 }
 
-int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
+int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
                       struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
+       CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
+              dt_obd_name(th->th_dev),
               PFID(lu_object_fid(&arg->object->do_lu)),
               arg->u.create.dof.dof_type,
               arg->u.create.attr.la_mode & S_IFMT);
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_create(env, dt_obj, &arg->u.create.attr,
                       &arg->u.create.hint, &arg->u.create.dof, th);
 
-       dt_write_unlock(info->mti_env, dt_obj);
-       CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
-              arg->u.create.attr.la_mode, arg->index);
+       dt_write_unlock(env, dt_obj);
+
+       CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
+static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
                           struct lu_attr *attr, struct lu_fid *parent_fid,
                           struct dt_object_format *dof,
                           struct thandle_exec_args *th,
@@ -252,7 +275,7 @@ static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
        struct tx_arg *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
+       th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
                                       th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
@@ -275,7 +298,7 @@ static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
        return 0;
 }
 
-static int out_create(struct mdt_thread_info *info)
+static int out_create(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -319,45 +342,50 @@ static int out_create(struct mdt_thread_info *info)
                }
        }
 
-       rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
+       if (lu_object_exists(&obj->do_lu))
+               RETURN(-EEXIST);
+
+       rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
+                          &info->mti_handle,
                           info->mti_u.update.mti_update_reply,
                           info->mti_u.update.mti_update_reply_index);
 
        RETURN(rc);
 }
 
-static int out_tx_attr_set_undo(struct mdt_thread_info *info,
+static int out_tx_attr_set_undo(const struct lu_env *env,
                                struct thandle *th, struct tx_arg *arg)
 {
        CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
-              mdt_obd_name(info->mti_mdt),
+              dt_obd_name(th->th_dev),
               PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
 
        return -ENOTSUPP;
 }
 
-static int out_tx_attr_set_exec(struct mdt_thread_info *info,
-                               struct thandle *th, struct tx_arg *arg)
+static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
+                               struct tx_arg *arg)
 {
        struct dt_object        *dt_obj = arg->object;
        int                     rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "attr set "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
+              PFID(lu_object_fid(&dt_obj->do_lu)));
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
                         th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_unlock(env, dt_obj);
+
+       CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_attr_set(struct mdt_thread_info *info,
+static int __out_tx_attr_set(const struct lu_env *env,
                             struct dt_object *dt_obj,
                             const struct lu_attr *attr,
                             struct thandle_exec_args *th,
@@ -367,8 +395,7 @@ static int __out_tx_attr_set(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
-                                        th->ta_handle);
+       th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -383,7 +410,7 @@ static int __out_tx_attr_set(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_attr_set(struct mdt_thread_info *info)
+static int out_attr_set(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct lu_attr          *attr = &info->mti_attr.ma_attr;
@@ -408,14 +435,14 @@ static int out_attr_set(struct mdt_thread_info *info)
        lustre_get_wire_obdo(lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
-       rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
+       rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
                             info->mti_u.update.mti_update_reply,
                             info->mti_u.update.mti_update_reply_index);
 
        RETURN(rc);
 }
 
-static int out_attr_get(struct mdt_thread_info *info)
+static int out_attr_get(struct out_thread_info *info)
 {
        struct obdo             *obdo = &info->mti_u.update.mti_obdo;
        const struct lu_env     *env = info->mti_env;
@@ -476,12 +503,17 @@ static int out_attr_get(struct mdt_thread_info *info)
 
 out_unlock:
        dt_read_unlock(env, obj);
+
+       CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
+              mdt_obd_name(info->mti_mdt),
+              info->mti_u.update.mti_update_reply, 0, rc);
+
        update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
                            sizeof(*obdo), 0, rc);
        RETURN(rc);
 }
 
-static int out_xattr_get(struct mdt_thread_info *info)
+static int out_xattr_get(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        const struct lu_env     *env = info->mti_env;
@@ -529,7 +561,7 @@ out:
        RETURN(rc);
 }
 
-static int out_index_lookup(struct mdt_thread_info *info)
+static int out_index_lookup(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        const struct lu_env     *env = info->mti_env;
@@ -569,29 +601,33 @@ static int out_index_lookup(struct mdt_thread_info *info)
 
 out_unlock:
        dt_read_unlock(env, obj);
+
+       CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
+              mdt_obd_name(info->mti_mdt),
+              info->mti_u.update.mti_update_reply, 0, rc);
+
        update_insert_reply(info->mti_u.update.mti_update_reply,
                            &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
                            0, rc);
        RETURN(rc);
 }
 
-static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
+static int out_tx_xattr_set_exec(const struct lu_env *env,
                                 struct thandle *th,
                                 struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
+              dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
+              arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
-       CDEBUG(D_OTHER, "attr set "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
                          arg->u.xattr_set.name, arg->u.xattr_set.flags,
                          th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_unlock(env, dt_obj);
        /**
         * Ignore errors if this is LINK EA
         **/
@@ -599,16 +635,15 @@ static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
                                    strlen(XATTR_NAME_LINK))))
                rc = 0;
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
-       CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
-              arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
-              arg->u.xattr_set.flags);
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_xattr_set(struct mdt_thread_info *info,
+static int __out_tx_xattr_set(const struct lu_env *env,
                              struct dt_object *dt_obj,
                              const struct lu_buf *buf,
                              const char *name, int flags,
@@ -619,7 +654,7 @@ static int __out_tx_xattr_set(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
+       th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
                                          flags, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
@@ -637,7 +672,7 @@ static int __out_tx_xattr_set(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_xattr_set(struct mdt_thread_info *info)
+static int out_xattr_set(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -676,49 +711,61 @@ static int out_xattr_set(struct mdt_thread_info *info)
 
        flag = le32_to_cpu(*(int *)tmp);
 
-       rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
-                        info->mti_u.update.mti_update_reply,
-                        info->mti_u.update.mti_update_reply_index);
-
+       rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
+                             &info->mti_handle,
+                             info->mti_u.update.mti_update_reply,
+                             info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
-                              struct tx_arg *arg)
+static int out_obj_ref_add(const struct lu_env *env,
+                          struct dt_object *dt_obj,
+                          struct thandle *th)
 {
-       struct dt_object *dt_obj = arg->object;
+       int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_ref_add(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
 
-       CDEBUG(D_OTHER, "ref add "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       return rc;
+}
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       dt_ref_add(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
+static int out_obj_ref_del(const struct lu_env *env,
+                          struct dt_object *dt_obj,
+                          struct thandle *th)
+{
+       int rc;
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
-       return 0;
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_ref_del(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
+
+       return rc;
 }
 
-static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
+       int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       rc = out_obj_ref_add(env, dt_obj, th);
 
-       CDEBUG(D_OTHER, "ref del "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       dt_ref_del(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       return rc;
+}
 
-       return 0;
+static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       return out_obj_ref_del(env, arg->object, th);
 }
 
-static int __out_tx_ref_add(struct mdt_thread_info *info,
+static int __out_tx_ref_add(const struct lu_env *env,
                            struct dt_object *dt_obj,
                            struct thandle_exec_args *th,
                            struct update_reply *reply,
@@ -727,8 +774,7 @@ static int __out_tx_ref_add(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
-                                       th->ta_handle);
+       th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -745,34 +791,42 @@ static int __out_tx_ref_add(struct mdt_thread_info *info,
 /**
  * increase ref of the object
  **/
-static int out_ref_add(struct mdt_thread_info *info)
+static int out_ref_add(struct out_thread_info *info)
 {
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
        int               rc;
 
        ENTRY;
 
-       rc = out_tx_ref_add(info, obj, &info->mti_handle,
+       rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
                            info->mti_u.update.mti_update_reply,
                            info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
-       out_tx_ref_add_undo(info, th, arg);
-       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
-       return 0;
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       rc = out_obj_ref_del(env, dt_obj, th);
+
+       CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
 }
 
-static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
-       return out_tx_ref_add_exec(info, th, arg);
+       return out_obj_ref_add(env, arg->object, th);
 }
 
-static int __out_tx_ref_del(struct mdt_thread_info *info,
+static int __out_tx_ref_del(const struct lu_env *env,
                            struct dt_object *dt_obj,
                            struct thandle_exec_args *th,
                            struct update_reply *reply,
@@ -781,7 +835,7 @@ static int __out_tx_ref_del(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
+       th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -795,7 +849,7 @@ static int __out_tx_ref_del(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_ref_del(struct mdt_thread_info *info)
+static int out_ref_del(struct out_thread_info *info)
 {
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
        int               rc;
@@ -805,67 +859,79 @@ static int out_ref_del(struct mdt_thread_info *info)
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
 
-       rc = out_tx_ref_del(info, obj, &info->mti_handle,
+       rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
                            info->mti_u.update.mti_update_reply,
                            info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_index_insert_exec(struct mdt_thread_info *info,
-                                   struct thandle *th, struct tx_arg *arg)
+static int out_obj_index_insert(const struct lu_env *env,
+                               struct dt_object *dt_obj,
+                               const struct dt_rec *rec,
+                               const struct dt_key *key,
+                               struct thandle *th)
 {
-       struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)),
-              (char *)arg->u.insert.key,
-              PFID((struct lu_fid *)arg->u.insert.rec));
+       CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
+              dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+              (char *)key, PFID((struct lu_fid *)rec));
 
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
+       if (dt_try_as_dir(env, dt_obj) == 0)
                return -ENOTDIR;
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
-                      arg->u.insert.key, th, NULL, 0);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
+       dt_write_unlock(env, dt_obj);
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       return rc;
+}
+
+static int out_obj_index_delete(const struct lu_env *env,
+                               struct dt_object *dt_obj,
+                               const struct dt_key *key,
+                               struct thandle *th)
+{
+       int rc;
+
+       CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
+              dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+              (char *)key);
+
+       if (dt_try_as_dir(env, dt_obj) == 0)
+               return -ENOTDIR;
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_delete(env, dt_obj, key, th, NULL);
+       dt_write_unlock(env, dt_obj);
 
        return rc;
 }
 
-static int out_tx_index_insert_undo(struct mdt_thread_info *info,
+static int out_tx_index_insert_exec(const struct lu_env *env,
                                    struct thandle *th, struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
+                                 arg->u.insert.key, th);
 
-       CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
-              PFID(lu_object_fid(&arg->object->do_lu)),
-              (char *)arg->u.insert.key);
-
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
-               CERROR("%s: "DFID" is not directory: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
-               return -ENOTDIR;
-       }
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_index_insert(struct mdt_thread_info *info,
+static int out_tx_index_insert_undo(const struct lu_env *env,
+                                   struct thandle *th, struct tx_arg *arg)
+{
+       return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+}
+
+static int __out_tx_index_insert(const struct lu_env *env,
                                 struct dt_object *dt_obj,
                                 char *name, struct lu_fid *fid,
                                 struct thandle_exec_args *th,
@@ -877,11 +943,11 @@ static int __out_tx_index_insert(struct mdt_thread_info *info,
        LASSERT(th->ta_handle != NULL);
 
        if (lu_object_exists(&dt_obj->do_lu)) {
-               if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+               if (dt_try_as_dir(env, dt_obj) == 0) {
                        th->ta_err = -ENOTDIR;
                        return th->ta_err;
                }
-               th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
+               th->ta_err = dt_declare_insert(env, dt_obj,
                                               (struct dt_rec *)fid,
                                               (struct dt_key *)name,
                                               th->ta_handle);
@@ -904,7 +970,7 @@ static int __out_tx_index_insert(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_index_insert(struct mdt_thread_info *info)
+static int out_index_insert(struct out_thread_info *info)
 {
        struct update     *update = info->mti_u.update.mti_update;
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
@@ -936,27 +1002,39 @@ static int out_index_insert(struct mdt_thread_info *info)
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
+       rc = out_tx_index_insert(info->mti_env, obj, name, fid,
+                                &info->mti_handle,
                                 info->mti_u.update.mti_update_reply,
                                 info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_index_delete_exec(struct mdt_thread_info *info,
+static int out_tx_index_delete_exec(const struct lu_env *env,
                                    struct thandle *th,
                                    struct tx_arg *arg)
 {
-       return out_tx_index_insert_undo(info, th, arg);
+       int rc;
+
+       rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+
+       CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
 }
 
-static int out_tx_index_delete_undo(struct mdt_thread_info *info,
+static int out_tx_index_delete_undo(const struct lu_env *env,
                                    struct thandle *th,
                                    struct tx_arg *arg)
 {
-       return out_tx_index_insert_exec(info, th, arg);
+       CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
+              dt_obd_name(th->th_dev), -ENOTSUPP);
+       return -ENOTSUPP;
 }
 
-static int __out_tx_index_delete(struct mdt_thread_info *info,
+static int __out_tx_index_delete(const struct lu_env *env,
                                 struct dt_object *dt_obj, char *name,
                                 struct thandle_exec_args *th,
                                 struct update_reply *reply,
@@ -964,13 +1042,13 @@ static int __out_tx_index_delete(struct mdt_thread_info *info,
 {
        struct tx_arg *arg;
 
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+       if (dt_try_as_dir(env, dt_obj) == 0) {
                th->ta_err = -ENOTDIR;
                return th->ta_err;
        }
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
+       th->ta_err = dt_declare_delete(env, dt_obj,
                                       (struct dt_key *)name,
                                       th->ta_handle);
        if (th->ta_err != 0)
@@ -988,7 +1066,7 @@ static int __out_tx_index_delete(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_index_delete(struct mdt_thread_info *info)
+static int out_index_delete(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -1004,12 +1082,84 @@ static int out_index_delete(struct mdt_thread_info *info)
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
+       rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
                                 info->mti_u.update.mti_update_reply,
                                 info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
+static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       rc = out_obj_destroy(env, dt_obj, th);
+
+       CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       RETURN(rc);
+}
+
+static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       CERROR("%s: not support destroy undo yet!: rc = %d\n",
+              dt_obd_name(th->th_dev), -ENOTSUPP);
+       return -ENOTSUPP;
+}
+
+static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+                            struct thandle_exec_args *th,
+                            struct update_reply *reply,
+                            int index, char *file, int line)
+{
+       struct tx_arg *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
+       if (th->ta_err)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
+                         file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_destroy(struct out_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct lu_fid           *fid;
+       int                     rc;
+       ENTRY;
+
+       fid = &update->u_fid;
+       fid_le_to_cpu(fid, fid);
+       if (!fid_is_sane(fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+
+       rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
+                           info->mti_u.update.mti_update_reply,
+                           info->mti_u.update.mti_update_reply_index);
+
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
 [opc - OBJ_CREATE] = {                                 \
        .mh_name    = name,                             \
@@ -1024,6 +1174,8 @@ static int out_index_delete(struct mdt_thread_info *info)
 static struct out_handler out_update_ops[] = {
        DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
                     out_create),
+       DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
+                    out_destroy),
        DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
                     out_ref_add),
        DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
@@ -1036,8 +1188,8 @@ static struct out_handler out_update_ops[] = {
                     out_xattr_set),
        DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
                     out_xattr_get),
-       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
-                    HABEO_REFERO, out_index_lookup),
+       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
+                    out_index_lookup),
        DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
                     MUTABOR | HABEO_REFERO, out_index_insert),
        DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
@@ -1065,38 +1217,43 @@ static struct out_opc_slice out_handlers[] = {
  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
  * format.
  */
-int out_handle(struct mdt_thread_info *info)
+int out_handle(struct out_thread_info *info)
 {
-       struct req_capsule        *pill = info->mti_pill;
-       struct update_buf         *ubuf;
-       struct update             *update;
-       struct thandle_exec_args  *th = &info->mti_handle;
-       int                       bufsize;
-       int                       count;
-       unsigned                  off;
-       int                       i;
-       int                       rc = 0;
-       int                       rc1 = 0;
+       struct thandle_exec_args        *th = &info->mti_handle;
+       struct req_capsule              *pill = info->mti_pill;
+       struct mdt_device               *mdt = info->mti_mdt;
+       struct dt_device                *dt = mdt->mdt_bottom;
+       const struct lu_env             *env = info->mti_env;
+       struct update_buf               *ubuf;
+       struct update                   *update;
+       struct update_reply             *update_reply;
+       int                             bufsize;
+       int                             count;
+       int                             old_batchid = -1;
+       unsigned                        off;
+       int                             i;
+       int                             rc = 0;
+       int                             rc1 = 0;
        ENTRY;
 
        req_capsule_set(pill, &RQF_UPDATE_OBJ);
        bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
        if (bufsize != UPDATE_BUFFER_SIZE) {
                CERROR("%s: invalid bufsize %d: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
+                      mdt_obd_name(mdt), bufsize, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
        if (ubuf == NULL) {
-               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
+               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
                       -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
                CERROR("%s: invalid magic %x expect %x: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
+                      mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
                       UPDATE_BUFFER_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
@@ -1104,7 +1261,7 @@ int out_handle(struct mdt_thread_info *info)
        count = le32_to_cpu(ubuf->ub_count);
        if (count <= 0) {
                CERROR("%s: No update!: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      mdt_obd_name(mdt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -1113,16 +1270,16 @@ int out_handle(struct mdt_thread_info *info)
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
                CERROR("%s: Can't pack response: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), rc);
+                      mdt_obd_name(mdt), rc);
                RETURN(rc);
        }
 
        /* Prepare the update reply buffer */
-       info->mti_u.update.mti_update_reply =
-                       req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
-       update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
+       update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
+       update_init_reply_buf(update_reply, count);
+       info->mti_u.update.mti_update_reply = update_reply;
 
-       rc = out_tx_start(info->mti_env, info->mti_mdt, th);
+       rc = out_tx_start(env, dt, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -1133,15 +1290,30 @@ int out_handle(struct mdt_thread_info *info)
                struct dt_object   *dt_obj;
 
                update = (struct update *)((char *)ubuf + off);
+               if (old_batchid == -1) {
+                       old_batchid = update->u_batchid;
+               } else if (old_batchid != update->u_batchid) {
+                       /* Stop the current update transaction,
+                        * create a new one */
+                       rc = out_tx_end(env, th);
+                       if (rc != 0)
+                               RETURN(rc);
+
+                       rc = out_tx_start(env, dt, th);
+                       if (rc != 0)
+                               RETURN(rc);
+                       old_batchid = update->u_batchid;
+               }
 
                fid_le_to_cpu(&update->u_fid, &update->u_fid);
                if (!fid_is_sane(&update->u_fid)) {
                        CERROR("%s: invalid FID "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(&update->u_fid), -EPROTO);
+                              mdt_obd_name(mdt), PFID(&update->u_fid),
+                              -EPROTO);
                        GOTO(out, rc = err_serious(-EPROTO));
                }
-               dt_obj = out_object_find(info, &update->u_fid);
+
+               dt_obj = dt_locate(env, dt, &update->u_fid);
                if (IS_ERR(dt_obj))
                        GOTO(out, rc = PTR_ERR(dt_obj));
 
@@ -1151,22 +1323,32 @@ int out_handle(struct mdt_thread_info *info)
 
                h = mdt_handler_find(update->u_type, out_handlers);
                if (likely(h != NULL)) {
+                       /* For real modification RPC, check if the update
+                        * has been executed */
+                       if (h->mh_flags & MUTABOR) {
+                               struct ptlrpc_request *req = mdt_info_req(info);
+
+                               if (out_check_resent(env, dt, dt_obj, req,
+                                                    out_reconstruct,
+                                                    update_reply, i))
+                                       GOTO(next, rc);
+                       }
+
                        rc = h->mh_act(info);
                } else {
                        CERROR("%s: The unsupported opc: 0x%x\n",
-                              mdt_obd_name(info->mti_mdt), update->u_type);
-                       lu_object_put(info->mti_env, &dt_obj->do_lu);
+                              mdt_obd_name(mdt), update->u_type);
+                       lu_object_put(env, &dt_obj->do_lu);
                        GOTO(out, rc = -ENOTSUPP);
                }
-               lu_object_put(info->mti_env, &dt_obj->do_lu);
+next:
+               lu_object_put(env, &dt_obj->do_lu);
                if (rc < 0)
                        GOTO(out, rc);
                off += cfs_size_round(update_size(update));
        }
-
 out:
-       rc1 = out_tx_end(info, th);
+       rc1 = out_tx_end(env, th);
        rc = rc == 0 ? rc1 : rc;
-       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;
        RETURN(rc);
 }
index 6df84fc..5137315 100644 (file)
@@ -1956,12 +1956,12 @@ static int echo_md_destroy_internal(const struct lu_env *env,
         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
-        rc = mdo_unlink(env, parent, lu2md(child), lname, ma);
-        if (rc) {
-                CERROR("Can not unlink child %s: rc = %d\n",
-                        lname->ln_name, rc);
-                GOTO(out_put, rc);
-        }
+       rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
+       if (rc) {
+               CERROR("Can not unlink child %s: rc = %d\n",
+                       lname->ln_name, rc);
+               GOTO(out_put, rc);
+       }
         CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 out_put:
index f4493d0..f9b8aaa 100644 (file)
@@ -90,8 +90,8 @@ static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
        int                     rc;
        ENTRY;
 
-       rc = osp_prep_update_req(env, osp, update->ur_buf,
-                                UPDATE_BUFFER_SIZE, &req);
+       rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
+                                &req);
        if (rc)
                RETURN(rc);
 
@@ -136,7 +136,7 @@ static struct update_request
 
        CFS_INIT_LIST_HEAD(&update->ur_list);
        update->ur_dt = dt;
-
+       update->ur_batchid = 0;
        update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
        update->ur_buf->ub_count = 0;
 
@@ -197,8 +197,8 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
  */
 static int osp_insert_update(const struct lu_env *env,
                             struct update_request *update, int op,
-                            struct lu_fid *fid, int count, int *lens,
-                            char **bufs)
+                            struct lu_fid *fid, int count,
+                            int *lens, char **bufs)
 {
        struct update_buf    *ubuf = update->ur_buf;
        struct update        *obj_update;
@@ -236,6 +236,7 @@ static int osp_insert_update(const struct lu_env *env,
        /* fill the update into the update buffer */
        fid_cpu_to_le(&obj_update->u_fid, fid);
        obj_update->u_type = cpu_to_le32(op);
+       obj_update->u_batchid = update->ur_batchid;
        for (i = 0; i < count; i++)
                obj_update->u_lens[i] = cpu_to_le32(lens[i]);
 
@@ -270,6 +271,11 @@ static struct update_request
        return NULL;
 }
 
+static inline void osp_md_add_update_batchid(struct update_request *update)
+{
+       update->ur_batchid++;
+}
+
 /**
  * Find one loc in th_dev/dev_obj_update for the update,
  * Because only one thread can access this thandle, no need
@@ -337,6 +343,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        int                     buf_count;
        int                     rc;
 
+
        update = osp_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
@@ -365,12 +372,53 @@ static int osp_md_declare_object_create(const struct lu_env *env,
                buf_count++;
        }
 
+       if (lu_object_exists(&dt->do_lu)) {
+               /* If the object already exists, we needs to destroy
+                * this orphan object first.
+                *
+                * The scenario might happen in this case
+                *
+                * 1. client send remote create to MDT0.
+                * 2. MDT0 send create update to MDT1.
+                * 3. MDT1 finished create synchronously.
+                * 4. MDT0 failed and reboot.
+                * 5. client resend remote create to MDT0.
+                * 6. MDT0 tries to resend create update to MDT1,
+                *    but find the object already exists
+                */
+               CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
+
+               rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+                                      NULL, NULL);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
+                       /* decrease for ".." */
+                       rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+                                              0, NULL, NULL);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+
+               rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+                                      NULL);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+               /* Increase batchid to add this orphan object deletion
+                * to separate transaction */
+               osp_md_add_update_batchid(update);
+       }
+
        rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
                               bufs);
-       if (rc) {
+out:
+       if (rc)
                CERROR("%s: Insert update error: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name, rc);
-       }
 
        return rc;
 }
index 2426be1..7cb1d9a 100644 (file)
@@ -370,16 +370,13 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
                po->opo_obj.do_ops = &osp_md_obj_ops;
                o->lo_header->loh_attr |= LOHA_REMOTE;
                po->opo_obj.do_lock_ops = &osp_md_lock_ops;
-               /* Do not need get attr for new object */
-               if (!(conf != NULL && (conf->loc_flags & LOC_F_NEW) != 0)) {
-                       rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
-                                                            la, NULL);
-                       if (rc == 0)
-                               o->lo_header->loh_attr |=
-                                       LOHA_EXISTS | (la->la_mode & S_IFMT);
-                       if (rc == -ENOENT)
-                               rc = 0;
-               }
+               rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
+                                                    la, NULL);
+               if (rc == 0)
+                       o->lo_header->loh_attr |=
+                               LOHA_EXISTS | (la->la_mode & S_IFMT);
+               if (rc == -ENOENT)
+                       rc = 0;
        }
        RETURN(rc);
 }
index 8d8fd5c..e5f9e70 100644 (file)
@@ -4496,10 +4496,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct update, u_type));
        LASSERTF((int)sizeof(((struct update *)0)->u_type) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct update *)0)->u_type));
-       LASSERTF((int)offsetof(struct update, u_padding) == 4, "found %lld\n",
-                (long long)(int)offsetof(struct update, u_padding));
-       LASSERTF((int)sizeof(((struct update *)0)->u_padding) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct update *)0)->u_padding));
+       LASSERTF((int)offsetof(struct update, u_batchid) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_batchid));
+       LASSERTF((int)sizeof(((struct update *)0)->u_batchid) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_batchid));
        LASSERTF((int)offsetof(struct update, u_fid) == 8, "found %lld\n",
                 (long long)(int)offsetof(struct update, u_fid));
        LASSERTF((int)sizeof(((struct update *)0)->u_fid) == 16, "found %lld\n",
index c765d2e..148766a 100755 (executable)
@@ -620,7 +620,6 @@ test_22a () {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        wait $CLIENT_PID || error "lfs mkdir failed"
@@ -647,7 +646,6 @@ test_22b () {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
        wait $CLIENT_PID || error "lfs mkdir failed"
@@ -674,8 +672,8 @@ test_22c () {
 
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       # OBD_FAIL_UPDATE_OBJ_NET_REP    0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -701,8 +699,8 @@ test_22d () {
 
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       # OBD_FAIL_UPDATE_OBJ_NET_REP    0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -808,8 +806,8 @@ test_23c () {
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir ||
                        error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       # OBD_FAIL_UPDATE_OBJ_NET_REP    0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        do_node $CLIENT1 rmdir $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -837,8 +835,8 @@ test_23d () {
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir ||
                        error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        do_node $CLIENT1 rmdir $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
index 8816fab..abc0c5e 100755 (executable)
@@ -2014,11 +2014,10 @@ test_80a() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds$((MDTIDX + 1))
 
@@ -2042,11 +2041,10 @@ test_80b() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
 
@@ -2070,11 +2068,10 @@ test_80c() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
@@ -2094,11 +2091,10 @@ test_80d() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
+       #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
@@ -2126,7 +2122,6 @@ test_80e() {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
 
@@ -2153,7 +2148,6 @@ test_80f() {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds$((MDTIDX + 1))
 
@@ -2181,7 +2175,6 @@ test_80g() {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
@@ -2205,7 +2198,6 @@ test_80h() {
        do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
@@ -2231,11 +2223,11 @@ test_81a() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       touch $remote_dir
+       # OBD_FAIL_OBJ_UPDATE_NET_REP       0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds$((MDTIDX + 1))
 
@@ -2261,11 +2253,10 @@ test_81b() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       # OBD_FAIL_OBJ_UPDATE_NET_REP       0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
 
@@ -2292,11 +2283,10 @@ test_81c() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       # OBD_FAIL_OBJ_UPDATE_NET_REP       0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
@@ -2319,11 +2309,10 @@ test_81d() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
+       # OBD_FAIL_OBJ_UPDATE_NET_REP       0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
@@ -2385,7 +2374,6 @@ test_81f() {
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds$((MDTIDX + 1))
 
@@ -2416,7 +2404,6 @@ test_81g() {
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
@@ -2443,7 +2430,6 @@ test_81h() {
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
index 3b5214e..42c34d5 100644 (file)
@@ -3848,11 +3848,11 @@ drop_reint_reply() {
 }
 
 drop_update_reply() {
-# OBD_FAIL_UPDATE_OBJ_NET
+# OBD_FAIL_UPDATE_OBJ_NET_REP
        local index=$1
        shift 1
        RC=0
-       do_facet mds${index} lctl set_param fail_loc=0x1500
+       do_facet mds${index} lctl set_param fail_loc=0x1701
        do_facet client "$@" || RC=$?
        do_facet mds${index} lctl set_param fail_loc=0
        return $RC
index 0bac42c..842e374 100644 (file)
@@ -2023,7 +2023,7 @@ static void check_update(void)
        BLANK_LINE();
        CHECK_STRUCT(update);
        CHECK_MEMBER(update, u_type);
-       CHECK_MEMBER(update, u_padding);
+       CHECK_MEMBER(update, u_batchid);
        CHECK_MEMBER(update, u_fid);
        CHECK_MEMBER(update, u_lens);
        CHECK_MEMBER(update, u_bufs);
index 7977902..d426a82 100644 (file)
@@ -4504,10 +4504,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct update, u_type));
        LASSERTF((int)sizeof(((struct update *)0)->u_type) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct update *)0)->u_type));
-       LASSERTF((int)offsetof(struct update, u_padding) == 4, "found %lld\n",
-                (long long)(int)offsetof(struct update, u_padding));
-       LASSERTF((int)sizeof(((struct update *)0)->u_padding) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct update *)0)->u_padding));
+       LASSERTF((int)offsetof(struct update, u_batchid) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_batchid));
+       LASSERTF((int)sizeof(((struct update *)0)->u_batchid) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_batchid));
        LASSERTF((int)offsetof(struct update, u_fid) == 8, "found %lld\n",
                 (long long)(int)offsetof(struct update, u_fid));
        LASSERTF((int)sizeof(((struct update *)0)->u_fid) == 16, "found %lld\n",