Whamcloud - gitweb
LU-3534 osp: send updates by separate thread 25/12825/37
authorWang Di <di.wang@intel.com>
Sun, 23 Nov 2014 04:06:55 +0000 (20:06 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Jun 2015 16:11:24 +0000 (16:11 +0000)
Sending updates to other MDT by separate thread, so

1. The thread will sends the update by version number, i.e.
lower version of update will be sent first.

2. If one operation includes updates on several MDTs, these
updates can be sent on different MDTs in parallel. Though in
transaction stop (top_trans_stop()), the top transaction needs
to wait all sub transaction stop their transaction (i.e. updates
have been executed on all MDTs), then return.

3. Move dt_update_request to osp layer, since it is only OSP layer
thing, and each OSP transaction will create the osp_update_request
and attached to the list of OSP sending thread.

4. Change lgh_hdr_lock from spinlock to rw_semaphore, because if
the llog object is remote object, it will go into osp_md_write_obj,
where it might be blocked by memory allocation.

5. a few cleanups for the pervious DNE patches
http://review.whamcloud.com/11572
http://review.whamcloud.com/11737
see LU-6691, LU-6692, LU-6693.

Change-Id: I2dde892e4ca4bf74c2cd56e16bb1125920365334
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/12825
Tested-by: Jenkins
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
19 files changed:
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lu_target.h
lustre/include/lustre_log.h
lustre/include/lustre_update.h
lustre/ldlm/ldlm_lib.c
lustre/lod/lod_dev.c
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/target/out_handler.c
lustre/target/update_recovery.c
lustre/target/update_trans.c

index 4b98848..dd3f40b 100644 (file)
@@ -104,7 +104,7 @@ typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th,
 #define TRANS_COMMIT_CB_MAGIC  0xa0a00a0a
 #define MAX_COMMIT_CB_STR_LEN  32
 
-#define DCB_TRANS_NOT_COMMITTED        0x1
+#define DCB_TRANS_STOP         0x1
 struct dt_txn_commit_cb {
        struct list_head        dcb_linkage;
        dt_cb_t                 dcb_func;
index 39d9d22..c11a2a7 100644 (file)
@@ -439,44 +439,6 @@ struct lu_attr {
         __u64          la_valid;
 };
 
-static inline void lu_attr_cpu_to_le(struct lu_attr *dst_attr,
-                                    struct lu_attr *src_attr)
-{
-       dst_attr->la_size = cpu_to_le64(src_attr->la_size);
-       dst_attr->la_mtime = cpu_to_le64(src_attr->la_mtime);
-       dst_attr->la_atime = cpu_to_le64(src_attr->la_atime);
-       dst_attr->la_ctime = cpu_to_le64(src_attr->la_ctime);
-       dst_attr->la_blocks = cpu_to_le64(src_attr->la_blocks);
-       dst_attr->la_mode = cpu_to_le32(src_attr->la_mode);
-       dst_attr->la_uid = cpu_to_le32(src_attr->la_uid);
-       dst_attr->la_gid = cpu_to_le32(src_attr->la_gid);
-       dst_attr->la_flags = cpu_to_le32(src_attr->la_flags);
-       dst_attr->la_nlink = cpu_to_le32(src_attr->la_nlink);
-       dst_attr->la_blkbits = cpu_to_le32(src_attr->la_blkbits);
-       dst_attr->la_blksize = cpu_to_le32(src_attr->la_blksize);
-       dst_attr->la_rdev = cpu_to_le32(src_attr->la_rdev);
-       dst_attr->la_valid = cpu_to_le64(src_attr->la_valid);
-}
-
-static inline void lu_attr_le_to_cpu(struct lu_attr *dst_attr,
-                                    struct lu_attr *src_attr)
-{
-       dst_attr->la_size = le64_to_cpu(src_attr->la_size);
-       dst_attr->la_mtime = le64_to_cpu(src_attr->la_mtime);
-       dst_attr->la_atime = le64_to_cpu(src_attr->la_atime);
-       dst_attr->la_ctime = le64_to_cpu(src_attr->la_ctime);
-       dst_attr->la_blocks = le64_to_cpu(src_attr->la_blocks);
-       dst_attr->la_mode = le32_to_cpu(src_attr->la_mode);
-       dst_attr->la_uid = le32_to_cpu(src_attr->la_uid);
-       dst_attr->la_gid = le32_to_cpu(src_attr->la_gid);
-       dst_attr->la_flags = le32_to_cpu(src_attr->la_flags);
-       dst_attr->la_nlink = le32_to_cpu(src_attr->la_nlink);
-       dst_attr->la_blkbits = le32_to_cpu(src_attr->la_blkbits);
-       dst_attr->la_blksize = le32_to_cpu(src_attr->la_blksize);
-       dst_attr->la_rdev = le32_to_cpu(src_attr->la_rdev);
-       dst_attr->la_valid = le64_to_cpu(src_attr->la_valid);
-}
-
 /** Bit-mask of valid attributes */
 enum la_valid {
         LA_ATIME = 1 << 0,
index 6c4c384..4b5b7d7 100644 (file)
@@ -417,7 +417,7 @@ int distribute_txn_replay_handle(struct lu_env *env,
 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd);
 struct distribute_txn_replay_req *
 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd);
-void dtrq_destory(struct distribute_txn_replay_req *dtrq);
+void dtrq_destroy(struct distribute_txn_replay_req *dtrq);
 struct distribute_txn_replay_req_sub *
 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index);
 
index 52bc7be..18e9a3a 100644 (file)
@@ -292,7 +292,7 @@ struct llog_operations {
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
        struct rw_semaphore      lgh_lock;
-       spinlock_t               lgh_hdr_lock; /* protect lgh_hdr data */
+       struct rw_semaphore      lgh_hdr_lock; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
        struct dt_object        *lgh_obj;
index 1417804..27d56ee 100644 (file)
@@ -41,27 +41,6 @@ struct dt_key;
 struct dt_rec;
 struct object_update_param;
 
-struct update_buffer {
-       struct object_update_request    *ub_req;
-       size_t                          ub_req_size;
-};
-
-/**
- * Tracking the updates being executed on this dt_device.
- */
-struct dt_update_request {
-       struct dt_device                *dur_dt;
-       /* attached itself to thandle */
-       int                             dur_flags;
-       /* update request result */
-       int                             dur_rc;
-       /* Current batch(transaction) id */
-       __u64                           dur_batchid;
-       /* Holding object updates */
-       struct update_buffer            dur_buf;
-       struct list_head                dur_cb_items;
-};
-
 struct update_params {
        struct object_update_param      up_params[0];
 };
@@ -116,7 +95,7 @@ update_params_get_param_buf(const struct update_params *params, __u16 index,
        if (size != NULL)
                *size = param->oup_len;
 
-       return &param->oup_buf[0];
+       return param->oup_buf;
 }
 
 struct update_op {
@@ -360,6 +339,7 @@ struct top_multiple_thandle {
        /* All of update records will packed here */
        struct thandle_update_records *tmt_update_records;
 
+       wait_queue_head_t       tmt_stop_waitq;
        __u64                   tmt_batchid;
        int                     tmt_result;
        __u32                   tmt_magic;
@@ -382,23 +362,24 @@ struct top_thandle {
 struct sub_thandle {
        struct thandle          *st_sub_th;
        struct dt_device        *st_dt;
-       struct list_head        st_list;
        struct llog_cookie      st_cookie;
        struct dt_txn_commit_cb st_commit_dcb;
+       struct dt_txn_commit_cb st_stop_dcb;
        int                     st_result;
 
        /* linked to top_thandle */
        struct list_head        st_sub_list;
 
        /* If this sub thandle is committed */
-       bool                    st_committed:1;
+       bool                    st_committed:1,
+                               st_stopped:1;
 };
 
 struct tx_arg;
 typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
                              struct tx_arg *ta);
 
-/* Structure for holding one update executation */
+/* Structure for holding one update execution */
 struct tx_arg {
        tx_exec_func_t           exec_fn;
        tx_exec_func_t           undo_fn;
index 2c288c2..cb80676 100644 (file)
@@ -1732,7 +1732,6 @@ static int check_for_next_transno(struct lu_target *lut)
 
        if (lut->lut_tdtd != NULL) {
                struct target_distribute_txn_data *tdtd;
-               __u64 update_transno;
 
                tdtd = lut->lut_tdtd;
                update_transno = distribute_txn_get_next_transno(lut->lut_tdtd);
@@ -1754,8 +1753,8 @@ static int check_for_next_transno(struct lu_target *lut)
        } else if (obd->obd_recovery_expired) {
                CDEBUG(D_HA, "waking for expired recovery\n");
                wake_up = 1;
-       } else if (req_transno == next_transno || (update_transno != 0 &&
-                                          update_transno <= next_transno)) {
+       } else if (req_transno == next_transno ||
+                  (update_transno != 0 && update_transno <= next_transno)) {
                CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
                wake_up = 1;
        } else if (queue_len > 0 &&
@@ -2263,7 +2262,7 @@ static void replay_request_or_update(struct lu_env *env,
                                obd->obd_next_recovery_transno = transno;
                        spin_unlock(&obd->obd_recovery_task_lock);
                        target_update_lcd(env, lut, dtrq);
-                       dtrq_destory(dtrq);
+                       dtrq_destroy(dtrq);
                } else {
                        spin_unlock(&obd->obd_recovery_task_lock);
                        LASSERT(list_empty(&obd->obd_req_replay_queue));
index 7c7a1cb..02b41d6 100644 (file)
@@ -316,8 +316,8 @@ static int lod_process_recovery_updates(const struct lu_env *env,
 
        if (rec->lrh_len !=
                llog_update_record_size((struct llog_update_record *)rec)) {
-               CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n",
-                      lod2obd(lrd->lrd_lod)->obd_name, index,
+               CERROR("%s broken update record! index %u "DOSTID":%u :"
+                      " rc = %d\n", lod2obd(lrd->lrd_lod)->obd_name, index,
                       POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO);
                return -EIO;
        }
index 4e3ae10..33d2ea3 100644 (file)
@@ -65,7 +65,7 @@ static struct llog_handle *llog_alloc_handle(void)
                return NULL;
 
        init_rwsem(&loghandle->lgh_lock);
-       spin_lock_init(&loghandle->lgh_hdr_lock);
+       init_rwsem(&loghandle->lgh_hdr_lock);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        atomic_set(&loghandle->lgh_refcount, 1);
 
@@ -121,9 +121,9 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                 RETURN(-EINVAL);
         }
 
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        if (!ext2_clear_bit(index, llh->llh_bitmap)) {
-               spin_unlock(&loghandle->lgh_hdr_lock);
+               up_write(&loghandle->lgh_hdr_lock);
                CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
                RETURN(-ENOENT);
        }
@@ -133,7 +133,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
            (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
-               spin_unlock(&loghandle->lgh_hdr_lock);
+               up_write(&loghandle->lgh_hdr_lock);
                rc = llog_destroy(env, loghandle);
                if (rc < 0) {
                        CERROR("%s: can't destroy empty llog #"DOSTID
@@ -145,7 +145,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                }
                RETURN(LLOG_DEL_PLAIN);
        }
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
 
        rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX);
        if (rc < 0) {
@@ -158,10 +158,10 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        }
        RETURN(0);
 out_err:
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        ext2_set_bit(index, llh->llh_bitmap);
        llh->llh_count++;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
        return rc;
 }
 
index e81fb9e..ad843e9 100644 (file)
@@ -507,15 +507,24 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
        /* the lgh_hdr_lock protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        if (ext2_set_bit(index, llh->llh_bitmap)) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               spin_unlock(&loghandle->lgh_hdr_lock);
+               up_write(&loghandle->lgh_hdr_lock);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+
+       /* XXX It is a bit tricky here, if the log object is local,
+        * we do not need lock during write here, because if there is
+        * race, the transaction(jbd2, what about ZFS?) will make sure the
+        * conflicts will all committed in the same transaction group.
+        * But for remote object, we need lock the whole process, so to
+        * set the version of the remote transaction to make sure they
+        * are being sent in order. (see osp_md_write()) */
+       if (!dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
 
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
@@ -523,7 +532,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = &llh->llh_hdr;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
        } else {
                /* Note: If this is not initialization (size == 0), then do not
                 * write the whole header (8k bytes), only update header/tail
@@ -538,7 +547,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = &llh->llh_count;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
 
                lgi->lgi_off = offsetof(typeof(*llh),
                        llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
@@ -547,16 +556,23 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
 
                lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
                lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
                lgi->lgi_buf.lb_buf = &llh->llh_tail;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
        }
 
+out_remote_unlock:
+       /* unlock here for remote object */
+       if (dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
+       if (rc)
+               GOTO(out, rc);
+
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
@@ -586,10 +602,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
        RETURN(rc);
 out:
        /* cleanup llog for error case */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        ext2_clear_bit(index, llh->llh_bitmap);
        llh->llh_count--;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
 
        /* restore llog last_idx */
        loghandle->lgh_last_idx--;
index 3ceb854..1815db2 100644 (file)
@@ -831,7 +831,8 @@ static void osd_trans_commit_cb(struct super_block *sb,
         dt_txn_hook_commit(th);
 
        /* call per-transaction callbacks if any */
-       list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
+       list_for_each_entry_safe(dcb, tmp, &oh->ot_commit_dcb_list,
+                                dcb_linkage) {
                LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
                         "commit callback entry: magic=%x name='%s'\n",
                         dcb->dcb_magic, dcb->dcb_name);
@@ -870,7 +871,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
                th->th_result = 0;
                th->th_tags = LCT_TX_HANDLE;
                oh->ot_credits = 0;
-               INIT_LIST_HEAD(&oh->ot_dcb_list);
+               INIT_LIST_HEAD(&oh->ot_commit_dcb_list);
+               INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
                osd_th_alloced(oh);
 
                memset(oti->oti_declare_ops, 0,
@@ -1006,6 +1008,22 @@ static int osd_seq_exists(const struct lu_env *env,
        RETURN(ss->ss_node_id == range->lsr_index);
 }
 
+static void osd_trans_stop_cb(struct osd_thandle *oth, int result)
+{
+       struct dt_txn_commit_cb *dcb;
+       struct dt_txn_commit_cb *tmp;
+
+       /* call per-transaction stop callbacks if any */
+       list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
+                                dcb_linkage) {
+               LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+                        "commit callback entry: magic=%x name='%s'\n",
+                        dcb->dcb_magic, dcb->dcb_name);
+               list_del_init(&dcb->dcb_linkage);
+               dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+       }
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
@@ -1045,6 +1063,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                        CERROR("%s: failed in transaction hook: rc = %d\n",
                               osd_name(osd), rc);
 
+               osd_trans_stop_cb(oh, rc);
                /* hook functions might modify th_sync */
                hdl->h_sync = th->th_sync;
 
@@ -1054,6 +1073,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                        CERROR("%s: failed to stop transaction: rc = %d\n",
                               osd_name(osd), rc);
        } else {
+               osd_trans_stop_cb(oh, th->th_result);
                OBD_FREE_PTR(oh);
        }
 
@@ -1085,7 +1105,10 @@ static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 
        LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
        LASSERT(&dcb->dcb_func != NULL);
-       list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+       if (dcb->dcb_flags & DCB_TRANS_STOP)
+               list_add(&dcb->dcb_linkage, &oh->ot_stop_dcb_list);
+       else
+               list_add(&dcb->dcb_linkage, &oh->ot_commit_dcb_list);
 
        return 0;
 }
index 2cc5ca6..683deb8 100644 (file)
@@ -330,7 +330,8 @@ struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
         struct ldiskfs_journal_cb_entry ot_jcb;
-       struct list_head              ot_dcb_list;
+       struct list_head       ot_commit_dcb_list;
+       struct list_head       ot_stop_dcb_list;
        /* Link to the device, for debugging. */
        struct lu_ref_link      ot_dev_link;
         unsigned short          ot_credits;
index 73efec3..b9ea13b 100644 (file)
@@ -74,6 +74,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <linux/kthread.h>
 #include <obd_class.h>
 #include <lustre_ioctl.h>
 #include <lustre_param.h>
@@ -483,6 +484,95 @@ static int osp_disconnect(struct osp_device *d)
 }
 
 /**
+ * Initialize the osp_update structure in OSP device
+ *
+ * Allocate osp update structure and start update thread.
+ *
+ * \param[in] osp      OSP device
+ *
+ * \retval             0 if initialization succeeds.
+ * \retval             negative errno if initialization fails.
+ */
+static int osp_update_init(struct osp_device *osp)
+{
+       struct l_wait_info      lwi = { 0 };
+       struct task_struct      *task;
+
+       ENTRY;
+
+       LASSERT(osp->opd_connect_mdt);
+
+       OBD_ALLOC_PTR(osp->opd_update);
+       if (osp->opd_update == NULL)
+               RETURN(-ENOMEM);
+
+       init_waitqueue_head(&osp->opd_update_thread.t_ctl_waitq);
+       init_waitqueue_head(&osp->opd_update->ou_waitq);
+       spin_lock_init(&osp->opd_update->ou_lock);
+       INIT_LIST_HEAD(&osp->opd_update->ou_list);
+       osp->opd_update->ou_rpc_version = 1;
+       osp->opd_update->ou_version = 1;
+
+       /* start thread handling sending updates to the remote MDT */
+       task = kthread_run(osp_send_update_thread, osp,
+                          "osp_up%u-%u", osp->opd_index, osp->opd_group);
+       if (IS_ERR(task)) {
+               int rc = PTR_ERR(task);
+
+               OBD_FREE_PTR(osp->opd_update);
+               osp->opd_update = NULL;
+               CERROR("%s: can't start precreate thread: rc = %d\n",
+                      osp->opd_obd->obd_name, rc);
+               RETURN(rc);
+       }
+
+       l_wait_event(osp->opd_update_thread.t_ctl_waitq,
+                    osp_send_update_thread_running(osp) ||
+                    osp_send_update_thread_stopped(osp), &lwi);
+
+       RETURN(0);
+}
+
+/**
+ * Finialize osp_update structure in OSP device
+ *
+ * Stop the OSP update sending thread, then delete the left
+ * osp thandle in the sending list.
+ *
+ * \param [in] osp     OSP device.
+ */
+static void osp_update_fini(const struct lu_env *env, struct osp_device *osp)
+{
+       struct osp_update_request *our;
+       struct osp_update_request *tmp;
+       struct osp_updates *ou = osp->opd_update;
+
+       if (ou == NULL)
+               return;
+
+       osp->opd_update_thread.t_flags = SVC_STOPPING;
+       wake_up(&ou->ou_waitq);
+
+       wait_event(osp->opd_update_thread.t_ctl_waitq,
+                  osp->opd_update_thread.t_flags & SVC_STOPPED);
+
+       /* Remove the left osp thandle from the list */
+       spin_lock(&ou->ou_lock);
+       list_for_each_entry_safe(our, tmp, &ou->ou_list,
+                                our_list) {
+               list_del_init(&our->our_list);
+               LASSERT(our->our_th != NULL);
+               osp_trans_callback(env, our->our_th, -EIO);
+               /* our will be destroyed in osp_thandle_put() */
+               osp_thandle_put(our->our_th);
+       }
+       spin_unlock(&ou->ou_lock);
+
+       OBD_FREE_PTR(ou);
+       osp->opd_update = NULL;
+}
+
+/**
  * Cleanup OSP, which includes disconnect import, cleanup unlink log, stop
  * precreate threads etc.
  *
@@ -543,6 +633,7 @@ static int osp_process_config(const struct lu_env *env,
        switch (lcfg->lcfg_command) {
        case LCFG_PRE_CLEANUP:
                rc = osp_disconnect(d);
+               osp_update_fini(env, d);
                break;
        case LCFG_CLEANUP:
                lu_dev_del_linkage(dev->ld_site, dev);
@@ -1082,6 +1173,10 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp,
                rc = osp_sync_init(env, osp);
                if (rc < 0)
                        GOTO(out_precreat, rc);
+       } else {
+               rc = osp_update_init(osp);
+               if (rc != 0)
+                       GOTO(out_fid, rc);
        }
 
        ns_register_cancel(obd->obd_namespace, osp_cancel_weight);
@@ -1109,6 +1204,8 @@ out_precreat:
        /* stop precreate thread */
        if (!osp->opd_connect_mdt)
                osp_precreate_fini(osp);
+       else
+               osp_update_fini(env, osp);
 out_last_used:
        if (!osp->opd_connect_mdt)
                osp_last_used_fini(env, osp);
@@ -1222,7 +1319,7 @@ static struct lu_device *osp_device_fini(const struct lu_env *env,
        ENTRY;
 
        if (osp->opd_async_requests != NULL) {
-               dt_update_request_destroy(osp->opd_async_requests);
+               osp_update_request_destroy(osp->opd_async_requests);
                osp->opd_async_requests = NULL;
        }
 
index 4198f45..d6cf452 100644 (file)
@@ -94,6 +94,36 @@ struct osp_precreate {
        int                              osp_pre_recovering;
 };
 
+/**
+ * Tracking the updates being executed on this dt_device.
+ */
+struct osp_update_request {
+       int                             our_flags;
+       /* update request result */
+       int                             our_rc;
+
+       /* Holding object updates sent to the remote target */
+       struct object_update_request    *our_req;
+       size_t                          our_req_size;
+
+       struct list_head                our_cb_items;
+
+       /* points to thandle if this update request belongs to one */
+       struct osp_thandle              *our_th;
+       /* linked to the list(ou_list) in osp_updates */
+       struct list_head                our_list;
+       __u32                           our_req_sent:1;
+};
+
+struct osp_updates {
+       struct list_head        ou_list;
+       spinlock_t              ou_lock;
+       wait_queue_head_t       ou_waitq;
+       /* wait for next updates */
+       __u64                   ou_rpc_version;
+       __u64                   ou_version;
+};
+
 struct osp_device {
        struct dt_device                 opd_dt_dev;
        /* corresponded OST index */
@@ -141,6 +171,11 @@ struct osp_device {
        /* thread waits for signals about pool going empty */
        wait_queue_head_t                opd_pre_waitq;
 
+       /* send update thread */
+       struct osp_updates              *opd_update;
+       /* dedicate update thread */
+       struct ptlrpc_thread             opd_update_thread;
+
        /*
         * OST synchronization
         */
@@ -196,7 +231,7 @@ struct osp_device {
         * osp_device::opd_async_requests via declare() functions, these
         * requests can be packed together and sent to the remote server
         * via single OUT RPC later. */
-       struct dt_update_request        *opd_async_requests;
+       struct osp_update_request       *opd_async_requests;
        /* Protect current operations on opd_async_requests. */
        struct mutex                     opd_async_requests_mutex;
        struct list_head                 opd_async_updates;
@@ -296,14 +331,18 @@ struct osp_it {
        struct page              **ooi_pages;
 };
 
+#define OSP_THANDLE_MAGIC      0x20141214
 struct osp_thandle {
        struct thandle           ot_super;
-       struct dt_update_request *ot_dur;
 
        /* OSP will use this thandle to update last oid*/
        struct thandle          *ot_storage_th;
-       struct list_head         ot_dcb_list;
+       __u32                    ot_magic;
+       struct list_head         ot_commit_dcb_list;
+       struct list_head         ot_stop_dcb_list;
+       struct osp_update_request *ot_our;
        atomic_t                 ot_refcount;
+       __u64                    ot_version;
 };
 
 static inline struct osp_thandle *
@@ -312,13 +351,13 @@ thandle_to_osp_thandle(struct thandle *th)
        return container_of(th, struct osp_thandle, ot_super);
 }
 
-static inline struct dt_update_request *
-thandle_to_dt_update_request(struct thandle *th)
+static inline struct osp_update_request *
+thandle_to_osp_update_request(struct thandle *th)
 {
        struct osp_thandle *oth;
 
        oth = thandle_to_osp_thandle(th);
-       return oth->ot_dur;
+       return oth->ot_our;
 }
 
 /* The transaction only include the updates on the remote node, and
@@ -542,7 +581,7 @@ update_buffer_get_update(struct object_update_request *request,
                         unsigned int index);
 
 int osp_extend_update_buffer(const struct lu_env *env,
-                            struct update_buffer *ubuf);
+                            struct osp_update_request *our);
 
 #define osp_update_rpc_pack(env, name, update, op, ...)                \
 ({                                                             \
@@ -552,25 +591,27 @@ int osp_extend_update_buffer(const struct lu_env *env,
        int                     ret;                            \
                                                                \
        while (1) {                                                     \
-               ureq = update->dur_buf.ub_req;                          \
-               max_update_length = update->dur_buf.ub_req_size -       \
+               ureq = update->our_req;                                 \
+               max_update_length = update->our_req_size -              \
                                    object_update_request_size(ureq);   \
                                                                        \
                object_update = update_buffer_get_update(ureq,          \
-                                                        ureq->ourq_count);    \
-               ret = out_##name##_pack(env, object_update, max_update_length, \
+                                                ureq->ourq_count);     \
+               ret = out_##name##_pack(env, object_update,             \
+                                       max_update_length,              \
                                       __VA_ARGS__);                    \
                if (ret == -E2BIG) {                                    \
                        int rc1;                                        \
                        /* extend the buffer and retry */               \
-                       rc1 = osp_extend_update_buffer(env, &update->dur_buf); \
+                       rc1 = osp_extend_update_buffer(env, update);    \
                        if (rc1 != 0) {                                 \
                                ret = rc1;                              \
                                break;                                  \
                        }                                               \
                } else {                                                \
                        if (ret == 0) {                                 \
-                               object_update->ou_flags |= update->dur_flags; \
+                               object_update->ou_flags |=              \
+                                                    update->our_flags; \
                                ureq->ourq_count++;                     \
                        }                                               \
                        break;                                          \
@@ -579,6 +620,16 @@ int osp_extend_update_buffer(const struct lu_env *env,
        ret;                                                            \
 })
 
+static inline bool osp_send_update_thread_running(struct osp_device *osp)
+{
+       return osp->opd_update_thread.t_flags & SVC_RUNNING;
+}
+
+static inline bool osp_send_update_thread_stopped(struct osp_device *osp)
+{
+       return osp->opd_update_thread.t_flags & SVC_STOPPED;
+}
+
 typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
                                        struct object_update_reply *rep,
                                        struct ptlrpc_request *req,
@@ -597,32 +648,47 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
 
 int osp_unplug_async_request(const struct lu_env *env,
                             struct osp_device *osp,
-                            struct dt_update_request *update);
+                            struct osp_update_request *update);
 int osp_trans_update_request_create(struct thandle *th);
 struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
 int osp_insert_update_callback(const struct lu_env *env,
-                              struct dt_update_request *update,
+                              struct osp_update_request *update,
                               struct osp_object *obj, void *data,
                               osp_update_interpreter_t interpreter);
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
-                       struct ptlrpc_request **reqp);
-struct dt_update_request *dt_update_request_create(struct dt_device *dt);
-void dt_update_request_destroy(struct dt_update_request *dt_update);
+
+struct osp_update_request *osp_update_request_create(struct dt_device *dt);
+void osp_update_request_destroy(struct osp_update_request *update);
+
+int osp_send_update_thread(void *arg);
+int osp_check_and_set_rpc_version(struct osp_thandle *oth);
+
+void osp_thandle_destroy(struct osp_thandle *oth);
+static inline void osp_thandle_get(struct osp_thandle *oth)
+{
+       atomic_inc(&oth->ot_refcount);
+}
+
+static inline void osp_thandle_put(struct osp_thandle *oth)
+{
+       if (atomic_dec_and_test(&oth->ot_refcount))
+               osp_thandle_destroy(oth);
+}
 
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
-                   struct dt_update_request *update,
+                   struct osp_update_request *update,
                    struct ptlrpc_request **reqp);
 
 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
                                        struct thandle *th,
                                        struct osp_device *osp);
+void osp_trans_callback(const struct lu_env *env,
+                       struct osp_thandle *oth, int rc);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
index 3acbb16..46de68b 100644 (file)
@@ -104,7 +104,7 @@ static int osp_object_create_interpreter(const struct lu_env *env,
 /**
  * Implementation of dt_object_operations::do_declare_create
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -154,25 +154,25 @@ update_buffer_get_update(struct object_update_request *request,
 }
 
 int osp_extend_update_buffer(const struct lu_env *env,
-                            struct update_buffer *ubuf)
+                            struct osp_update_request *our)
 {
-       struct object_update_request *ureq;
-       size_t  new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
+       struct object_update_request *obj_update_req;
+       size_t new_size = our->our_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
 
        /* enlarge object update request size */
        if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
                return -E2BIG;
 
-       OBD_ALLOC_LARGE(ureq, new_size);
-       if (ureq == NULL)
+       OBD_ALLOC_LARGE(obj_update_req, new_size);
+       if (obj_update_req == NULL)
                return -ENOMEM;
 
-       memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
+       memcpy(obj_update_req, our->our_req, our->our_req_size);
 
-       OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
+       OBD_FREE_LARGE(our->our_req, our->our_req_size);
 
-       ubuf->ub_req = ureq;
-       ubuf->ub_req_size = new_size;
+       our->our_req = obj_update_req;
+       our->our_req_size = new_size;
 
        return 0;
 }
@@ -197,11 +197,11 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
                         struct lu_attr *attr, struct dt_allocation_hint *hint,
                         struct dt_object_format *dof, struct thandle *th)
 {
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        struct osp_object               *obj = dt2osp_obj(dt);
        int                             rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        LASSERT(attr->la_valid & LA_TYPE);
@@ -228,7 +228,7 @@ out:
 /**
  * Implementation of dt_object_operations::do_declare_ref_del
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -260,10 +260,10 @@ static int osp_md_declare_ref_del(const struct lu_env *env,
 static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        int                             rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL,
@@ -274,7 +274,7 @@ static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
 /**
  * Implementation of dt_object_operations::do_declare_ref_del
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -306,10 +306,10 @@ static int osp_md_declare_ref_add(const struct lu_env *env,
 static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        int                             rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD,
@@ -346,7 +346,7 @@ static void osp_md_ah_init(const struct lu_env *env,
 /**
  * Implementation of dt_object_operations::do_declare_attr_get
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -382,10 +382,10 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
                    const struct lu_attr *attr, struct thandle *th)
 {
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        int                             rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET,
@@ -506,7 +506,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
        struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
        struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
        struct dt_device        *dt_dev = &osp->opd_dt_dev;
-       struct dt_update_request   *update;
+       struct osp_update_request   *update;
        struct object_update_reply *reply;
        struct ptlrpc_request      *req = NULL;
        struct lu_fid              *fid;
@@ -517,7 +517,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
         * just create an update buffer, instead of attaching the
         * update_remote list of the thandle.
         */
-       update = dt_update_request_create(dt_dev);
+       update = osp_update_request_create(dt_dev);
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
@@ -573,7 +573,7 @@ out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       dt_update_request_destroy(update);
+       osp_update_request_destroy(update);
 
        return rc;
 }
@@ -581,7 +581,7 @@ out:
 /**
  * Implementation of dt_index_operations::dio_declare_insert
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -625,10 +625,12 @@ static int osp_md_index_insert(const struct lu_env *env,
                               struct thandle *th,
                               int ignore_quota)
 {
-       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
-       struct dt_update_request *update = oth->ot_dur;
+       struct osp_update_request *update;
        int                      rc;
 
+       update = thandle_to_osp_update_request(th);
+       LASSERT(update != NULL);
+
        rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT,
                                 lu_object_fid(&dt->do_lu), rec, key);
        return rc;
@@ -637,7 +639,7 @@ static int osp_md_index_insert(const struct lu_env *env,
 /**
  * Implementation of dt_index_operations::dio_declare_delete
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
  *
  * \param[in] env      execution environment
@@ -675,10 +677,10 @@ static int osp_md_index_delete(const struct lu_env *env,
                               const struct dt_key *key,
                               struct thandle *th)
 {
-       struct dt_update_request *update;
+       struct osp_update_request *update;
        int                      rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE,
@@ -1026,14 +1028,14 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt,
 {
        struct osp_object               *o = dt2osp_obj(dt);
        struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        int                             rc = 0;
 
        ENTRY;
        o->opo_non_exist = 1;
 
        LASSERT(osp->opd_connect_mdt);
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY,
@@ -1077,7 +1079,7 @@ struct dt_object_operations osp_md_obj_ops = {
 /**
  * Implementation of dt_body_operations::dbo_declare_write
  *
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
  * in the transaction.
   *
  * \param[in] env      execution environment
@@ -1117,12 +1119,13 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
                            const struct lu_buf *buf, loff_t *pos,
                            struct thandle *th, int ignore_quota)
 {
-       struct osp_object *obj = dt2osp_obj(dt);
-       struct dt_update_request  *update;
+       struct osp_object         *obj = dt2osp_obj(dt);
+       struct osp_update_request  *update;
+       struct osp_thandle        *oth = thandle_to_osp_thandle(th);
        ssize_t                   rc;
        ENTRY;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
@@ -1141,6 +1144,10 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
            obj->opo_ooa->ooa_attr.la_size < *pos)
                obj->opo_ooa->ooa_attr.la_size = *pos;
 
+       rc = osp_check_and_set_rpc_version(oth);
+       if (rc < 0)
+               RETURN(rc);
+
        RETURN(buf->lb_len);
 }
 
@@ -1150,7 +1157,7 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
        struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
        struct dt_device        *dt_dev = &osp->opd_dt_dev;
        struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
-       struct dt_update_request   *update;
+       struct osp_update_request   *update;
        struct object_update_reply *reply;
        struct out_read_reply      *orr;
        struct ptlrpc_request      *req = NULL;
@@ -1159,8 +1166,8 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
 
        /* Because it needs send the update buffer right away,
         * just create an update buffer, instead of attaching the
-        * update_remote list of the thandle. */
-       update = dt_update_request_create(dt_dev);
+        * update_remote list of the thandle.  */
+       update = osp_update_request_create(dt_dev);
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
@@ -1211,7 +1218,7 @@ out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       dt_update_request_destroy(update);
+       osp_update_request_destroy(update);
 
        return rc;
 }
index bd51a8e..ff6107e 100644 (file)
@@ -541,7 +541,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object               *obj = dt2osp_obj(dt);
        struct dt_device                *dev = &osp->opd_dt_dev;
-       struct dt_update_request        *update;
+       struct osp_update_request       *update;
        struct object_update_reply      *reply;
        struct ptlrpc_request           *req = NULL;
        int                             rc = 0;
@@ -561,7 +561,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                spin_unlock(&obj->opo_lock);
        }
 
-       update = dt_update_request_create(dev);
+       update = osp_update_request_create(dev);
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
@@ -607,7 +607,7 @@ out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       dt_update_request_destroy(update);
+       osp_update_request_destroy(update);
 
        return rc;
 }
