Whamcloud - gitweb
LU-4585 out: remove OBJECT_UPDATE_PER_RPC_MAX limit. 90/9590/4
authorwang di <di.wang@intel.com>
Tue, 11 Mar 2014 15:38:24 +0000 (08:38 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 25 Mar 2014 15:30:15 +0000 (15:30 +0000)
Extend tx_args if the update inside 1 RPC is more than
the count of tx_args count in tgt_thread_info.

And also for create remote object, the remote RPC might
happen in trans stop, so we need to add return value
for mdd_trans_stop to track the error.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Ibb87181d87dfb6c3ed7e6757592bccd4a836a98a
Reviewed-on: http://review.whamcloud.com/9590
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_trans.c
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/target/tgt_internal.h
lustre/target/tgt_main.c

index 537151c..7fdaa2d 100644 (file)
@@ -3935,11 +3935,6 @@ extern void lustre_swab_hsm_request(struct hsm_request *hr);
  */
 
 /**
- * Maximum number of updates per UPDATE_OBJ RPC
- */
-#define        OUT_UPDATE_PER_TRANS_MAX        10
-
-/**
  * Type of each update
  */
 enum update_type {
index aeccd3b..1bc386a 100644 (file)
@@ -2159,6 +2159,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        const char              *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
        int                      rc;
+       int                      rc2;
        ENTRY;
 
         /*
@@ -2309,7 +2310,9 @@ err_created:
                        S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
                        0, son, mdd_pobj, lname, handle);
 out_stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       rc2 = mdd_trans_stop(env, mdd, rc, handle);
+       if (rc == 0)
+               rc = rc2;
 out_free:
        if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
                /* if we vmalloced a large buffer drop it */
index 9c95279..b536821 100644 (file)
@@ -419,8 +419,8 @@ struct thandle *mdd_trans_create(const struct lu_env *env,
                                  struct mdd_device *mdd);
 int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
                     struct thandle *th);
-void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
-                    int rc, struct thandle *handle);
+int mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
+                  int rc, struct thandle *handle);
 int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                     void *cookie);
 int mdd_txn_start_cb(const struct lu_env *env, struct thandle *,
index 8c2b74e..96d7c88 100644 (file)
@@ -60,9 +60,9 @@ int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
         return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, th);
 }
 
-void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
-                    int result, struct thandle *handle)
+int mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
+                  int result, struct thandle *handle)
 {
        handle->th_result = result;
-       mdd_child_ops(mdd)->dt_trans_stop(env, mdd->mdd_child, handle);
+       return mdd_child_ops(mdd)->dt_trans_stop(env, mdd->mdd_child, handle);
 }
index 48bb61c..3cb6e83 100644 (file)
@@ -22,7 +22,7 @@
 /*
  * Copyright (c) 2013, Intel Corporation.
  *
- * lustre/mdt/out_handler.c
+ * lustre/target/out_handler.c
  *
  * Object update handler between targets.
  *
 #include "tgt_internal.h"
 #include <lustre_update.h>
 
-struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
-                          tx_exec_func_t undo, char *file, int line)
+static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
 {
+       struct tx_arg   **new_ta;
+       int             i;
+       int             rc = 0;
+
+       if (ta->ta_alloc_args >= new_alloc_ta)
+               return 0;
+
+       OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
+       if (new_ta == NULL)
+               return -ENOMEM;
+
+       for (i = 0; i < new_alloc_ta; i++) {
+               if (i < ta->ta_alloc_args) {
+                       /* copy the old args to new one */
+                       new_ta[i] = ta->ta_args[i];
+               } else {
+                       OBD_ALLOC_PTR(new_ta[i]);
+                       if (new_ta[i] == NULL)
+                               GOTO(out, rc = -ENOMEM);
+               }
+       }
+
+       /* free the old args */
+       if (ta->ta_args != NULL)
+               OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
+                                     ta->ta_alloc_args);
+
+       ta->ta_args = new_ta;
+       ta->ta_alloc_args = new_alloc_ta;
+out:
+       if (rc != 0) {
+               for (i = 0; i < new_alloc_ta; i++) {
+                       if (new_ta[i] != NULL)
+                               OBD_FREE_PTR(new_ta[i]);
+               }
+               OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
+       }
+       return rc;
+}
+
+#define TX_ALLOC_STEP  8
+static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
+                                 tx_exec_func_t func, tx_exec_func_t undo,
+                                 char *file, int line)
+{
+       int rc;
        int i;
 
-       LASSERT(ta);
-       LASSERT(func);
+       LASSERT(ta != NULL);
+       LASSERT(func != NULL);
+
+       if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
+               rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
+               if (rc != 0)
+                       return ERR_PTR(rc);
+       }
 
