Whamcloud - gitweb
LU-1187 mdt: add out handler for object update
authorwangdi <di.wang@whamcloud.com>
Sat, 16 Nov 2013 18:53:15 +0000 (10:53 -0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 31 Jan 2013 21:35:12 +0000 (16:35 -0500)
Add object update handler on remote MDT. Because the RPC
only includes object updates, (no metadata operation), the
out_handler will skip MDD layer and call osd layer directly.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I9d553145cedfffcbb26d839ed8c7d9cc0887ed86
Reviewed-on: http://review.whamcloud.com/4928
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/md_object.h
lustre/include/obd_support.h
lustre/mdd/mdd_internal.h
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_mds.c
lustre/mdt/out_handler.c [new file with mode: 0644]

index e966fce..04f5a52 100644 (file)
@@ -840,6 +840,17 @@ static inline int mdo_rename_tgt(const struct lu_env *env,
         }
 }
 
+/**
+ * Used in MDD/OUT layer for object lock rule
+ **/
+enum mdd_object_role {
+       MOR_SRC_PARENT,
+       MOR_SRC_CHILD,
+       MOR_TGT_PARENT,
+       MOR_TGT_CHILD,
+       MOR_TGT_ORPHAN
+};
+
 struct dt_device;
 /**
  * Structure to hold object information. This is used to create object
index 3a91473..b9a034d 100644 (file)
@@ -255,7 +255,6 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x185
 #define OBD_FAIL_MDS_GET_INFO_NET        0x186
 #define OBD_FAIL_MDS_DQACQ_NET           0x187
-#define OBD_FAIL_MDS_OBJ_UPDATE_NET      0x188
 
 /* OI scrub */
 #define OBD_FAIL_OSD_SCRUB_DELAY        0x190
@@ -461,6 +460,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LOV_INIT                          0x1403
 #define OBD_FAIL_GLIMPSE_DELAY                     0x1404
 
+#define OBD_FAIL_UPDATE_OBJ_NET                            0x1500
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   CFS_FAIL_PRECHECK(id)
 #define OBD_FAIL_CHECK(id)                      CFS_FAIL_CHECK(id)
index 169d05c..f2d21a4 100644 (file)
@@ -142,14 +142,6 @@ enum mod_flags {
         ORPHAN_OBJ = 1 << 3,
 };
 
