Whamcloud - gitweb
LU-3536 lod: cancel update log after all committed 72/11572/52
authorWang Di <di.wang@intel.com>
Thu, 21 Aug 2014 22:39:19 +0000 (15:39 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 1 Jun 2015 20:32:38 +0000 (20:32 +0000)
Cancel the log cookies after all of updates have been
committed to disk.

In DNE2, the distributed operation will write update log in all
of involve MDTs, when these the updates are committed on all of
MDT, these update records will be cancelled. If one of these MDTs
restart during the operation, it will replay the updates based
on these update records.

The distribute operation will be represent as distribute transaction
(top_multiple_thandle), and all of distribute transaction will be
linked into a list, which is attached to lu_target (see
struct target_distribute_txn_data), after distribute thandle is
committed, the commit thread (distribute_txn_commit_thread) will
update the commit batchid, after it is committed, it will then
cancel the update records.

Change-Id: I3023f95515b2c15307479f7f85c3f13926d4bcc9
Signed-off-by: Wang Di <di.wang@intel.com>
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/11572
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
27 files changed:
lustre/fid/fid_handler.c
lustre/fld/fld_handler.c
lustre/include/dt_object.h
lustre/include/lu_target.h
lustre/include/lustre_fid.h
lustre/include/lustre_update.h
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_sub_object.c
lustre/mdt/mdt_handler.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_oi.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/ptlrpc/wiretest.c
lustre/target/tgt_internal.h
lustre/target/update_records.c
lustre/target/update_trans.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index fe94281..1046b3c 100644 (file)
@@ -258,17 +258,27 @@ static int range_alloc_set(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_seq_range *out,
-                                   const struct lu_env *env)
+/**
+ * Check if the sequence server has sequence avaible
+ *
+ * Check if the sequence server has sequence avaible, if not, then
+ * allocating super sequence from sequence manager (MDT0).
+ *
+ * \param[in] env      execution environment
+ * \param[in] seq      server sequence
+ *
+ * \retval             negative errno if allocating new sequence fails
+ * \retval             0 if there is enough sequence or allocating
+ *                      new sequence succeeds
+ */
+int seq_server_check_and_alloc_super(const struct lu_env *env,
+                                    struct lu_server_seq *seq)
 {
        struct lu_seq_range *space = &seq->lss_space;
        int rc = 0;
 
        ENTRY;
 
-       LASSERT(range_is_sane(space));
-
        /* Check if available space ends and allocate new super seq */
        if (range_is_exhausted(space)) {
                if (!seq->lss_cli) {
@@ -279,8 +289,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 
                rc = seq_client_alloc_super(seq->lss_cli, env);
                if (rc) {
-                       CERROR("%s: Can't allocate super-sequence, rc %d\n",
-                              seq->lss_name, rc);
+                       CDEBUG(D_HA, "%s: Can't allocate super-sequence:"
+                             " rc %d\n", seq->lss_name, rc);
                        RETURN(rc);
                }
 
@@ -298,6 +308,31 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                }
        }
 
+       if (range_is_zero(&seq->lss_lowater_set))
+               __seq_set_init(env, seq);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(seq_server_check_and_alloc_super);
+
+static int __seq_server_alloc_meta(struct lu_server_seq *seq,
+                                  struct lu_seq_range *out,
+                                  const struct lu_env *env)
+{
+       struct lu_seq_range *space = &seq->lss_space;
+       int rc = 0;
+
+       ENTRY;
+
+       LASSERT(range_is_sane(space));
+
+       rc = seq_server_check_and_alloc_super(env, seq);
+       if (rc < 0) {
+               CERROR("%s: Allocated super-sequence failed: rc = %d\n",
+                       seq->lss_name, rc);
+               RETURN(rc);
+       }
+
        rc = range_alloc_set(env, out, seq);
        if (rc != 0) {
                CERROR("%s: Allocated meta-sequence failed: rc = %d\n",
@@ -324,6 +359,7 @@ int seq_server_alloc_meta(struct lu_server_seq *seq,
 
         RETURN(rc);
 }
+EXPORT_SYMBOL(seq_server_alloc_meta);
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_env *env,
index 417f103..b38f306 100644 (file)
@@ -474,6 +474,7 @@ int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
        fld->lsf_control_exp = NULL;
        fld->lsf_seq_lookup = fld_server_lookup;
 
+       fld->lsf_seq_lookup = fld_server_lookup;
        RETURN(0);
 out_index:
        fld_index_fini(env, fld);
index e5f782c..4921873 100644 (file)
@@ -104,10 +104,13 @@ typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th,
 #define TRANS_COMMIT_CB_MAGIC  0xa0a00a0a
 #define MAX_COMMIT_CB_STR_LEN  32
 
+#define DCB_TRANS_NOT_COMMITTED        0x1
 struct dt_txn_commit_cb {
        struct list_head        dcb_linkage;
        dt_cb_t                 dcb_func;
+       void                    *dcb_data;
        __u32                   dcb_magic;
+       __u32                   dcb_flags;
        char                    dcb_name[MAX_COMMIT_CB_STR_LEN];
 };
 
@@ -1794,9 +1797,13 @@ struct thandle {
        /** the dt device on which the transactions are executed */
        struct dt_device *th_dev;
 
-       /* In some callback function, it needs to access the top_th directly */
-       struct thandle *th_top;
-
+       /* point to the top thandle, XXX this is a bit hacky right now,
+        * but normal device trans callback triggered by the bottom
+        * device (OSP/OSD == sub thandle layer) needs to get the
+        * top_thandle (see dt_txn_hook_start/stop()), so we put the
+        * top thandle here for now, will fix it when we have better
+        * callback mechanism */
+       struct thandle  *th_top;
        /** context for this transaction, tag is LCT_TX_HANDLE */
        struct lu_context th_ctx;
 
@@ -1810,7 +1817,9 @@ struct thandle {
        /** whether we need sync commit */
        unsigned int            th_sync:1,
        /* local transation, no need to inform other layers */
-                               th_local:1;
+                               th_local:1,
+       /* Whether we need wait the transaction to be submitted */
+                               th_wait_submit:1;
 };
 
 /**
index d0580c2..2e56849 100644 (file)
 #include <lustre_disk.h>
 #include <lustre_lfsck.h>
 
+struct target_distribute_txn_data {
+       /* Distribution ID is used to identify updates log on different
+        * MDTs for one operation */
+       spinlock_t              tdtd_batchid_lock;
+       __u64                   tdtd_batchid;
+       struct lu_target        *tdtd_lut;
+       struct dt_object        *tdtd_batchid_obj;
+
+       /* Committed batchid for distribute transaction */
+       __u64                   tdtd_committed_batchid;
+
+       /* List for distribute transaction */
+       struct list_head        tdtd_list;
+
+       /* Threads to manage distribute transaction */
+       wait_queue_head_t       tdtd_commit_thread_waitq;
+       atomic_t                tdtd_refcount;
+};
+
 struct lu_target {
        struct obd_device       *lut_obd;
        struct dt_device        *lut_bottom;
 
+       struct target_distribute_txn_data *lut_tdtd;
+       struct ptlrpc_thread    lut_tdtd_commit_thread;
+
        /* supported opcodes and handlers for this target */
        struct tgt_opc_slice    *lut_slice;
        __u32                    lut_reply_fail_id;
@@ -333,6 +355,15 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
 
+/* target/update_trans.c */
+int distribute_txn_init(const struct lu_env *env,
+                       struct lu_target *lut,
+                       struct target_distribute_txn_data *tdtd,
+                       __u32 index);
+void distribute_txn_fini(const struct lu_env *env,
+                        struct target_distribute_txn_data *tdtd);
+
+
 enum {
        ESERIOUS = 0x0001000
 };
index f325849..e5ef46a 100644 (file)
@@ -239,6 +239,7 @@ enum local_oid {
        /* This definition is obsolete
         * SLAVE_LLOG_CATALOGS_OID      = 4124UL,
         */
+       BATCHID_COMMITTED_OID   = 4125UL,
 };
 
 static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
@@ -501,6 +502,8 @@ int seq_server_set_cli(const struct lu_env *env,
                       struct lu_server_seq *seq,
                       struct lu_client_seq *cli);
 
+int seq_server_check_and_alloc_super(const struct lu_env *env,
+                                    struct lu_server_seq *seq);
 /* Client methods */
 int seq_client_init(struct lu_client_seq *seq,
                     struct obd_export *exp,
index 4cb87e0..8a42fb8 100644 (file)
@@ -333,36 +333,50 @@ struct thandle_update_records {
 };
 
 #define TOP_THANDLE_MAGIC      0x20140917
+struct top_multiple_thandle {
+       struct dt_device        *tmt_master_sub_dt;
+       atomic_t                tmt_refcount;
+       /* Other sub transactions will be listed here. */
+       struct list_head        tmt_sub_thandle_list;
+
+       struct list_head        tmt_commit_list;
+       /* All of update records will packed here */
+       struct thandle_update_records *tmt_update_records;
+
+       __u64                   tmt_batchid;
+       int                     tmt_result;
+       __u32                   tmt_magic;
+       __u32                   tmt_committed:1;
+};
+
 /* {top,sub}_thandle are used to manage distributed transactions which
  * include updates on several nodes. A top_handle represents the
  * whole operation, and sub_thandle represents updates on each node. */
 struct top_thandle {
        struct thandle          tt_super;
-       __u32                   tt_magic;
-       atomic_t                tt_refcount;
        /* The master sub transaction. */
        struct thandle          *tt_master_sub_thandle;
 
-       /* Other sub thandle will be listed here. */
-       struct list_head        tt_sub_thandle_list;
-
-       /* All of update records will packed here */
-       struct thandle_update_records *tt_update_records;
+       struct top_multiple_thandle *tt_multiple_thandle;
 };
 
+/* Sub thandle is used to track multiple sub thandles under one parent
+ * thandle */
 struct sub_thandle {
-       /* point to the osd/osp_thandle */
        struct thandle          *st_sub_th;
+       struct dt_device        *st_dt;
+       struct list_head        st_list;
+       struct llog_cookie      st_cookie;
+       struct dt_txn_commit_cb st_commit_dcb;
+       int                     st_result;
 
        /* linked to top_thandle */
        struct list_head        st_sub_list;
 
        /* If this sub thandle is committed */
-       bool                    st_committed:1,
-                               st_record_update:1;
+       bool                    st_committed:1;
 };
 
-
 /* target/out_lib.c */
 int out_update_pack(const struct lu_env *env, struct object_update *update,
                    size_t max_update_size, enum update_type op,
@@ -433,14 +447,25 @@ thandle_get_sub(const struct lu_env *env, struct thandle *th,
 
 struct thandle *
 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
-
 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
                    struct thandle *th);
-
 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
                   struct thandle *th);
+void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
+
+static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
+{
+       atomic_inc(&tmt->tmt_refcount);
+}
+
+static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
+{
+       if (atomic_dec_and_test(&tmt->tmt_refcount))
+               top_multiple_thandle_destroy(tmt);
+}
 
-void top_thandle_destroy(struct top_thandle *top_th);
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+                                      struct dt_device *dt_dev);
 
 /* update_records.c */
 int update_records_create_pack(const struct lu_env *env,
@@ -549,16 +574,13 @@ int tur_update_records_extend(struct thandle_update_records *tur,
                              size_t new_size);
 int tur_update_params_extend(struct thandle_update_records *tur,
                             size_t new_size);
-int check_and_prepare_update_record(const struct lu_env *env,
-                                   struct thandle *th);
-int merge_params_updates_buf(const struct lu_env *env,
-                            struct thandle_update_records *tur);
 int tur_update_extend(struct thandle_update_records *tur,
                      size_t new_op_size, size_t new_param_size);
 
 #define update_record_pack(name, th, ...)                              \
 ({                                                                     \
        struct top_thandle *top_th;                                     \
+       struct top_multiple_thandle *tmt;                               \
        struct thandle_update_records *tur;                             \
        struct llog_update_record     *lur;                             \
        size_t          avail_param_size;                               \
@@ -567,7 +589,8 @@ int tur_update_extend(struct thandle_update_records *tur,
                                                                        \
        while (1) {                                                     \
                top_th = container_of(th, struct top_thandle, tt_super);\
-               tur = top_th->tt_update_records;                        \
+               tmt = top_th->tt_multiple_thandle;                      \
+               tur = tmt->tmt_update_records;                          \
                lur = tur->tur_update_records;                          \
                avail_param_size = tur->tur_update_params_buf_size -    \
                             update_params_size(tur->tur_update_params, \
index cd7a97d..88d5800 100644 (file)
@@ -153,6 +153,9 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
        }
 
        server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
+       if (server_fld == NULL)
+               RETURN(-EIO);
+
        fld_range_set_type(&range, *type);
        rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
        if (rc != 0)
@@ -170,6 +173,8 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
 /* Slab for OSD object allocation */
 struct kmem_cache *lod_object_kmem;
 
+/* Slab for dt_txn_callback */
+struct kmem_cache *lod_txn_callback_kmem;
 static struct lu_kmem_descr lod_caches[] = {
        {
                .ckd_cache = &lod_object_kmem,
@@ -177,6 +182,11 @@ static struct lu_kmem_descr lod_caches[] = {
                .ckd_size  = sizeof(struct lod_object)
        },
        {
+               .ckd_cache = &lod_txn_callback_kmem,
+               .ckd_name  = "lod_txn_callback",
+               .ckd_size  = sizeof(struct dt_txn_callback)
+       },
+       {
                .ckd_cache = NULL
        }
 };
@@ -212,7 +222,7 @@ static struct lu_object *lod_object_alloc(const struct lu_env *env,
 }
 
 /**
- * Cleanup table of target's descriptors.
+ * Process the config log for all sub device.
  *
  * The function goes through all the targets in the given table
  * and apply given configuration command on to the targets.
@@ -318,6 +328,44 @@ out:
 }
 
 /**
+ * finish sub llog context
+ *
+ * Stop update recovery thread for the sub device, then cleanup the
+ * correspondent llog ctxt.
+ *
+ * \param[in] env      execution environment
+ * \param[in] lod      lod device to do update recovery
+ * \param[in] thread   recovery thread on this sub device
+ */
+void lod_sub_fini_llog(const struct lu_env *env,
+                      struct dt_device *dt, struct ptlrpc_thread *thread)
+{
+       struct obd_device       *obd;
+       struct llog_ctxt        *ctxt;
+       ENTRY;
+
+       obd = dt->dd_lu_dev.ld_obd;
+       CDEBUG(D_INFO, "%s: finish sub llog\n", obd->obd_name);
+       /* Stop recovery thread first */
+       if (thread != NULL && thread->t_flags & SVC_RUNNING) {
+               thread->t_flags = SVC_STOPPING;
+               wake_up(&thread->t_ctl_waitq);
+               wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+       }
+
+       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+       if (ctxt == NULL)
+               RETURN_EXIT;
+
+       if (ctxt->loc_handle != NULL)
+               llog_cat_close(env, ctxt->loc_handle);
+
+       llog_cleanup(env, ctxt);
+
+       RETURN_EXIT;
+}
+
+/**
  * Extract MDT target index from a device name.
  *
  * a helper function to extract index from the given device name
@@ -405,16 +453,13 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
        if (lod->lod_child == dt) {
                thread = &lod->lod_child_recovery_thread;
                rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index);
-               if (rc != 0) {
-                       OBD_FREE_PTR(lrd);
-                       RETURN(rc);
-               }
+               if (rc != 0)
+                       GOTO(free_lrd, rc);
        } else {
                struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
                struct lod_tgt_desc     *tgt = NULL;
                unsigned int            i;
 
-               mutex_lock(&ltd->ltd_mutex);
                cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
                        tgt = LTD_TGT(ltd, i);
                        if (tgt->ltd_tgt == dt) {
@@ -423,13 +468,12 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
                                break;
                        }
                }
-               mutex_unlock(&ltd->ltd_mutex);
-               OBD_ALLOC_PTR(tgt->ltd_recovery_thread);
-               if (tgt->ltd_recovery_thread == NULL) {
-                       OBD_FREE_PTR(lrd);
-                       RETURN(-ENOMEM);
-               }
-               thread = tgt->ltd_recovery_thread;
+               LASSERT(sub_ltd != NULL);
+               OBD_ALLOC_PTR(sub_ltd->ltd_recovery_thread);
+               if (sub_ltd->ltd_recovery_thread == NULL)
+                       GOTO(free_lrd, rc = -ENOMEM);
+
+               thread = sub_ltd->ltd_recovery_thread;
        }
 
        lrd->lrd_lod = lod;
@@ -445,8 +489,7 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
        if (rc < 0) {
                CERROR("%s: cannot setup updatelog llog: rc = %d\n",
                       obd->obd_name, rc);
-               OBD_FREE_PTR(lrd);
-               RETURN(rc);
+               GOTO(free_thread, rc);
        }
 
        /* Start the recovery thread */
@@ -454,7 +497,6 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
                           index);
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               OBD_FREE_PTR(lrd);
                CERROR("%s: cannot start recovery thread: rc = %d\n",
                       obd->obd_name, rc);
                GOTO(out_llog, rc);
@@ -462,46 +504,18 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
 
        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING ||
                                          thread->t_flags & SVC_STOPPED, &lwi);
-out_llog:
-       if (rc != 0)
-               lod_sub_fini_llog(env, dt, thread);
-
-       RETURN(rc);
-}
 
-/**
- * finish sub llog context
- *
- * Stop update recovery thread for the sub device, then cleanup the
- * correspondent llog ctxt.
- *
- * \param[in] env      execution environment
- * \param[in] dt       dt device(lod) to do update recovery
- * \param[in] thread   recovery thread on this sub device
- */
-void lod_sub_fini_llog(const struct lu_env *env,
-                      struct dt_device *dt, struct ptlrpc_thread *thread)
-{
-       struct obd_device       *obd;
-       struct llog_ctxt        *ctxt;
-
-       CDEBUG(D_INFO, "%s: finish sub llog\n", dt->dd_lu_dev.ld_obd->obd_name);
-       /* Stop recovery thread first */
-       if (thread != NULL && thread->t_flags & SVC_RUNNING) {
-               thread->t_flags = SVC_STOPPING;
-               wake_up(&thread->t_ctl_waitq);
-               wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+       RETURN(0);
+out_llog:
+       lod_sub_fini_llog(env, dt, thread);
+free_thread:
+       if (lod->lod_child != dt) {
+               OBD_FREE_PTR(sub_ltd->ltd_recovery_thread);
+               sub_ltd->ltd_recovery_thread = NULL;
        }
-
-       obd = dt->dd_lu_dev.ld_obd;
-       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
-       if (ctxt == NULL)
-               return;
-
-       if (ctxt->loc_handle != NULL)
-               llog_cat_close(env, ctxt->loc_handle);
-
-       llog_cleanup(env, ctxt);
+free_lrd:
+       OBD_FREE_PTR(lrd);
+       RETURN(rc);
 }
 
 /**
@@ -512,15 +526,16 @@ void lod_sub_fini_llog(const struct lu_env *env,
  * \param[in] env      execution environment
  * \param[in] lod      lod device to do update recovery
  */
-void lod_sub_fini_all_llogs(const struct lu_env *env, struct lod_device *lod)
+static void lod_sub_fini_all_llogs(const struct lu_env *env,
+                                  struct lod_device *lod)
 {
        struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
        unsigned int i;
 
        /* Stop the update log commit cancel threads and finish master
         * llog ctxt */
-       lod_sub_fini_llog(env, lod->lod_child, &lod->lod_child_recovery_thread);
-
+       lod_sub_fini_llog(env, lod->lod_child,
+                         &lod->lod_child_recovery_thread);
        lod_getref(ltd);
        cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
                struct lod_tgt_desc     *tgt;
@@ -538,6 +553,71 @@ void lod_sub_fini_all_llogs(const struct lu_env *env, struct lod_device *lod)
 }
 
 /**
+ * Prepare distribute txn
+ *
+ * Prepare distribute txn structure for LOD
+ *
+ * \param[in] env      execution environment
+ * \param[in] lod_device  LOD device
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+static int lod_prepare_distribute_txn(const struct lu_env *env,
+                                     struct lod_device *lod)
+{
+       struct target_distribute_txn_data *tdtd;
+       struct lu_target                  *lut;
+       int                               rc;
+       ENTRY;
+
+       /* Init update recovery data */
+       OBD_ALLOC_PTR(tdtd);
+       if (tdtd == NULL)
+               RETURN(-ENOMEM);
+
+       lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+
+       rc = distribute_txn_init(env, lut, tdtd,
+               lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id);
+
+       if (rc < 0) {
+               CERROR("%s: cannot init distribute txn: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               OBD_FREE_PTR(tdtd);
+               RETURN(rc);
+       }
+
+       lut->lut_tdtd = tdtd;
+
+       RETURN(0);
+}
+
+/**
+ * Finish distribute txn
+ *
+ * Release the resource holding by distribute txn, i.e. stop distribute
+ * txn thread.
+ *
+ * \param[in] env      execution environment
+ * \param[in] lod      lod device
+ */
+static void lod_fini_distribute_txn(const struct lu_env *env,
+                                   struct lod_device *lod)
+{
+       struct lu_target                  *lut;
+
+       lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+       if (lut->lut_tdtd == NULL)
+               return;
+
+       distribute_txn_fini(env, lut->lut_tdtd);
+
+       OBD_FREE_PTR(lut->lut_tdtd);
+       lut->lut_tdtd = NULL;
+}
+
+/**
  * Implementation of lu_device_operations::ldo_process_config() for LOD
  *
  * The method is called by the configuration subsystem during setup,
@@ -645,6 +725,7 @@ static int lod_process_config(const struct lu_env *env,
                        CDEBUG(D_HA, "%s: can't process %u: %d\n",
                               lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
 
+               lod_fini_distribute_txn(env, lod);
                lod_sub_fini_all_llogs(env, lod);
                break;
        }
@@ -791,10 +872,10 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
        if (IS_ERR(root))
                RETURN(PTR_ERR(root));
 
+       /* Create update log object */
        index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
        lu_update_log_fid(fid, index);
 
-       /* Create update log object */
        dto = local_file_find_or_create_with_fid(env, lod->lod_child,
                                                 fid, root,
                                                 lod_update_log_name,
@@ -815,8 +896,12 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
 
        lu_object_put(env, &dto->do_lu);
 
+       rc = lod_prepare_distribute_txn(env, lod);
+       if (rc != 0)
+               GOTO(out_put, rc);
+
        rc = lod_sub_init_llogs(env, lod);
-       if (rc < 0)
+       if (rc != 0)
                GOTO(out_put, rc);
 
 out_put:
@@ -1203,7 +1288,7 @@ static struct lu_device *lod_device_free(const struct lu_env *env,
        struct lu_device  *next = &lod->lod_child->dd_lu_dev;
        ENTRY;
 
-       LASSERT(atomic_read(&lu->ld_ref) == 0);
+       LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
        dt_device_fini(&lod->lod_dt_dev);
        OBD_FREE_PTR(lod);
        RETURN(next);
index c99b074..85c5007 100644 (file)
@@ -402,6 +402,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
                      struct dt_device *dt);
 void lod_sub_fini_llog(const struct lu_env *env,
                       struct dt_device *dt, struct ptlrpc_thread *thread);
+int lodname2mdt_index(char *lodname, __u32 *mdt_index);
+
 /* lod_lov.c */
 void lod_getref(struct lod_tgt_descs *ltd);
 void lod_putref(struct lod_device *lod, struct lod_tgt_descs *ltd);
index d6f1f32..05eb8a4 100644 (file)
@@ -74,7 +74,7 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
                RETURN(th);
 
        tth = container_of(th, struct top_thandle, tt_super);
-       LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC);
+
        /* local object must be mdt object, Note: during ost object
         * creation, FID is not assigned until osp_object_create(),
         * so if the FID of sub_obj is zero, it means OST object. */
@@ -82,8 +82,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
            fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
                /* local MDT object */
                if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
-                   tth->tt_update_records != NULL &&
-                   record_update != NULL)
+                   tth->tt_multiple_thandle != NULL &&
+                   record_update != NULL &&
+                   th->th_result == 0)
                        *record_update = true;
 
                RETURN(tth->tt_master_sub_thandle);
@@ -97,7 +98,8 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        if (type == LU_SEQ_RANGE_OST)
                RETURN(tth->tt_master_sub_thandle);
 
-       if (tth->tt_update_records != NULL && record_update != NULL)
+       if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
+           th->th_result == 0)
                *record_update = true;
 
        sub_th = thandle_get_sub(env, th, sub_obj);
@@ -881,33 +883,69 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
        struct lu_fid           *fid = &lti->lti_fid;
        struct obd_device       *obd;
        int                     rc;
+       bool                    need_put = false;
        ENTRY;
 
        lu_update_log_fid(fid, index);
-       fid_to_logid(fid, &cid->lci_logid);
+
+       rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
+       if (rc < 0)
+               RETURN(rc);
+
+       rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
+       if (rc != 0) {
+               CERROR("%s: can't get id from catalogs: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
 
        obd = dt->dd_lu_dev.ld_obd;
        ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
        LASSERT(ctxt != NULL);
        ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
 
-       rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
-                      LLOG_OPEN_EXISTS);
-       if (rc < 0) {
-               llog_ctxt_put(ctxt);
-               RETURN(rc);
+       if (likely(logid_id(&cid->lci_logid) != 0)) {
+               rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
+                              LLOG_OPEN_EXISTS);
+
+               /* re-create llog if it is missing */
+               if (rc == -ENOENT)
+                       logid_set_id(&cid->lci_logid, 0);
+               else if (rc < 0)
+                       GOTO(out_put, rc);
+       }
+
+       if (unlikely(logid_id(&cid->lci_logid) == 0)) {
+               rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
+               if (rc < 0)
+                       GOTO(out_put, rc);
+               cid->lci_logid = lgh->lgh_id;
+               need_put = true;
        }
 
        LASSERT(lgh != NULL);
        ctxt->loc_handle = lgh;
 
        rc = llog_cat_init_and_process(env, lgh);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
+       if (need_put) {
+               rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
+               if (rc != 0)
+                       GOTO(out_close, rc);
+       }
+
+       CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
+              obd->obd_name, index, POSTID(&cid->lci_logid.lgl_oi),
+              cid->lci_logid.lgl_ogen);
+out_close:
        if (rc != 0) {
                llog_cat_close(env, ctxt->loc_handle);
                ctxt->loc_handle = NULL;
        }
-
+out_put:
        llog_ctxt_put(ctxt);
-
        RETURN(rc);
 }
+
index caa9145..b90f15e 100644 (file)
@@ -3505,20 +3505,26 @@ static int mdt_register_lwp_callback(void *data)
 
        LASSERT(mdt_seq_site(mdt)->ss_node_id != 0);
 
-       if (!likely(fld->lsf_new))
-               RETURN(0);
-
        rc = lu_env_init(&env, LCT_MD_THREAD);
-       if (rc) {
+       if (rc < 0) {
                CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc);
                RETURN(rc);
        }
 
-       rc = fld_update_from_controller(&env, fld);
-       if (rc != 0) {
-               CERROR("%s: cannot update controller: rc = %d\n",
-                      mdt_obd_name(mdt), rc);
+       /* Allocate new sequence now to avoid creating local transaction
+        * in the normal transaction process */
+       rc = seq_server_check_and_alloc_super(&env,
+                                             mdt_seq_site(mdt)->ss_server_seq);
+       if (rc < 0)
                GOTO(out, rc);
+
+       if (fld->lsf_new) {
+               rc = fld_update_from_controller(&env, fld);
+               if (rc != 0) {
+                       CERROR("%s: cannot update controller: rc = %d\n",
+                              mdt_obd_name(mdt), rc);
+                       GOTO(out, rc);
+               }
        }
 out:
        lu_env_fini(&env);
index 9346f18..4e3ae10 100644 (file)
@@ -791,6 +791,7 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
+       th->th_wait_submit = 1;
        rc = llog_declare_create(env, *res, th);
        if (rc == 0) {
                rc = dt_trans_start_local(env, d, th);
@@ -863,6 +864,7 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
        rc = dt_trans_start_local(env, dt, th);
        if (rc)
                GOTO(out_trans, rc);
index c8c2da3..bc5c126 100644 (file)
@@ -72,6 +72,14 @@ static int llog_cat_new_log(const struct lu_env *env,
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                RETURN(-ENOSPC);
 
+       if (loghandle->lgh_hdr != NULL) {
+               /* If llog object is remote and creation is failed, lgh_hdr
+                * might be left over here, free it first */
+               LASSERT(!llog_exist(loghandle));
+               OBD_FREE_PTR(loghandle->lgh_hdr);
+               loghandle->lgh_hdr = NULL;
+       }
+
        rc = llog_create(env, loghandle, th);
        /* if llog is already created, no need to initialize it */
        if (rc == -EEXIST) {
index 9aa79de..f0e904b 100644 (file)
@@ -1377,12 +1377,15 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 
        LASSERT(handle->lgh_obj);
 
-       lu_object_put(env, &handle->lgh_obj->do_lu);
-
-       if (handle->lgh_ctxt->loc_flags &
-           LLOG_CTXT_FLAG_NORMAL_FID)
+       if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               /* Remove the object from the cache, otherwise it may
+                * hold LOD being released during cleanup process */
+               lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
+               LASSERT(handle->private_data == NULL);
                RETURN(rc);
-
+       } else {
+               lu_object_put(env, &handle->lgh_obj->do_lu);
+       }
        los = handle->private_data;
        LASSERT(los);
        dt_los_put(los);
@@ -1511,6 +1514,8 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
+
        dt_write_lock(env, o, 0);
        if (dt_object_exists(o)) {
                if (name) {
@@ -1722,6 +1727,12 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
                lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
                lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
 
+               th->th_wait_submit = 1;
+               /* Make the llog object creation synchronization, so
+                * it will be reliable to the reference, especially
+                * for remote reference */
+               th->th_sync = 1;
+
                rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
                                       &lgi->lgi_dof, th);
                if (rc)
@@ -1854,6 +1865,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
+
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
        if (rc)
                CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",
index bcdf461..9c8ea63 100644 (file)
@@ -1026,7 +1026,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        qtrans = oh->ot_quota_trans;
        oh->ot_quota_trans = NULL;
 
-        if (oh->ot_handle != NULL) {
+       if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
                 /*
@@ -1039,6 +1039,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
 
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
+
                 rc = dt_txn_hook_stop(env, th);
                 if (rc != 0)
                        CERROR("%s: failed in transaction hook: rc = %d\n",
@@ -5549,10 +5550,18 @@ static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
 
        rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
                             osd->od_svname, ss->ss_server_seq);
-
        if (rc != 0) {
                OBD_FREE_PTR(osd->od_cl_seq);
                osd->od_cl_seq = NULL;
+               RETURN(rc);
+       }
+
+       if (ss->ss_node_id == 0) {
+               /* If the OSD on the sequence controller(MDT0), then allocate
+                * sequence here, otherwise allocate sequence after connected
+                * to MDT0 (see mdt_register_lwp_callback()). */
+               rc = seq_server_alloc_meta(osd->od_cl_seq->lcs_srv,
+                                  &osd->od_cl_seq->lcs_space, env);
        }
 
        RETURN(rc);
index 9926dd1..a6fed15 100644 (file)
@@ -514,6 +514,9 @@ int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
        if (!(flags & OI_CHECK_FLD))
                RETURN(0);
 
+       if (osd_seq_site(osd)->ss_server_fld == NULL)
+               RETURN(0);
+
        rc = osd_fld_lookup(info->oti_env, osd, fid_seq(fid), range);
        if (rc != 0) {
                if (rc != -ENOENT)
index 047e656..76895d3 100644 (file)
@@ -1654,6 +1654,10 @@ static const struct osd_lf_map osd_lf_maps[] = {
        { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 },
                OLF_SHOW_NAME, sizeof("LAST_GROUP") - 1, NULL, NULL },
 
+       /* committed batchid for cross-MDT operation */
+       { "BATCHID", { FID_SEQ_LOCAL_FILE, BATCHID_COMMITTED_OID, 0 },
+               OLF_SHOW_NAME, sizeof("BATCHID") - 1, NULL, NULL },
+
        /* lost+found */
        { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 },
                OLF_SCAN_SUBITEMS, sizeof("lost+found") - 1,
index 588032d..6c3ad4f 100644 (file)
@@ -791,6 +791,7 @@ const struct dt_device_operations osp_dt_ops = {
        .dt_trans_create = osp_trans_create,
        .dt_trans_start  = osp_trans_start,
        .dt_trans_stop   = osp_trans_stop,
+       .dt_trans_cb_add   = osp_trans_cb_add,
 };
 
 /**
index 27a8643..4198f45 100644 (file)
@@ -302,6 +302,8 @@ struct osp_thandle {
 
        /* OSP will use this thandle to update last oid*/
        struct thandle          *ot_storage_th;
+       struct list_head         ot_dcb_list;
+       atomic_t                 ot_refcount;
 };
 
 static inline struct osp_thandle *
@@ -639,6 +641,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th);
+int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb);
 
 struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
                          __u32 attr);
index 11272e1..caa7ee9 100644 (file)
@@ -1203,7 +1203,10 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
 
        memcpy(rbuf->lb_buf, orr->orr_data, orr->orr_size);
 
-       GOTO(out, rc = orr->orr_size);
+       CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u\n",
+              osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+              *pos, orr->orr_size);
+       GOTO(out, rc = (int)orr->orr_size);
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
index 026fdb3..bd51a8e 100644 (file)
@@ -1552,6 +1552,7 @@ static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
        int                      rc = 0;
 
        ENTRY;
+
        o->opo_non_exist = 1;
 
        LASSERT(!osp->opd_connect_mdt);
index d0a8c39..a3aa8d3 100644 (file)
@@ -522,6 +522,17 @@ int osp_trans_update_request_create(struct thandle *th)
        return 0;
 }
 
+static void osp_thandle_get(struct osp_thandle *oth)
+{
+       atomic_inc(&oth->ot_refcount);
+}
+
+static void osp_thandle_put(struct osp_thandle *oth)
+{
+       if (atomic_dec_and_test(&oth->ot_refcount))
+               OBD_FREE_PTR(oth);
+}
+
 /**
  * The OSP layer dt_device_operations::dt_trans_create() interface
  * to create a transaction.
@@ -563,6 +574,9 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
 
+       atomic_set(&oth->ot_refcount, 1);
+       INIT_LIST_HEAD(&oth->ot_dcb_list);
+
        RETURN(th);
 }
 
@@ -572,22 +586,22 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
  * Prepare OUT update ptlrpc request, and the request usually includes
  * all of updates (stored in \param ureq) from one operation.
  *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] ureq     hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
  *
- * \retval             0 if preparation succeeds.
- * \retval             negative errno if preparation fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp)
 {
-       struct ptlrpc_request           *req;
-       struct object_update_request    *tmp;
-       int                             ureq_len;
-       int                             rc;
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       int                             ureq_len;
+       int                             rc;
        ENTRY;
 
        object_update_request_dump(ureq, D_INFO);
@@ -606,7 +620,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
        }
 
        req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+                           RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
 
        tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
        memcpy(tmp, ureq, ureq_len);
@@ -620,18 +634,18 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
 }
 
 /**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] dt_update        hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
- *
- * \retval             0 if RPC succeeds.
- * \retval             negative errno if RPC fails.
- */
+* Send update RPC.
+*
+* Send update request to the remote MDT synchronously.
+*
+* \param[in] env       execution environment
+* \param[in] imp       import on which ptlrpc request will be sent
+* \param[in] dt_update hold all of updates which will be packed into the req
+* \param[in] reqp      request to be created
+*
+* \retval              0 if RPC succeeds.
+* \retval              negative errno if RPC fails.
+*/
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct dt_update_request *dt_update,
                    struct ptlrpc_request **reqp)
@@ -672,6 +686,76 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
 }
 
 /**
+ * Add commit callback to transaction.
+ *
+ * Add commit callback to the osp thandle, which will be called
+ * when the thandle is committed remotely.
+ *
+ * \param[in] th       the thandle
+ * \param[in] dcb      commit callback structure
+ *
+ * \retval             only return 0 for now.
+ */
+int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+       struct osp_thandle *oth = thandle_to_osp_thandle(th);
+
+       LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
+       LASSERT(&dcb->dcb_func != NULL);
+       list_add(&dcb->dcb_linkage, &oth->ot_dcb_list);
+
+       return 0;
+}
+
+static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
+{
+       struct dt_txn_commit_cb *dcb;
+       struct dt_txn_commit_cb *tmp;
+
+       LASSERT(atomic_read(&oth->ot_refcount) > 0);
+       /* call per-transaction callbacks if any */
+       list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list,
+                                dcb_linkage) {
+               LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+                        "commit callback entry: magic=%x name='%s'\n",
+                        dcb->dcb_magic, dcb->dcb_name);
+               list_del_init(&dcb->dcb_linkage);
+               dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+       }
+}
+
+static void osp_request_commit_cb(struct ptlrpc_request *req)
+{
+       struct thandle *th = req->rq_cb_data;
+       struct osp_thandle *oth = thandle_to_osp_thandle(th);
+       __u64                   last_committed_transno = 0;
+       int                     result = req->rq_status;
+       ENTRY;
+
+       if (lustre_msg_get_last_committed(req->rq_repmsg))
+               last_committed_transno =
+                       lustre_msg_get_last_committed(req->rq_repmsg);
+
+       if (last_committed_transno <
+               req->rq_import->imp_peer_committed_transno)
+               last_committed_transno =
+                       req->rq_import->imp_peer_committed_transno;
+
+       CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
+              req->rq_transno, last_committed_transno);
+
+       /* If the transaction is not really committed, mark result = 1 */
+       if (req->rq_transno != 0 &&
+           (req->rq_transno > last_committed_transno) && result == 0)
+               result = 1;
+
+       osp_trans_commit_cb(oth, result);
+       req->rq_committed = 1;
+       osp_thandle_put(oth);
+       EXIT;
+}
+
+/**
  * Trigger the request for remote updates.
  *
  * If th_sync is set, then the request will be sent synchronously,
@@ -706,7 +790,8 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
        req->rq_interpret_reply = osp_update_interpret;
        args = ptlrpc_req_async_args(req);
        args->oaua_update = dt_update;
-       if (is_only_remote_trans(th) && !th->th_sync) {
+
+       if (!th->th_wait_submit && is_only_remote_trans(th) && !th->th_sync) {
                args->oaua_flow_control = true;
 
                if (!osp->opd_connect_mdt) {
@@ -719,10 +804,30 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 
                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        } else {
-               osp_get_rpc_lock(osp);
+               struct osp_thandle *oth = thandle_to_osp_thandle(th);
+               struct lu_device *top_device;
+
+               /* If the transaction is created during MDT recoverying
+                * process, it means this is an recovery update, we need
+                * to let OSP send it anyway without checking recoverying
+                * status, in case the other target is being recoveried
+                * at the same time, and if we wait here for the import
+                * to be recoveryed, it might cause deadlock */
+               top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
+               if (top_device->ld_obd->obd_recovering)
+                       req->rq_allow_replay = 1;
+
                args->oaua_flow_control = false;
+               req->rq_commit_cb = osp_request_commit_cb;
+               req->rq_cb_data = th;
+               osp_thandle_get(oth); /* for commit callback */
+               osp_get_rpc_lock(osp);
                rc = ptlrpc_queue_wait(req);
                osp_put_rpc_lock(osp);
+               if (req->rq_transno == 0 && !req->rq_committed)
+                       osp_thandle_put(oth);
+               else
+                       oth->ot_dur = NULL;
                ptlrpc_req_finished(req);
        }
 
@@ -842,17 +947,17 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
        }
 
        dt_update = oth->ot_dur;
-       if (dt_update == NULL)
+       if (dt_update == NULL || th->th_result != 0) {
+               rc = th->th_result;
                GOTO(out, rc);
+       }
 
        LASSERT(dt_update != LP_POISON);
 
        /* If there are no updates, destroy dt_update and thandle */
        if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0) {
-               dt_update_request_destroy(dt_update);
+           dt_update->dur_buf.ub_req->ourq_count == 0)
                GOTO(out, rc);
-       }
 
        if (is_only_remote_trans(th) && !th->th_sync) {
                struct osp_device *osp = dt2osp_dev(th->th_dev);
@@ -879,24 +984,29 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
 out:
        /* If RPC is triggered successfully, dt_update will be freed in
         * osp_update_interpreter() */
-       if (rc != 0 && dt_update != NULL && sent == 0) {
+       if (sent == 0) {
                struct osp_update_callback *ouc;
                struct osp_update_callback *next;
 
-               list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
-                                ouc_list) {
-                       list_del_init(&ouc->ouc_list);
-                       if (ouc->ouc_interpreter != NULL)
-                               ouc->ouc_interpreter(env, NULL, NULL,
-                                                    ouc->ouc_obj,
-                                                    ouc->ouc_data, 0, rc);
-                       osp_update_callback_fini(env, ouc);
+               if (dt_update != NULL) {
+                       list_for_each_entry_safe(ouc, next,
+                                                &dt_update->dur_cb_items,
+                                                ouc_list) {
+                               list_del_init(&ouc->ouc_list);
+                               if (ouc->ouc_interpreter != NULL)
+                                       ouc->ouc_interpreter(env, NULL, NULL,
+                                                          ouc->ouc_obj,
+                                                          ouc->ouc_data, 0,
+                                                          rc);
+                               osp_update_callback_fini(env, ouc);
+                       }
                }
-
+               osp_trans_commit_cb(oth, rc);
                dt_update_request_destroy(dt_update);
+               oth->ot_dur = NULL;
        }
 
-       OBD_FREE_PTR(oth);
+       osp_thandle_put(oth);
 
        RETURN(rc);
 }
index 7e85579..6ee0dc4 100644 (file)
@@ -532,6 +532,8 @@ void lustre_assert_wire_constants(void)
                        (long long)FID_SEQ_LAYOUT_RBTREE);
        LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_UPDATE_LOG);
+       LASSERTF(FID_SEQ_UPDATE_LOG_DIR == 0x000000020000000aULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_UPDATE_LOG_DIR);
        LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_NORMAL);
        LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n",
index 919fa1e..3b52e33 100644 (file)
@@ -225,6 +225,8 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
 
 void update_records_dump(const struct update_records *records,
                         unsigned int mask, bool dump_updates);
+int check_and_prepare_update_record(const struct lu_env *env,
+                                   struct thandle_update_records *tur);
 
 struct update_thread_info {
        struct lu_attr                  uti_attr;
index fd1fce7..632e071 100644 (file)
@@ -870,31 +870,11 @@ EXPORT_SYMBOL(tur_update_params_extend);
  * \retval             negative errno if updates recording fails.
  */
 int check_and_prepare_update_record(const struct lu_env *env,
-                                   struct thandle *th)
+                                   struct thandle_update_records *tur)
 {
-       struct thandle_update_records   *tur;
        struct llog_update_record       *lur;
-       struct top_thandle              *top_th;
-       struct sub_thandle              *lst;
-       int                             rc;
-       bool                            record_update = false;
-       ENTRY;
-
-       top_th = container_of(th, struct top_thandle, tt_super);
-       /* Check if it needs to record updates for this transaction */
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               if (lst->st_record_update) {
-                       record_update = true;
-                       break;
-               }
-       }
-       if (!record_update)
-               RETURN(0);
-
-       if (top_th->tt_update_records != NULL)
-               RETURN(0);
+       int rc;
 
-       tur = &update_env_info(env)->uti_tur;
        if (tur->tur_update_records == NULL) {
                rc = tur_update_records_create(tur);
                if (rc < 0)
@@ -917,61 +897,8 @@ int check_and_prepare_update_record(const struct lu_env *env,
 
        tur->tur_update_param_count = 0;
 
-       top_th->tt_update_records = tur;
-
        RETURN(0);
 }
-EXPORT_SYMBOL(check_and_prepare_update_record);
-
-/**
- * Merge params into the update records
- *
- * Merge params and ops into the update records attached to top th.
- * During transaction execution phase, parameters and update ops
- * are collected in two different buffers (see lod_updates_pack()),
- * then in transaction stop, it needs to be merged them in one
- * buffer, then being written into the update log.
- *
- * \param[in] env      execution environment
- * \param[in] tur      thandle update records whose updates and
- *                      parameters are merged
- *
- * \retval             0 if merging succeeds.
- * \retval             negaitive errno if merging fails.
- */
-int merge_params_updates_buf(const struct lu_env *env,
-                            struct thandle_update_records *tur)
-{
-       struct llog_update_record *lur;
-       struct update_params *params;
-       size_t params_size;
-       size_t record_size;
-
-       if (tur->tur_update_records == NULL ||
-           tur->tur_update_params == NULL)
-               return 0;
-
-       lur = tur->tur_update_records;
-       /* Extends the update records buffer if needed */
-       params_size = update_params_size(tur->tur_update_params,
-                                        tur->tur_update_param_count);
-       record_size = llog_update_record_size(lur);
-       if (cfs_size_round(record_size + params_size) >
-                               tur->tur_update_records_buf_size) {
-               int rc;
-
-               rc = tur_update_records_extend(tur, record_size + params_size);
-               if (rc < 0)
-                       return rc;
-               lur = tur->tur_update_records;
-       }
-
-       params = update_records_get_params(&lur->lur_update_rec);
-       memcpy(params, tur->tur_update_params, params_size);
-       lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
-       return 0;
-}
-EXPORT_SYMBOL(merge_params_updates_buf);
 
 static void update_key_fini(const struct lu_context *ctx,
                            struct lu_context_key *key, void *data)
index f56aa7c..7ff50cf 100644 (file)
@@ -49,6 +49,7 @@
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
+#include <linux/kthread.h>
 #include <lu_target.h>
 #include <lustre_log.h>
 #include <lustre_update.h>
 #include <obd_class.h>
 #include <tgt_internal.h>
 
+#include <tgt_internal.h>
+/**
+ * Dump top mulitple thandle
+ *
+ * Dump top multiple thandle and all of its sub thandle to the debug log.
+ *
+ * \param[in]mask      debug mask
+ * \param[in]top_th    top_thandle to be dumped
+ */
+static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
+                                     __u32 mask)
+{
+       struct sub_thandle      *st;
+
+       LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
+       CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d"
+              "batchid "LPU64"\n",
+              tmt->tmt_master_sub_dt ?
+              tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name :
+              "NULL",
+              tmt, atomic_read(&tmt->tmt_refcount), tmt->tmt_committed,
+              tmt->tmt_result, tmt->tmt_batchid);
+
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               CDEBUG(mask, "st %p obd %s committed %d sub_th %p "
+                      " cookie "DOSTID": %u\n",
+                      st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
+                      st->st_committed, st->st_sub_th,
+                      POSTID(&st->st_cookie.lgc_lgl.lgl_oi),
+                      st->st_cookie.lgc_index);
+       }
+}
+
 /**
  * Declare write update to sub device
  *
  *
  * \param[in] env      execution environment
  * \param[in] record   update records being written
- * \param[in] lst      sub transaction handle
+ * \param[in] sub_th   sub transaction handle
  *
  * \retval             0 if writing succeeds
  * \retval             negative errno if writing fails
  */
 static int sub_declare_updates_write(const struct lu_env *env,
                                     struct llog_update_record *record,
-                                    struct sub_thandle *lst)
+                                    struct thandle *sub_th)
 {
        struct llog_ctxt        *ctxt;
-       struct dt_device        *dt = lst->st_sub_th->th_dev;
+       struct dt_device        *dt = sub_th->th_dev;
        int rc;
 
        /* If ctxt is NULL, it means not need to write update,
@@ -90,7 +124,7 @@ static int sub_declare_updates_write(const struct lu_env *env,
        }
 
        rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr,
-                             lst->st_sub_th);
+                             sub_th);
 
        llog_ctxt_put(ctxt);
 
@@ -100,23 +134,27 @@ static int sub_declare_updates_write(const struct lu_env *env,
 /**
  * write update to sub device
  *
- * Write updates llog records to the sub device during distribute
- * transaction.
+ * Write llog update record to the sub device during distribute
+ * transaction. If it succeeds, llog cookie of the record will be
+ * returned by @cookie.
  *
  * \param[in] env      execution environment
  * \param[in] record   update records being written
- * \param[in] lst      sub transaction handle
+ * \param[in] sub_th   sub transaction handle
+ * \param[out] cookie  llog cookie of the update record.
  *
  * \retval             1 if writing succeeds
  * \retval             negative errno if writing fails
  */
 static int sub_updates_write(const struct lu_env *env,
                             struct llog_update_record *record,
-                            struct sub_thandle *lst)
+                            struct thandle *sub_th,
+                            struct llog_cookie *cookie)
 {
+       struct dt_device        *dt = sub_th->th_dev;
        struct llog_ctxt        *ctxt;
-       struct dt_device        *dt = lst->st_sub_th->th_dev;
        int                     rc;
+       ENTRY;
 
        ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
                                LLOG_UPDATELOG_ORIG_CTXT);
@@ -126,72 +164,229 @@ static int sub_updates_write(const struct lu_env *env,
         * in error handler path */
        if (ctxt->loc_handle == NULL) {
                llog_ctxt_put(ctxt);
-               return 0;
+               RETURN(0);
        }
 
+       /* Since the cross-MDT updates will includes both local
+        * and remote updates, the update ops count must > 1 */
+       LASSERT(record->lur_update_rec.ur_update_count > 1);
        LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record),
                 "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
                 llog_update_record_size(record));
 
        rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
-                     NULL, lst->st_sub_th);
-
+                     cookie, sub_th);
        llog_ctxt_put(ctxt);
 
-       return rc;
+       CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u.\n",
+              dt->dd_lu_dev.ld_obd->obd_name,
+              POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index);
+
+       if (rc > 0)
+               rc = 0;
+
+       RETURN(rc);
 }
 
 /**
- * write update transaction
+ * Prepare the update records.
  *
- * Check if there are updates being recorded in this transaction,
- * it will write the record into the disk.
+ * Merge params and ops into the update records, then initializing
+ * the update buffer.
+ *
+ * During transaction execution phase, parameters and update ops
+ * are collected in two different buffers (see lod_updates_pack()),
+ * during transaction stop, it needs to be merged in one buffer,
+ * so it will be written in the update log.
  *
  * \param[in] env      execution environment
- * \param[in] top_th   top transaction handle
+ * \param[in] tmt      top_multiple_thandle for distribute txn
  *
- * \retval             0 if writing succeeds
- * \retval             negative errno if writing fails
+ * \retval             0 if merging succeeds.
+ * \retval             negaitive errno if merging fails.
  */
-static int top_updates_write(const struct lu_env *env,
-                            struct top_thandle *top_th)
+static int prepare_writing_updates(const struct lu_env *env,
+                                  struct top_multiple_thandle *tmt)
 {
-       struct thandle_update_records *tur;
-       struct llog_update_record *lur;
-       struct sub_thandle      *lst;
-       int                     rc;
-       ENTRY;
+       struct thandle_update_records   *tur = tmt->tmt_update_records;
+       struct llog_update_record       *lur;
+       struct update_params *params;
+       size_t params_size;
+       size_t update_size;
 
-       if (top_th->tt_update_records == NULL)
-               RETURN(0);
+       if (tur == NULL || tur->tur_update_records == NULL ||
+           tur->tur_update_params == NULL)
+               return 0;
 
-       tur = top_th->tt_update_records;
+       lur = tur->tur_update_records;
+       /* Extends the update records buffer if needed */
+       params_size = update_params_size(tur->tur_update_params,
+                                        tur->tur_update_param_count);
+       LASSERT(lur->lur_update_rec.ur_param_count == 0);
+       update_size = llog_update_record_size(lur);
+       if (cfs_size_round(update_size + params_size) >
+           tur->tur_update_records_buf_size) {
+               int rc;
 
-       /* merge the parameters and updates into one buffer */
-       rc = merge_params_updates_buf(env, tur);
-       if (rc < 0)
-               RETURN(rc);
+               rc = tur_update_records_extend(tur,
+                       cfs_size_round(update_size + params_size));
+               if (rc < 0)
+                       return rc;
 
-       lur = tur->tur_update_records;
-       /* Dump updates to debug log */
-       update_records_dump(&lur->lur_update_rec, D_INFO, true);
+               lur = tur->tur_update_records;
+       }
+
+       params = update_records_get_params(&lur->lur_update_rec);
+       memcpy(params, tur->tur_update_params, params_size);
 
+       lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
+       lur->lur_update_rec.ur_batchid = tmt->tmt_batchid;
        /* Init update record header */
        lur->lur_hdr.lrh_len = llog_update_record_size(lur);
        lur->lur_hdr.lrh_type = UPDATE_REC;
 
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               if (!lst->st_record_update)
-                       continue;
-               rc = sub_updates_write(env, lur, lst);
-               if (rc < 0)
-                       break;
+       /* Dump updates for debugging purpose */
+       update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+       return 0;
+}
+
+static inline int
+distribute_txn_commit_thread_running(struct lu_target *lut)
+{
+       return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING;
+}
+
+static inline int
+distribute_txn_commit_thread_stopped(struct lu_target *lut)
+{
+       return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED;
+}
+
+/**
+ * Top thandle commit callback
+ *
+ * This callback will be called when all of sub transactions are committed.
+ *
+ * \param[in] th       top thandle to be committed.
+ */
+static void top_trans_committed_cb(struct top_multiple_thandle *tmt)
+{
+       struct lu_target *lut;
+       ENTRY;
+
+       LASSERT(atomic_read(&tmt->tmt_refcount) > 0);
+
+       top_multiple_thandle_dump(tmt, D_HA);
+       tmt->tmt_committed = 1;
+       lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt;
+       if (distribute_txn_commit_thread_running(lut))
+               wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq);
+       RETURN_EXIT;
+}
+
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+                                      struct dt_device *dt_dev)
+{
+       struct sub_thandle *st;
+
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st->st_dt == dt_dev)
+                       return st;
        }
+       return NULL;
+}
+EXPORT_SYMBOL(lookup_sub_thandle);
 
-       if (rc > 0)
-               rc = 0;
+struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt,
+                                      struct dt_device *dt_dev)
+{
+       struct sub_thandle *st;
 
-       RETURN(rc);
+       OBD_ALLOC_PTR(st);
+       if (st == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       INIT_LIST_HEAD(&st->st_sub_list);
+       st->st_dt = dt_dev;
+
+       list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list);
+       return st;
+}
+
+/**
+ * Create sub thandle
+ *
+ * Create transaction handle for sub_thandle
+ *
+ * \param[in] env      execution environment
+ * \param[in] th       top thandle
+ * \param[in] st       sub_thandle
+ *
+ * \retval             0 if creation succeeds.
+ * \retval             negative errno if creation fails.
+ */
+int sub_thandle_trans_create(const struct lu_env *env,
+                            struct top_thandle *top_th,
+                            struct sub_thandle *st)
+{
+       struct thandle *sub_th;
+
+       sub_th = dt_trans_create(env, st->st_dt);
+       if (IS_ERR(sub_th))
+               return PTR_ERR(sub_th);
+
+       sub_th->th_top = &top_th->tt_super;
+       st->st_sub_th = sub_th;
+       return 0;
+}
+
+/**
+ * sub thandle commit callback
+ *
+ * Mark the sub thandle to be committed and if all sub thandle are committed
+ * notify the top thandle.
+ *
+ * \param[in] sub_th   sub thandle being committed.
+ */
+static void sub_trans_commit_cb(struct lu_env *env, struct thandle *sub_th,
+                               struct dt_txn_commit_cb *cb, int err)
+{
+       struct sub_thandle      *st;
+       struct top_multiple_thandle *tmt = cb->dcb_data;
+       bool                    all_committed = true;
+       ENTRY;
+
+       /* Check if all sub thandles are committed */
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st->st_sub_th == sub_th) {
+                       st->st_committed = 1;
+                       st->st_result = err;
+               }
+               if (!st->st_committed)
+                       all_committed = false;
+       }
+
+       if (tmt->tmt_result == 0)
+               tmt->tmt_result = err;
+
+       if (all_committed)
+               top_trans_committed_cb(tmt);
+
+       top_multiple_thandle_dump(tmt, D_INFO);
+       top_multiple_thandle_put(tmt);
+       RETURN_EXIT;
+}
+
+static void sub_thandle_register_commit_cb(struct sub_thandle *st,
+                                   struct top_multiple_thandle *tmt)
+{
+       LASSERT(st->st_sub_th != NULL);
+       top_multiple_thandle_get(tmt);
+       st->st_commit_dcb.dcb_func = sub_trans_commit_cb;
+       st->st_commit_dcb.dcb_data = tmt;
+       INIT_LIST_HEAD(&st->st_commit_dcb.dcb_linkage);
+       dt_trans_cb_add(st->st_sub_th, &st->st_commit_dcb);
 }
 
 /**
@@ -216,25 +411,162 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev)
        if (top_th == NULL)
                return ERR_PTR(-ENOMEM);
 
-       child_th = dt_trans_create(env, master_dev);
-       if (IS_ERR(child_th)) {
-               OBD_FREE_PTR(top_th);
-               return child_th;
-       }
+       top_th->tt_super.th_top = &top_th->tt_super;
 
-       top_th->tt_magic = TOP_THANDLE_MAGIC;
-       top_th->tt_master_sub_thandle = child_th;
-       child_th->th_top = &top_th->tt_super;
+       if (master_dev != NULL) {
+               child_th = dt_trans_create(env, master_dev);
+               if (IS_ERR(child_th)) {
+                       OBD_FREE_PTR(top_th);
+                       return child_th;
+               }
 
-       top_th->tt_update_records = NULL;
-       top_th->tt_super.th_top = &top_th->tt_super;
-       INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
+               child_th->th_top = &top_th->tt_super;
+               child_th->th_wait_submit = 1;
+               top_th->tt_master_sub_thandle = child_th;
 
+               top_th->tt_super.th_tags |= child_th->th_tags;
+       }
        return &top_th->tt_super;
 }
 EXPORT_SYMBOL(top_trans_create);
 
 /**
+ * Declare write update transaction
+ *
+ * Check if there are updates being recorded in this transaction,
+ * it will write the record into the disk.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tmt      top multiple transaction handle
+ *
+ * \retval             0 if writing succeeds
+ * \retval             negative errno if writing fails
+ */
+static int declare_updates_write(const struct lu_env *env,
+                                struct top_multiple_thandle *tmt)
+{
+       struct llog_update_record *record;
+       struct sub_thandle *st;
+       int rc = 0;
+
+       record = tmt->tmt_update_records->tur_update_records;
+       /* Declare update write for all other target */
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st->st_sub_th == NULL)
+                       continue;
+
+               rc = sub_declare_updates_write(env, record, st->st_sub_th);
+               if (rc < 0)
+                       break;
+       }
+
+       return rc;
+}
+
+/**
+ * Assign batchid to the distribute transaction.
+ *
+ * Assign batchid to the distribute transaction
+ *
+ * \param[in] tmt      distribute transaction
+ *
+ */
+static void distribute_txn_assign_batchid(struct top_multiple_thandle *new)
+{
+       struct target_distribute_txn_data *tdtd;
+       struct dt_device *dt = new->tmt_master_sub_dt;
+
+       LASSERT(dt != NULL);
+       tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd;
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       new->tmt_batchid = tdtd->tdtd_batchid++;
+       list_add_tail(&new->tmt_commit_list, &tdtd->tdtd_list);
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+       top_multiple_thandle_get(new);
+       top_multiple_thandle_dump(new, D_INFO);
+}
+
+/**
+ * Insert distribute transaction to the distribute txn list.
+ *
+ * Insert distribute transaction to the distribute txn list.
+ *
+ * \param[in] new      the distribute txn to be inserted.
+ */
+void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new)
+{
+       struct dt_device *dt = new->tmt_master_sub_dt;
+       struct top_multiple_thandle *tmt;
+       struct target_distribute_txn_data *tdtd;
+       bool    at_head = false;
+
+       LASSERT(dt != NULL);
+       tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd;
+
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       list_for_each_entry_reverse(tmt, &tdtd->tdtd_list, tmt_commit_list) {
+               if (new->tmt_batchid > tmt->tmt_batchid) {
+                       list_add(&new->tmt_commit_list, &tmt->tmt_commit_list);
+                       break;
+               }
+       }
+       if (list_empty(&new->tmt_commit_list)) {
+               at_head = true;
+               list_add(&new->tmt_commit_list, &tdtd->tdtd_list);
+       }
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+       top_multiple_thandle_get(new);
+       top_multiple_thandle_dump(new, D_INFO);
+       if (new->tmt_committed && at_head)
+               wake_up(&tdtd->tdtd_commit_thread_waitq);
+}
+
+/**
+ * Prepare cross-MDT operation.
+ *
+ * Create the update record buffer to record updates for cross-MDT operation,
+ * add master sub transaction to tt_sub_trans_list, and declare the update
+ * writes.
+ *
+ * During updates packing, all of parameters will be packed in
+ * tur_update_params, and updates will be packed in tur_update_records.
+ * Then in transaction stop, parameters and updates will be merged
+ * into one updates buffer.
+ *
+ * And also master thandle will be added to the sub_th list, so it will be
+ * easy to track the commit status.
+ *
+ * \param[in] env      execution environment
+ * \param[in] th       top transaction handle
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+static int prepare_multiple_node_trans(const struct lu_env *env,
+                                      struct top_multiple_thandle *tmt)
+{
+       struct thandle_update_records   *tur;
+       int                             rc;
+       ENTRY;
+
+       /* Prepare the update buffer for recording updates */
+       if (tmt->tmt_update_records != NULL)
+               RETURN(0);
+
+       tur = &update_env_info(env)->uti_tur;
+       rc = check_and_prepare_update_record(env, tur);
+       if (rc < 0)
+               RETURN(rc);
+
+       tmt->tmt_update_records = tur;
+
+       distribute_txn_assign_batchid(tmt);
+       rc = declare_updates_write(env, tmt);
+
+       RETURN(rc);
+}
+
+/**
  * start the top transaction.
  *
  * Start all of its sub transactions, then start master sub transaction.
@@ -251,43 +583,81 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
 {
        struct top_thandle      *top_th = container_of(th, struct top_thandle,
                                                       tt_super);
-       struct sub_thandle      *lst;
-       int                     rc;
+       struct sub_thandle              *st;
+       struct top_multiple_thandle     *tmt = top_th->tt_multiple_thandle;
+       int                             rc = 0;
+       ENTRY;
 
-       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
-       rc = check_and_prepare_update_record(env, th);
+       /* Walk through all of sub transaction to see if it needs to
+        * record updates for this transaction */
+       if (tmt == NULL) {
+               rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev,
+                                   top_th->tt_master_sub_thandle);
+               RETURN(rc);
+       }
+
+       rc = prepare_multiple_node_trans(env, tmt);
        if (rc < 0)
-               return rc;
-       /* Check if needs to write updates */
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               struct llog_update_record *record;
+               RETURN(rc);
 
-               if (!lst->st_record_update)
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st->st_sub_th == NULL)
                        continue;
-
-               record = top_th->tt_update_records->tur_update_records;
-               rc = sub_declare_updates_write(env, record, lst);
+               st->st_sub_th->th_sync = th->th_sync;
+               st->st_sub_th->th_local = th->th_local;
+               st->st_sub_th->th_tags = th->th_tags;
+               rc = dt_trans_start(env, st->st_sub_th->th_dev,
+                                   st->st_sub_th);
                if (rc != 0)
-                       return rc;
-       }
+                       RETURN(rc);
 
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               lst->st_sub_th->th_sync = th->th_sync;
-               lst->st_sub_th->th_local = th->th_local;
-               rc = dt_trans_start(env, lst->st_sub_th->th_dev,
-                                   lst->st_sub_th);
-               if (rc != 0)
-                       return rc;
+               sub_thandle_register_commit_cb(st, tmt);
        }
 
-       top_th->tt_master_sub_thandle->th_local = th->th_local;
-       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
-
-       return dt_trans_start(env, master_dev, top_th->tt_master_sub_thandle);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(top_trans_start);
 
 /**
+ * Check whether we need write updates record
+ *
+ * Check if the updates for the top_thandle needs to be writen
+ * to all targets. Only if the transaction succeeds and the updates
+ * number > 2, it will write the updates,
+ *
+ * \params [in] top_th top thandle.
+ *
+ * \retval             true if it needs to write updates
+ * \retval             false if it does not need to write updates
+ **/
+static bool top_check_write_updates(struct top_thandle *top_th)
+{
+       struct top_multiple_thandle     *tmt;
+       struct thandle_update_records   *tur;
+
+       /* Do not write updates to records if the transaction fails */
+       if (top_th->tt_super.th_result != 0)
+               return false;
+
+       tmt = top_th->tt_multiple_thandle;
+       if (tmt == NULL)
+               return false;
+
+       tur = tmt->tmt_update_records;
+       if (tur == NULL)
+               return false;
+
+       /* Hmm, false update records, since the cross-MDT operation
+        * should includes both local and remote updates, so the
+        * updates count should >= 2 */
+       if (tur->tur_update_records == NULL ||
+           tur->tur_update_records->lur_update_rec.ur_update_count <= 1)
+               return false;
+
+       return true;
+}
+
+/**
  * Stop the top transaction.
  *
  * Stop the transaction on the master device first, then stop transactions
@@ -305,55 +675,198 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
 {
        struct top_thandle      *top_th = container_of(th, struct top_thandle,
                                                       tt_super);
-       struct thandle_update_records *tur = top_th->tt_update_records;
-       struct sub_thandle      *lst;
-       int                     rc;
+       struct sub_thandle              *st;
+       struct sub_thandle              *master_st;
+       struct top_multiple_thandle     *tmt;
+       struct thandle_update_records   *tur;
+       bool                            write_updates = false;
+       int                     rc = 0;
        ENTRY;
 
-       /* Note: we always need walk through all of sub_transaction to do
-        * transaction stop to release the resource here */
-       if (tur != NULL && th->th_result == 0) {
-               rc = top_updates_write(env, top_th);
+       if (likely(top_th->tt_multiple_thandle == NULL)) {
+               rc = dt_trans_stop(env, master_dev,
+                                  top_th->tt_master_sub_thandle);
+               OBD_FREE_PTR(top_th);
+               RETURN(rc);
+       }
+
+       tmt = top_th->tt_multiple_thandle;
+       tur = tmt->tmt_update_records;
+
+       /* Note: we need stop the master thandle first, then the stop
+        * callback will fill the master transno in the update logs,
+        * then these update logs will be sent to other MDTs */
+       /* get the master sub thandle */
+       master_st = lookup_sub_thandle(tmt, tmt->tmt_master_sub_dt);
+       if (master_st == NULL) {
+               top_multiple_thandle_dump(tmt, D_ERROR);
+               if (th->th_result == 0)
+                       LBUG();
+       }
+
+       write_updates = top_check_write_updates(top_th);
+
+       /* Step 1: write the updates log on Master MDT */
+       if (master_st != NULL && master_st->st_sub_th != NULL &&
+           write_updates) {
+               struct llog_update_record *lur;
+
+               /* Merge the parameters and updates into one buffer */
+               rc = prepare_writing_updates(env, tmt);
                if (rc < 0) {
-                       CERROR("%s: cannot write updates: rc = %d\n",
+                       CERROR("%s: cannot prepare updates: rc = %d\n",
                               master_dev->dd_lu_dev.ld_obd->obd_name, rc);
-                       /* Still need call dt_trans_stop to release resources
-                        * holding by the transaction */
+                       th->th_result = rc;
+                       GOTO(stop_master_trans, rc);
+               }
+
+               lur = tur->tur_update_records;
+               /* Write updates to the master MDT */
+               rc = sub_updates_write(env, lur, top_th->tt_master_sub_thandle,
+                                      &master_st->st_cookie);
+
+               /* Cleanup the common parameters in the update records,
+                * master transno callback might add more parameters.
+                * and we need merge the update records again in the
+                * following */
+               if (tur->tur_update_params != NULL)
+                       lur->lur_update_rec.ur_param_count = 0;
+
+               if (rc < 0) {
+                       CERROR("%s: write updates failed: rc = %d\n",
+                              master_dev->dd_lu_dev.ld_obd->obd_name, rc);
+                       th->th_result = rc;
+                       GOTO(stop_master_trans, rc);
                }
-               top_th->tt_update_records = NULL;
        }
 
-       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
+stop_master_trans:
+       /* Step 2: Stop the transaction on the master MDT, and fill the
+        * master transno in the update logs to other MDT. */
+       if (master_st != NULL && master_st->st_sub_th != NULL) {
+               master_st->st_sub_th->th_local = th->th_local;
+               master_st->st_sub_th->th_sync = th->th_sync;
+               master_st->st_sub_th->th_tags = th->th_tags;
+               master_st->st_sub_th->th_result = th->th_result;
+               rc = dt_trans_stop(env, master_st->st_dt, master_st->st_sub_th);
+               if (rc < 0) {
+                       th->th_result = rc;
+                       GOTO(stop_other_trans, rc);
+               } else if (tur != NULL && tur->tur_update_records != NULL) {
+                       struct llog_update_record *lur;
+
+                       lur = tur->tur_update_records;
+                       if (lur->lur_update_rec.ur_master_transno == 0)
+                               /* Update master transno after master stop
+                                * callback */
+                               lur->lur_update_rec.ur_master_transno =
+                                               tgt_th_info(env)->tti_transno;
+               }
+       }
 
-       top_th->tt_master_sub_thandle->th_local = th->th_local;
-       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
-       top_th->tt_master_sub_thandle->th_result = th->th_result;
-       /* To avoid sending RPC while holding thandle, it always stop local
-        * transaction first, then other sub thandle */
-       rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle);
+       /* Step 3: write updates to other MDTs */
+       if (write_updates) {
+               struct llog_update_record *lur;
 
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               int     rc2;
+               /* Stop callback of master will add more updates and also update
+                * master transno, so merge the parameters and updates into one
+                * buffer again */
+               rc = prepare_writing_updates(env, tmt);
+               if (rc < 0) {
+                       CERROR("%s: prepare updates failed: rc = %d\n",
+                              master_dev->dd_lu_dev.ld_obd->obd_name, rc);
+                       th->th_result = rc;
+                       GOTO(stop_other_trans, rc);
+               }
+               lur = tur->tur_update_records;
+               list_for_each_entry(st, &tmt->tmt_sub_thandle_list,
+                                   st_sub_list) {
+                       if (st->st_sub_th == NULL || st == master_st ||
+                           st->st_sub_th->th_result < 0)
+                               continue;
 
-               if (rc != 0)
-                       lst->st_sub_th->th_result = rc;
-               else
-                       lst->st_sub_th->th_result = th->th_result;
-               lst->st_sub_th->th_sync = th->th_sync;
-               lst->st_sub_th->th_local = th->th_local;
-               rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev,
-                                   lst->st_sub_th);
-               if (unlikely(rc2 < 0 && rc == 0))
-                       rc = rc2;
+                       rc = sub_updates_write(env, lur, st->st_sub_th,
+                                              &st->st_cookie);
+                       if (rc < 0)
+                               break;
+               }
        }
 
-       top_thandle_destroy(top_th);
+stop_other_trans:
+       /* Step 4: Stop the transaction on other MDTs */
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               if (st == master_st || st->st_sub_th == NULL)
+                       continue;
+
+               st->st_sub_th->th_sync = th->th_sync;
+               st->st_sub_th->th_local = th->th_local;
+               st->st_sub_th->th_tags = th->th_tags;
+               st->st_sub_th->th_result = th->th_result;
+               rc = dt_trans_stop(env, st->st_sub_th->th_dev,
+                                  st->st_sub_th);
+               if (unlikely(rc < 0 && th->th_result == 0))
+                       th->th_result = rc;
+       }
 
+       tmt->tmt_result = rc;
+       /* Balance for the refcount in top_trans_create, Note: if it is NOT
+        * multiple node transaction, the top transaction will be destroyed. */
+       top_multiple_thandle_put(tmt);
+       OBD_FREE_PTR(top_th);
        RETURN(rc);
 }
 EXPORT_SYMBOL(top_trans_stop);
 
 /**
+ * Create top_multiple_thandle for top_thandle
+ *
+ * Create top_mutilple_thandle to manage the mutiple node transaction
+ * for top_thandle, and it also needs to add master sub thandle to the
+ * sub trans list now.
+ *
+ * \param[in] env      execution environment
+ * \param[in] top_th   the top thandle
+ *
+ * \retval     0 if creation succeeds
+ * \retval     negative errno if creation fails
+ */
+static int top_trans_create_tmt(const struct lu_env *env,
+                               struct top_thandle *top_th)
+{
+       struct top_multiple_thandle *tmt;
+
+       OBD_ALLOC_PTR(tmt);
+       if (tmt == NULL)
+               return -ENOMEM;
+
+       tmt->tmt_magic = TOP_THANDLE_MAGIC;
+       INIT_LIST_HEAD(&tmt->tmt_sub_thandle_list);
+       INIT_LIST_HEAD(&tmt->tmt_commit_list);
+       atomic_set(&tmt->tmt_refcount, 1);
+
+       top_th->tt_multiple_thandle = tmt;
+
+       return 0;
+}
+
+static struct sub_thandle *
+create_sub_thandle_with_thandle(struct top_thandle *top_th,
+                               struct thandle *sub_th)
+{
+       struct sub_thandle *st;
+
+       /* create and init sub th to the top trans list */
+       st = create_sub_thandle(top_th->tt_multiple_thandle,
+                               sub_th->th_dev);
+       if (IS_ERR(st))
+               return st;
+
+       st->st_sub_th = sub_th;
+       sub_th->th_top = &top_th->tt_super;
+       return st;
+}
+
+/**
  * Get sub thandle.
  *
  * Get sub thandle from the top thandle according to the sub dt_device.
@@ -369,66 +882,568 @@ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
                                      struct thandle *th,
                                      struct dt_device *sub_dt)
 {
-       struct sub_thandle      *lst;
+       struct sub_thandle      *st = NULL;
        struct top_thandle      *top_th;
-       struct thandle          *sub_th;
+       struct thandle          *sub_th = NULL;
+       int                     rc = 0;
        ENTRY;
 
        top_th = container_of(th, struct top_thandle, tt_super);
-       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
-       LASSERT(top_th->tt_master_sub_thandle != NULL);
+
        if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev))
                RETURN(top_th->tt_master_sub_thandle);
 
-       /* Find or create the transaction in tt_trans_list, since there is
-        * always only one thread access the list, so no need lock here */
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               if (lst->st_sub_th->th_dev == sub_dt)
-                       RETURN(lst->st_sub_th);
+       if (top_th->tt_multiple_thandle != NULL) {
+               st = lookup_sub_thandle(top_th->tt_multiple_thandle, sub_dt);
+               if (st != NULL)
+                       RETURN(st->st_sub_th);
        }
 
        sub_th = dt_trans_create(env, sub_dt);
        if (IS_ERR(sub_th))
                RETURN(sub_th);
 
-       /* XXX all of mixed transaction (see struct th_handle) will
-        * be synchronized until async update is done */
-       th->th_sync = 1;
+       /* Create top_multiple_thandle if necessary */
+       if (top_th->tt_multiple_thandle == NULL) {
+               struct top_multiple_thandle *tmt;
 
-       sub_th->th_top = th;
-       OBD_ALLOC_PTR(lst);
-       if (lst == NULL) {
-               dt_trans_stop(env, sub_dt, sub_th);
-               RETURN(ERR_PTR(-ENOMEM));
+               rc = top_trans_create_tmt(env, top_th);
+               if (rc < 0)
+                       GOTO(stop_trans, rc);
+
+               tmt = top_th->tt_multiple_thandle;
+
+               /* Add master sub th to the top trans list */
+               tmt->tmt_master_sub_dt =
+                       top_th->tt_master_sub_thandle->th_dev;
+               st = create_sub_thandle_with_thandle(top_th,
+                               top_th->tt_master_sub_thandle);
+               if (IS_ERR(st))
+                       GOTO(stop_trans, rc = PTR_ERR(st));
+               top_th->tt_master_sub_thandle->th_sync = 1;
+               top_th->tt_super.th_sync = 1;
        }
 
-       INIT_LIST_HEAD(&lst->st_sub_list);
-       lst->st_sub_th = sub_th;
-       list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list);
-       lst->st_record_update = 1;
+       /* create and init sub th to the top trans list */
+       st = create_sub_thandle_with_thandle(top_th, sub_th);
+       st->st_sub_th->th_wait_submit = 1;
+stop_trans:
+       if (rc < 0) {
+               if (st != NULL)
+                       OBD_FREE_PTR(st);
+               sub_th->th_result = rc;
+               dt_trans_stop(env, sub_dt, sub_th);
+               sub_th = ERR_PTR(rc);
+       }
 
        RETURN(sub_th);
 }
 EXPORT_SYMBOL(thandle_get_sub_by_dt);
 
 /**
- * Top thandle destroy
+ * Top multiple thandle destroy
  *
- * Destroy the top thandle and all of its sub thandle.
+ * Destroy multiple thandle and all its sub thandle.
  *
- * \param[in] top_th   top thandle to be destroyed.
+ * \param[in] tmt      top_multiple_thandle to be destroyed.
  */