-       LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
-                "Too many updates(%d) in one trans\n", ta->ta_argno);
        i = ta->ta_argno;
 
        ta->ta_argno++;
 
-       ta->ta_args[i].exec_fn = func;
-       ta->ta_args[i].undo_fn = undo;
-       ta->ta_args[i].file    = file;
-       ta->ta_args[i].line    = line;
+       ta->ta_args[i]->exec_fn = func;
+       ta->ta_args[i]->undo_fn = undo;
+       ta->ta_args[i]->file    = file;
+       ta->ta_args[i]->line    = line;
 
-       return &ta->ta_args[i];
+       return ta->ta_args[i];
 }
 
 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
@@ -164,16 +213,18 @@ static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
                           int index, char *file, int line)
 {
        struct tx_arg *arg;
+       int rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
+       rc = dt_declare_create(env, obj, attr, NULL, dof,
                                       ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
                          line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
 
        /* release the object in out_trans_stop */
        lu_object_get(&obj->do_lu);
@@ -284,16 +335,19 @@ static int __out_tx_attr_set(const struct lu_env *env,
                             struct object_update_reply *reply,
                             int index, char *file, int line)
 {
-       struct tx_arg           *arg;
+       struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+       rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
                          file, line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->u.attr_set.attr = *attr;
@@ -514,13 +568,13 @@ static int out_index_lookup(struct tgt_session_info *tsi)
        if (rc == 0)
                rc += 1;
 
+out_unlock:
+       dt_read_unlock(env, obj);
+
        CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
               PFID(lu_object_fid(&obj->do_lu)), name,
               PFID(&tti->tti_fid1), rc);
 
-out_unlock:
-       dt_read_unlock(env, obj);
-
        CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
               tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
               0, rc);
@@ -572,16 +626,18 @@ static int __out_tx_xattr_set(const struct lu_env *env,
                              struct object_update_reply *reply,
                              int index, char *file, int line)
 {
-       struct tx_arg           *arg;
+       struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
-                                         flags, ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->u.xattr_set.name = name;
@@ -697,15 +753,18 @@ static int __out_tx_ref_add(const struct lu_env *env,
                            int index, char *file, int line)
 {
        struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
                          line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->reply = reply;
@@ -759,15 +818,18 @@ static int __out_tx_ref_del(const struct lu_env *env,
                            int index, char *file, int line)
 {
        struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
                          line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->reply = reply;
@@ -865,27 +927,25 @@ static int __out_tx_index_insert(const struct lu_env *env,
                                 struct object_update_reply *reply,
                                 int index, char *file, int line)
 {
-       struct tx_arg *arg;
+       struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-
        if (dt_try_as_dir(env, dt_obj) == 0) {
-               ta->ta_err = -ENOTDIR;
-               return ta->ta_err;
+               rc = -ENOTDIR;
+               return rc;
        }
 
-       ta->ta_err = dt_declare_insert(env, dt_obj,
-                                      (struct dt_rec *)fid,
-                                      (struct dt_key *)name,
-                                      ta->ta_handle);
-
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_insert(env, dt_obj, (struct dt_rec *)fid,
+                              (struct dt_key *)name, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_index_insert_exec,
-                         out_tx_index_insert_undo, file,
-                         line);
-       LASSERT(arg);
+                         out_tx_index_insert_undo, file, line);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->reply = reply;
@@ -969,24 +1029,25 @@ static int __out_tx_index_delete(const struct lu_env *env,
                                 struct object_update_reply *reply,
                                 int index, char *file, int line)
 {
-       struct tx_arg *arg;
+       struct tx_arg   *arg;
+       int             rc;
 
        if (dt_try_as_dir(env, dt_obj) == 0) {
-               ta->ta_err = -ENOTDIR;
-               return ta->ta_err;
+               rc = -ENOTDIR;
+               return rc;
        }
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_delete(env, dt_obj,
-                                      (struct dt_key *)name,
-                                      ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_delete(env, dt_obj, (struct dt_key *)name,
+                              ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_index_delete_exec,
-                         out_tx_index_delete_undo, file,
-                         line);
-       LASSERT(arg);
+                         out_tx_index_delete_undo, file, line);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->reply = reply;
@@ -1048,16 +1109,19 @@ static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
                             struct object_update_reply *reply,
                             int index, char *file, int line)
 {
-       struct tx_arg *arg;
+       struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
-       if (ta->ta_err)
-               return ta->ta_err;
+       rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
                          file, line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->reply = reply;
@@ -1118,15 +1182,17 @@ static int __out_tx_write(const struct lu_env *env,
                          int index, char *file, int line)
 {
        struct tx_arg   *arg;
+       int             rc;
 
        LASSERT(ta->ta_handle != NULL);
-       ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
-                                            ta->ta_handle);
-       if (ta->ta_err != 0)
-               return ta->ta_err;
+       rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
+       if (rc != 0)
+               return rc;
 
        arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
-       LASSERT(arg);
+       if (IS_ERR(arg))
+               return PTR_ERR(arg);
+
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
        arg->u.write.buf = *buf;
@@ -1240,11 +1306,10 @@ static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
 
                rc = PTR_ERR(ta->ta_handle);
                ta->ta_handle = NULL;
-               CERROR("%s: start handle error: rc = %d\n",
-                      dt_obd_name(dt), rc);
+               CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
+                      rc);
                return rc;
        }
-       ta->ta_dev = dt;
        if (exp->exp_need_sync)
                ta->ta_handle->th_sync = 1;
 
@@ -1254,7 +1319,7 @@ static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
 static int out_trans_start(const struct lu_env *env,
                           struct thandle_exec_args *ta)
 {
-       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
+       return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
 }
 
 static int out_trans_stop(const struct lu_env *env,
@@ -1264,28 +1329,29 @@ static int out_trans_stop(const struct lu_env *env,
        int rc;
 
        ta->ta_handle->th_result = err;
-       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
+       rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
        for (i = 0; i < ta->ta_argno; i++) {
-               if (ta->ta_args[i].object != NULL) {
-                       struct dt_object *obj = ta->ta_args[i].object;
+               if (ta->ta_args[i]->object != NULL) {
+                       struct dt_object *obj = ta->ta_args[i]->object;
 
                        /* If the object is being created during this
                         * transaction, we need to remove them from the
                         * cache immediately, because a few layers are
                         * missing in OUT handler, i.e. the object might
                         * not be initialized in all layers */
-                       if (ta->ta_args[i].exec_fn == out_tx_create_exec)
+                       if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
                                set_bit(LU_OBJECT_HEARD_BANSHEE,
                                        &obj->do_lu.lo_header->loh_flags);
-                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
-                       ta->ta_args[i].object = NULL;
+                       lu_object_put(env, &ta->ta_args[i]->object->do_lu);
+                       ta->ta_args[i]->object = NULL;
                }
        }
 
        return rc;
 }
 
-int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
+              int declare_ret)
 {
        struct tgt_session_info *tsi = tgt_ses_info(env);
        int                     i;
@@ -1296,36 +1362,36 @@ int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
        if (ta->ta_handle == NULL)
                RETURN(0);
 
-       if (ta->ta_err != 0 || ta->ta_argno == 0)
-               GOTO(stop, rc = ta->ta_err);
+       if (declare_ret != 0 || ta->ta_argno == 0)
+               GOTO(stop, rc = declare_ret);
 
-       LASSERT(ta->ta_dev != NULL);
+       LASSERT(ta->ta_handle->th_dev != NULL);
        rc = out_trans_start(env, ta);
        if (unlikely(rc != 0))
                GOTO(stop, rc);
 
        for (i = 0; i < ta->ta_argno; i++) {
-               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
-                                           &ta->ta_args[i]);
+               rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
+                                            ta->ta_args[i]);
                if (unlikely(rc != 0)) {
                        CDEBUG(D_INFO, "error during execution of #%u from"
-                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
-                              ta->ta_args[i].line, rc);
+                              " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
+                              ta->ta_args[i]->line, rc);
                        while (--i >= 0) {
-                               if (ta->ta_args[i].undo_fn != NULL)
-                                       ta->ta_args[i].undo_fn(env,
+                               if (ta->ta_args[i]->undo_fn != NULL)
+                                       ta->ta_args[i]->undo_fn(env,
                                                               ta->ta_handle,
-                                                             &ta->ta_args[i]);
+                                                              ta->ta_args[i]);
                                else
                                        CERROR("%s: undo for %s:%d: rc = %d\n",
-                                              dt_obd_name(ta->ta_dev),
-                                              ta->ta_args[i].file,
-                                              ta->ta_args[i].line, -ENOTSUPP);
+                                            dt_obd_name(ta->ta_handle->th_dev),
+                                              ta->ta_args[i]->file,
+                                              ta->ta_args[i]->line, -ENOTSUPP);
                        }
                        break;
                }
                CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-                      dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
+                      dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
        }
 
        /* Only fail for real update */
@@ -1337,7 +1403,6 @@ stop:
 
        ta->ta_handle = NULL;
        ta->ta_argno = 0;
-       ta->ta_err = 0;
 
        RETURN(rc);
 }
