Whamcloud - gitweb
LU-3157 llite: A not locked mutex can be unlocked.
[fs/lustre-release.git] / lustre / mdt / out_handler.c
index 83671d5..f0b0daa 100644 (file)
  */
 /*
  * Copyright (c) 2013, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdt/out_handler.c
  *
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
+/* Current out and mdt shared the same thread info, but in the future,
+ * this should be decoupled with MDT XXX*/
+#define out_thread_info                mdt_thread_info
+#define out_thread_key         mdt_thread_key
+
+struct out_thread_info *out_env_info(const struct lu_env *env)
+{
+       struct out_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &out_thread_key);
+       LASSERT(info != NULL);
+       return info;
+}
+
+static inline char *dt_obd_name(struct dt_device *dt)
+{
+       return dt->dd_lu_dev.ld_obd->obd_name;
+}
+
 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
                           tx_exec_func_t undo, char *file, int line)
 {
@@ -65,16 +80,14 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
        return &ta->ta_args[i];
 }
 
-static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
                        struct thandle_exec_args *th)
 {
-       struct dt_device *dt = mdt->mdt_bottom;
-
        memset(th, 0, sizeof(*th));
        th->ta_handle = dt_trans_create(env, dt);
        if (IS_ERR(th->ta_handle)) {
                CERROR("%s: start handle error: rc = %ld\n",
-                      mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle));
+                      dt_obd_name(dt), PTR_ERR(th->ta_handle));
                return PTR_ERR(th->ta_handle);
        }
        th->ta_dev = dt;
@@ -83,12 +96,12 @@ static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt,
        return 0;
 }
 
-static int out_trans_start(const struct lu_env *env, struct dt_device *dt,
-                          struct thandle *th)
+static int out_trans_start(const struct lu_env *env,
+                          struct thandle_exec_args *th)
 {
        /* Always do sync commit for Phase I */
-       LASSERT(th->th_sync != 0);
-       return dt_trans_start(env, dt, th);
+       LASSERT(th->ta_handle->th_sync != 0);
+       return dt_trans_start(env, th->ta_dev, th->ta_handle);
 }
 
 static int out_trans_stop(const struct lu_env *env,
@@ -110,24 +123,23 @@ static int out_trans_stop(const struct lu_env *env,
        return rc;
 }
 
-int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
 {
-       struct thandle_exec_args *_th = &info->mti_handle;
+       struct out_thread_info *info = out_env_info(env);
        int i = 0, rc;
 
-       LASSERT(th == _th);
        LASSERT(th->ta_dev);
        LASSERT(th->ta_handle);
 
        if (th->ta_err != 0 || th->ta_argno == 0)
                GOTO(stop, rc = th->ta_err);
 
-       rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle);
+       rc = out_trans_start(env, th);
        if (unlikely(rc))
                GOTO(stop, rc);
 
        for (i = 0; i < th->ta_argno; i++) {
-               rc = th->ta_args[i].exec_fn(info, th->ta_handle,
+               rc = th->ta_args[i].exec_fn(env, th->ta_handle,
                                            &th->ta_args[i]);
                if (unlikely(rc)) {
                        CDEBUG(D_INFO, "error during execution of #%u from"
@@ -136,16 +148,19 @@ int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th)
                        while (--i >= 0) {
                                LASSERTF(th->ta_args[i].undo_fn != NULL,
                                    "can't undo changes, hope for failover!\n");
-                               th->ta_args[i].undo_fn(info, th->ta_handle,
+                               th->ta_args[i].undo_fn(env, th->ta_handle,
                                                       &th->ta_args[i]);
                        }
                        break;
                }
        }
+
+       /* Only fail for real update */
+       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
 stop:
        CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc);
-       out_trans_stop(info->mti_env, th, rc);
+              dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
+       out_trans_stop(env, th, rc);
        th->ta_handle = NULL;
        th->ta_argno = 0;
        th->ta_err = 0;
@@ -153,41 +168,56 @@ stop:
        RETURN(rc);
 }
 
-static struct dt_object *out_get_dt_obj(struct lu_object *obj)
+static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
+                           struct dt_object *obj, struct update_reply *reply,
+                           int index)
 {
-       struct mdt_device *mdt;
-       struct dt_device *dt;
-       struct lu_object *bottom_obj;
+       CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
+              dt_obd_name(dt), reply, index, 0);
 
-       mdt = lu2mdt_dev(obj->lo_dev);
-       dt = mdt->mdt_bottom;
+       update_insert_reply(reply, NULL, 0, index, 0);
+       return;
+}
 
-       bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type);
-       if (bottom_obj == NULL)
-               return ERR_PTR(-ENOENT);
+typedef void (*out_reconstruct_t)(const struct lu_env *env,
+                                 struct dt_device *dt,
+                                 struct dt_object *obj,
+                                 struct update_reply *reply,
+                                 int index);
+
+static inline int out_check_resent(const struct lu_env *env,
+                                  struct dt_device *dt,
+                                  struct dt_object *obj,
+                                  struct ptlrpc_request *req,
+                                  out_reconstruct_t reconstruct,
+                                  struct update_reply *reply,
+                                  int index)
+{
+       if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+               return 0;
 
-       return lu2dt_obj(bottom_obj);
+       if (req_xid_is_last(req)) {
+               reconstruct(env, dt, obj, reply, index);
+               return 1;
+       }
+       DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
+                req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
+       return 0;
 }
 
-static struct dt_object *out_object_find(struct mdt_thread_info *info,
-                                        struct lu_fid *fid)
+static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+                          struct thandle *th)
 {
-       struct lu_object        *obj;
-       struct dt_object        *dt_obj;
-
-       obj = lu_object_find(info->mti_env,
-                            &info->mti_mdt->mdt_md_dev.md_lu_dev,
-                            fid, NULL);
-       if (IS_ERR(obj))
-               return (struct dt_object *)obj;
-
-       dt_obj = out_get_dt_obj(obj);
-       if (IS_ERR(dt_obj)) {
-               lu_object_put(info->mti_env, obj);
-               return ERR_PTR(-ENOENT);
-       }
+       int rc;
 
-       return dt_obj;
+       CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
+              PFID(lu_object_fid(&dt_obj->do_lu)));
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_destroy(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
+
+       return rc;
 }
 
 /**
@@ -196,53 +226,46 @@ static struct dt_object *out_object_find(struct mdt_thread_info *info,
  * declare phase, i.e. if declare succeed, it should make sure
  * the following executing phase succeed in anyway, so these undo
  * should be useless for most of the time in Phase I
- **/
-int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th,
+ */
+int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
                       struct tx_arg *arg)
 {
-       struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_destroy(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
-
-       /* we don't like double failures */
+       rc = out_obj_destroy(env, arg->object, th);
        if (rc != 0)
                CERROR("%s: undo failure, we are doomed!: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), rc);
+                      dt_obd_name(th->th_dev), rc);
        return rc;
 }
 
-int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th,
+int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
                       struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n",
+       CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
+              dt_obd_name(th->th_dev),
               PFID(lu_object_fid(&arg->object->do_lu)),
               arg->u.create.dof.dof_type,
               arg->u.create.attr.la_mode & S_IFMT);
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_create(env, dt_obj, &arg->u.create.attr,
                       &arg->u.create.hint, &arg->u.create.dof, th);
 
-       dt_write_unlock(info->mti_env, dt_obj);
-       CDEBUG(D_INFO, "insert create reply mode %o index %d\n",
-              arg->u.create.attr.la_mode, arg->index);
+       dt_write_unlock(env, dt_obj);
+
+       CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
+static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
                           struct lu_attr *attr, struct lu_fid *parent_fid,
                           struct dt_object_format *dof,
                           struct thandle_exec_args *th,
@@ -252,7 +275,7 @@ static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
        struct tx_arg *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof,
+       th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
                                       th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
@@ -275,7 +298,7 @@ static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj,
        return 0;
 }
 
-static int out_create(struct mdt_thread_info *info)
+static int out_create(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -297,7 +320,7 @@ static int out_create(struct mdt_thread_info *info)
        }
 
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        dof->dof_type = dt_mode_to_dft(attr->la_mode);
@@ -319,45 +342,50 @@ static int out_create(struct mdt_thread_info *info)
                }
        }
 
-       rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle,
+       if (lu_object_exists(&obj->do_lu))
+               RETURN(-EEXIST);
+
+       rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
+                          &info->mti_handle,
                           info->mti_u.update.mti_update_reply,
                           info->mti_u.update.mti_update_reply_index);
 
        RETURN(rc);
 }
 