-void top_thandle_destroy(struct top_thandle *top_th)
+void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt)
 {
        struct sub_thandle *st;
        struct sub_thandle *tmp;
 
-       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
-       list_for_each_entry_safe(st, tmp, &top_th->tt_sub_thandle_list,
+       LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
+       list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list,
                                 st_sub_list) {
                list_del(&st->st_sub_list);
                OBD_FREE_PTR(st);
        }
-       OBD_FREE_PTR(top_th);
+       OBD_FREE_PTR(tmt);
+}
+EXPORT_SYMBOL(top_multiple_thandle_destroy);
+
+/**
+ * Cancel the update log on MDTs
+ *
+ * Cancel the update log on MDTs then destroy the thandle.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tmt      the top multiple thandle whose updates records
+ *                      will be cancelled.
+ *
+ * \retval             0 if cancellation succeeds.
+ * \retval             negative errno if cancellation fails.
+ */
+static int distribute_txn_cancel_records(const struct lu_env *env,
+                                        struct top_multiple_thandle *tmt)
+{
+       struct sub_thandle *st;
+       ENTRY;
+
+       top_multiple_thandle_dump(tmt, D_INFO);
+       /* Cancel update logs on other MDTs */
+       list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+               struct llog_ctxt        *ctxt;
+               struct obd_device       *obd;
+               struct llog_cookie      *cookie;
+               int rc;
+
+               cookie = &st->st_cookie;
+               if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
+                       continue;
+
+               obd = st->st_dt->dd_lu_dev.ld_obd;
+               ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+               LASSERT(ctxt);
+
+               rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
+                                            cookie);
+
+               llog_ctxt_put(ctxt);
+               CDEBUG(D_HA, "%s: batchid %llu cancel update log "DOSTID
+                      ".%u : rc = %d\n", obd->obd_name, tmt->tmt_batchid,
+                      POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index, rc);
+       }
+
+       RETURN(0);
+}
+
+/**
+ * Check if there are committed transaction
+ *
+ * Check if there are committed transaction in the distribute transaction
+ * list, then cancel the update records for those committed transaction.
+ * Because the distribute transaction in the list are sorted by batchid,
+ * and cancellation will be done by batchid order, so we only check the first
+ * the transaction(with lowest batchid) in the list.
+ *
+ * \param[in] lod      lod device where cancel thread is
+ *
+ * \retval             true if it is ready
+ * \retval             false if it is not ready
+ */
+static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd)
+{
+       struct top_multiple_thandle     *tmt = NULL;
+       struct obd_device               *obd = tdtd->tdtd_lut->lut_obd;
+       bool    ready = false;
+
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       if (!list_empty(&tdtd->tdtd_list)) {
+               tmt = list_entry(tdtd->tdtd_list.next,
+                                struct top_multiple_thandle, tmt_commit_list);
+               if (tmt->tmt_committed &&
+                   (!obd->obd_recovering || (obd->obd_recovering &&
+                   tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)))
+                       ready = true;
+       }
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+
+       return ready;
+}
+
+struct distribute_txn_bid_data {
+       struct dt_txn_commit_cb  dtbd_cb;
+       struct target_distribute_txn_data      *dtbd_tdtd;
+       __u64                    dtbd_batchid;
+};
+
+/**
+ * callback of updating commit batchid
+ *
+ * Updating commit batchid then wake up the commit thread to cancel the
+ * records.
+ *
+ * \param[in]env       execution environment
+ * \param[in]th                thandle to updating commit batchid
+ * \param[in]cb                commit callback
+ * \param[in]err       result of thandle
+ */
+static void distribute_txn_batchid_cb(struct lu_env *env,
+                                     struct thandle *th,
+                                     struct dt_txn_commit_cb *cb,
+                                     int err)
+{
+       struct distribute_txn_bid_data          *dtbd = NULL;
+       struct target_distribute_txn_data       *tdtd;
+
+       dtbd = container_of0(cb, struct distribute_txn_bid_data, dtbd_cb);
+       tdtd = dtbd->dtbd_tdtd;
+
+       CDEBUG(D_HA, "%s: %llu batchid updated\n",
+             tdtd->tdtd_lut->lut_obd->obd_name, dtbd->dtbd_batchid);
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       if (dtbd->dtbd_batchid > tdtd->tdtd_committed_batchid &&
+           !tdtd->tdtd_lut->lut_obd->obd_no_transno)
+               tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid;
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+       atomic_dec(&tdtd->tdtd_refcount);
+       wake_up(&tdtd->tdtd_commit_thread_waitq);
+
+       OBD_FREE_PTR(dtbd);
+}
+
+/**
+ * Update the commit batchid in disk
+ *
+ * Update commit batchid in the disk, after this is committed, it can start
+ * to cancel the update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tdtd     distribute transaction structure
+ * \param[in] batchid  commit batchid to be updated
+ *
+ * \retval             0 if update succeeds.
+ * \retval             negative errno if update fails.
+ */
+static int
+distribute_txn_commit_batchid_update(const struct lu_env *env,
+                             struct target_distribute_txn_data *tdtd,
+                             __u64 batchid)
+{
+       struct distribute_txn_bid_data  *dtbd = NULL;
+       struct thandle          *th;
+       struct lu_buf            buf;
+       __u64                    tmp;
+       __u64                    off;
+       int                      rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(dtbd);
+       if (dtbd == NULL)
+               RETURN(-ENOMEM);
+       dtbd->dtbd_batchid = batchid;
+       dtbd->dtbd_tdtd = tdtd;
+       dtbd->dtbd_cb.dcb_func = distribute_txn_batchid_cb;
+       atomic_inc(&tdtd->tdtd_refcount);
+
+       th = dt_trans_create(env, tdtd->tdtd_lut->lut_bottom);
+       if (IS_ERR(th)) {
+               OBD_FREE_PTR(dtbd);
+               RETURN(PTR_ERR(th));
+       }
+
+       tmp = cpu_to_le64(batchid);
+       buf.lb_buf = &tmp;
+       buf.lb_len = sizeof(tmp);
+       off = 0;
+
+       rc = dt_declare_record_write(env, tdtd->tdtd_batchid_obj, &buf, off,
+                                    th);
+       if (rc < 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, tdtd->tdtd_lut->lut_bottom, th);
+       if (rc < 0)
+               GOTO(stop, rc);
+
+       dt_trans_cb_add(th, &dtbd->dtbd_cb);
+       if (rc < 0)
+               GOTO(stop, rc);
+
+       rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf,
+                            &off, th);
+
+       CDEBUG(D_INFO, "%s: update batchid "LPU64": rc = %d\n",
+              tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc);
+
+stop:
+       dt_trans_stop(env, tdtd->tdtd_lut->lut_bottom, th);
+       if (rc < 0)
+               OBD_FREE_PTR(dtbd);
+       RETURN(rc);
+}
+
+/**
+ * Init commit batchid for distribute transaction.
+ *
+ * Initialize the batchid object and get commit batchid from the object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tdtd     distribute transaction whose batchid is initialized.
+ *
+ * \retval             0 if initialization succeeds.
+ * \retval             negative errno if initialization fails.
+ **/
+static int
+distribute_txn_commit_batchid_init(const struct lu_env *env,
+                                  struct target_distribute_txn_data *tdtd)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct lu_target        *lut = tdtd->tdtd_lut;
+       struct lu_attr          *attr = &tti->tti_attr;
+       struct lu_fid           *fid = &tti->tti_fid1;
+       struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
+       struct dt_object        *dt_obj = NULL;
+       struct lu_buf           buf;
+       __u64                   tmp;
+       __u64                   off;
+       int                     rc;
+       ENTRY;
+
+       memset(attr, 0, sizeof(*attr));
+       attr->la_valid = LA_MODE;
+       attr->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+       dof->dof_type = dt_mode_to_dft(S_IFREG);
+
+       lu_local_obj_fid(fid, BATCHID_COMMITTED_OID);
+
+       dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof,
+                                  attr);
+       if (IS_ERR(dt_obj))
+               GOTO(out_put, rc = PTR_ERR(dt_obj));
+
+       tdtd->tdtd_batchid_obj = dt_obj;
+
+       buf.lb_buf = &tmp;
+       buf.lb_len = sizeof(tmp);
+       off = 0;
+       rc = dt_read(env, dt_obj, &buf, &off);
+       if (rc < 0 || (rc < buf.lb_len && rc > 0)) {
+               CERROR("%s can't read last committed batchid: rc = %d\n",
+                      tdtd->tdtd_lut->lut_obd->obd_name, rc);
+               if (rc > 0)
+                       rc = -EINVAL;
+               GOTO(out_put, rc);
+       } else if (rc == buf.lb_len) {
+               tdtd->tdtd_committed_batchid = le64_to_cpu(tmp);
+               CDEBUG(D_HA, "%s: committed batchid %llu\n",
+                      tdtd->tdtd_lut->lut_obd->obd_name,
+                      tdtd->tdtd_committed_batchid);
+               rc = 0;
+       }
+
+out_put:
+       if (rc < 0 && dt_obj != NULL) {
+               lu_object_put(env, &dt_obj->do_lu);
+               tdtd->tdtd_batchid_obj = NULL;
+       }
+       return rc;
+}
+
+/**
+ * manage the distribute transaction thread
+ *
+ * Distribute transaction are linked to the list, and once the distribute
+ * transaction is committed, it will update the last committed batchid first,
+ * after it is committed, it will cancel the records.
+ *
+ * \param[in] _arg     argument for commit thread
+ *
+ * \retval             0 if thread is running successfully
+ * \retval             negative errno if the thread can not be run.
+ */
+static int distribute_txn_commit_thread(void *_arg)
+{
+       struct target_distribute_txn_data *tdtd = _arg;
+       struct lu_target        *lut = tdtd->tdtd_lut;
+       struct ptlrpc_thread    *thread = &lut->lut_tdtd_commit_thread;
+       struct l_wait_info       lwi = { 0 };
+       struct lu_env            env;
+       struct list_head         list;
+       int                      rc;
+       struct top_multiple_thandle *tmt;
+       struct top_multiple_thandle *tmp;
+       __u64                    batchid = 0, committed;
+
+       ENTRY;
+
+       rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
+       if (rc != 0)
+               RETURN(rc);
+
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       thread->t_flags = SVC_RUNNING;
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+       wake_up(&thread->t_ctl_waitq);
+       INIT_LIST_HEAD(&list);
+
+       CDEBUG(D_HA, "%s: start commit thread committed batchid "LPU64"\n",
+              tdtd->tdtd_lut->lut_obd->obd_name,
+              tdtd->tdtd_committed_batchid);
+
+       while (distribute_txn_commit_thread_running(lut)) {
+               spin_lock(&tdtd->tdtd_batchid_lock);
+               list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
+                                        tmt_commit_list) {
+                       if (tmt->tmt_committed == 0)
+                               break;
+
+                       /* Note: right now, replay is based on master MDT
+                        * transno, but cancellation is based on batchid.
+                        * so we do not try to cancel the update log until
+                        * the recoverying is done, unless the update records
+                        * batchid < committed_batchid. */
+                       if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) {
+                               list_move_tail(&tmt->tmt_commit_list, &list);
+                       } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
+                               LASSERTF(tmt->tmt_batchid >= batchid,
+                                        "tmt %p tmt_batchid: "LPU64", batchid "
+                                         LPU64"\n", tmt, tmt->tmt_batchid,
+                                        batchid);
+                               /* There are three types of distribution
+                                * transaction result
+                                *
+                                * 1. If tmt_result < 0, it means the
+                                * distribution transaction fails, which should
+                                * be rare, because once declare phase succeeds,
+                                * the operation should succeeds anyway. Note in
+                                * this case, we will still update batchid so
+                                * cancellation would be stopped.
+                                *
+                                * 2. If tmt_result == 0, it means the
+                                * distribution transaction succeeds, and we
+                                * will update batchid.
+                                *
+                                * 3. If tmt_result > 0, it means distribute
+                                * transaction is not yet committed on every
+                                * node, but we need release this tmt before
+                                * that, which usuually happens during umount.
+                                */
+                               if (tmt->tmt_result <= 0)
+                                       batchid = tmt->tmt_batchid;
+                               list_move_tail(&tmt->tmt_commit_list, &list);
+                       }
+               }
+               spin_unlock(&tdtd->tdtd_batchid_lock);
+
+               CDEBUG(D_HA, "%s: batchid: "LPU64" committed batchid "
+                      LPU64"\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid,
+                      tdtd->tdtd_committed_batchid);
+               /* update globally committed on a storage */
+               if (batchid > tdtd->tdtd_committed_batchid) {
+                       distribute_txn_commit_batchid_update(&env, tdtd,
+                                                            batchid);
+                       spin_lock(&tdtd->tdtd_batchid_lock);
+                       if (batchid > tdtd->tdtd_batchid) {
+                               /* This might happen during recovery,
+                                * batchid is initialized as last transno,
+                                * and the batchid in the update records
+                                * on other MDTs might be bigger than
+                                * the batchid, so we need update it to
+                                * avoid duplicate batchid. */
+                               CDEBUG(D_HA, "%s update batchid from "LPU64
+                                      " to "LPU64"\n",
+                                      tdtd->tdtd_lut->lut_obd->obd_name,
+                                      tdtd->tdtd_batchid, batchid);
+                               tdtd->tdtd_batchid = batchid;
+                       }
+                       spin_unlock(&tdtd->tdtd_batchid_lock);
+               }
+               /* cancel the records for committed batchid's */
+               /* XXX: should we postpone cancel's till the end of recovery? */
+               committed = tdtd->tdtd_committed_batchid;
+               list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
+                       if (tmt->tmt_batchid > committed)
+                               break;
+                       list_del_init(&tmt->tmt_commit_list);
+                       if (tmt->tmt_result <= 0)
+                               distribute_txn_cancel_records(&env, tmt);
+                       top_multiple_thandle_put(tmt);
+               }
+
+               l_wait_event(tdtd->tdtd_commit_thread_waitq,
+                            !distribute_txn_commit_thread_running(lut) ||
+                            committed < tdtd->tdtd_committed_batchid ||
+                            tdtd_ready_for_cancel_log(tdtd), &lwi);
+       };
+
+       l_wait_event(tdtd->tdtd_commit_thread_waitq,
+                    atomic_read(&tdtd->tdtd_refcount) == 0, &lwi);
+
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
+                                tmt_commit_list)
+               list_move_tail(&tmt->tmt_commit_list, &list);
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+
+       CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n",
+              tdtd->tdtd_lut->lut_obd->obd_name);
+       list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
+               list_del_init(&tmt->tmt_commit_list);
+               top_multiple_thandle_dump(tmt, D_HA);
+               top_multiple_thandle_put(tmt);
+       }
+
+       thread->t_flags = SVC_STOPPED;
+       lu_env_fini(&env);
+       wake_up(&thread->t_ctl_waitq);
+
+       RETURN(0);
+}
+
+/**
+ * Start llog cancel thread
+ *
+ * Start llog cancel(master/slave) thread on LOD
+ *
+ * \param[in]lclt      cancel log thread to be started.
+ *
+ * \retval             0 if the thread is started successfully.
+ * \retval             negative errno if the thread is not being
+ *                      started.
+ */
+int distribute_txn_init(const struct lu_env *env,
+                       struct lu_target *lut,
+                       struct target_distribute_txn_data *tdtd,
+                       __u32 index)
+{
+       struct task_struct      *task;
+       struct l_wait_info       lwi = { 0 };
+       int                     rc;
+       ENTRY;
+
+       spin_lock_init(&tdtd->tdtd_batchid_lock);
+       INIT_LIST_HEAD(&tdtd->tdtd_list);
+
+       tdtd->tdtd_batchid = lut->lut_last_transno + 1;
+
+       init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
+       init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
+       atomic_set(&tdtd->tdtd_refcount, 0);
+
+       tdtd->tdtd_lut = lut;
+       rc = distribute_txn_commit_batchid_init(env, tdtd);
+       if (rc != 0)
+               RETURN(rc);
+
+       task = kthread_run(distribute_txn_commit_thread, tdtd, "tdtd-%u",
+                          index);
+       if (IS_ERR(task))
+               RETURN(PTR_ERR(task));
+
+       l_wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
+                    distribute_txn_commit_thread_running(lut) ||
+                    distribute_txn_commit_thread_stopped(lut), &lwi);
+       RETURN(0);
+}
+EXPORT_SYMBOL(distribute_txn_init);
+
+/**
+ * Stop llog cancel thread
+ *
+ * Stop llog cancel(master/slave) thread on LOD and also destory
+ * all of transaction in the list.
+ *
+ * \param[in]lclt      cancel log thread to be stopped.
+ */
+void distribute_txn_fini(const struct lu_env *env,
+                        struct target_distribute_txn_data *tdtd)
+{
+       struct lu_target *lut = tdtd->tdtd_lut;
+
+       /* Stop cancel thread */
+       if (lut == NULL || !distribute_txn_commit_thread_running(lut))
+               return;
+
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING;
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+       wake_up(&tdtd->tdtd_commit_thread_waitq);
+       wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
+                  lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED);
+
+       if (tdtd->tdtd_batchid_obj != NULL)
+               lu_object_put(env, &tdtd->tdtd_batchid_obj->do_lu);
 }
-EXPORT_SYMBOL(top_thandle_destroy);
+EXPORT_SYMBOL(distribute_txn_fini);
index 0adf066..2e997c7 100644 (file)
@@ -252,6 +252,7 @@ check_ost_id(void)
        CHECK_VALUE_64X(FID_SEQ_ROOT);
        CHECK_VALUE_64X(FID_SEQ_LAYOUT_RBTREE);
        CHECK_VALUE_64X(FID_SEQ_UPDATE_LOG);
+       CHECK_VALUE_64X(FID_SEQ_UPDATE_LOG_DIR);
        CHECK_VALUE_64X(FID_SEQ_NORMAL);
        CHECK_VALUE_64X(FID_SEQ_LOV_DEFAULT);
 
index 92a44f1..2d347c7 100644 (file)
@@ -65,7 +65,6 @@ void lustre_assert_wire_constants(void)
          * running on Linux q 2.6.32-431.5.1.el6.lustre.x86_64 #1 SMP Wed Feb 12 11:01:08 CST 2014 x8
          * with gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC)  */
 
-
        /* Constants... */
        LASSERTF(PTL_RPC_MSG_REQUEST == 4711, "found %lld\n",
                 (long long)PTL_RPC_MSG_REQUEST);
@@ -541,6 +540,8 @@ void lustre_assert_wire_constants(void)
                        (long long)FID_SEQ_LAYOUT_RBTREE);
        LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_UPDATE_LOG);
+       LASSERTF(FID_SEQ_UPDATE_LOG_DIR == 0x000000020000000aULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_UPDATE_LOG_DIR);
        LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_NORMAL);
        LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n",