Whamcloud - gitweb
LU-6401 headers: Create a header for obdo related functions
[fs/lustre-release.git] / lustre / target / update_recovery.c
index 1b5ebca..3ffecba 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
  */
 
 /*
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
 #include <lustre_update.h>
+#include <lustre_swab.h>
+#include <md_object.h>
 #include <obd.h>
 #include <obd_class.h>
+
 #include "tgt_internal.h"
 
 /**
@@ -70,7 +73,7 @@ dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
        struct distribute_txn_replay_req        *dtrq = NULL;
 
        list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
-               if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
+               if (tmp->dtrq_batchid == batchid) {
                        dtrq = tmp;
                        break;
                }
@@ -97,25 +100,19 @@ static int dtrq_insert(struct target_distribute_txn_data *tdtd,
 {
        struct distribute_txn_replay_req *iter;
 
+       /* Check if the dtrq has been added to the list */
+       iter = dtrq_lookup(tdtd, new->dtrq_batchid);
+       if (iter != NULL)
+               return -EEXIST;
+
        list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
-               if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
-                   new->dtrq_lur->lur_update_rec.ur_master_transno)
+               if (iter->dtrq_master_transno > new->dtrq_master_transno)
                        continue;
 
                /* If there are mulitple replay req with same transno, then
                 * sort them with batchid */
-               if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
-                   new->dtrq_lur->lur_update_rec.ur_master_transno &&
-                   iter->dtrq_lur->lur_update_rec.ur_batchid ==
-                   new->dtrq_lur->lur_update_rec.ur_batchid)
-                       return -EEXIST;
-
-               /* If there are mulitple replay req with same transno, then
-                * sort them with batchid */
-               if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
-                   new->dtrq_lur->lur_update_rec.ur_master_transno &&
-                   iter->dtrq_lur->lur_update_rec.ur_batchid >
-                   new->dtrq_lur->lur_update_rec.ur_batchid)
+               if (iter->dtrq_master_transno == new->dtrq_master_transno &&
+                   iter->dtrq_batchid > new->dtrq_batchid)
                        continue;
 
                list_add(&new->dtrq_list, &iter->dtrq_list);