-enum mdd_object_role {
-        MOR_SRC_PARENT,
-        MOR_SRC_CHILD,
-        MOR_TGT_PARENT,
-        MOR_TGT_CHILD,
-        MOR_TGT_ORPHAN
-};
-
 struct mdd_object {
         struct md_object   mod_obj;
         /* open count */
index 4865a02..3867fd4 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
-mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o out_handler.o
 
 @INCLUDE_RULES@
index 7f10bc4..2f98d85 100644 (file)
@@ -2605,8 +2605,7 @@ void mdt_object_unlock_put(struct mdt_thread_info * info,
         mdt_object_put(info->mti_env, o);
 }
 
-static struct mdt_handler *mdt_handler_find(__u32 opc,
-                                            struct mdt_opc_slice *supported)
+struct mdt_handler *mdt_handler_find(__u32 opc, struct mdt_opc_slice *supported)
 {
         struct mdt_opc_slice *s;
         struct mdt_handler   *h;
@@ -3109,6 +3108,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg)
        case MDS_HSM_ACTION:
         case MDS_QUOTACHECK:
         case MDS_QUOTACTL:
+       case UPDATE_OBJ:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
         case SEQ_QUERY:
index 9abfbe6..37d7bc5 100644 (file)
@@ -102,7 +102,7 @@ struct mdt_device {
        struct seq_server_site     mdt_seq_site;
         struct ptlrpc_service     *mdt_regular_service;
         struct ptlrpc_service     *mdt_readpage_service;
-        struct ptlrpc_service     *mdt_xmds_service;
+       struct ptlrpc_service     *mdt_out_service;
         struct ptlrpc_service     *mdt_setattr_service;
         struct ptlrpc_service     *mdt_mdsc_service;
         struct ptlrpc_service     *mdt_mdss_service;
@@ -274,6 +274,57 @@ enum mdt_reint_flag {
         MRF_OPEN_TRUNC = 1 << 0,
 };
 
+struct mdt_thread_info;
+struct tx_arg;
+typedef int (*tx_exec_func_t)(struct mdt_thread_info *, struct thandle *,
+                             struct tx_arg *);
+
+struct tx_arg {
+       tx_exec_func_t          exec_fn;
+       tx_exec_func_t          undo_fn;
+       struct dt_object        *object;
+       char                    *file;
+       int                     line;
+       struct update_reply     *reply;
+       int                     index;
+       union {
+               struct {
+                       const struct dt_rec     *rec;
+                       const struct dt_key     *key;
+               } insert;
+               struct {
+               } ref;
+               struct {
+                       struct lu_attr  attr;
+               } attr_set;
+               struct {
+                       struct lu_buf   buf;
+                       const char      *name;
+                       int             flags;
+                       __u32           csum;
+               } xattr_set;
+               struct {
+                       struct lu_attr                  attr;
+                       struct dt_allocation_hint       hint;
+                       struct dt_object_format         dof;
+                       struct lu_fid                   fid;
+               } create;
+               struct {
+                       struct lu_buf   buf;
+                       loff_t          pos;
+               } write;
+       } u;
+};
+
+#define TX_MAX_OPS       10
+struct thandle_exec_args {
+       struct thandle          *ta_handle;
+       struct dt_device        *ta_dev;
+       int                     ta_err;
+       struct tx_arg           ta_args[TX_MAX_OPS];
+       int                     ta_argno;   /* used args */
+};
+
 /*
  * Common data shared by mdt-level handlers. This is allocated per-thread to
  * reduce stack consumption.
@@ -393,6 +444,14 @@ struct mdt_thread_info {
                         struct md_attr attr;
                         struct md_som_data data;
                 } som;
+               struct {
+                       struct dt_object_format mti_update_dof;
+                       struct update_reply     *mti_update_reply;
+                       struct update           *mti_update;
+                       int                     mti_update_reply_index;
+                       struct obdo             mti_obdo;
+                       struct dt_object        *mti_dt_object;
+               } update;
         } mti_u;
 
         /* IO epoch related stuff. */
@@ -412,6 +471,8 @@ struct mdt_thread_info {
        int                        mti_big_lmm_used;
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
+       struct thandle_exec_args   mti_handle;
+       struct ldlm_enqueue_info   mti_einfo;
 };
 
 /* ptlrpc request handler for MDT. All handlers are
@@ -728,6 +789,8 @@ extern struct lprocfs_vars lprocfs_mds_obd_vars[];
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
                     struct md_hsm *mh);
 
+struct mdt_handler *mdt_handler_find(__u32 opc,
+                                    struct mdt_opc_slice *supported);
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
 int mdt_init_idmap(struct mdt_thread_info *);
@@ -1010,5 +1073,35 @@ static inline char *mdt_obd_name(struct mdt_device *mdt)
 int mds_mod_init(void);
 void mds_mod_exit(void);
 
+/* Update handlers */
+int out_handle(struct mdt_thread_info *info);
+
+#define out_tx_create(info, obj, attr, fid, dof, th, reply, idx) \
+       __out_tx_create(info, obj, attr, fid, dof, th, reply, idx, \
+                       __FILE__, __LINE__)
+
+#define out_tx_attr_set(info, obj, attr, th, reply, idx) \
+       __out_tx_attr_set(info, obj, attr, th, reply, idx, \
+                         __FILE__, __LINE__)
+
+#define out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx)     \
+       __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx,    \
+                          __FILE__, __LINE__)
+
+#define out_tx_ref_add(info, obj, th, reply, idx) \
+       __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__)
+
+#define out_tx_ref_del(info, obj, th, reply, idx) \
+       __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__)
+
+#define out_tx_index_insert(info, obj, th, name, fid, reply, idx) \
+       __out_tx_index_insert(info, obj, th, name, fid, reply, idx, \
+                             __FILE__, __LINE__)
+
+#define out_tx_index_delete(info, obj, th, name, reply, idx) \
+       __out_tx_index_delete(info, obj, th, name, reply, idx, \
+                             __FILE__, __LINE__)
+
+
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index 0a15bd3..d5d4dde 100644 (file)
@@ -63,6 +63,7 @@ struct mds_device {
        struct md_device           mds_md_dev;
        struct ptlrpc_service     *mds_regular_service;
        struct ptlrpc_service     *mds_readpage_service;
+       struct ptlrpc_service     *mds_out_service;
        struct ptlrpc_service     *mds_setattr_service;
        struct ptlrpc_service     *mds_mdsc_service;
        struct ptlrpc_service     *mds_mdss_service;
@@ -308,6 +309,46 @@ struct mdt_opc_slice mdt_fld_handlers[] = {
        }
 };
 
+/* Request with a format known in advance */
+#define DEF_UPDATE_HDL(flags, name, fn)                                        \
+       DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
+
+#define target_handler mdt_handler
+static struct target_handler out_ops[] = {
+       DEF_UPDATE_HDL(MUTABOR,         UPDATE_OBJ,     out_handle),
+};
+
+static struct mdt_opc_slice update_handlers[] = {
+       {
+               .mos_opc_start = MDS_GETATTR,
+               .mos_opc_end   = MDS_LAST_OPC,
+               .mos_hs        = mdt_mds_ops
+       },
+       {
+               .mos_opc_start = OBD_PING,
+               .mos_opc_end   = OBD_LAST_OPC,
+               .mos_hs        = mdt_obd_ops
+       },
+       {
+               .mos_opc_start = LDLM_ENQUEUE,
+               .mos_opc_end   = LDLM_LAST_OPC,
+               .mos_hs        = mdt_dlm_ops
+       },
+       {
+               .mos_opc_start = SEC_CTX_INIT,
+               .mos_opc_end   = SEC_LAST_OPC,
+               .mos_hs        = mdt_sec_ctx_ops
+       },
+       {
+               .mos_opc_start = UPDATE_OBJ,
+               .mos_opc_end   = UPDATE_LAST_OPC,
+               .mos_hs        = out_ops
+       },
+       {
+               .mos_hs        = NULL
+       }
+};
+
 static int mds_regular_handle(struct ptlrpc_request *req)
 {
        return mdt_handle_common(req, mdt_regular_handlers);
@@ -323,6 +364,11 @@ static int mds_mdsc_handle(struct ptlrpc_request *req)
        return mdt_handle_common(req, mdt_seq_handlers);
 }
 