@@ -857,7 +857,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
                mutex_unlock(&osp->opd_async_requests_mutex);
                osp_oac_xattr_put(oxe);
        } else {
-               struct dt_update_request *update;
+               struct osp_update_request *update;
 
                /* XXX: Currently, we trigger the batched async OUT
                 *      RPC via dt_declare_xattr_get(). It is not
@@ -865,8 +865,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
                 *
                 *      We will improve it in the future. */
                update = osp->opd_async_requests;
-               if (update != NULL && update->dur_buf.ub_req != NULL &&
-                   update->dur_buf.ub_req->ourq_count > 0) {
+               if (update != NULL && update->our_req != NULL &&
+                   update->our_req->ourq_count > 0) {
                        osp->opd_async_requests = NULL;
                        mutex_unlock(&osp->opd_async_requests_mutex);
                        rc = osp_unplug_async_request(env, osp, update);
@@ -908,7 +908,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        struct osp_object       *obj    = dt2osp_obj(dt);
        struct dt_device        *dev    = &osp->opd_dt_dev;
        struct lu_buf           *rbuf   = &osp_env_info(env)->osi_lb2;
-       struct dt_update_request *update = NULL;
+       struct osp_update_request *update = NULL;
        struct ptlrpc_request   *req    = NULL;
        struct object_update_reply *reply;
        struct osp_xattr_entry  *oxe    = NULL;
@@ -959,7 +959,7 @@ unlock:
                spin_unlock(&obj->opo_lock);
        }
 
-       update = dt_update_request_create(dev);
+       update = osp_update_request_create(dev);
        if (IS_ERR(update))
                GOTO(out, rc = PTR_ERR(update));
 
@@ -1077,7 +1077,7 @@ out:
                ptlrpc_req_finished(req);
 
        if (update != NULL && !IS_ERR(update))
-               dt_update_request_destroy(update);
+               osp_update_request_destroy(update);
 
        if (oxe != NULL)
                osp_oac_xattr_put(oxe);
@@ -1141,12 +1141,12 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
-       struct dt_update_request *update;
+       struct osp_update_request *update;
        struct osp_xattr_entry  *oxe;
        int                     rc;
        ENTRY;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
@@ -1245,13 +1245,13 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th)
 {
-       struct dt_update_request *update;
+       struct osp_update_request *update;
        const struct lu_fid      *fid = lu_object_fid(&dt->do_lu);
-       struct osp_object        *o = dt2osp_obj(dt);
+       struct osp_object        *o     = dt2osp_obj(dt);
        struct osp_xattr_entry   *oxe;
        int                       rc;
 
-       update = thandle_to_dt_update_request(th);
+       update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL,
index b6e5cf4..6d0067a 100644 (file)
@@ -72,7 +72,7 @@
  * The argument for the interpreter callback of osp request.
  */
 struct osp_update_args {
-       struct dt_update_request *oaua_update;
+       struct osp_update_request *oaua_update;
        atomic_t                 *oaua_count;
        wait_queue_head_t        *oaua_waitq;
        bool                      oaua_flow_control;
@@ -82,7 +82,7 @@ struct osp_update_args {
  * Call back for each update request.
  */
 struct osp_update_callback {
-       /* list in the dt_update_request::dur_cb_items */
+       /* list in the osp_update_request::our_cb_items */
        struct list_head                 ouc_list;
 
        /* The target of the async update request. */
@@ -117,55 +117,49 @@ static void object_update_request_free(struct object_update_request *ourq,
 }
 
 /**
- * Allocate and initialize dt_update_request
+ * Allocate and initialize osp_update_request
  *
- * dt_update_request is being used to track updates being executed on
+ * osp_update_request is being used to track updates being executed on
  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
  * and increased if needed.
  *
  * \param [in] dt      dt device
  *
- * \retval             dt_update_request being allocated if succeed
+ * \retval             osp_update_request being allocated if succeed
  * \retval             ERR_PTR(errno) if failed
  */
-struct dt_update_request *dt_update_request_create(struct dt_device *dt)
+struct osp_update_request *osp_update_request_create(struct dt_device *dt)
 {
-       struct dt_update_request *dt_update;
+       struct osp_update_request *osp_update_req;
        struct object_update_request *ourq;
 
-       OBD_ALLOC_PTR(dt_update);
-       if (dt_update == NULL)
+       OBD_ALLOC_PTR(osp_update_req);
+       if (osp_update_req == NULL)
                return ERR_PTR(-ENOMEM);
 
        ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
        if (IS_ERR(ourq)) {
-               OBD_FREE_PTR(dt_update);
+               OBD_FREE_PTR(osp_update_req);
                return ERR_CAST(ourq);
        }
 
-       dt_update->dur_buf.ub_req = ourq;
-       dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+       osp_update_req->our_req = ourq;
+       osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
 
-       dt_update->dur_dt = dt;
-       dt_update->dur_batchid = 0;
-       INIT_LIST_HEAD(&dt_update->dur_cb_items);
+       INIT_LIST_HEAD(&osp_update_req->our_cb_items);
+       INIT_LIST_HEAD(&osp_update_req->our_list);
 
-       return dt_update;
+       return osp_update_req;
 }
 
-/**
- * Destroy dt_update_request
- *
- * \param [in] dt_update       dt_update_request being destroyed
- */
-void dt_update_request_destroy(struct dt_update_request *dt_update)
+void osp_update_request_destroy(struct osp_update_request *our)
 {
-       if (dt_update == NULL)
+       if (our == NULL)
                return;
 
-       object_update_request_free(dt_update->dur_buf.ub_req,
-                                  dt_update->dur_buf.ub_req_size);
-       OBD_FREE_PTR(dt_update);
+       object_update_request_free(our->our_req,
+                                  our->our_req_size);
+       OBD_FREE_PTR(our);
 }
 
 static void
@@ -195,6 +189,22 @@ object_update_request_dump(const struct object_update_request *ourq,
               ourq->ourq_magic, ourq->ourq_count, total_size);
 }
 
+static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
+{
+       struct dt_txn_commit_cb *dcb;
+       struct dt_txn_commit_cb *tmp;
+
+       /* call per-transaction stop callbacks if any */
+       list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
+                                dcb_linkage) {
+               LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+                        "commit callback entry: magic=%x name='%s'\n",
+                        dcb->dcb_magic, dcb->dcb_name);
+               list_del_init(&dcb->dcb_linkage);
+               dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+       }
+}
+
 /**
  * Allocate an osp request and initialize it with the given parameters.
  *
@@ -258,16 +268,28 @@ static int osp_update_interpret(const struct lu_env *env,
 {
        struct object_update_reply      *reply  = NULL;
        struct osp_update_args          *oaua   = arg;
-       struct dt_update_request        *dt_update = oaua->oaua_update;
+       struct osp_update_request       *our = oaua->oaua_update;
+       struct osp_thandle              *oth;
        struct osp_update_callback      *ouc;
        struct osp_update_callback      *next;
        int                              count  = 0;
        int                              index  = 0;
        int                              rc1    = 0;
 
-       if (oaua->oaua_flow_control)
-               obd_put_request_slot(
-                               &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
+       ENTRY;
+
+       if (our == NULL)
+               RETURN(0);
+
+       oaua->oaua_update = NULL;
+       oth = our->our_th;
+       if (oaua->oaua_flow_control) {
+               struct osp_device *osp;
+
+               LASSERT(oth != NULL);
+               osp = dt2osp_dev(oth->ot_super.th_dev);
+               obd_put_request_slot(&osp->opd_obd->u.cli);
+       }
 
        /* Unpack the results from the reply message. */
        if (req->rq_repmsg != NULL) {
@@ -282,8 +304,7 @@ static int osp_update_interpret(const struct lu_env *env,
                rc1 = rc;
        }
 
-       list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
-                                ouc_list) {
+       list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
                list_del_init(&ouc->ouc_list);
 
                /* The peer may only have handled some requests (indicated
@@ -314,9 +335,16 @@ static int osp_update_interpret(const struct lu_env *env,
        if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
                wake_up_all(oaua->oaua_waitq);
 
-       dt_update_request_destroy(dt_update);
+       if (oth != NULL) {
+               /* oth and osp_update_requests will be destoryed in
+                * osp_thandle_put */
+               osp_trans_stop_cb(oth, rc);
+               osp_thandle_put(oth);
+       } else {
+               osp_update_request_destroy(our);
+       }
 
-       return 0;
+       RETURN(0);
 }
 
 /**
@@ -325,27 +353,27 @@ static int osp_update_interpret(const struct lu_env *env,
  *
  * \param[in] env      pointer to the thread context
  * \param[in] osp      pointer to the OSP device
- * \param[in] update   pointer to the shared queue
+ * \param[in] our      pointer to the shared queue
  *
  * \retval             0 for success
  * \retval             negative error number on failure
  */
 int osp_unplug_async_request(const struct lu_env *env,
                             struct osp_device *osp,
-                            struct dt_update_request *update)
+                            struct osp_update_request *our)
 {
        struct osp_update_args  *args;
        struct ptlrpc_request   *req = NULL;
        int                      rc;
 
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                update->dur_buf.ub_req, &req);
+                                our->our_req, &req);
        if (rc != 0) {
                struct osp_update_callback *ouc;
                struct osp_update_callback *next;
 
                list_for_each_entry_safe(ouc, next,
-                                        &update->dur_cb_items, ouc_list) {
+                                        &our->our_cb_items, ouc_list) {
                        list_del_init(&ouc->ouc_list);
                        if (ouc->ouc_interpreter != NULL)
                                ouc->ouc_interpreter(env, NULL, NULL,
@@ -353,10 +381,10 @@ int osp_unplug_async_request(const struct lu_env *env,
                                                     ouc->ouc_data, 0, rc);
                        osp_update_callback_fini(env, ouc);
                }
-               dt_update_request_destroy(update);
+               osp_update_request_destroy(our);
        } else {
                args = ptlrpc_req_async_args(req);
-               args->oaua_update = update;
+               args->oaua_update = our;
                args->oaua_count = NULL;
                args->oaua_waitq = NULL;
                args->oaua_flow_control = false;
@@ -372,33 +400,35 @@ int osp_unplug_async_request(const struct lu_env *env,
  * request queue - osp_device::opd_async_requests.
  *
  * If the osp_device::opd_async_requests is not NULL, then return it directly;
- * otherwise create new dt_update_request and attach it to opd_async_requests.
+ * otherwise create new osp_update_request and attach it to opd_async_requests.
  *
  * \param[in] osp      pointer to the OSP device
  *
  * \retval             pointer to the shared queue
  * \retval             negative error number on failure
  */
-static struct dt_update_request *
+static struct osp_update_request *
 osp_find_or_create_async_update_request(struct osp_device *osp)
 {
-       struct dt_update_request *update = osp->opd_async_requests;
+       struct osp_update_request *our = osp->opd_async_requests;
+
+       if (our != NULL)
+               return our;
 
-       if (update != NULL)
-               return update;
+       our = osp_update_request_create(&osp->opd_dt_dev);
+       if (IS_ERR(our))
+               return our;
 
-       update = dt_update_request_create(&osp->opd_dt_dev);
-       if (!IS_ERR(update))
-               osp->opd_async_requests = update;
+       osp->opd_async_requests = our;
 
-       return update;
+       return our;
 }
 
 /**
- * Insert an osp_update_callback into the dt_update_request.
+ * Insert an osp_update_callback into the osp_update_request.
  *
- * Insert an osp_update_callback to the dt_update_request. Usually each update
- * in the dt_update_request will have one correspondent callback, and these
+ * Insert an osp_update_callback to the osp_update_request. Usually each update
+ * in the osp_update_request will have one correspondent callback, and these
  * callbacks will be called in rq_interpret_reply.
  *
  * \param[in] env              pointer to the thread context
@@ -410,7 +440,7 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
  * \retval                     negative error number on failure
  */
 int osp_insert_update_callback(const struct lu_env *env,
-                              struct dt_update_request *update,
+                              struct osp_update_request *our,
                               struct osp_object *obj, void *data,
                               osp_update_interpreter_t interpreter)
 {
@@ -420,7 +450,7 @@ int osp_insert_update_callback(const struct lu_env *env,
        if (ouc == NULL)
                RETURN(-ENOMEM);
 
-       list_add_tail(&ouc->ouc_list, &update->dur_cb_items);
+       list_add_tail(&ouc->ouc_list, &our->our_cb_items);
 
        return 0;
 }
@@ -454,22 +484,22 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             __u16 *lens, const void **bufs, void *data,
                             osp_update_interpreter_t interpreter)
 {
-       struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
-       struct dt_update_request        *update;
+       struct osp_device               *osp;
+       struct osp_update_request       *our;
        struct object_update            *object_update;
        size_t                          max_update_size;
        struct object_update_request    *ureq;
        int                             rc = 0;
        ENTRY;
 
-       update = osp_find_or_create_async_update_request(osp);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
+       osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
+       our = osp_find_or_create_async_update_request(osp);
+       if (IS_ERR(our))
+               RETURN(PTR_ERR(our));
 
 again:
-       ureq = update->dur_buf.ub_req;
-       max_update_size = update->dur_buf.ub_req_size -
-                           object_update_request_size(ureq);
+       ureq = our->our_req;
+       max_update_size = our->our_req_size - object_update_request_size(ureq);
 
        object_update = update_buffer_get_update(ureq, ureq->ourq_count);
        rc = out_update_pack(env, object_update, max_update_size, op,
@@ -479,14 +509,14 @@ again:
                osp->opd_async_requests = NULL;
                mutex_unlock(&osp->opd_async_requests_mutex);
 
-               rc = osp_unplug_async_request(env, osp, update);
+               rc = osp_unplug_async_request(env, osp, our);
                mutex_lock(&osp->opd_async_requests_mutex);
                if (rc != 0)
                        RETURN(rc);
 
-               update = osp_find_or_create_async_update_request(osp);
-               if (IS_ERR(update))
-                       RETURN(PTR_ERR(update));
+               our = osp_find_or_create_async_update_request(osp);
+               if (IS_ERR(our))
+                       RETURN(PTR_ERR(our));
 
                goto again;
        } else {
@@ -496,7 +526,7 @@ again:
                ureq->ourq_count++;
        }
 
-       rc = osp_insert_update_callback(env, update, obj, data, interpreter);
+       rc = osp_insert_update_callback(env, our, obj, data, interpreter);
 
        RETURN(rc);
 }
@@ -504,33 +534,33 @@ again:
 int osp_trans_update_request_create(struct thandle *th)
 {
        struct osp_thandle              *oth = thandle_to_osp_thandle(th);
-       struct dt_update_request        *update;
+       struct osp_update_request       *our;
 
-       if (oth->ot_dur != NULL)
+       if (oth->ot_our != NULL)
                return 0;
 
-       update = dt_update_request_create(th->th_dev);
-       if (IS_ERR(update)) {
-               th->th_result = PTR_ERR(update);
-               return PTR_ERR(update);
+       our = osp_update_request_create(th->th_dev);
+       if (IS_ERR(our)) {
+               th->th_result = PTR_ERR(our);
+               return PTR_ERR(our);
        }
 
        if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
-               update->dur_flags = UPDATE_FL_SYNC;
+               our->our_flags = UPDATE_FL_SYNC;
 
-       oth->ot_dur = update;
+       oth->ot_our = our;
+       our->our_th = oth;
        return 0;
 }
 
-static void osp_thandle_get(struct osp_thandle *oth)
-{
-       atomic_inc(&oth->ot_refcount);
-}
-
-static void osp_thandle_put(struct osp_thandle *oth)
+void osp_thandle_destroy(struct osp_thandle *oth)
 {
-       if (atomic_dec_and_test(&oth->ot_refcount))
-               OBD_FREE_PTR(oth);
+       LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
+       LASSERT(list_empty(&oth->ot_commit_dcb_list));
+       LASSERT(list_empty(&oth->ot_stop_dcb_list));
+       if (oth->ot_our != NULL)
+               osp_update_request_destroy(oth->ot_our);
+       OBD_FREE_PTR(oth);
 }
 
 /**
@@ -570,12 +600,14 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
        if (unlikely(oth == NULL))
                RETURN(ERR_PTR(-ENOMEM));
 
+       oth->ot_magic = OSP_THANDLE_MAGIC;
        th = &oth->ot_super;
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
 
        atomic_set(&oth->ot_refcount, 1);
-       INIT_LIST_HEAD(&oth->ot_dcb_list);
+       INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
+       INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
 
        RETURN(th);
 }
@@ -586,22 +618,22 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
  * Prepare OUT update ptlrpc request, and the request usually includes
  * all of updates (stored in \param ureq) from one operation.
  *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] ureq     hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
  *
- * \retval             0 if preparation succeeds.
- * \retval             negative errno if preparation fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp)
 {
-       struct ptlrpc_request           *req;
-       struct object_update_request    *tmp;
-       int                             ureq_len;
-       int                             rc;
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       size_t                          ureq_len;
+       int                             rc;
        ENTRY;
 
        object_update_request_dump(ureq, D_INFO);
@@ -620,7 +652,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
        }
 
        req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                           RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
 
        tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
        memcpy(tmp, ureq, ureq_len);
@@ -634,20 +666,21 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
 }
 
 /**
-* Send update RPC.
-*
-* Send update request to the remote MDT synchronously.
-*
-* \param[in] env       execution environment
-* \param[in] imp       import on which ptlrpc request will be sent
-* \param[in] dt_update hold all of updates which will be packed into the req
-* \param[in] reqp      request to be created
-*
-* \retval              0 if RPC succeeds.
-* \retval              negative errno if RPC fails.
-*/
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] our      hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if RPC succeeds.
+ * \retval             negative errno if RPC fails.
+ */
+
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
-                   struct dt_update_request *dt_update,
+                   struct osp_update_request *our,
                    struct ptlrpc_request **reqp)
 {
        struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
@@ -655,7 +688,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
        int                     rc;
        ENTRY;
 
-       rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
+       rc = osp_prep_update_req(env, imp, our->our_req, &req);
        if (rc != 0)
                RETURN(rc);
 
@@ -669,7 +702,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
        rc = ptlrpc_queue_wait(req);
        if (rc < 0) {
                ptlrpc_req_finished(req);
-               dt_update->dur_rc = rc;
+               our->our_rc = rc;
                RETURN(rc);
        }
 
@@ -678,7 +711,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                RETURN(rc);
        }
 
-       dt_update->dur_rc = rc;
+       our->our_rc = rc;
 
        ptlrpc_req_finished(req);
 
@@ -702,8 +735,10 @@ int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 
        LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
        LASSERT(&dcb->dcb_func != NULL);
-       list_add(&dcb->dcb_linkage, &oth->ot_dcb_list);
-
+       if (dcb->dcb_flags & DCB_TRANS_STOP)
+               list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
+       else
+               list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
        return 0;
 }
 
@@ -714,7 +749,7 @@ static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
 
        LASSERT(atomic_read(&oth->ot_refcount) > 0);
        /* call per-transaction callbacks if any */
-       list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list,
+       list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
                                 dcb_linkage) {
                LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
                         "commit callback entry: magic=%x name='%s'\n",
@@ -726,12 +761,16 @@ static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
 
 static void osp_request_commit_cb(struct ptlrpc_request *req)
 {
-       struct thandle *th = req->rq_cb_data;
-       struct osp_thandle *oth = thandle_to_osp_thandle(th);
+       struct thandle          *th = req->rq_cb_data;
+       struct osp_thandle      *oth;
        __u64                   last_committed_transno = 0;
        int                     result = req->rq_status;
        ENTRY;
 
+       if (th == NULL)
+               RETURN_EXIT;
+
+       oth = thandle_to_osp_thandle(th);
        if (lustre_msg_get_last_committed(req->rq_repmsg))
                last_committed_transno =
                        lustre_msg_get_last_committed(req->rq_repmsg);
@@ -756,43 +795,91 @@ static void osp_request_commit_cb(struct ptlrpc_request *req)
 }
 
 /**
- * Trigger the request for remote updates.
+ * callback of osp transaction
  *
- * If th_sync is set, then the request will be sent synchronously,
- * otherwise, the RPC will be sent asynchronously.
+ * Call all of callbacks for this osp thandle. This will only be
+ * called in error handler path. In the normal processing path,
+ * these callback will be called in osp_request_commit_cb() and
+ * osp_update_interpret().
+ *
+ * \param [in] env     execution environment
+ * \param [in] oth     osp thandle
+ * \param [in] rc      result of the osp thandle
+ */
+void osp_trans_callback(const struct lu_env *env,
+                       struct osp_thandle *oth, int rc)
+{
+       struct osp_update_callback *ouc;
+       struct osp_update_callback *next;
+
+       if (oth->ot_our != NULL) {
+               list_for_each_entry_safe(ouc, next,
+                                        &oth->ot_our->our_cb_items, ouc_list) {
+                       list_del_init(&ouc->ouc_list);
+                       if (ouc->ouc_interpreter != NULL)
+                               ouc->ouc_interpreter(env, NULL, NULL,
+                                                    ouc->ouc_obj,
+                                                    ouc->ouc_data, 0, rc);
+                       osp_update_callback_fini(env, ouc);
+               }
+       }
+       osp_trans_stop_cb(oth, rc);
+       osp_trans_commit_cb(oth, rc);
+}
+
+/**
+ * Send the request for remote updates.
+ *
+ * Send updates to the remote MDT. Prepare the request by osp_update_req
+ * and send them to remote MDT, for sync request, it will wait
+ * until the reply return, otherwise hand it to ptlrpcd.
  *
  * Please refer to osp_trans_create() for transaction type.
  *
  * \param[in] env              pointer to the thread context
  * \param[in] osp              pointer to the OSP device
- * \param[in] dt_update                pointer to the dt_update_request
- * \param[in] th               pointer to the transaction handler
- * \param[out] sent            whether the RPC has been sent
+ * \param[in] our              pointer to the osp_update_request
  *
  * \retval                     0 for success
  * \retval                     negative error number on failure
  */
-static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
-                            struct dt_update_request *dt_update,
-                            struct thandle *th, int *sent)
+static int osp_send_update_req(const struct lu_env *env,
+                              struct osp_device *osp,
+                              struct osp_update_request *our)
 {
        struct osp_update_args  *args;
        struct ptlrpc_request   *req;
+       struct lu_device *top_device;
+       struct osp_thandle      *oth = our->our_th;
        int     rc = 0;
        ENTRY;
 
+       LASSERT(oth != NULL);
+       LASSERT(our->our_req_sent == 0);
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                dt_update->dur_buf.ub_req, &req);
-       if (rc != 0)
+                                our->our_req, &req);
+       if (rc != 0) {
+               osp_trans_callback(env, oth, rc);
                RETURN(rc);
+       }
 
-       *sent = 1;
-       req->rq_interpret_reply = osp_update_interpret;
        args = ptlrpc_req_async_args(req);
-       args->oaua_update = dt_update;
+       args->oaua_update = our;
+       osp_thandle_get(oth); /* hold for update interpret */
+       req->rq_interpret_reply = osp_update_interpret;
+       if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
+               if (!osp->opd_imp_active || !osp->opd_imp_connected) {
+                       osp_trans_callback(env, oth, rc);
+                       osp_thandle_put(oth);
+                       GOTO(out, rc = -ENOTCONN);
+               }
 
-       if (is_only_remote_trans(th) && !th->th_sync &&
-           !th->th_wait_submit) {
+               rc = obd_get_request_slot(&osp->opd_obd->u.cli);
+               if (rc != 0) {
+                       osp_trans_callback(env, oth, rc);
+                       osp_thandle_put(oth);
+                       GOTO(out, rc = -ENOTCONN);
+               }
                args->oaua_flow_control = true;
 
                if (!osp->opd_connect_mdt) {
@@ -804,9 +891,12 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                }
 
                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               req = NULL;
        } else {
-               struct osp_thandle *oth = thandle_to_osp_thandle(th);
-               struct lu_device *top_device;
+               osp_thandle_get(oth); /* hold for commit callback */
+               req->rq_commit_cb = osp_request_commit_cb;
+               req->rq_cb_data = &oth->ot_super;
+               args->oaua_flow_control = false;
 
                /* If the transaction is created during MDT recoverying
                 * process, it means this is an recovery update, we need
@@ -818,19 +908,28 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                if (top_device->ld_obd->obd_recovering)
                        req->rq_allow_replay = 1;
 
-               args->oaua_flow_control = false;
-               req->rq_commit_cb = osp_request_commit_cb;
-               req->rq_cb_data = th;
-               osp_thandle_get(oth); /* for commit callback */
                osp_get_rpc_lock(osp);
                rc = ptlrpc_queue_wait(req);
                osp_put_rpc_lock(osp);
-               if (req->rq_transno == 0 && !req->rq_committed)
+               if ((rc == -ENOMEM && req->rq_set == NULL) ||
+                   (req->rq_transno == 0 && !req->rq_committed)) {
+                       if (args->oaua_update != NULL) {
+                               /* If osp_update_interpret is not being called,
+                                * release the osp_thandle */
+                               args->oaua_update = NULL;
+                               osp_thandle_put(oth);
+                       }
+
+                       req->rq_cb_data = NULL;
+                       rc = rc == 0 ? req->rq_status : rc;
+                       osp_trans_callback(env, oth, rc);
                        osp_thandle_put(oth);
-               else
-                       oth->ot_dur = NULL;
-               ptlrpc_req_finished(req);
+                       GOTO(out, rc);
+               }
        }
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
 
        RETURN(rc);
 }
@@ -885,6 +984,199 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env,
 }
 
 /**
+ * Set version for the transaction
+ *
+ * Set the version for the transaction, then the osp RPC will be
+ * sent in the order of version, i.e. the transaction with lower
+ * version will be sent first.
+ *
+ * \param [in] oth     osp thandle to be set version.
+ *
+ * \retval             0 if set version succeeds
+ *                      negative errno if set version fails.
+ */
+int osp_check_and_set_rpc_version(struct osp_thandle *oth)
+{
+       struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
+       struct osp_updates *ou = osp->opd_update;
+
+       if (ou == NULL)
+               return -EIO;
+
+       if (oth->ot_version != 0)
+               return 0;
+
+       spin_lock(&ou->ou_lock);
+       oth->ot_version = ou->ou_version++;
+       spin_unlock(&ou->ou_lock);
+
+       CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
+              osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
+
+       return 0;
+}
+
+/**
+ * Get next OSP update request in the sending list
+ * Get next OSP update request in the sending list by version number, next
+ * request will be
+ * 1. transaction which does not have a version number.
+ * 2. transaction whose version == opd_rpc_version.
+ *
+ * \param [in] ou      osp update structure.
+ * \param [out] ourp   the pointer holding the next update request.
+ *
+ * \retval             true if getting the next transaction.
+ * \retval             false if not getting the next transaction.
+ */
+static bool
+osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
+{
+       struct osp_update_request *our;
+       struct osp_update_request *tmp;
+       bool                    got_req = false;
+
+       spin_lock(&ou->ou_lock);
+       list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+               LASSERT(our->our_th != NULL);
+               CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
+                      our, our->our_th->ot_version, ou->ou_rpc_version);
+               if (our->our_th->ot_version == 0) {
+                       list_del_init(&our->our_list);
+                       *ourp = our;
+                       got_req = true;
+                       break;
+               }
+
+               /* Find next osp_update_request in the list */
+               if (our->our_th->ot_version == ou->ou_rpc_version) {
+                       list_del_init(&our->our_list);
+                       *ourp = our;
+                       got_req = true;
+                       break;
+               }
+       }
+       spin_unlock(&ou->ou_lock);
+
+       return got_req;
+}
+
+static void osp_update_rpc_version(struct osp_updates *ou,
+                                  struct osp_thandle *oth)
+{
+       if (oth->ot_version == 0)
+               return;
+
+       LASSERT(oth->ot_version == ou->ou_rpc_version);
+       spin_lock(&ou->ou_lock);
+       ou->ou_rpc_version++;
+       spin_unlock(&ou->ou_lock);
+}
+
+/**
+ * Sending update thread
+ *
+ * Create thread to send update request to other MDTs, this thread will pull
+ * out update request from the list in OSP by version number, i.e. it will
+ * make sure the update request with lower version number will be sent first.
+ *
+ * \param[in] arg      hold the OSP device.
+ *
+ * \retval             0 if the thread is created successfully.
+ * \retal              negative error if the thread is not created
+ *                      successfully.
+ */
+int osp_send_update_thread(void *arg)
+{
+       struct lu_env           env;
+       struct osp_device       *osp = arg;
+       struct l_wait_info       lwi = { 0 };
+       struct osp_updates      *ou = osp->opd_update;
+       struct ptlrpc_thread    *thread = &osp->opd_update_thread;
+       struct osp_update_request *our = NULL;
+       int                     rc;
+       ENTRY;
+
+       LASSERT(ou != NULL);
+       rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+       if (rc < 0) {
+               CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+                      rc);
+               RETURN(rc);
+       }
+
+       thread->t_flags = SVC_RUNNING;
+       wake_up(&thread->t_ctl_waitq);
+       while (1) {
+               our = NULL;
+               l_wait_event(ou->ou_waitq,
+                            !osp_send_update_thread_running(osp) ||
+                            osp_get_next_request(ou, &our),
+                            &lwi);
+
+               if (!osp_send_update_thread_running(osp)) {
+                       if (our != NULL && our->our_th != NULL) {
+                               osp_trans_callback(&env, our->our_th, -EINTR);
+                               osp_thandle_put(our->our_th);
+                       }
+                       break;
+               }
+
+               if (our->our_req_sent == 0) {
+                       if (our->our_th != NULL &&
+                           our->our_th->ot_super.th_result != 0)
+                               osp_trans_callback(&env, our->our_th,
+                                       our->our_th->ot_super.th_result);
+                       else
+                               rc = osp_send_update_req(&env, osp, our);
+               }
+
+               if (our->our_th != NULL) {
+                       /* Update the rpc version */
+                       osp_update_rpc_version(ou, our->our_th);
+                       /* Balanced for thandle_get in osp_trans_trigger() */
+                       osp_thandle_put(our->our_th);
+               }
+       }
+
+       thread->t_flags = SVC_STOPPED;
+       lu_env_fini(&env);
+       wake_up(&thread->t_ctl_waitq);
+
+       RETURN(0);
+}
+
+/**
+ * Trigger the request for remote updates.
+ *
+ * Add the request to the sending list, and wake up osp update
+ * sending thread.
+ *
+ * \param[in] env              pointer to the thread context
+ * \param[in] osp              pointer to the OSP device
+ * \param[in] oth              pointer to the transaction handler
+ *
+ */
+static void osp_trans_trigger(const struct lu_env *env,
+                            struct osp_device *osp,
+                            struct osp_thandle *oth)
+{
+
+       CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
+              osp->opd_obd->obd_name, oth, oth->ot_version);
+
+       LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
+       osp_thandle_get(oth);
+       LASSERT(oth->ot_our != NULL);
+       spin_lock(&osp->opd_update->ou_lock);
+       list_add_tail(&oth->ot_our->our_list,
+                     &osp->opd_update->ou_list);
+       spin_unlock(&osp->opd_update->ou_lock);
+
+       wake_up(&osp->opd_update->ou_waitq);
+}
+
+/**
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
@@ -934,9 +1226,9 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
        struct osp_thandle       *oth = thandle_to_osp_thandle(th);
-       struct dt_update_request *dt_update;
+       struct osp_update_request *our = oth->ot_our;
+       struct osp_device        *osp = dt2osp_dev(dt);
        int                      rc = 0;
-       int                      sent = 0;
        ENTRY;
 
        /* For remote transaction, if there is local storage thandle,
@@ -947,67 +1239,35 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                oth->ot_storage_th = NULL;
        }
 
-       dt_update = oth->ot_dur;
-       if (dt_update == NULL || th->th_result != 0) {
-               rc = th->th_result;
-               GOTO(out, rc);
+       if (our == NULL || our->our_req == NULL ||
+           our->our_req->ourq_count == 0) {
+               osp_trans_callback(env, oth, th->th_result);
+               GOTO(out, rc = th->th_result);
        }
 
-       LASSERT(dt_update != LP_POISON);
-
-       /* If there are no updates, destroy dt_update and thandle */
-       if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0)
+       if (!osp->opd_connect_mdt) {
+               rc = osp_send_update_req(env, osp, oth->ot_our);
                GOTO(out, rc);
-
-       if (is_only_remote_trans(th) && !th->th_sync &&
-           !th->th_wait_submit) {
-               struct osp_device *osp = dt2osp_dev(th->th_dev);
-               struct client_obd *cli = &osp->opd_obd->u.cli;
-
-               rc = obd_get_request_slot(cli);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               if (!osp->opd_imp_active || !osp->opd_imp_connected) {
-                       obd_put_request_slot(cli);
-                       GOTO(out, rc = -ENOTCONN);
-               }
-
-               rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                      dt_update, th, &sent);
-               if (rc != 0)
-                       obd_put_request_slot(cli);
-       } else {
-               rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
-                                      th, &sent);
        }
 