@@ -141,7 +138,8 @@ static int dtrq_insert(struct target_distribute_txn_data *tdtd,
  * \retval             NULL if the creation fails.
  */
 static struct distribute_txn_replay_req *
-dtrq_create(struct llog_update_record *lur)
+dtrq_create(struct target_distribute_txn_data *tdtd,
+           struct llog_update_record *lur)
 {
        struct distribute_txn_replay_req *new;
 
@@ -158,6 +156,24 @@ dtrq_create(struct llog_update_record *lur)
 
        memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
 
+       /* If the transno in the update record is 0, it means the
+        * update are from master MDT, and it will use the master
+        * last committed transno as its master transno. Later, if
+        * the update records are gotten from slave MDTs, then these
+        * transno will be replaced.
+        * See insert_update_records_to_replay_list(). */
+       if (lur->lur_update_rec.ur_master_transno == 0) {
+               new->dtrq_lur->lur_update_rec.ur_master_transno =
+                               tdtd->tdtd_lut->lut_obd->obd_last_committed;
+               new->dtrq_master_transno =
+                               tdtd->tdtd_lut->lut_obd->obd_last_committed;
+       } else {
+               new->dtrq_master_transno =
+                               lur->lur_update_rec.ur_master_transno;
+       }
+
+       new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
+
        spin_lock_init(&new->dtrq_sub_list_lock);
        INIT_LIST_HEAD(&new->dtrq_sub_list);
        INIT_LIST_HEAD(&new->dtrq_list);
@@ -193,6 +209,36 @@ dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
 }
 
 /**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs    sub replay req where cookies to be added.
+ * \param[in] cookie   cookie to be added.
+ *
+ * \retval             0 if the cookie is adding succeeds.
+ * \retval             negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+                              struct llog_cookie *cookie)
+{
+       struct sub_thandle_cookie *new;
+
+       OBD_ALLOC_PTR(new);
+       if (new == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&new->stc_list);
+       new->stc_cookie = *cookie;
+       /* Note: only single thread will access one sub_request each time,
+        * so no need lock here */
+       list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+       return 0;
+}
+
+/**
  * Insert distribute txn sub req replay
  *
  * Allocate sub replay req and insert distribute txn replay list.
@@ -209,31 +255,102 @@ dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
                           struct llog_cookie *cookie,
                           __u32 mdt_index)
 {
-       struct distribute_txn_replay_req_sub *dtrqs = NULL;
-       struct distribute_txn_replay_req_sub *new;
+       struct distribute_txn_replay_req_sub    *dtrqs = NULL;
+       struct distribute_txn_replay_req_sub    *new;
+       int                                     rc;
        ENTRY;
 
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        spin_unlock(&dtrq->dtrq_sub_list_lock);
-       if (dtrqs != NULL)
+       if (dtrqs != NULL) {
+               rc = dtrq_sub_add_cookie(dtrqs, cookie);
                RETURN(0);
+       }
 
        OBD_ALLOC_PTR(new);
        if (new == NULL)
                RETURN(-ENOMEM);
 
        INIT_LIST_HEAD(&new->dtrqs_list);
+       INIT_LIST_HEAD(&new->dtrqs_cookie_list);
        new->dtrqs_mdt_index = mdt_index;
-       new->dtrqs_llog_cookie = *cookie;
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
-       if (dtrqs == NULL)
+       if (dtrqs == NULL) {
                list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
-       else
+               dtrqs = new;
+       } else {
                OBD_FREE_PTR(new);
+       }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
 
+       rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+       RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq     the update replay request where the new update
+ *                      records will be added.
+ * \param[in] lur      the new update record.
+ *
+ * \retval             0 if appending succeeds.
+ * \retval             negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+                              struct update_records *record)
+{
+       struct llog_update_record *new_lur;
+       size_t lur_size = dtrq->dtrq_lur_size;
+       void *ptr;
+       ENTRY;
+
+       /* Because several threads might retrieve the same records from
+        * different targets, and we only need one copy of records. So
+        * we will check if the records is in the next one, if not, just
+        * skip it */
+       spin_lock(&dtrq->dtrq_sub_list_lock);
+       if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(0);
+       }
+       dtrq->dtrq_lur->lur_update_rec.ur_index++;
+       spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+       lur_size += update_records_size(record);
+       OBD_ALLOC_LARGE(new_lur, lur_size);
+       if (new_lur == NULL) {
+               spin_lock(&dtrq->dtrq_sub_list_lock);
+               dtrq->dtrq_lur->lur_update_rec.ur_index--;
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(-ENOMEM);
+       }
+
+       /* Copy the old and new records to the new allocated buffer */
+       memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       ptr = (char *)&new_lur->lur_update_rec +
+               update_records_size(&new_lur->lur_update_rec);
+       memcpy(ptr, &record->ur_ops,
+              update_records_size(record) -
+              offsetof(struct update_records, ur_ops));
+
+       new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+       new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+       new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+       /* Replace the records */
+       OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       dtrq->dtrq_lur = new_lur;
+       dtrq->dtrq_lur_size = lur_size;
+       dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+       update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
        RETURN(0);
 }
 
@@ -259,54 +376,91 @@ insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
                                     __u32 mdt_index)
 {
        struct distribute_txn_replay_req *dtrq;
-       struct update_records            *record = &lur->lur_update_rec;
+       struct update_records *record = &lur->lur_update_rec;
+       bool replace_record = false;
        int rc = 0;
        ENTRY;
 
        CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
               " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
               record->ur_batchid, record->ur_master_transno, mdt_index);
+
+       /* Update batchid if necessary */
+       spin_lock(&tdtd->tdtd_batchid_lock);
+       if (record->ur_batchid >= tdtd->tdtd_batchid) {
+               CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
+                      tdtd->tdtd_lut->lut_obd->obd_name,
+                      tdtd->tdtd_batchid, record->ur_batchid);
+               tdtd->tdtd_batchid = record->ur_batchid + 1;
+       }
+       spin_unlock(&tdtd->tdtd_batchid_lock);
+
 again:
        spin_lock(&tdtd->tdtd_replay_list_lock);
+       /* First try to build the replay update request with the records */
        dtrq = dtrq_lookup(tdtd, record->ur_batchid);
-       spin_unlock(&tdtd->tdtd_replay_list_lock);
        if (dtrq == NULL) {
-               /* If the transno in the update record is 0, it means the
-                * update are from master MDT, and we will use the master
-                * last committed transno as its batchid. Note: if it got
-                * the records from the slave later, it needs to update
-                * the batchid by the transno in slave update log (see below) */
-               dtrq = dtrq_create(lur);
+               spin_unlock(&tdtd->tdtd_replay_list_lock);
+               dtrq = dtrq_create(tdtd, lur);
                if (IS_ERR(dtrq))
                        RETURN(PTR_ERR(dtrq));
 
-               if (record->ur_master_transno == 0)
-                       dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
-                               tdtd->tdtd_lut->lut_last_transno;
                spin_lock(&tdtd->tdtd_replay_list_lock);
                rc = dtrq_insert(tdtd, dtrq);
-               spin_unlock(&tdtd->tdtd_replay_list_lock);
-       } else if (record->ur_master_transno != 0 &&
-                  dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
-                  record->ur_master_transno) {
-               /* If the master transno in update header is not matched with
-                * the one in the record, then it means the dtrq is originally
-                * created by master record, and we need update master transno
-                * and reposition the dtrq(by master transno). */
-               dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
+               if (rc < 0) {
+                       spin_unlock(&tdtd->tdtd_replay_list_lock);
+                       dtrq_destroy(dtrq);
+                       if (rc == -EEXIST)
+                               goto again;
+                       return rc;
+               }
+       } else {
+               /* If the master transno in update header is not
+               * matched with the one in the record, then it means
+               * the dtrq is originally created by master record,
+               * so we need update master transno and reposition
+               * the dtrq(by master transno) in the list and also
+               * replace update record */
+               if (record->ur_master_transno != 0 &&
+                   dtrq->dtrq_master_transno != record->ur_master_transno &&
+                   dtrq->dtrq_lur != NULL) {
+                       list_del_init(&dtrq->dtrq_list);
+                       dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
                                                record->ur_master_transno;
-               list_del_init(&dtrq->dtrq_list);
-               spin_lock(&tdtd->tdtd_replay_list_lock);
-               rc = dtrq_insert(tdtd, dtrq);
-               spin_unlock(&tdtd->tdtd_replay_list_lock);
+
+                       dtrq->dtrq_master_transno = record->ur_master_transno;
+                       replace_record = true;
+                       /* try to insert again */
+                       rc = dtrq_insert(tdtd, dtrq);
+                       if (rc < 0) {
+                               spin_unlock(&tdtd->tdtd_replay_list_lock);
+                               dtrq_destroy(dtrq);
+                               return rc;
+                       }
+               }
        }
+       spin_unlock(&tdtd->tdtd_replay_list_lock);
+
+       /* Because there should be only thread access the update record, so
+        * we do not need lock here */
+       if (replace_record) {
+               /* Replace the update record and master transno */
+               OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+               dtrq->dtrq_lur = NULL;
+               dtrq->dtrq_lur_size = llog_update_record_size(lur);
+               OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+               if (dtrq->dtrq_lur == NULL)
+                       return -ENOMEM;
 
-       if (rc == -EEXIST) {
-               dtrq_destory(dtrq);
-               rc = 0;
-               goto again;
+               memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
        }
 
+       /* This is a partial update records, let's try to append
+        * the record to the current replay request */
+       if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+               rc = dtrq_append_updates(dtrq, record);
+
+       /* Then create and add sub update request */
        rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
 
        RETURN(rc);
@@ -342,7 +496,7 @@ EXPORT_SYMBOL(dtrq_list_dump);
  *
  * \param[in] dtrq     distribute txn replqy req to be destroyed.
  */
-void dtrq_destory(struct distribute_txn_replay_req *dtrq)
+void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
 {
        struct distribute_txn_replay_req_sub    *dtrqs;
        struct distribute_txn_replay_req_sub    *tmp;
@@ -350,7 +504,15 @@ void dtrq_destory(struct distribute_txn_replay_req *dtrq)
        LASSERT(list_empty(&dtrq->dtrq_list));
        spin_lock(&dtrq->dtrq_sub_list_lock);
        list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+               struct sub_thandle_cookie *stc;
+               struct sub_thandle_cookie *tmp;
+
                list_del(&dtrqs->dtrqs_list);
+               list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+                                        stc_list) {
+                       list_del(&stc->stc_list);
+                       OBD_FREE_PTR(stc);
+               }
                OBD_FREE_PTR(dtrqs);
        }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