+static int mdt_out_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, update_handlers);
+}
+
 static int mds_mdss_handle(struct ptlrpc_request *req)
 {
        return mdt_handle_common(req, mdt_seq_handlers);
@@ -345,6 +391,10 @@ static void mds_stop_ptlrpc_service(struct mds_device *m)
                ptlrpc_unregister_service(m->mds_readpage_service);
                m->mds_readpage_service = NULL;
        }
+       if (m->mds_out_service != NULL) {
+               ptlrpc_unregister_service(m->mds_out_service);
+               m->mds_out_service = NULL;
+       }
        if (m->mds_setattr_service != NULL) {
                ptlrpc_unregister_service(m->mds_setattr_service);
                m->mds_setattr_service = NULL;
@@ -508,6 +558,49 @@ static int mds_start_ptlrpc_service(struct mds_device *m)
                GOTO(err_mds_svc, rc);
        }
 
+       /* Object update service */
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_out",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_MDS_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               /*
+                * We'd like to have a mechanism to set this on a per-device
+                * basis, but alas...
+                */
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_out",
+                       .tc_thr_factor          = MDS_THR_FACTOR,
+                       .tc_nthrs_init          = MDS_NTHRS_INIT,
+                       .tc_nthrs_base          = MDS_NTHRS_BASE,
+                       .tc_nthrs_max           = MDS_NTHRS_MAX,
+                       .tc_nthrs_user          = mds_num_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_cpt                = {
+                       .cc_pattern             = mds_num_cpts,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_out_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_out_service)) {
+               rc = PTR_ERR(m->mds_out_service);
+               CERROR("failed to start out service: %d\n", rc);
+               m->mds_out_service = NULL;
+               GOTO(err_mds_svc, rc);
+       }
+
        /*
         * sequence controller service configuration
         */
diff --git a/lustre/mdt/out_handler.c b/lustre/mdt/out_handler.c
new file mode 100644 (file)
index 0000000..83671d5
--- /dev/null
@@ -0,0 +1,1172 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdt/out_handler.c
+ *
+ * Object update handler between targets.
+ *
+ * Author: di.wang <di.wang@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+#include <lustre_update.h>
+
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+
+struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
+                          tx_exec_func_t undo, char *file, int line)
+{
+       int i;
+
+       LASSERT(ta);
+       LASSERT(func);
+
+       i = ta->ta_argno;
+       LASSERT(i < UPDATE_MAX_OPS);
+
+       ta->ta_argno++;
+
+       ta->ta_args[i].exec_fn = func;
+       ta->ta_args[i].undo_fn = undo;
+       ta->ta_args[i].file    = file;
+       ta->ta_args[i].line    = line;
+
+       return &ta->ta_args[i];
+}
+
+static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
+                       struct thandle_exec_args *th)
+{
+       struct dt_device *dt = mdt->mdt_bottom;
+
+       memset(th, 0, sizeof(*th));
+       th->ta_handle = dt_trans_create(env, dt);
+       if (IS_ERR(th->ta_handle)) {
+               CERROR("%s: start handle error: rc = %ld\n",
+                      mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
+               return PTR_ERR(th->ta_handle);
+       }
+       th->ta_dev = dt;
+       /*For phase I, sync for cross-ref operation*/
+       th->ta_handle->th_sync = 1;
+       return 0;
+}
+
+static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
+                          struct thandle *th)
+{
+       /* Always do sync commit for Phase I */
+       LASSERT(th->th_sync != 0);
+       return dt_trans_start(env, dt, th);
+}
+
+static int out_trans_stop(const struct lu_env *env,
+                         struct thandle_exec_args *th, int err)
+{
+       int i;
+       int rc;
+
+       th->ta_handle->th_result = err;
+       LASSERT(th->ta_handle->th_sync != 0);
+       rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
+       for (i = 0; i < th->ta_argno; i++) {
+               if (th->ta_args[i].object != NULL) {
+                       lu_object_put(env, &th->ta_args[i].object->do_lu);
+                       th->ta_args[i].object = NULL;
+               }
+       }
+
+       return rc;
+}
+
+int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
+{
+       struct thandle_exec_args *_th = &info->mti_handle;
+       int i = 0, rc;
+
+       LASSERT(th == _th);
+       LASSERT(th->ta_dev);
+       LASSERT(th->ta_handle);
+
+       if (th->ta_err != 0 || th->ta_argno == 0)
+               GOTO(stop, rc = th->ta_err);
+
+       rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
+       if (unlikely(rc))
+               GOTO(stop, rc);
+
+       for (i = 0; i < th->ta_argno; i++) {
+               rc = th->ta_args[i].exec_fn(info, th->ta_handle,
+                                           &th->ta_args[i]);
+               if (unlikely(rc)) {
+                       CDEBUG(D_INFO, "error during execution of #%u from"
+                              " %s:%d: rc = %d\n", i, th->ta_args[i].file,
+                              th->ta_args[i].line, rc);
+                       while (--i >= 0) {
+                               LASSERTF(th->ta_args[i].undo_fn != NULL,
+                                   "can't undo changes, hope for failover!\n");
+                               th->ta_args[i].undo_fn(info, th->ta_handle,
+                                                      &th->ta_args[i]);
+                       }
+                       break;
+               }
+       }
+stop:
+       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
+              mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
+       out_trans_stop(info->mti_env, th, rc);
+       th->ta_handle = NULL;
+       th->ta_argno = 0;
+       th->ta_err = 0;
+
+       RETURN(rc);
+}
+
+static struct dt_object *out_get_dt_obj(struct lu_object *obj)
+{
+       struct mdt_device *mdt;
+       struct dt_device *dt;
+       struct lu_object *bottom_obj;
+
+       mdt = lu2mdt_dev(obj->lo_dev);
+       dt = mdt->mdt_bottom;
+
+       bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
+       if (bottom_obj == NULL)
+               return ERR_PTR(-ENOENT);
+
+       return lu2dt_obj(bottom_obj);
+}
+
+static struct dt_object *out_object_find(struct mdt_thread_info *info,
+                                        struct lu_fid *fid)
+{
+       struct lu_object        *obj;
+       struct dt_object        *dt_obj;
+
+       obj = lu_object_find(info->mti_env,
+                            &info->mti_mdt->mdt_md_dev.md_lu_dev,
+                            fid, NULL);
+       if (IS_ERR(obj))
+               return (struct dt_object *)obj;
+
+       dt_obj = out_get_dt_obj(obj);
+       if (IS_ERR(dt_obj)) {
+               lu_object_put(info->mti_env, obj);
+               return ERR_PTR(-ENOENT);
+       }
+
+       return dt_obj;
+}
+
+/**
+ * All of the xxx_undo will be used once execution failed,
+ * But because all of the required resource has been reserved in
+ * declare phase, i.e. if declare succeed, it should make sure
+ * the following executing phase succeed in anyway, so these undo
+ * should be useless for most of the time in Phase I
+ **/
+int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
+                      struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_destroy(info->mti_env, dt_obj, th);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       /* we don't like double failures */
+       if (rc != 0)
+               CERROR("%s: undo failure, we are doomed!: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), rc);
+       return rc;
+}
+
+int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
+                      struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
+              PFID(lu_object_fid(&arg->object->do_lu)),
+              arg->u.create.dof.dof_type,
+              arg->u.create.attr.la_mode & S_IFMT);
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
+                      &arg->u.create.hint, &arg->u.create.dof, th);
+
+       dt_write_unlock(info->mti_env, dt_obj);
+       CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
+              arg->u.create.attr.la_mode, arg->index);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
+}
+
+static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
+                          struct lu_attr *attr, struct lu_fid *parent_fid,
+                          struct dt_object_format *dof,
+                          struct thandle_exec_args *th,
+                          struct update_reply *reply,
+                          int index, char *file, int line)
+{
+       struct tx_arg *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
+                                      th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
+                         line);
+       LASSERT(arg);
+
+       /* release the object in out_trans_stop */
+       lu_object_get(&obj->do_lu);
+       arg->object = obj;
+       arg->u.create.attr = *attr;
+       if (parent_fid)
+               arg->u.create.fid = *parent_fid;
+       memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
+       arg->u.create.dof  = *dof;
+       arg->reply = reply;
+       arg->index = index;
+
+       return 0;
+}
+
+static int out_create(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
+       struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
+       struct lu_attr          *attr = &info->mti_attr.ma_attr;
+       struct lu_fid           *fid = NULL;
+       struct obdo             *wobdo;
+       int                     size;
+       int                     rc;
+
+       ENTRY;
+
+       wobdo = update_param_buf(update, 0, &size);
+       if (wobdo == NULL || size != sizeof(*wobdo)) {
+               CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       obdo_le_to_cpu(wobdo, wobdo);
+       lustre_get_wire_obdo(lobdo, wobdo);
+       la_from_obdo(attr, lobdo, lobdo->o_valid);
+
+       dof->dof_type = dt_mode_to_dft(attr->la_mode);
+       if (S_ISDIR(attr->la_mode)) {
+               int size;
+
+               fid = update_param_buf(update, 1, &size);
+               if (fid == NULL || size != sizeof(*fid)) {
+                       CERROR("%s: invalid fid: rc = %d\n",
+                              mdt_obd_name(info->mti_mdt), -EPROTO);
+                       RETURN(err_serious(-EPROTO));
+               }
+               fid_le_to_cpu(fid, fid);
+               if (!fid_is_sane(fid)) {
+                       CERROR("%s: invalid fid "DFID": rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(fid), -EPROTO);
+                       RETURN(err_serious(-EPROTO));
+               }
+       }
+
+       rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
+                          info->mti_u.update.mti_update_reply,
+                          info->mti_u.update.mti_update_reply_index);
+
+       RETURN(rc);
+}
+
+static int out_tx_attr_set_undo(struct mdt_thread_info *info,
+                               struct thandle *th, struct tx_arg *arg)
+{
+       CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
+              mdt_obd_name(info->mti_mdt),
+              PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
+
+       return -ENOTSUPP;
+}
+
+static int out_tx_attr_set_exec(struct mdt_thread_info *info,
+                               struct thandle *th, struct tx_arg *arg)
+{
+       struct dt_object        *dt_obj = arg->object;
+       int                     rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "attr set "DFID"\n",
+              PFID(lu_object_fid(&arg->object->do_lu)));
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
+                        th, NULL);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
+}
+
+static int __out_tx_attr_set(struct mdt_thread_info *info,
+                            struct dt_object *dt_obj,
+                            const struct lu_attr *attr,
+                            struct thandle_exec_args *th,
+                            struct update_reply *reply, int index,
+                            char *file, int line)
+{
+       struct tx_arg           *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
+                                        th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
+                         file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->u.attr_set.attr = *attr;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_attr_set(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct lu_attr          *attr = &info->mti_attr.ma_attr;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
+       struct obdo             *wobdo;
+       int                     size;
+       int                     rc;
+
+       ENTRY;
+
+       wobdo = update_param_buf(update, 0, &size);
+       if (wobdo == NULL || size != sizeof(*wobdo)) {
+               CERROR("%s: empty obdo in the update: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       attr->la_valid = 0;
+       attr->la_valid = 0;
+       obdo_le_to_cpu(wobdo, wobdo);
+       lustre_get_wire_obdo(lobdo, wobdo);
+       la_from_obdo(attr, lobdo, lobdo->o_valid);
+
+       rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
+                            info->mti_u.update.mti_update_reply,
+                            info->mti_u.update.mti_update_reply_index);
+
+       RETURN(rc);
+}
+
+static int out_attr_get(struct mdt_thread_info *info)
+{
+       struct obdo             *obdo = &info->mti_u.update.mti_obdo;
+       const struct lu_env     *env = info->mti_env;
+       struct lu_attr          *la = &info->mti_attr.ma_attr;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       int                     rc;
+
+       ENTRY;
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+
+       dt_read_lock(env, obj, MOR_TGT_CHILD);
+       rc = dt_attr_get(env, obj, la, NULL);
+       if (rc)
+               GOTO(out_unlock, rc);
+       /*
+        * If it is a directory, we will also check whether the
+        * directory is empty.
+        * la_flags = 0 : Empty.
+        *          = 1 : Not empty.
+        */
+       la->la_flags = 0;
+       if (S_ISDIR(la->la_mode)) {
+               struct dt_it     *it;
+               const struct dt_it_ops *iops;
+
+               if (!dt_try_as_dir(env, obj))
+                       GOTO(out_unlock, rc = -ENOTDIR);
+
+               iops = &obj->do_index_ops->dio_it;
+               it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
+               if (!IS_ERR(it)) {
+                       int  result;
+                       result = iops->get(env, it, (const void *)"");
+                       if (result > 0) {
+                               int i;
+                               for (result = 0, i = 0; result == 0 && i < 3;
+                                    ++i)
+                                       result = iops->next(env, it);
+                               if (result == 0)
+                                       la->la_flags = 1;
+                       } else if (result == 0)
+                               /*
+                                * Huh? Index contains no zero key?
+                                */
+                               rc = -EIO;
+
+                       iops->put(env, it);
+                       iops->fini(env, it);
+               }
+       }
+
+       obdo->o_valid = 0;
+       obdo_from_la(obdo, la, la->la_valid);
+       obdo_cpu_to_le(obdo, obdo);
+       lustre_set_wire_obdo(obdo, obdo);
+
+out_unlock:
+       dt_read_unlock(env, obj);
+       update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
+                           sizeof(*obdo), 0, rc);
+       RETURN(rc);
+}
+
+static int out_xattr_get(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       const struct lu_env     *env = info->mti_env;
+       struct lu_buf           *lbuf = &info->mti_buf;
+       struct update_reply     *reply = info->mti_u.update.mti_update_reply;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       char                    *name;
+       void                    *ptr;
+       int                     rc;
+
+       ENTRY;
+
+       name = (char *)update_param_buf(update, 0, NULL);
+       if (name == NULL) {
+               CERROR("%s: empty name for xattr get: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       ptr = update_get_buf_internal(reply, 0, NULL);
+       LASSERT(ptr != NULL);
+
+       /* The first 4 bytes(int) are used to store the result */
+       lbuf->lb_buf = (char *)ptr + sizeof(int);
+       lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
+       dt_read_lock(env, obj, MOR_TGT_CHILD);
+       rc = dt_xattr_get(env, obj, lbuf, name, NULL);
+       dt_read_unlock(env, obj);
+       if (rc < 0) {
+               lbuf->lb_len = 0;
+               GOTO(out, rc);
+       }
+       if (rc == 0) {
+               lbuf->lb_len = 0;
+               GOTO(out, rc = -ENOENT);
+       }
+       lbuf->lb_len = rc;
+       rc = 0;
+       CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
+              mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
+              name, (int)lbuf->lb_len);
+out:
+       *(int *)ptr = rc;
+       reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
+       RETURN(rc);
+}
+
+static int out_index_lookup(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       const struct lu_env     *env = info->mti_env;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       char                    *name;
+       int                     rc;
+
+       ENTRY;
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+
+       name = (char *)update_param_buf(update, 0, NULL);
+       if (name == NULL) {
+               CERROR("%s: empty name for lookup: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       dt_read_lock(env, obj, MOR_TGT_CHILD);
+       if (!dt_try_as_dir(env, obj))
+               GOTO(out_unlock, rc = -ENOTDIR);
+
+       rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
+               (struct dt_key *)name, NULL);
+
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       if (rc == 0)
+               rc += 1;
+
+       CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
+              PFID(lu_object_fid(&obj->do_lu)), name,
+              PFID(&info->mti_tmp_fid1), rc);
+       fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
+
+out_unlock:
+       dt_read_unlock(env, obj);
+       update_insert_reply(info->mti_u.update.mti_update_reply,
+                           &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
+                           0, rc);
+       RETURN(rc);
+}
+
+static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
+                                struct thandle *th,
+                                struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "attr set "DFID"\n",
+              PFID(lu_object_fid(&arg->object->do_lu)));
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
+                         arg->u.xattr_set.name, arg->u.xattr_set.flags,
+                         th, NULL);
+       dt_write_unlock(info->mti_env, dt_obj);
+       /**
+        * Ignore errors if this is LINK EA
+        **/
+       if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
+                                   strlen(XATTR_NAME_LINK))))
+               rc = 0;
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
+              arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
+              arg->u.xattr_set.flags);
+
+       return rc;
+}
+
+static int __out_tx_xattr_set(struct mdt_thread_info *info,
+                             struct dt_object *dt_obj,
+                             const struct lu_buf *buf,
+                             const char *name, int flags,
+                             struct thandle_exec_args *th,
+                             struct update_reply *reply, int index,
+                             char *file, int line)
+{
+       struct tx_arg           *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
+                                         flags, th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->u.xattr_set.name = name;
+       arg->u.xattr_set.flags = flags;
+       arg->u.xattr_set.buf = *buf;
+       arg->reply = reply;
+       arg->index = index;
+       arg->u.xattr_set.csum = 0;
+       return 0;
+}
+
+static int out_xattr_set(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct lu_buf           *lbuf = &info->mti_buf;
+       char                    *name;
+       char                    *buf;
+       char                    *tmp;
+       int                     buf_len = 0;
+       int                     flag;
+       int                     rc;
+       ENTRY;
+
+       name = update_param_buf(update, 0, NULL);
+       if (name == NULL) {
+               CERROR("%s: empty name for xattr set: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       buf = (char *)update_param_buf(update, 1, &buf_len);
+       if (buf == NULL || buf_len == 0) {
+               CERROR("%s: empty buf for xattr set: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       lbuf->lb_buf = buf;
+       lbuf->lb_len = buf_len;
+
+       tmp = (char *)update_param_buf(update, 2, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty flag for xattr set: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       flag = le32_to_cpu(*(int *)tmp);
+
+       rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
+                        info->mti_u.update.mti_update_reply,
+                        info->mti_u.update.mti_update_reply_index);
+
+       RETURN(rc);
+}
+
+static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "ref add "DFID"\n",
+              PFID(lu_object_fid(&arg->object->do_lu)));
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       dt_ref_add(info->mti_env, dt_obj, th);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
+       return 0;
+}
+
+static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "ref del "DFID"\n",
+              PFID(lu_object_fid(&arg->object->do_lu)));
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       dt_ref_del(info->mti_env, dt_obj, th);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       return 0;
+}
+
+static int __out_tx_ref_add(struct mdt_thread_info *info,
+                           struct dt_object *dt_obj,
+                           struct thandle_exec_args *th,
+                           struct update_reply *reply,
+                           int index, char *file, int line)
+{
+       struct tx_arg           *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
+                                       th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
+                         line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+/**
+ * increase ref of the object
+ **/
+static int out_ref_add(struct mdt_thread_info *info)
+{
+       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
+       int               rc;
+
+       ENTRY;
+
+       rc = out_tx_ref_add(info, obj, &info->mti_handle,
+                           info->mti_u.update.mti_update_reply,
+                           info->mti_u.update.mti_update_reply_index);
+       RETURN(rc);
+}
+
+static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       out_tx_ref_add_undo(info, th, arg);
+       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
+       return 0;
+}
+
+static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       return out_tx_ref_add_exec(info, th, arg);
+}
+
+static int __out_tx_ref_del(struct mdt_thread_info *info,
+                           struct dt_object *dt_obj,
+                           struct thandle_exec_args *th,
+                           struct update_reply *reply,
+                           int index, char *file, int line)
+{
+       struct tx_arg           *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
+                         line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_ref_del(struct mdt_thread_info *info)
+{
+       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
+       int               rc;
+
+       ENTRY;
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+
+       rc = out_tx_ref_del(info, obj, &info->mti_handle,
+                           info->mti_u.update.mti_update_reply,
+                           info->mti_u.update.mti_update_reply_index);
+       RETURN(rc);
+}
+
+static int out_tx_index_insert_exec(struct mdt_thread_info *info,
+                                   struct thandle *th, struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
+              PFID(lu_object_fid(&arg->object->do_lu)),
+              (char *)arg->u.insert.key,
+              PFID((struct lu_fid *)arg->u.insert.rec));
+
+       if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
+               return -ENOTDIR;
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
+                      arg->u.insert.key, th, NULL, 0);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
+}
+
+static int out_tx_index_insert_undo(struct mdt_thread_info *info,
+                                   struct thandle *th, struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+
+       CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
+              PFID(lu_object_fid(&arg->object->do_lu)),
+              (char *)arg->u.insert.key);
+
+       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+               CERROR("%s: "DFID" is not directory: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt),
+                      PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
+               return -ENOTDIR;
+       }
+
+       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
+       dt_write_unlock(info->mti_env, dt_obj);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
+}
+
+static int __out_tx_index_insert(struct mdt_thread_info *info,
+                                struct dt_object *dt_obj,
+                                char *name, struct lu_fid *fid,
+                                struct thandle_exec_args *th,
+                                struct update_reply *reply,
+                                int index, char *file, int line)
+{
+       struct tx_arg *arg;
+
+       LASSERT(th->ta_handle != NULL);
+
+       if (lu_object_exists(&dt_obj->do_lu)) {
+               if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+                       th->ta_err = -ENOTDIR;
+                       return th->ta_err;
+               }
+               th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
+                                              (struct dt_rec *)fid,
+                                              (struct dt_key *)name,
+                                              th->ta_handle);
+       }
+
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_index_insert_exec,
+                         out_tx_index_insert_undo, file,
+                         line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       arg->u.insert.rec = (struct dt_rec *)fid;
+       arg->u.insert.key = (struct dt_key *)name;
+
+       return 0;
+}
+
+static int out_index_insert(struct mdt_thread_info *info)
+{
+       struct update     *update = info->mti_u.update.mti_update;
+       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
+       struct lu_fid     *fid;
+       char              *name;
+       int               rc = 0;
+       int               size;
+       ENTRY;
+
+       name = (char *)update_param_buf(update, 0, NULL);
+       if (name == NULL) {
+               CERROR("%s: empty name for index insert: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       fid = (struct lu_fid *)update_param_buf(update, 1, &size);
+       if (fid == NULL || size != sizeof(*fid)) {
+               CERROR("%s: invalid fid: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      RETURN(err_serious(-EPROTO));
+       }
+
+       fid_le_to_cpu(fid, fid);
+       if (!fid_is_sane(fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(fid),
+                      -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
+                                info->mti_u.update.mti_update_reply,
+                                info->mti_u.update.mti_update_reply_index);
+       RETURN(rc);
+}
+
+static int out_tx_index_delete_exec(struct mdt_thread_info *info,
+                                   struct thandle *th,
+                                   struct tx_arg *arg)
+{
+       return out_tx_index_insert_undo(info, th, arg);
+}
+
+static int out_tx_index_delete_undo(struct mdt_thread_info *info,
+                                   struct thandle *th,
+                                   struct tx_arg *arg)
+{
+       return out_tx_index_insert_exec(info, th, arg);
+}
+
+static int __out_tx_index_delete(struct mdt_thread_info *info,
+                                struct dt_object *dt_obj, char *name,
+                                struct thandle_exec_args *th,
+                                struct update_reply *reply,
+                                int index, char *file, int line)
+{
+       struct tx_arg *arg;
+
+       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+               th->ta_err = -ENOTDIR;
+               return th->ta_err;
+       }
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
+                                      (struct dt_key *)name,
+                                      th->ta_handle);
+       if (th->ta_err != 0)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_index_delete_exec,
+                         out_tx_index_delete_undo, file,
+                         line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       arg->u.insert.key = (struct dt_key *)name;
+       return 0;
+}
+
+static int out_index_delete(struct mdt_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       char                    *name;
+       int                     rc = 0;
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+       name = (char *)update_param_buf(update, 0, NULL);
+       if (name == NULL) {
+               CERROR("%s: empty name for index delete: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
+                                info->mti_u.update.mti_update_reply,
+                                info->mti_u.update.mti_update_reply_index);
+       RETURN(rc);
+}
+
+#define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
+[opc - OBJ_CREATE] = {                                 \
+       .mh_name    = name,                             \
+       .mh_fail_id = fail_id,                          \
+       .mh_opc     = opc,                              \
+       .mh_flags   = flags,                            \
+       .mh_act     = fn,                               \
+       .mh_fmt     = NULL                              \
+}
+
+#define out_handler mdt_handler
+static struct out_handler out_update_ops[] = {
+       DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
+                    out_create),
+       DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
+                    out_ref_add),
+       DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
+                    out_ref_del),
+       DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0,  MUTABOR | HABEO_REFERO,
+                    out_attr_set),
+       DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0,  HABEO_REFERO,
+                    out_attr_get),
+       DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
+                    out_xattr_set),
+       DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
+                    out_xattr_get),
+       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
+                    HABEO_REFERO, out_index_lookup),
+       DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
+                    MUTABOR | HABEO_REFERO, out_index_insert),
+       DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
+                    MUTABOR | HABEO_REFERO, out_index_delete),
+};
+
+#define out_opc_slice mdt_opc_slice
+static struct out_opc_slice out_handlers[] = {
+       {
+               .mos_opc_start = OBJ_CREATE,
+               .mos_opc_end   = OBJ_LAST,
+               .mos_hs = out_update_ops
+       },
+};
+
+/**
+ * Object updates between Targets. Because all the updates has been
+ * dis-assemblied into object updates in master MDD layer, so out
+ * will skip MDD layer, and call OSD API directly to execute these
+ * updates.
+ *
+ * In phase I, all of the updates in the request need to be executed
+ * in one transaction, and the transaction has to be synchronously.
+ *
+ * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
+ * format.
+ */
+int out_handle(struct mdt_thread_info *info)
+{
+       struct req_capsule        *pill = info->mti_pill;
+       struct update_buf         *ubuf;
+       struct update             *update;
+       struct thandle_exec_args  *th = &info->mti_handle;
+       int                       bufsize;
+       int                       count;
+       unsigned                  off;
+       int                       i;
+       int                       rc = 0;
+       int                       rc1 = 0;
+       ENTRY;
+
+       req_capsule_set(pill, &RQF_UPDATE_OBJ);
+       bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
+       if (bufsize != UPDATE_BUFFER_SIZE) {
+               CERROR("%s: invalid bufsize %d: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
+       if (ubuf == NULL) {
+               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
+                      -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
+               CERROR("%s: invalid magic %x expect %x: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
+                      UPDATE_BUFFER_MAGIC, -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       count = le32_to_cpu(ubuf->ub_count);
+       if (count <= 0) {
+               CERROR("%s: No update!: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
+                            UPDATE_BUFFER_SIZE);
+       rc = req_capsule_server_pack(pill);
+       if (rc != 0) {
+               CERROR("%s: Can't pack response: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), rc);
+               RETURN(rc);
+       }
+
+       /* Prepare the update reply buffer */
+       info->mti_u.update.mti_update_reply =
+                       req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
+       update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
+
+       rc = out_tx_start(info->mti_env, info->mti_mdt, th);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Walk through updates in the request to execute them synchronously */
+       off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
+       for (i = 0; i < count; i++) {
+               struct out_handler *h;
+               struct dt_object   *dt_obj;
+
+               update = (struct update *)((char *)ubuf + off);
+
+               fid_le_to_cpu(&update->u_fid, &update->u_fid);
+               if (!fid_is_sane(&update->u_fid)) {
+                       CERROR("%s: invalid FID "DFID": rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(&update->u_fid), -EPROTO);
+                       GOTO(out, rc = err_serious(-EPROTO));
+               }
+               dt_obj = out_object_find(info, &update->u_fid);
+               if (IS_ERR(dt_obj))
+                       GOTO(out, rc = PTR_ERR(dt_obj));
+
+               info->mti_u.update.mti_dt_object = dt_obj;
+               info->mti_u.update.mti_update = update;
+               info->mti_u.update.mti_update_reply_index = i;
+
+               h = mdt_handler_find(update->u_type, out_handlers);
+               if (likely(h != NULL)) {
+                       rc = h->mh_act(info);
+               } else {
+                       CERROR("%s: The unsupported opc: 0x%x\n",
+                              mdt_obd_name(info->mti_mdt), update->u_type);
+                       lu_object_put(info->mti_env, &dt_obj->do_lu);
+                       GOTO(out, rc = -ENOTSUPP);
+               }
+               lu_object_put(info->mti_env, &dt_obj->do_lu);
+               if (rc < 0)
+                       GOTO(out, rc);
+               off += cfs_size_round(update_size(update));
+       }
+
+out:
+       rc1 = out_tx_end(info, th);
+       rc = rc == 0 ? rc1 : rc;
+       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;
+       RETURN(rc);
+}