-static int out_tx_attr_set_undo(struct mdt_thread_info *info,
+static int out_tx_attr_set_undo(const struct lu_env *env,
                                struct thandle *th, struct tx_arg *arg)
 {
        CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
-              mdt_obd_name(info->mti_mdt),
+              dt_obd_name(th->th_dev),
               PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
 
        return -ENOTSUPP;
 }
 
-static int out_tx_attr_set_exec(struct mdt_thread_info *info,
-                               struct thandle *th, struct tx_arg *arg)
+static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
+                               struct tx_arg *arg)
 {
        struct dt_object        *dt_obj = arg->object;
        int                     rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "attr set "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
+              PFID(lu_object_fid(&dt_obj->do_lu)));
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
                         th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_unlock(env, dt_obj);
+
+       CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_attr_set(struct mdt_thread_info *info,
+static int __out_tx_attr_set(const struct lu_env *env,
                             struct dt_object *dt_obj,
                             const struct lu_attr *attr,
                             struct thandle_exec_args *th,
@@ -367,8 +395,7 @@ static int __out_tx_attr_set(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr,
-                                        th->ta_handle);
+       th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -383,7 +410,7 @@ static int __out_tx_attr_set(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_attr_set(struct mdt_thread_info *info)
+static int out_attr_set(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct lu_attr          *attr = &info->mti_attr.ma_attr;
@@ -405,17 +432,17 @@ static int out_attr_set(struct mdt_thread_info *info)
        attr->la_valid = 0;
        attr->la_valid = 0;
        obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(lobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
-       rc = out_tx_attr_set(info, obj, attr, &info->mti_handle,
+       rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
                             info->mti_u.update.mti_update_reply,
                             info->mti_u.update.mti_update_reply_index);
 
        RETURN(rc);
 }
 
-static int out_attr_get(struct mdt_thread_info *info)
+static int out_attr_get(struct out_thread_info *info)
 {
        struct obdo             *obdo = &info->mti_u.update.mti_obdo;
        const struct lu_env     *env = info->mti_env;
@@ -472,16 +499,21 @@ static int out_attr_get(struct mdt_thread_info *info)
        obdo->o_valid = 0;
        obdo_from_la(obdo, la, la->la_valid);
        obdo_cpu_to_le(obdo, obdo);
-       lustre_set_wire_obdo(obdo, obdo);
+       lustre_set_wire_obdo(NULL, obdo, obdo);
 
 out_unlock:
        dt_read_unlock(env, obj);
+
+       CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
+              mdt_obd_name(info->mti_mdt),
+              info->mti_u.update.mti_update_reply, 0, rc);
+
        update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
                            sizeof(*obdo), 0, rc);
        RETURN(rc);
 }
 
-static int out_xattr_get(struct mdt_thread_info *info)
+static int out_xattr_get(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        const struct lu_env     *env = info->mti_env;
@@ -529,7 +561,7 @@ out:
        RETURN(rc);
 }
 
-static int out_index_lookup(struct mdt_thread_info *info)
+static int out_index_lookup(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        const struct lu_env     *env = info->mti_env;
@@ -569,29 +601,33 @@ static int out_index_lookup(struct mdt_thread_info *info)
 
 out_unlock:
        dt_read_unlock(env, obj);
+
+       CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
+              mdt_obd_name(info->mti_mdt),
+              info->mti_u.update.mti_update_reply, 0, rc);
+
        update_insert_reply(info->mti_u.update.mti_update_reply,
                            &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
                            0, rc);
        RETURN(rc);
 }
 
-static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
+static int out_tx_xattr_set_exec(const struct lu_env *env,
                                 struct thandle *th,
                                 struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
+              dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
+              arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
-       CDEBUG(D_OTHER, "attr set "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf,
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
                          arg->u.xattr_set.name, arg->u.xattr_set.flags,
                          th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_unlock(env, dt_obj);
        /**
         * Ignore errors if this is LINK EA
         **/
@@ -599,16 +635,15 @@ static int out_tx_xattr_set_exec(struct mdt_thread_info *info,
                                    strlen(XATTR_NAME_LINK))))
                rc = 0;
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
-       CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n",
-              arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name,
-              arg->u.xattr_set.flags);
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_xattr_set(struct mdt_thread_info *info,
+static int __out_tx_xattr_set(const struct lu_env *env,
                              struct dt_object *dt_obj,
                              const struct lu_buf *buf,
                              const char *name, int flags,
@@ -619,7 +654,7 @@ static int __out_tx_xattr_set(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name,
+       th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
                                          flags, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
@@ -637,7 +672,7 @@ static int __out_tx_xattr_set(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_xattr_set(struct mdt_thread_info *info)
+static int out_xattr_set(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -676,49 +711,61 @@ static int out_xattr_set(struct mdt_thread_info *info)
 
        flag = le32_to_cpu(*(int *)tmp);
 
-       rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle,
-                        info->mti_u.update.mti_update_reply,
-                        info->mti_u.update.mti_update_reply_index);
-
+       rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
+                             &info->mti_handle,
+                             info->mti_u.update.mti_update_reply,
+                             info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th,
-                              struct tx_arg *arg)
+static int out_obj_ref_add(const struct lu_env *env,
+                          struct dt_object *dt_obj,
+                          struct thandle *th)
 {
-       struct dt_object *dt_obj = arg->object;
+       int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_ref_add(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
 
-       CDEBUG(D_OTHER, "ref add "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       return rc;
+}
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       dt_ref_add(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
+static int out_obj_ref_del(const struct lu_env *env,
+                          struct dt_object *dt_obj,
+                          struct thandle *th)
+{
+       int rc;
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
-       return 0;
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_ref_del(env, dt_obj, th);
+       dt_write_unlock(env, dt_obj);
+
+       return rc;
 }
 
-static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
+       int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       rc = out_obj_ref_add(env, dt_obj, th);
 
-       CDEBUG(D_OTHER, "ref del "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)));
+       CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       dt_ref_del(info->mti_env, dt_obj, th);
-       dt_write_unlock(info->mti_env, dt_obj);
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       return rc;
+}
 
-       return 0;
+static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       return out_obj_ref_del(env, arg->object, th);
 }
 
-static int __out_tx_ref_add(struct mdt_thread_info *info,
+static int __out_tx_ref_add(const struct lu_env *env,
                            struct dt_object *dt_obj,
                            struct thandle_exec_args *th,
                            struct update_reply *reply,
@@ -727,8 +774,7 @@ static int __out_tx_ref_add(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj,
-                                       th->ta_handle);
+       th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -745,34 +791,42 @@ static int __out_tx_ref_add(struct mdt_thread_info *info,
 /**
  * increase ref of the object
  **/
-static int out_ref_add(struct mdt_thread_info *info)
+static int out_ref_add(struct out_thread_info *info)
 {
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
        int               rc;
 
        ENTRY;
 
-       rc = out_tx_ref_add(info, obj, &info->mti_handle,
+       rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
                            info->mti_u.update.mti_update_reply,
                            info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
-       out_tx_ref_add_undo(info, th, arg);
-       update_insert_reply(arg->reply, NULL, 0, arg->index, 0);
-       return 0;
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       rc = out_obj_ref_del(env, dt_obj, th);
+
+       CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
 }
 
-static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th,
+static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
-       return out_tx_ref_add_exec(info, th, arg);
+       return out_obj_ref_add(env, arg->object, th);
 }
 
-static int __out_tx_ref_del(struct mdt_thread_info *info,
+static int __out_tx_ref_del(const struct lu_env *env,
                            struct dt_object *dt_obj,
                            struct thandle_exec_args *th,
                            struct update_reply *reply,
@@ -781,7 +835,7 @@ static int __out_tx_ref_del(struct mdt_thread_info *info,
        struct tx_arg           *arg;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle);
+       th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
        if (th->ta_err != 0)
                return th->ta_err;
 
@@ -795,7 +849,7 @@ static int __out_tx_ref_del(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_ref_del(struct mdt_thread_info *info)
+static int out_ref_del(struct out_thread_info *info)
 {
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
        int               rc;
@@ -805,67 +859,79 @@ static int out_ref_del(struct mdt_thread_info *info)
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
 
-       rc = out_tx_ref_del(info, obj, &info->mti_handle,
+       rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
                            info->mti_u.update.mti_update_reply,
                            info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_index_insert_exec(struct mdt_thread_info *info,
-                                   struct thandle *th, struct tx_arg *arg)
+static int out_obj_index_insert(const struct lu_env *env,
+                               struct dt_object *dt_obj,
+                               const struct dt_rec *rec,
+                               const struct dt_key *key,
+                               struct thandle *th)
 {
-       struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
-
-       CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n",
-              PFID(lu_object_fid(&arg->object->do_lu)),
-              (char *)arg->u.insert.key,
-              PFID((struct lu_fid *)arg->u.insert.rec));
+       CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
+              dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+              (char *)key, PFID((struct lu_fid *)rec));
 
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0)
+       if (dt_try_as_dir(env, dt_obj) == 0)
                return -ENOTDIR;
 
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec,
-                      arg->u.insert.key, th, NULL, 0);
-       dt_write_unlock(info->mti_env, dt_obj);
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
+       dt_write_unlock(env, dt_obj);
 
-       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+       return rc;
+}
+
+static int out_obj_index_delete(const struct lu_env *env,
+                               struct dt_object *dt_obj,
+                               const struct dt_key *key,
+                               struct thandle *th)
+{
+       int rc;
+
+       CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
+              dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
+              (char *)key);
+
+       if (dt_try_as_dir(env, dt_obj) == 0)
+               return -ENOTDIR;
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_delete(env, dt_obj, key, th, NULL);
+       dt_write_unlock(env, dt_obj);
 
        return rc;
 }
 
-static int out_tx_index_insert_undo(struct mdt_thread_info *info,
+static int out_tx_index_insert_exec(const struct lu_env *env,
                                    struct thandle *th, struct tx_arg *arg)
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 
-       LASSERT(dt_obj != NULL && !IS_ERR(dt_obj));
+       rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
+                                 arg->u.insert.key, th);
 
-       CDEBUG(D_OTHER, "index delete "DFID" name: %s\n",
-              PFID(lu_object_fid(&arg->object->do_lu)),
-              (char *)arg->u.insert.key);
-
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
-               CERROR("%s: "DFID" is not directory: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR);
-               return -ENOTDIR;
-       }
-
-       dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL);
-       dt_write_unlock(info->mti_env, dt_obj);
+       CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
        update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
 
        return rc;
 }
 
-static int __out_tx_index_insert(struct mdt_thread_info *info,
+static int out_tx_index_insert_undo(const struct lu_env *env,
+                                   struct thandle *th, struct tx_arg *arg)
+{
+       return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+}
+
+static int __out_tx_index_insert(const struct lu_env *env,
                                 struct dt_object *dt_obj,
                                 char *name, struct lu_fid *fid,
                                 struct thandle_exec_args *th,
@@ -877,11 +943,11 @@ static int __out_tx_index_insert(struct mdt_thread_info *info,
        LASSERT(th->ta_handle != NULL);
 
        if (lu_object_exists(&dt_obj->do_lu)) {
-               if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+               if (dt_try_as_dir(env, dt_obj) == 0) {
                        th->ta_err = -ENOTDIR;
                        return th->ta_err;
                }
-               th->ta_err = dt_declare_insert(info->mti_env, dt_obj,
+               th->ta_err = dt_declare_insert(env, dt_obj,
                                               (struct dt_rec *)fid,
                                               (struct dt_key *)name,
                                               th->ta_handle);
@@ -904,7 +970,7 @@ static int __out_tx_index_insert(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_index_insert(struct mdt_thread_info *info)
+static int out_index_insert(struct out_thread_info *info)
 {
        struct update     *update = info->mti_u.update.mti_update;
        struct dt_object  *obj = info->mti_u.update.mti_dt_object;
@@ -936,27 +1002,39 @@ static int out_index_insert(struct mdt_thread_info *info)
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle,
+       rc = out_tx_index_insert(info->mti_env, obj, name, fid,
+                                &info->mti_handle,
                                 info->mti_u.update.mti_update_reply,
                                 info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
-static int out_tx_index_delete_exec(struct mdt_thread_info *info,
+static int out_tx_index_delete_exec(const struct lu_env *env,
                                    struct thandle *th,
                                    struct tx_arg *arg)
 {
-       return out_tx_index_insert_undo(info, th, arg);
+       int rc;
+
+       rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
+
+       CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc;
 }
 
-static int out_tx_index_delete_undo(struct mdt_thread_info *info,
+static int out_tx_index_delete_undo(const struct lu_env *env,
                                    struct thandle *th,
                                    struct tx_arg *arg)
 {
-       return out_tx_index_insert_exec(info, th, arg);
+       CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
+              dt_obd_name(th->th_dev), -ENOTSUPP);
+       return -ENOTSUPP;
 }
 
-static int __out_tx_index_delete(struct mdt_thread_info *info,
+static int __out_tx_index_delete(const struct lu_env *env,
                                 struct dt_object *dt_obj, char *name,
                                 struct thandle_exec_args *th,
                                 struct update_reply *reply,
@@ -964,13 +1042,13 @@ static int __out_tx_index_delete(struct mdt_thread_info *info,
 {
        struct tx_arg *arg;
 
-       if (dt_try_as_dir(info->mti_env, dt_obj) == 0) {
+       if (dt_try_as_dir(env, dt_obj) == 0) {
                th->ta_err = -ENOTDIR;
                return th->ta_err;
        }
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_delete(info->mti_env, dt_obj,
+       th->ta_err = dt_declare_delete(env, dt_obj,
                                       (struct dt_key *)name,
                                       th->ta_handle);
        if (th->ta_err != 0)
@@ -988,7 +1066,7 @@ static int __out_tx_index_delete(struct mdt_thread_info *info,
        return 0;
 }
 
-static int out_index_delete(struct mdt_thread_info *info)
+static int out_index_delete(struct out_thread_info *info)
 {
        struct update           *update = info->mti_u.update.mti_update;
        struct dt_object        *obj = info->mti_u.update.mti_dt_object;
@@ -1004,12 +1082,84 @@ static int out_index_delete(struct mdt_thread_info *info)
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_delete(info, obj, name, &info->mti_handle,
+       rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
                                 info->mti_u.update.mti_update_reply,
                                 info->mti_u.update.mti_update_reply_index);
        RETURN(rc);
 }
 
+static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       rc = out_obj_destroy(env, dt_obj, th);
+
+       CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
+              dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
+
+       update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
+
+       RETURN(rc);
+}
+
+static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
+                              struct tx_arg *arg)
+{
+       CERROR("%s: not support destroy undo yet!: rc = %d\n",
+              dt_obd_name(th->th_dev), -ENOTSUPP);
+       return -ENOTSUPP;
+}
+
+static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
+                            struct thandle_exec_args *th,
+                            struct update_reply *reply,
+                            int index, char *file, int line)
+{
+       struct tx_arg *arg;
+
+       LASSERT(th->ta_handle != NULL);
+       th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
+       if (th->ta_err)
+               return th->ta_err;
+
+       arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
+                         file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_destroy(struct out_thread_info *info)
+{
+       struct update           *update = info->mti_u.update.mti_update;
+       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct lu_fid           *fid;
+       int                     rc;
+       ENTRY;
+
+       fid = &update->u_fid;
+       fid_le_to_cpu(fid, fid);
+       if (!fid_is_sane(fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (!lu_object_exists(&obj->do_lu))
+               RETURN(-ENOENT);
+
+       rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
+                           info->mti_u.update.mti_update_reply,
+                           info->mti_u.update.mti_update_reply_index);
+
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
 [opc - OBJ_CREATE] = {                                 \
        .mh_name    = name,                             \
@@ -1024,6 +1174,8 @@ static int out_index_delete(struct mdt_thread_info *info)
 static struct out_handler out_update_ops[] = {
        DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
                     out_create),
+       DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
+                    out_destroy),
        DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
                     out_ref_add),
        DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
@@ -1036,8 +1188,8 @@ static struct out_handler out_update_ops[] = {
                     out_xattr_set),
        DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
                     out_xattr_get),
-       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0,
-                    HABEO_REFERO, out_index_lookup),
+       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
+                    out_index_lookup),
        DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
                     MUTABOR | HABEO_REFERO, out_index_insert),
        DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
@@ -1065,38 +1217,43 @@ static struct out_opc_slice out_handlers[] = {
  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
  * format.
  */
-int out_handle(struct mdt_thread_info *info)
+int out_handle(struct out_thread_info *info)
 {
-       struct req_capsule        *pill = info->mti_pill;
-       struct update_buf         *ubuf;
-       struct update             *update;
-       struct thandle_exec_args  *th = &info->mti_handle;
-       int                       bufsize;
-       int                       count;
-       unsigned                  off;
-       int                       i;
-       int                       rc = 0;
-       int                       rc1 = 0;
+       struct thandle_exec_args        *th = &info->mti_handle;
+       struct req_capsule              *pill = info->mti_pill;
+       struct mdt_device               *mdt = info->mti_mdt;
+       struct dt_device                *dt = mdt->mdt_bottom;
+       const struct lu_env             *env = info->mti_env;
+       struct update_buf               *ubuf;
+       struct update                   *update;
+       struct update_reply             *update_reply;
+       int                             bufsize;
+       int                             count;
+       int                             old_batchid = -1;
+       unsigned                        off;
+       int                             i;
+       int                             rc = 0;
+       int                             rc1 = 0;
        ENTRY;
 
        req_capsule_set(pill, &RQF_UPDATE_OBJ);
        bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
        if (bufsize != UPDATE_BUFFER_SIZE) {
                CERROR("%s: invalid bufsize %d: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), bufsize, -EPROTO);
+                      mdt_obd_name(mdt), bufsize, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
        if (ubuf == NULL) {
-               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt),
+               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
                       -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
                CERROR("%s: invalid magic %x expect %x: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic),
+                      mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
                       UPDATE_BUFFER_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
@@ -1104,7 +1261,7 @@ int out_handle(struct mdt_thread_info *info)
        count = le32_to_cpu(ubuf->ub_count);
        if (count <= 0) {
                CERROR("%s: No update!: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      mdt_obd_name(mdt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -1113,16 +1270,16 @@ int out_handle(struct mdt_thread_info *info)
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
                CERROR("%s: Can't pack response: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), rc);
+                      mdt_obd_name(mdt), rc);
                RETURN(rc);
        }
 
        /* Prepare the update reply buffer */
-       info->mti_u.update.mti_update_reply =
-                       req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
-       update_init_reply_buf(info->mti_u.update.mti_update_reply, count);
+       update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
+       update_init_reply_buf(update_reply, count);
+       info->mti_u.update.mti_update_reply = update_reply;
 
-       rc = out_tx_start(info->mti_env, info->mti_mdt, th);
+       rc = out_tx_start(env, dt, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -1133,15 +1290,30 @@ int out_handle(struct mdt_thread_info *info)
                struct dt_object   *dt_obj;
 
                update = (struct update *)((char *)ubuf + off);
+               if (old_batchid == -1) {
+                       old_batchid = update->u_batchid;
+               } else if (old_batchid != update->u_batchid) {
+                       /* Stop the current update transaction,
+                        * create a new one */
+                       rc = out_tx_end(env, th);
+                       if (rc != 0)
+                               RETURN(rc);
+
+                       rc = out_tx_start(env, dt, th);
+                       if (rc != 0)
+                               RETURN(rc);
+                       old_batchid = update->u_batchid;
+               }
 
                fid_le_to_cpu(&update->u_fid, &update->u_fid);
                if (!fid_is_sane(&update->u_fid)) {
                        CERROR("%s: invalid FID "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(&update->u_fid), -EPROTO);
+                              mdt_obd_name(mdt), PFID(&update->u_fid),
+                              -EPROTO);
                        GOTO(out, rc = err_serious(-EPROTO));
                }
-               dt_obj = out_object_find(info, &update->u_fid);
+
+               dt_obj = dt_locate(env, dt, &update->u_fid);
                if (IS_ERR(dt_obj))
                        GOTO(out, rc = PTR_ERR(dt_obj));
 
@@ -1151,22 +1323,32 @@ int out_handle(struct mdt_thread_info *info)
 
                h = mdt_handler_find(update->u_type, out_handlers);
                if (likely(h != NULL)) {
+                       /* For real modification RPC, check if the update
+                        * has been executed */
+                       if (h->mh_flags & MUTABOR) {
+                               struct ptlrpc_request *req = mdt_info_req(info);
+
+                               if (out_check_resent(env, dt, dt_obj, req,
+                                                    out_reconstruct,
+                                                    update_reply, i))
+                                       GOTO(next, rc);
+                       }
+
                        rc = h->mh_act(info);
                } else {
                        CERROR("%s: The unsupported opc: 0x%x\n",
-                              mdt_obd_name(info->mti_mdt), update->u_type);
-                       lu_object_put(info->mti_env, &dt_obj->do_lu);
+                              mdt_obd_name(mdt), update->u_type);
+                       lu_object_put(env, &dt_obj->do_lu);
                        GOTO(out, rc = -ENOTSUPP);
                }
-               lu_object_put(info->mti_env, &dt_obj->do_lu);
+next:
+               lu_object_put(env, &dt_obj->do_lu);
                if (rc < 0)
                        GOTO(out, rc);
                off += cfs_size_round(update_size(update));
        }
-
 out:
-       rc1 = out_tx_end(info, th);
+       rc1 = out_tx_end(env, th);
        rc = rc == 0 ? rc1 : rc;
-       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET;
        RETURN(rc);
 }