@@ -1459,20 +1524,28 @@ int out_handle(struct tgt_session_info *tsi)
                        GOTO(next, rc = -ENOTSUPP);
                }
 
+               /* Check resend case only for modifying RPC */
+               if (h->th_flags & MUTABOR) {
+                       struct ptlrpc_request *req = tgt_ses_req(tsi);
+
+                       if (out_check_resent(env, dt, dt_obj, req,
+                                            out_reconstruct, reply, i))
+                               GOTO(next, rc = 0);
+               }
+
                /* start transaction for modification RPC only */
                if (h->th_flags & MUTABOR && current_batchid == -1) {
                        current_batchid = update->ou_batchid;
                        rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
-                       if (rc < 0)
+                       if (rc != 0)
                                GOTO(next, rc);
                }
 
-               /* Stop the current update transaction,
-                * if it meets a different batchid, or
-                * it is read-only RPC */
+               /* Stop the current update transaction, if the update has
+                * different batchid, or read-only update */
                if (((current_batchid != update->ou_batchid) ||
                     !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
-                       rc = out_tx_end(env, ta);
+                       rc = out_tx_end(env, ta, rc);
                        current_batchid = -1;
                        if (rc != 0)
                                GOTO(next, rc);
@@ -1487,16 +1560,6 @@ int out_handle(struct tgt_session_info *tsi)
                        }
                }
 