@@ -360,7 +522,7 @@ void dtrq_destory(struct distribute_txn_replay_req *dtrq)
 
        OBD_FREE_PTR(dtrq);
 }
-EXPORT_SYMBOL(dtrq_destory);
+EXPORT_SYMBOL(dtrq_destroy);
 
 /**
  * Destroy all of replay req.
@@ -378,7 +540,12 @@ void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
        list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
                                 dtrq_list) {
                list_del_init(&dtrq->dtrq_list);
-               dtrq_destory(dtrq);
+               dtrq_destroy(dtrq);
+       }
+       list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+                                dtrq_list) {
+               list_del_init(&dtrq->dtrq_list);
+               dtrq_destroy(dtrq);
        }
        spin_unlock(&tdtd->tdtd_replay_list_lock);
 }
@@ -428,7 +595,7 @@ __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
        if (!list_empty(&tdtd->tdtd_replay_list)) {
                dtrq = list_entry(tdtd->tdtd_replay_list.next,
                                 struct distribute_txn_replay_req, dtrq_list);
-               transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
+               transno = dtrq->dtrq_master_transno;
        }
        spin_unlock(&tdtd->tdtd_replay_list_lock);
 
@@ -438,6 +605,40 @@ __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
 }
 EXPORT_SYMBOL(distribute_txn_get_next_transno);
 
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+                                 __u64 xid)
+{
+       struct distribute_txn_replay_req *dtrq = NULL;
+       struct distribute_txn_replay_req *iter;
+
+       spin_lock(&tdtd->tdtd_replay_list_lock);
+       list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+               if (iter->dtrq_xid == xid) {
+                       dtrq = iter;
+                       break;
+               }
+       }
+       spin_unlock(&tdtd->tdtd_replay_list_lock);
+       return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+       struct lu_target *tgt = class_exp2tgt(req->rq_export);
+       struct distribute_txn_replay_req *dtrq;
+
+       if (tgt->lut_tdtd == NULL)
+               return false;
+
+       dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
+       if (dtrq == NULL)
+               return false;
+
+       return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
 /**
  * Check if the update of one object is committed
  *
@@ -496,8 +697,15 @@ static int update_is_committed(const struct lu_env *env,
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
                st->st_committed = 1;
-               if (dtrqs != NULL)
-                       st->st_cookie = dtrqs->dtrqs_llog_cookie;
+               if (dtrqs != NULL) {
+                       struct sub_thandle_cookie *stc;
+                       struct sub_thandle_cookie *tmp;
+
+                       list_for_each_entry_safe(stc, tmp,
+                                                &dtrqs->dtrqs_cookie_list,
+                                                stc_list)
+                               list_move(&stc->stc_list, &st->st_cookie_list);
+               }
                RETURN(0);
        }
 
@@ -843,6 +1051,73 @@ static int update_recovery_xattr_del(const struct lu_env *env,
 }
 
 /**
+ * Update session information
+ *
+ * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
+ * can be called correctly during update replay.
+ *
+ * \param[in] env      execution environment.
+ * \param[in] tdtd     distribute data structure of the recovering tgt.
+ * \param[in] th       thandle of this update replay.
+ * \param[in] master_th        master sub thandle.
+ * \param[in] ta_arg   the tx arg structure to hold the update for updating
+ *                      reply data.
+ */
+static void update_recovery_update_ses(struct lu_env *env,
+                                     struct target_distribute_txn_data *tdtd,
+                                     struct thandle *th,
+                                     struct thandle *master_th,
+                                     struct distribute_txn_replay_req *dtrq,
+                                     struct tx_arg *ta_arg)
+{
+       struct tgt_session_info *tsi;
+       struct lu_target        *lut = tdtd->tdtd_lut;
+       struct obd_export       *export;
+       struct cfs_hash         *hash;
+       struct top_thandle      *top_th;
+       struct lsd_reply_data   *lrd;
+       size_t                  size;
+
+       tsi = tgt_ses_info(env);
+       if (tsi->tsi_exp != NULL)
+               return;
+
+       size = ta_arg->u.write.buf.lb_len;
+       lrd = ta_arg->u.write.buf.lb_buf;
+       if (size != sizeof(*lrd) || lrd == NULL)
+               return;
+
+       lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
+       lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
+       lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
+       lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
+       lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
+
+       if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
+               return;
+
+       hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
+       if (hash == NULL)
+               return;
+
+       export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+       if (export == NULL) {
+               cfs_hash_putref(hash);
+               return;
+       }
+
+       tsi->tsi_exp = export;
+       tsi->tsi_xid = lrd->lrd_xid;
+       tsi->tsi_opdata = lrd->lrd_data;
+       tsi->tsi_result = lrd->lrd_result;
+       tsi->tsi_client_gen = lrd->lrd_client_gen;
+       dtrq->dtrq_xid = lrd->lrd_xid;
+       top_th = container_of(th, struct top_thandle, tt_super);
+       top_th->tt_master_sub_thandle = master_th;
+       cfs_hash_putref(hash);
+}
+
+/**
  * Execute updates in the update replay records
  *
  * Declare distribute txn replay by update records and add the updates
@@ -886,6 +1161,9 @@ static int update_recovery_exec(const struct lu_env *env,
                struct dt_device        *sub_dt;
                struct sub_thandle      *st;
 
+               if (op->uop_type == OUT_NOOP)
+                       continue;
+
                dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
                if (IS_ERR(dt_obj)) {
                        rc = PTR_ERR(dt_obj);
@@ -1015,7 +1293,7 @@ int distribute_txn_replay_handle(struct lu_env *env,
        struct top_thandle      *top_th;
        struct top_multiple_thandle *tmt;
        struct thandle_update_records *tur = NULL;
-       unsigned int            i;
+       int                     i;
        int                     rc = 0;
        ENTRY;
 
@@ -1044,13 +1322,14 @@ int distribute_txn_replay_handle(struct lu_env *env,
        if (rc < 0)
                GOTO(stop_trans, rc);
 
+       th->th_dev = tdtd->tdtd_dt;
        ta->ta_handle = th;
 
        /* check if the distribute transaction has been committed */
        tmt = top_th->tt_multiple_thandle;
        tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
-       tmt->tmt_batchid = records->ur_batchid;
-       tgt_th_info(env)->tti_transno = records->ur_master_transno;
+       tmt->tmt_batchid = dtrq->dtrq_batchid;
+       tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
 
        if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
                tmt->tmt_committed = 1;
@@ -1059,10 +1338,9 @@ int distribute_txn_replay_handle(struct lu_env *env,
        if (rc < 0)
                GOTO(stop_trans, rc);
 
-       /* If no updates are needed to be replayed, then
-        * mark this records as committed, so commit thread
-        * distribute_txn_commit_thread() will delete the
-        * record */
+       /* If no updates are needed to be replayed, then mark this records as
+        * committed, so commit thread distribute_txn_commit_thread() will
+        * delete the record */
        if (ta->ta_argno == 0)
                tmt->tmt_committed = 1;
 
@@ -1090,15 +1368,24 @@ int distribute_txn_replay_handle(struct lu_env *env,
                LASSERT(tmt->tmt_committed == 0);
                sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
                st = lookup_sub_thandle(tmt, sub_dt);
+
                LASSERT(st != NULL);
                LASSERT(st->st_sub_th != NULL);
                rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
                                             ta->ta_args[i]);
+
+               /* If the update is to update the reply data, then
+                * we need set the session information, so
+                * tgt_last_rcvd_update() can be called correctly */
+               if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
+                       update_recovery_update_ses(env, tdtd, th,
+                                                  st->st_sub_th, dtrq, ta_arg);
+
                if (unlikely(rc < 0)) {
                        CDEBUG(D_HA, "error during execution of #%u from"
                               " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
                               ta->ta_args[i]->line, rc);
-                       while (--i >= 0) {
+                       while (--i > 0) {
                                if (ta->ta_args[i]->undo_fn != NULL) {
                                        dt_obj = ta->ta_args[i]->object;
                                        sub_dt =
@@ -1136,6 +1423,11 @@ stop_trans:
 
        if (tur != NULL)
                tur->tur_update_records = NULL;
+
+       if (tgt_ses_info(env)->tsi_exp != NULL) {
+               class_export_put(tgt_ses_info(env)->tsi_exp);
+               tgt_ses_info(env)->tsi_exp = NULL;
+       }
 exit_session:
        lu_context_exit(&session_env);
        lu_context_fini(&session_env);