-out:
-       /* If RPC is triggered successfully, dt_update will be freed in
-        * osp_update_interpreter() */
-       if (sent == 0) {
-               struct osp_update_callback *ouc;
-               struct osp_update_callback *next;
+       if (osp->opd_update == NULL ||
+           !osp_send_update_thread_running(osp)) {
+               osp_trans_callback(env, oth, -EIO);
+               GOTO(out, rc = -EIO);
+       }
 
-               if (dt_update != NULL) {
-                       list_for_each_entry_safe(ouc, next,
-                                                &dt_update->dur_cb_items,
-                                                ouc_list) {
-                               list_del_init(&ouc->ouc_list);
-                               if (ouc->ouc_interpreter != NULL)
-                                       ouc->ouc_interpreter(env, NULL, NULL,
-                                                          ouc->ouc_obj,
-                                                          ouc->ouc_data, 0,
-                                                          rc);
-                               osp_update_callback_fini(env, ouc);
-                       }
-               }
-               osp_trans_commit_cb(oth, rc);
-               dt_update_request_destroy(dt_update);
-               oth->ot_dur = NULL;
+       if (th->th_sync) {
+               /* if th_sync is set, then it needs to be sent
+                * right away. Note: even thought the RPC has been
+                * sent, it still needs to be added to the sending
+                * list (see osp_trans_trigger()), so ou_rpc_version
+                * can be updated correctly. */
+               rc = osp_send_update_req(env, osp, our);
+               our->our_req_sent = 1;
        }
 
+       osp_trans_trigger(env, osp, oth);
+out:
        osp_thandle_put(oth);
 
        RETURN(rc);
index 11dc864..9e63594 100644 (file)
@@ -365,7 +365,7 @@ static int out_xattr_set(struct tgt_session_info *tsi)
        lbuf->lb_len = buf_len;
 
        tmp = object_update_param_get(update, 2, &size);
-       if (tmp == NULL || size != sizeof(*tmp)) {
+       if (tmp == NULL || IS_ERR(tmp) || size != sizeof(*tmp)) {
                CERROR("%s: emptry or wrong size %zu flag: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), size, -EPROTO);
                RETURN(err_serious(-EPROTO));
@@ -616,7 +616,7 @@ static int out_read(struct tgt_session_info *tsi)
                GOTO(out, rc = -ENOENT);
 
        tmp = object_update_param_get(update, 0, NULL);
-       if (tmp == NULL) {
+       if (tmp == NULL || IS_ERR(tmp)) {
                CERROR("%s: empty size for read: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), -EPROTO);
                GOTO(out, rc = err_serious(-EPROTO));
@@ -624,7 +624,7 @@ static int out_read(struct tgt_session_info *tsi)
        size = le64_to_cpu(*(size_t *)(tmp));
 
        tmp = object_update_param_get(update, 1, NULL);
-       if (tmp == NULL) {
+       if (tmp == NULL || IS_ERR(tmp)) {
                CERROR("%s: empty pos for read: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), -EPROTO);
                GOTO(out, rc = err_serious(-EPROTO));
index 1b5ebca..2fd87d7 100644 (file)
@@ -302,7 +302,7 @@ again:
        }
 
        if (rc == -EEXIST) {
-               dtrq_destory(dtrq);
+               dtrq_destroy(dtrq);
                rc = 0;
                goto again;
        }
@@ -342,7 +342,7 @@ EXPORT_SYMBOL(dtrq_list_dump);
  *
  * \param[in] dtrq     distribute txn replqy req to be destroyed.
  */
-void dtrq_destory(struct distribute_txn_replay_req *dtrq)
+void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
 {
        struct distribute_txn_replay_req_sub    *dtrqs;
        struct distribute_txn_replay_req_sub    *tmp;
@@ -360,7 +360,7 @@ void dtrq_destory(struct distribute_txn_replay_req *dtrq)
 
        OBD_FREE_PTR(dtrq);
 }
-EXPORT_SYMBOL(dtrq_destory);
+EXPORT_SYMBOL(dtrq_destroy);
 
 /**
  * Destroy all of replay req.
@@ -378,7 +378,7 @@ void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
        list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
                                 dtrq_list) {
                list_del_init(&dtrq->dtrq_list);
-               dtrq_destory(dtrq);
+               dtrq_destroy(dtrq);
        }
        spin_unlock(&tdtd->tdtd_replay_list_lock);
 }
@@ -1015,7 +1015,7 @@ int distribute_txn_replay_handle(struct lu_env *env,
        struct top_thandle      *top_th;
        struct top_multiple_thandle *tmt;
        struct thandle_update_records *tur = NULL;
-       unsigned int            i;
+       int                     i;
        int                     rc = 0;
        ENTRY;
 
@@ -1098,7 +1098,7 @@ int distribute_txn_replay_handle(struct lu_env *env,
                        CDEBUG(D_HA, "error during execution of #%u from"
                               " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
                               ta->ta_args[i]->line, rc);
-                       while (--i >= 0) {
+                       while (--i > 0) {
                                if (ta->ta_args[i]->undo_fn != NULL) {
                                        dt_obj = ta->ta_args[i]->object;
                                        sub_dt =
index e56c1ec..0d2d995 100644 (file)
@@ -320,7 +320,10 @@ struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt,
  * Mark the sub thandle to be committed and if all sub thandle are committed
  * notify the top thandle.
  *
- * \param[in] sub_th   sub thandle being committed.
+ * \param[in] env      execution environment
+ * \param[in] sub_th   sub thandle being committed
+ * \param[in] cb       commit callback
+ * \param[in] err      trans result
  */
 static void sub_trans_commit_cb(struct lu_env *env,
                                struct thandle *sub_th,
@@ -364,6 +367,48 @@ static void sub_thandle_register_commit_cb(struct sub_thandle *st,
 }
 
 /**
+ * Sub thandle stop call back
+ *
+ * After sub thandle is stopped, it will call this callback to notify
+ * the top thandle.
+ *
+ * \param[in] th       sub thandle to be stopped
+ * \param[in] rc       result of sub trans
+ */
+static void sub_trans_stop_cb(struct lu_env *env,
+                             struct thandle *sub_th,
+                             struct dt_txn_commit_cb *cb, int err)
+{
+       struct sub_thandle              *st;
+       struct top_multiple_thandle     *tmt = cb->dcb_data;
+       ENTRY;
+
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st->st_stopped)
+                       continue;
+
+               if (st->st_dt == sub_th->th_dev) {
+                       st->st_stopped = 1;
+                       st->st_result = err;
+                       break;
+               }
+       }
+
+       wake_up(&tmt->tmt_stop_waitq);
+       RETURN_EXIT;
+}
+
+static void sub_thandle_register_stop_cb(struct sub_thandle *st,
+                                        struct top_multiple_thandle *tmt)
+{
+       st->st_stop_dcb.dcb_func = sub_trans_stop_cb;
+       st->st_stop_dcb.dcb_data = tmt;
+       st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP;
+       INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage);
+       dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb);
+}
+
+/**
  * Create sub thandle
  *
  * Create transaction handle for sub_thandle
@@ -392,7 +437,6 @@ int sub_thandle_trans_create(const struct lu_env *env,
        return 0;
 }
 
-
 /**
  * Create the top transaction.
  *
@@ -611,6 +655,7 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
                if (rc != 0)
                        GOTO(out, rc);
 
+               sub_thandle_register_stop_cb(st, tmt);
                sub_thandle_register_commit_cb(st, tmt);
        }
 out:
@@ -659,6 +704,57 @@ static bool top_check_write_updates(struct top_thandle *top_th)
 }
 
 /**
+ * Check if top transaction is stopped
+ *
+ * Check if top transaction is stopped, only if all sub transaction
+ * is stopped, then the top transaction is stopped.
+ *
+ * \param [in] top_th  top thandle
+ *
+ * \retval             true if the top transaction is stopped.
+ * \retval             false if the top transaction is not stopped.
+ */
+static bool top_trans_is_stopped(struct top_thandle *top_th)
+{
+       struct top_multiple_thandle     *tmt;
+       struct sub_thandle              *st;
+       bool                    all_stopped = true;
+
+       tmt = top_th->tt_multiple_thandle;
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (!st->st_stopped && st->st_sub_th != NULL) {
+                       all_stopped = false;
+                       break;
+               }
+
+               if (st->st_result != 0 &&
+                   top_th->tt_super.th_result == 0)
+                       top_th->tt_super.th_result = st->st_result;
+       }
+
+       return all_stopped;
+}
+
+/**
+ * Wait result of top transaction
+ *
+ * Wait until all sub transaction get its result.
+ *
+ * \param [in] top_th  top thandle.
+ *
+ * \retval             the result of top thandle.
+ */
+static int top_trans_wait_result(struct top_thandle *top_th)
+{
+       struct l_wait_info      lwi = {0};
+
+       l_wait_event(top_th->tt_multiple_thandle->tmt_stop_waitq,
+                    top_trans_is_stopped(top_th), &lwi);
+
+       RETURN(top_th->tt_super.th_result);
+}
+
+/**
  * Stop the top transaction.
  *
  * Stop the transaction on the master device first, then stop transactions
@@ -806,7 +902,10 @@ stop_other_trans:
                        th->th_result = rc;
        }
 
+       rc = top_trans_wait_result(top_th);
+
        tmt->tmt_result = rc;
+
        /* Balance for the refcount in top_trans_create, Note: if it is NOT
         * multiple node transaction, the top transaction will be destroyed. */
        top_multiple_thandle_put(tmt);
@@ -842,6 +941,7 @@ int top_trans_create_tmt(const struct lu_env *env,
        INIT_LIST_HEAD(&tmt->tmt_commit_list);
        atomic_set(&tmt->tmt_refcount, 1);
 
+       init_waitqueue_head(&tmt->tmt_stop_waitq);
        top_th->tt_multiple_thandle = tmt;
 
        return 0;
@@ -860,6 +960,7 @@ create_sub_thandle_with_thandle(struct top_thandle *top_th,
                return st;
 
        st->st_sub_th = sub_th;
+
        sub_th->th_top = &top_th->tt_super;
        return st;
 }
@@ -1135,7 +1236,7 @@ distribute_txn_commit_batchid_update(const struct lu_env *env,
        if (rc < 0)
                GOTO(stop, rc);
 
-       dt_trans_cb_add(th, &dtbd->dtbd_cb);
+       rc = dt_trans_cb_add(th, &dtbd->dtbd_cb);
        if (rc < 0)
                GOTO(stop, rc);
 
@@ -1188,8 +1289,11 @@ distribute_txn_commit_batchid_init(const struct lu_env *env,
 
        dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof,
                                   attr);
-       if (IS_ERR(dt_obj))
-               GOTO(out_put, rc = PTR_ERR(dt_obj));
+       if (IS_ERR(dt_obj)) {
+               rc = PTR_ERR(dt_obj);
+               dt_obj = NULL;
+               GOTO(out_put, rc);
+       }
 
        tdtd->tdtd_batchid_obj = dt_obj;