-               /* Check resend case only for modifying RPC */
-               if (h->th_flags & MUTABOR) {
-                       struct ptlrpc_request *req = tgt_ses_req(tsi);
-
-                       if (out_check_resent(env, dt, dt_obj, req,
-                                            out_reconstruct, reply,
-                                            i))
-                               GOTO(next, rc);
-               }
-
                rc = h->th_act(tsi);
 next:
                lu_object_put(env, &dt_obj->do_lu);
@@ -1505,7 +1568,7 @@ next:
        }
 out:
        if (current_batchid != -1) {
-               rc1 = out_tx_end(env, ta);
+               rc1 = out_tx_end(env, ta, rc);
                if (rc == 0)
                        rc = rc1;
        }
index 1b70916..f915b6d 100644 (file)
@@ -249,14 +249,6 @@ int out_insert_update(const struct lu_env *env,
        int                             i;
        ENTRY;
 
-       if (ureq->ourq_count >= OUT_UPDATE_PER_TRANS_MAX) {
-               CERROR("%s: too much params %d or update %d "DFID" op %d: "
-                      "rc = %d\n",
-                      update->dur_dt->dd_lu_dev.ld_obd->obd_name,
-                      params_count, ureq->ourq_count, PFID(fid), op, -E2BIG);
-               RETURN(-E2BIG);
-       }
-
        /* Check update size to make sure it can fit into the buffer */
        ureq_len = object_update_request_size(ureq);
        update_length = offsetof(struct object_update, ou_params[0]);
index 14ce886..f0ed071 100644 (file)
@@ -91,13 +91,11 @@ struct tx_arg {
        } u;
 };
 
-#define TX_MAX_OPS       10
 struct thandle_exec_args {
        struct thandle          *ta_handle;
-       struct dt_device        *ta_dev;
-       struct tx_arg            ta_args[TX_MAX_OPS];
-       int                      ta_err;
-       int                      ta_argno;   /* used args */
+       int                     ta_argno;   /* used args */
+       int                     ta_alloc_args; /* allocated args count */
+       struct tx_arg           **ta_args;
 };
 
 /**
index 1d2a933..9191039 100644 (file)
@@ -143,7 +143,26 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut)
 EXPORT_SYMBOL(tgt_fini);
 
 /* context key constructor/destructor: tg_key_init, tg_key_fini */
-LU_KEY_INIT_FINI(tgt, struct tgt_thread_info);
+LU_KEY_INIT(tgt, struct tgt_thread_info);
+
+static void tgt_key_fini(const struct lu_context *ctx,
+                        struct lu_context_key *key, void *data)
+{
+       struct tgt_thread_info          *info = data;
+       struct thandle_exec_args        *args = &info->tti_tea;
+       int                             i;
+
+       for (i = 0; i < args->ta_alloc_args; i++) {
+               if (args->ta_args[i] != NULL)
+                       OBD_FREE_PTR(args->ta_args[i]);
+       }
+
+       if (args->ta_args != NULL)
+               OBD_FREE(args->ta_args, sizeof(args->ta_args[0]) *
+                                       args->ta_alloc_args);
+       OBD_FREE_PTR(info);
+}
+
 static void tgt_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {