Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index b316781..8801607 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * Lustre Unified Target
  * These are common function to work with last_received file
@@ -148,6 +147,13 @@ static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
        int chunk;
        int b;
 
+       if (lut->lut_obd->obd_stopping)
+               /*
+                * in case of failover keep the bit set in order to
+                * avoid overwriting slots in reply_data which might
+                * be required by resent rpcs
+                */
+               return 0;
        chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
        b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
 
@@ -255,9 +261,10 @@ static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
                                struct lsd_reply_data *lrd, loff_t off,
                                struct thandle *th)
 {
-       struct tgt_thread_info  *tti = tgt_th_info(env);
-       struct dt_object        *dto;
-       struct lsd_reply_data   *buf = &tti->tti_lrd;
+       struct tgt_thread_info *tti = tgt_th_info(env);
+       struct lsd_reply_data *buf = &tti->tti_lrd;
+       struct lsd_reply_header *lrh = &tgt->lut_reply_header;
+       struct dt_object *dto;
 
        lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
 
@@ -269,9 +276,12 @@ static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
 
        lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
 
+       if (lrh->lrh_magic > LRH_MAGIC_V1)
+               buf->lrd_batch_idx = cpu_to_le32(lrd->lrd_batch_idx);
+
        tti->tti_off = off;
        tti->tti_buf.lb_buf = buf;
-       tti->tti_buf.lb_len = sizeof(*buf);
+       tti->tti_buf.lb_len = lrh->lrh_reply_size;
 
        dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
        return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
@@ -281,31 +291,36 @@ static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
  * into structure @lrd
  */
 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
-                              struct lsd_reply_data *lrd, loff_t off)
+                              struct lsd_reply_data *lrd, loff_t off,
+                              struct lsd_reply_header *lrh)
 {
-       int                      rc;
-       struct tgt_thread_info  *tti = tgt_th_info(env);
-       struct lsd_reply_data   *buf = &tti->tti_lrd;
+       struct tgt_thread_info *tti = tgt_th_info(env);
+       struct lsd_reply_data *buf = &tti->tti_lrd;
+       int rc;
 
        tti->tti_off = off;
        tti->tti_buf.lb_buf = buf;
-       tti->tti_buf.lb_len = sizeof(*buf);
+       tti->tti_buf.lb_len = lrh->lrh_reply_size;
 
        rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
                            &tti->tti_off);
        if (rc != 0)
                return rc;
 
-       lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
-       lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
-       lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
-       lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
-       lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
+       lrd->lrd_transno = le64_to_cpu(buf->lrd_transno);
+       lrd->lrd_xid = le64_to_cpu(buf->lrd_xid);
+       lrd->lrd_data = le64_to_cpu(buf->lrd_data);
+       lrd->lrd_result = le32_to_cpu(buf->lrd_result);
+       lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
+
+       if (lrh->lrh_magic > LRH_MAGIC_V1)
+               lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx);
+       else
+               lrd->lrd_batch_idx = 0;
 
        return 0;
 }
 
-
 /* Free the in-memory reply data structure @trd and release
  * the corresponding slot in the reply_data file of target @lut
  * Called with ted_lcd_lock held
@@ -324,7 +339,7 @@ static void tgt_free_reply_data(struct lu_target *lut,
 
        list_del(&trd->trd_list);
        ted->ted_reply_cnt--;
-       if (lut != NULL)
+       if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
                tgt_clear_reply_slot(lut, trd->trd_index);
        OBD_FREE_PTR(trd);
 }
@@ -378,6 +393,13 @@ static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
        return &tti->tti_buf;
 }
 
+static inline bool tgt_is_multimodrpcs_record(struct lu_target *tgt,
+                                             struct lsd_client_data *lcd)
+{
+       return tgt->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
+               lcd->lcd_generation != 0;
+}
+
 /**
  * Allocate in-memory data for client slot related to export.
  */
@@ -388,6 +410,8 @@ int tgt_client_alloc(struct obd_export *exp)
 
        spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
        INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
+       spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
 
        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
        if (exp->exp_target_data.ted_lcd == NULL)
@@ -411,6 +435,8 @@ void tgt_client_free(struct obd_export *exp)
 
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       tgt_fmd_cleanup(exp);
+
        /* free reply data */
        mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
@@ -432,7 +458,8 @@ void tgt_client_free(struct obd_export *exp)
 
        /* Target may have been freed (see LU-7430)
         * Slot may be not yet assigned */
-       if (exp->exp_obd->u.obt.obt_magic != OBT_MAGIC ||
+       if (((struct obd_device_target *)(&exp->exp_obd->u))->obt_magic !=
+           OBT_MAGIC ||
            ted->ted_lr_idx < 0)
                return;
 
@@ -443,9 +470,6 @@ void tgt_client_free(struct obd_export *exp)
                       exp->exp_obd->obd_name, ted->ted_lr_idx);
                LBUG();
        }
-
-       if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping)
-               atomic_dec(&lut->lut_num_clients);
 }
 EXPORT_SYMBOL(tgt_client_free);
 
@@ -518,7 +542,7 @@ static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
 {
        struct tgt_new_client_callback *ccb;
 
-       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
+       ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
 
        LASSERT(ccb->lncc_exp->exp_obd);
 
@@ -536,11 +560,11 @@ static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
        OBD_FREE_PTR(ccb);
 }
 
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+static int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
 {
-       struct tgt_new_client_callback  *ccb;
-       struct dt_txn_commit_cb         *dcb;
-       int                              rc;
+       struct tgt_new_client_callback *ccb;
+       struct dt_txn_commit_cb *dcb;
+       int rc;
 
        OBD_ALLOC_PTR(ccb);
        if (ccb == NULL)
@@ -551,7 +575,7 @@ int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
        dcb = &ccb->lncc_cb;
        dcb->dcb_func = tgt_cb_new_client;
        INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+       strscpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
 
        rc = dt_trans_cb_add(th, dcb);
        if (rc) {
@@ -687,7 +711,7 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
 
        CDEBUG(D_SUPER,
               "%s: mount_count is %llu, last_transno is %llu\n",
-              tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
+              tgt->lut_lsd.lsd_uuid, obd2obt(tgt->lut_obd)->obt_mount_count,
               tgt->lut_last_transno);
 
        /* Always save latest transno to keep it fresh */
@@ -725,10 +749,9 @@ out:
 }
 EXPORT_SYMBOL(tgt_server_data_update);
 
-static int tgt_truncate_last_rcvd(const struct lu_env *env,
-                                 struct lu_target *tgt, loff_t size)
+static int tgt_truncate_object(const struct lu_env *env, struct lu_target *tgt,
+                              struct dt_object *dt, loff_t size)
 {
-       struct dt_object *dt = tgt->lut_last_rcvd;
        struct thandle   *th;
        struct lu_attr    attr;
        int               rc;
@@ -778,6 +801,52 @@ static void tgt_client_epoch_update(const struct lu_env *env,
        tgt_client_data_update(env, exp);
 }
 
+static int tgt_reply_data_upgrade_check(const struct lu_env *env,
+                                       struct lu_target *tgt)
+{
+       struct lsd_reply_header *lrh = &tgt->lut_reply_header;
+       int rc;
+
+       /*
+        * Reply data is supported by MDT targets only for now.
+        * When reply data object @lut_reply_data is NULL, it indicates the
+        * target type is OST and it should skip the upgrade check.
+        */
+       if (tgt->lut_reply_data == NULL)
+               RETURN(0);
+
+       rc = tgt_reply_header_read(env, tgt, lrh);
+       if (rc) {
+               CERROR("%s: failed to read %s: rc = %d\n",
+                      tgt_name(tgt), REPLY_DATA, rc);
+               RETURN(rc);
+       }
+
+       if (lrh->lrh_magic == LRH_MAGIC)
+               RETURN(0);
+
+       rc = tgt_truncate_object(env, tgt, tgt->lut_reply_data, 0);
+       if (rc) {
+               CERROR("%s: failed to truncate %s: rc = %d\n",
+                      tgt_name(tgt), REPLY_DATA, rc);
+               RETURN(rc);
+       }
+
+       lrh->lrh_magic = LRH_MAGIC;
+       lrh->lrh_header_size = sizeof(struct lsd_reply_header);
+       if (lrh->lrh_magic == LRH_MAGIC_V1)
+               lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
+       else
+               lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
+
+       rc = tgt_reply_header_write(env, tgt, lrh);
+       if (rc)
+               CERROR("%s: failed to write header for %s: rc = %d\n",
+                      tgt_name(tgt), REPLY_DATA, rc);
+
+       RETURN(rc);
+}
+
 /**
  * Update boot epoch when recovery ends
  */
@@ -786,7 +855,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        struct lu_env            env;
        struct ptlrpc_request   *req;
        __u32                    start_epoch;
-       struct list_head         client_list;
+       LIST_HEAD(client_list);
        int                      rc;
 
        if (tgt->lut_obd->obd_stopping)
@@ -805,7 +874,6 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        tgt->lut_lsd.lsd_start_epoch = start_epoch;
        spin_unlock(&tgt->lut_translock);
 
-       INIT_LIST_HEAD(&client_list);
        /**
         * The recovery is not yet finished and final queue can still be updated
         * with resend requests. Move final list to separate one for processing
@@ -828,17 +896,16 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
        spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
 
-       /** Clear MULTI RPCS incompatibility flag if
-        * - target is MDT and
-        * - there is no client to recover or the recovery was aborted
+       /**
+        * Clear MULTI RPCS incompatibility flag if there is no multi-rpcs
+        * client in last_rcvd file
         */
-       if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
-           (tgt->lut_obd->obd_max_recoverable_clients == 0 ||
-           tgt->lut_obd->obd_abort_recovery))
+       if (atomic_read(&tgt->lut_num_clients) == 0)
                tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
 
        /** update server epoch */
        tgt_server_data_update(&env, tgt, 1);
+       tgt_reply_data_upgrade_check(&env, tgt);
        lu_env_fini(&env);
 }
 
@@ -857,12 +924,23 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
 {
        struct tgt_last_committed_callback *ccb;
 
-       ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
+       ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
 
        LASSERT(ccb->llcc_exp);
        LASSERT(ccb->llcc_tgt != NULL);
        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
 
+       if (th->th_reserved_quota.lqi_space > 0) {
+               CDEBUG(D_QUOTA, "free quota %llu %llu\n",
+                      th->th_reserved_quota.lqi_id.qid_gid,
+                      th->th_reserved_quota.lqi_space);
+
+               /* env can be NULL for freeing reserved quota */
+               th->th_reserved_quota.lqi_space *= -1;
+               dt_reserve_or_free_quota(NULL, th->th_dev,
+                                        &th->th_reserved_quota);
+       }
+
        /* error hit, don't update last committed to provide chance to
         * replay data after fail */
        if (err != 0)
@@ -917,7 +995,7 @@ static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
        dcb = &ccb->llcc_cb;
        dcb->dcb_func = tgt_cb_last_committed;
        INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
+       strscpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
 
        rc = dt_trans_cb_add(th, dcb);
        if (rc) {
@@ -934,6 +1012,26 @@ static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
        return rc ? rc : exp->exp_need_sync;
 }
 
+static int tgt_is_local_client(const struct lu_env *env,
+                                     struct obd_export *exp)
+{
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+
+       if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
+           exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
+               return 0;
+       if (tgt->lut_local_recovery)
+               return 0;
+       if (!req)
+               return 0;
+       if (!LNetIsPeerLocal(&req->rq_peer.nid))
+               return 0;
+
+       return 1;
+}
+
 /**
  * Add new client to the last_rcvd upon new connection.
  *
@@ -955,13 +1053,20 @@ int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
        if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
+       if (tgt_is_local_client(env, exp)) {
+               LCONSOLE_WARN("%s: local client %s w/o recovery\n",
+                             exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
+               exp->exp_no_recovery = 1;
+               RETURN(0);
+       }
+
        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
         * there's no need for extra complication here
         */
        idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
 repeat:
        if (idx >= LR_MAX_CLIENTS ||
-           OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
+           CFS_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
                CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
                       tgt->lut_obd->obd_name,  idx);
                RETURN(-EOVERFLOW);
@@ -981,7 +1086,6 @@ repeat:
        if (tgt_is_multimodrpcs_client(exp)) {
                /* Set MULTI RPCS incompatibility flag to prevent previous
                 * Lustre versions to mount a target with reply_data file */
-               atomic_inc(&tgt->lut_num_clients);
                if (!(tgt->lut_lsd.lsd_feature_incompat &
                      OBD_INCOMPAT_MULTI_RPCS)) {
                        tgt->lut_lsd.lsd_feature_incompat |=
@@ -1007,15 +1111,20 @@ repeat:
               tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
               ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
                RETURN(-ENOSPC);
 
        rc = tgt_client_data_update(env, exp);
-       if (rc)
+       if (rc) {
                CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
                       tgt->lut_obd->obd_name, idx, rc);
+               RETURN(rc);
+       }
 
-       RETURN(rc);
+       if (tgt_is_multimodrpcs_client(exp))
+               atomic_inc(&tgt->lut_num_clients);
+
+       RETURN(0);
 }
 EXPORT_SYMBOL(tgt_client_new);
 
@@ -1045,7 +1154,6 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
                       tgt->lut_obd->obd_name,  idx);
                LBUG();
        }
-       atomic_inc(&tgt->lut_num_clients);
 
        CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
               "generation %d\n",
@@ -1082,7 +1190,8 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
        if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
                    (char *)tgt->lut_obd->obd_uuid.uuid) ||
-           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
+           exp->exp_no_recovery)
                RETURN(0);
 
        /* Slot may be not yet assigned, use case is race between Client
@@ -1109,7 +1218,7 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        if (exp->exp_flags & OBD_OPT_FAILOVER)
                RETURN(0);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
                RETURN(0);
 
        /* Make sure the server's last_transno is up to date.
@@ -1123,9 +1232,21 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
                RETURN(rc);
        }
 
+       /* Race between an eviction and a disconnection ?*/
+       mutex_lock(&ted->ted_lcd_lock);
+       if (ted->ted_lcd->lcd_uuid[0] == '\0') {
+               mutex_unlock(&ted->ted_lcd_lock);
+               RETURN(rc);
+       }
+
        memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
+       mutex_unlock(&ted->ted_lcd_lock);
+
        rc = tgt_client_data_update(env, exp);
 
+       if (!rc && tgt_is_multimodrpcs_record(tgt, ted->ted_lcd))
+               atomic_dec(&tgt->lut_num_clients);
+
        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
               "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
               tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
@@ -1134,12 +1255,36 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_del);
 
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
+static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return;
+
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+
+               LASSERT(ergo(tgt_is_increasing_xid_client(exp),
+                            trd->trd_reply.lrd_xid <= xid));
+
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+}
+
+static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
 {
-       struct lsd_reply_data   *lrd;
-       int     i;
+       struct tgt_session_info *tsi = NULL;
+       struct lsd_reply_data *lrd;
+       int i = -1;
+       int rc;
 
        lrd = &trd->trd_reply;
        /* update export last transno */
@@ -1148,43 +1293,176 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
        mutex_unlock(&ted->ted_lcd_lock);
 
-       /* find a empty slot */
-       i = tgt_find_free_reply_slot(tgt);
-       if (unlikely(i < 0)) {
-               CERROR("%s: couldn't find a slot for reply data: "
-                      "rc = %d\n", tgt_name(tgt), i);
-               RETURN(i);
+       if (!tgt) {
+               trd->trd_index = TRD_INDEX_MEMORY;
+               GOTO(add_reply_data, rc = 0);
+       }
+
+       if (env) {
+               tsi = tgt_ses_info(env);
+               if (tsi->tsi_batch_trd) {
+                       LASSERT(tsi->tsi_batch_env);
+                       trd = tsi->tsi_batch_trd;
+                       i = trd->trd_index;
+               }
+       }
+
+       if (i == -1) {
+               /* find a empty slot */
+               i = tgt_find_free_reply_slot(tgt);
+               if (unlikely(i < 0)) {
+                       CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
+                              tgt_name(tgt), i);
+                       RETURN(i);
+               }
+               trd->trd_index = i;
        }
-       trd->trd_index = i;
 
        if (update_lrd_file) {
+               struct lsd_reply_header *lrh = &tgt->lut_reply_header;
                loff_t  off;
-               int     rc;
 
                /* write reply data to disk */
-               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+               off = lrh->lrh_header_size + lrh->lrh_reply_size * i;
                rc = tgt_reply_data_write(env, tgt, lrd, off, th);
                if (unlikely(rc != 0)) {
                        CERROR("%s: can't update %s file: rc = %d\n",
                               tgt_name(tgt), REPLY_DATA, rc);
-                       RETURN(rc);
+                       GOTO(free_slot, rc);
                }
        }
+
+add_reply_data:
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
-       list_add(&trd->trd_list, &ted->ted_reply_list);
-       ted->ted_reply_cnt++;
-       if (ted->ted_reply_cnt > ted->ted_reply_max)
-               ted->ted_reply_max = ted->ted_reply_cnt;
+       if (req != NULL) {
+               int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
+                             MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
+
+               if (req->rq_obsolete) {
+                       CDEBUG(D_INFO,
+                              "drop reply data update for obsolete req xid=%llu,"
+                              "transno=%llu, tag=%hu\n", req->rq_xid,
+                              lrd->lrd_transno, trd->trd_tag);
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       GOTO(free_slot, rc = -EBADR);
+               }
+
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
+                   !(tsi && tsi->tsi_batch_env &&
+                     trd->trd_reply.lrd_batch_idx > 0))
+                       tgt_clean_by_tag(req->rq_export, req->rq_xid,
+                                        trd->trd_tag);
+       }
+
+       /*
+        * For the batched RPC, all sub requests use one common @trd for the
+        * reply data.
+        */
+       if (list_empty(&trd->trd_list)) {
+               list_add(&trd->trd_list, &ted->ted_reply_list);
+               ted->ted_reply_cnt++;
+               if (ted->ted_reply_cnt > ted->ted_reply_max)
+                       ted->ted_reply_max = ted->ted_reply_cnt;
+       }
        mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
               "tag %hu, client gen %u, slot idx %d\n",
               trd, lrd->lrd_xid, lrd->lrd_transno,
-              trd->trd_tag, lrd->lrd_client_gen, i);
+              trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
+
        RETURN(0);
+
+free_slot:
+       if (tgt != NULL)
+               tgt_clear_reply_slot(tgt, trd->trd_index);
+       return rc;
+}
+
+int tgt_mk_reply_data(const struct lu_env *env,
+                     struct lu_target *tgt,
+                     struct tg_export_data *ted,
+                     struct ptlrpc_request *req,
+                     __u64 opdata,
+                     struct thandle *th,
+                     bool write_update,
+                     __u64 transno)
+{
+       struct tg_reply_data *trd = NULL;
+       struct lsd_reply_data *lrd;
+       __u64 *pre_versions = NULL;
+       struct tgt_session_info *tsi = NULL;
+       int rc;
+
+       if (env != NULL) {
+               tsi = tgt_ses_info(env);
+               if (tsi->tsi_batch_trd) {
+                       LASSERT(tsi->tsi_batch_env);
+                       trd = tsi->tsi_batch_trd;
+               }
+       }
+
+       if (trd == NULL) {
+               OBD_ALLOC_PTR(trd);
+               if (unlikely(trd == NULL))
+                       RETURN(-ENOMEM);
+
+               INIT_LIST_HEAD(&trd->trd_list);
+       }
+
+       /* fill reply data information */
+       lrd = &trd->trd_reply;
+       lrd->lrd_transno = transno;
+       if (tsi && tsi->tsi_batch_env) {
+               if (tsi->tsi_batch_idx == 0) {
+                       LASSERT(req != NULL);
+                       tsi->tsi_batch_trd = trd;
+                       trd->trd_index = -1;
+                       lrd->lrd_xid = req->rq_xid;
+                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               }
+               lrd->lrd_batch_idx = tsi->tsi_batch_idx;
+       } else if (req != NULL) {
+               lrd->lrd_xid = req->rq_xid;
+               trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+               lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               if (write_update) {
+                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                       lrd->lrd_result = th->th_result;
+               }
+       } else {
+               LASSERT(env != NULL);
+               LASSERT(tsi->tsi_xid != 0);
+
+               lrd->lrd_xid = tsi->tsi_xid;
+               lrd->lrd_result = tsi->tsi_result;
+               lrd->lrd_client_gen = tsi->tsi_client_gen;
+       }
+
+       lrd->lrd_data = opdata;
+       if (pre_versions) {
+               trd->trd_pre_versions[0] = pre_versions[0];
+               trd->trd_pre_versions[1] = pre_versions[1];
+               trd->trd_pre_versions[2] = pre_versions[2];
+               trd->trd_pre_versions[3] = pre_versions[3];
+       }
+
+       if (tsi && tsi->tsi_open_obj)
+               trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
+
+       rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+                               th, write_update);
+       if (rc < 0) {
+               OBD_FREE_PTR(trd);
+               if (rc == -EBADR)
+                       rc = 0;
+       }
+       return rc;
+
 }
-EXPORT_SYMBOL(tgt_add_reply_data);
+EXPORT_SYMBOL(tgt_mk_reply_data);
 
 /*
  * last_rcvd & last_committed update callbacks
@@ -1195,11 +1473,11 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct tgt_session_info *tsi = tgt_ses_info(env);
-       struct obd_export       *exp = tsi->tsi_exp;
-       struct tg_export_data   *ted;
-       __u64                   *transno_p;
-       int                      rc = 0;
-       bool                     lw_client;
+       struct obd_export *exp = tsi->tsi_exp;
+       struct tg_export_data *ted;
+       __u64 *transno_p;
+       bool nolcd = false;
+       int rc = 0;
 
        ENTRY;
 
@@ -1207,11 +1485,15 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        LASSERT(exp != NULL);
        ted = &exp->exp_target_data;
 
-       lw_client = exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT;
-       if (ted->ted_lr_idx < 0 && !lw_client)
-               /* ofd connect may cause transaction before export has
-                * last_rcvd slot */
-               RETURN(0);
+       /* Some clients don't support recovery, and they don't have last_rcvd
+        * client data:
+        * 1. lightweight clients.
+        * 2. local clients on MDS which doesn't enable "localrecov".
+        * 3. OFD connect may cause transaction before export has last_rcvd
+        *    slot.
+        */
+       if (ted->ted_lr_idx < 0)
+               nolcd = true;
 
        if (req != NULL)
                tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
@@ -1237,7 +1519,10 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        /** VBR: set new versions */
        if (th->th_result == 0 && obj != NULL) {
                struct dt_object *dto = dt_object_locate(obj, th->th_dev);
+
                dt_version_set(env, dto, tti->tti_transno, th);
+               if (unlikely(tsi->tsi_dv_update))
+                       dt_data_version_set(env, dto, tti->tti_transno, th);
        }
 
        /* filling reply data */
@@ -1252,14 +1537,13 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        /* if can't add callback, do sync write */
        th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
 
-       if (lw_client) {
-               /* All operations performed by LW clients are synchronous and
-                * we store the committed transno in the last_rcvd header */
+       if (nolcd) {
+               /* store transno in the last_rcvd header */
                spin_lock(&tgt->lut_translock);
                if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
                        tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
                        spin_unlock(&tgt->lut_translock);
-                       /* Although lightweight (LW) connections have no slot
+                       /* Although current connection doesn't have slot
                         * in the last_rcvd, we still want to maintain
                         * the in-memory lsd_client_data structure in order to
                         * properly handle reply reconstruction. */
@@ -1275,47 +1559,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        /* Target that supports multiple reply data */
        if (tgt_is_multimodrpcs_client(exp)) {
-               struct tg_reply_data    *trd;
-               struct lsd_reply_data   *lrd;
-               __u64                   *pre_versions;
-               bool                    write_update;
-
-               OBD_ALLOC_PTR(trd);
-               if (unlikely(trd == NULL))
-                       RETURN(-ENOMEM);
-
-               /* fill reply data information */
-               lrd = &trd->trd_reply;
-               lrd->lrd_transno = tti->tti_transno;
-               if (req != NULL) {
-                       lrd->lrd_xid = req->rq_xid;
-                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
-                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
-                       lrd->lrd_result = th->th_result;
-                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
-                       write_update = true;
-               } else {
-                       LASSERT(tsi->tsi_xid != 0);
-                       lrd->lrd_xid = tsi->tsi_xid;
-                       lrd->lrd_result = tsi->tsi_result;
-                       lrd->lrd_client_gen = tsi->tsi_client_gen;
-                       trd->trd_tag = 0;
-                       pre_versions = NULL;
-                       write_update = false;
-               }
-
-               lrd->lrd_data = opdata;
-               if (pre_versions) {
-                       trd->trd_pre_versions[0] = pre_versions[0];
-                       trd->trd_pre_versions[1] = pre_versions[1];
-                       trd->trd_pre_versions[2] = pre_versions[2];
-                       trd->trd_pre_versions[3] = pre_versions[3];
-               }
-
-               rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
-               if (rc < 0)
-                       OBD_FREE_PTR(trd);
-               return rc;
+               return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
+                                        !!(req != NULL), tti->tti_transno);
        }
 
        /* Enough for update replay, let's return */
@@ -1371,7 +1616,7 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                }
        }
 
-       if (!lw_client) {
+       if (!nolcd) {
                tti->tti_off = ted->ted_lr_off;
                if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
                        rc = -EIO;
@@ -1451,8 +1696,8 @@ static int tgt_clients_data_init(const struct lu_env *env,
        if (tgt->lut_bottom->dd_rdonly)
                RETURN(0);
 
-       CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
-                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
+       BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
+                    sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
 
        OBD_ALLOC_PTR(lcd);
        if (lcd == NULL)
@@ -1517,11 +1762,11 @@ static int tgt_clients_data_init(const struct lu_env *env,
                exp->exp_connecting = 0;
                exp->exp_in_recovery = 0;
                spin_unlock(&exp->exp_lock);
-               obd->obd_max_recoverable_clients++;
+               atomic_inc(&obd->obd_max_recoverable_clients);
+
+               if (tgt_is_multimodrpcs_record(tgt, lcd)) {
+                       atomic_inc(&tgt->lut_num_clients);
 
-               if (tgt->lut_lsd.lsd_feature_incompat &
-                   OBD_INCOMPAT_MULTI_RPCS &&
-                   lcd->lcd_generation != 0) {
                        /* compute the highest valid client generation */
                        generation = max(generation, lcd->lcd_generation);
                        /* fill client_generation <-> export hash table */
@@ -1606,8 +1851,8 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
        last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
 
        /* ensure padding in the struct is the correct size */
-       CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
-                sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
+       BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
+                    sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
 
        rc = server_name2index(tgt_name(tgt), &index, NULL);
        if (rc < 0) {
@@ -1670,10 +1915,9 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
                }
 
                if (lsd->lsd_osd_index != index) {
-                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
-                                          "is different with the index %d in"
-                                          "config log, It might be disk"
-                                          "corruption!\n", tgt_name(tgt),
+                       LCONSOLE_ERROR_MSG(0x157,
+                                          "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
+                                          tgt_name(tgt),
                                           lsd->lsd_osd_index, index);
                        RETURN(-EINVAL);
                }
@@ -1702,8 +1946,8 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
                        LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
                                      "remove all clients for interop needs\n",
                                      tgt_name(tgt));
-                       rc = tgt_truncate_last_rcvd(env, tgt,
-                                                   lsd->lsd_client_start);
+                       rc = tgt_truncate_object(env, tgt, tgt->lut_last_rcvd,
+                                                lsd->lsd_client_start);
                        if (rc)
                                RETURN(rc);
                        last_rcvd_size = lsd->lsd_client_start;
@@ -1758,8 +2002,8 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
        tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
        spin_unlock(&tgt->lut_translock);
 
-       tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
-       tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
+       obd2obt(tgt->lut_obd)->obt_mount_count = lsd->lsd_mount_count;
+       obd2obt(tgt->lut_obd)->obt_instance = (__u32)lsd->lsd_mount_count;
 
        /* save it, so mount count and last_transno is current */
        rc = tgt_server_data_update(env, tgt, 0);
@@ -1830,6 +2074,8 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
            !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
                dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
                rc = dt_declare_version_set(env, dto, th);
+               if (!rc && tsi->tsi_dv_update)
+                       rc = dt_declare_data_version_set(env, dto, th);
        }
 
        return rc;
@@ -1856,18 +2102,29 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
 
        echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
 
-       if (tti->tti_has_trans && !echo_client) {
-               if (tti->tti_mult_trans == 0) {
+       if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
+               if (!tsi->tsi_mult_trans) {
                        CDEBUG(D_HA, "More than one transaction %llu\n",
                               tti->tti_transno);
-                       RETURN(0);
+                       /**
+                        * if RPC handler sees unexpected multiple last_rcvd
+                        * updates with transno, then it is better to return
+                        * the latest transaction number to the client.
+                        * In that case replay may fail if part of operation
+                        * was committed and can't be re-applied easily. But
+                        * that is better than report the first transno, in
+                        * which case partially committed operation would be
+                        * considered as finished so never replayed causing
+                        * data loss.
+                        */
                }
-               /* we need another transno to be assigned */
+               /* we need new transno to be assigned */
                tti->tti_transno = 0;
-       } else if (th->th_result == 0) {
-               tti->tti_has_trans = 1;
        }
 
+       if (!th->th_result)
+               tsi->tsi_has_trans++;
+
        if (tsi->tsi_vbr_obj != NULL &&
            !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
                obj = tsi->tsi_vbr_obj;
@@ -1888,8 +2145,7 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
        struct lsd_reply_data   *lrd = &tti->tti_lrd;
        unsigned long            reply_data_size;
        int                      rc;
-       struct lsd_reply_header *lrh = NULL;
-       struct lsd_client_data  *lcd = NULL;
+       struct lsd_reply_header *lrh = &tgt->lut_reply_header;
        struct tg_reply_data    *trd = NULL;
        int                      idx;
        loff_t                   off;
@@ -1903,16 +2159,15 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                GOTO(out, rc);
        reply_data_size = (unsigned long)tti->tti_attr.la_size;
 
-       OBD_ALLOC_PTR(lrh);
-       if (lrh == NULL)
-               GOTO(out, rc = -ENOMEM);
-
        if (reply_data_size == 0) {
                CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
                       tgt_name(tgt));
                lrh->lrh_magic = LRH_MAGIC;
                lrh->lrh_header_size = sizeof(struct lsd_reply_header);
-               lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
+               if (lrh->lrh_magic == LRH_MAGIC_V1)
+                       lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
+               else
+                       lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
                rc = tgt_reply_header_write(env, tgt, lrh);
                if (rc) {
                        CERROR("%s: error writing %s: rc = %d\n",
@@ -1920,17 +2175,50 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                        GOTO(out, rc);
                }
        } else {
+               __u32 recsz = sizeof(*lrd);
+               const char *lrd_ver = "v2";
+
                rc = tgt_reply_header_read(env, tgt, lrh);
                if (rc) {
                        CERROR("%s: error reading %s: rc = %d\n",
                               tgt_name(tgt), REPLY_DATA, rc);
                        GOTO(out, rc);
                }
-               if (lrh->lrh_magic != LRH_MAGIC ||
-                   lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
-                   lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
-                       CERROR("%s: invalid header in %s\n",
-                              tgt_name(tgt), REPLY_DATA);
+
+               switch (lrh->lrh_magic) {
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 5, 53, 0)
+               /* The old reply_data is replaced on the first mount after
+                * an upgrade, so no need to keep this interop code forever.
+                */
+               case LRH_MAGIC_V1:
+                       recsz = sizeof(struct lsd_reply_data_v1);
+                       lrd_ver = "v1";
+
+                       if (lrh->lrh_magic != LRH_MAGIC)
+                               CWARN("%s: %s record size will be %s\n",
+                                     tgt_name(tgt), REPLY_DATA,
+                                     lrh->lrh_magic < LRH_MAGIC ?
+                                     "upgraded" : "downgraded");
+                       fallthrough;
+#endif
+               case LRH_MAGIC_V2:
+                       if (lrh->lrh_header_size != sizeof(*lrh)) {
+                               CERROR("%s: bad %s %s header size: %u != %lu\n",
+                                      tgt_name(tgt), REPLY_DATA, lrd_ver,
+                                      lrh->lrh_header_size, sizeof(*lrh));
+                               GOTO(out, rc = -EINVAL);
+                       }
+                       if (lrh->lrh_reply_size != recsz) {
+                               CERROR("%s: bad %s %s reply size: %u != %u\n",
+                               tgt_name(tgt), REPLY_DATA, lrd_ver,
+                               lrh->lrh_reply_size, recsz);
+                               GOTO(out, rc = -EINVAL);
+                       }
+                       break;
+               default:
+                       CERROR("%s: invalid %s magic: %x != %x/%x\n",
+                              tgt_name(tgt), REPLY_DATA,
+                              lrh->lrh_magic, LRH_MAGIC_V1, LRH_MAGIC_V2);
                        GOTO(out, rc = -EINVAL);
                }
 
@@ -1938,19 +2226,14 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                if (hash == NULL)
                        GOTO(out, rc = -ENODEV);
 
-               OBD_ALLOC_PTR(lcd);
-               if (lcd == NULL)
-                       GOTO(out, rc = -ENOMEM);
-
                OBD_ALLOC_PTR(trd);
                if (trd == NULL)
                        GOTO(out, rc = -ENOMEM);
 
                /* Load reply_data from disk */
-               for (idx = 0, off = sizeof(struct lsd_reply_header);
-                    off < reply_data_size;
-                    idx++, off += sizeof(struct lsd_reply_data)) {
-                       rc = tgt_reply_data_read(env, tgt, lrd, off);
+               for (idx = 0, off = lrh->lrh_header_size;
+                    off < reply_data_size; idx++, off += recsz) {
+                       rc = tgt_reply_data_read(env, tgt, lrd, off, lrh);
                        if (rc) {
                                CERROR("%s: error reading %s: rc = %d\n",
                                       tgt_name(tgt), REPLY_DATA, rc);
@@ -1979,6 +2262,7 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                        trd->trd_pre_versions[3] = 0;
                        trd->trd_index = idx;
                        trd->trd_tag = 0;
+                       fid_zero(&trd->trd_object);
                        list_add(&trd->trd_list, &ted->ted_reply_list);
                        ted->ted_reply_cnt++;
                        if (ted->ted_reply_cnt > ted->ted_reply_max)
@@ -1993,6 +2277,13 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                        /* update export last committed transation */
                        exp->exp_last_committed = max(exp->exp_last_committed,
                                                      lrd->lrd_transno);
+                       /* Update lcd_last_transno as well for check in
+                        * tgt_release_reply_data() or the latest client
+                        * transno can be lost.
+                        */
+                       ted->ted_lcd->lcd_last_transno =
+                               max(ted->ted_lcd->lcd_last_transno,
+                                   exp->exp_last_committed);
 
                        mutex_unlock(&ted->ted_lcd_lock);
                        class_export_put(exp);
@@ -2024,52 +2315,75 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
 out:
        if (hash != NULL)
                cfs_hash_putref(hash);
-       if (lcd != NULL)
-               OBD_FREE_PTR(lcd);
        if (trd != NULL)
                OBD_FREE_PTR(trd);
-       if (lrh != NULL)
-               OBD_FREE_PTR(lrh);
        return rc;
 }
 
-struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
-                                             __u64 xid)
+static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
+                               struct tg_reply_data *trd)
 {
-       struct tg_reply_data    *found = NULL;
-       struct tg_reply_data    *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct lu_target *lut = class_exp2tgt(req->rq_export);
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+       int rc = 0;
+       struct tg_reply_data *reply;
+       bool check_increasing;
+
+       if (tag == 0)
+               return 0;
+
+       check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
+                          !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+       if (!lookup && !check_increasing)
+               return 0;
 
-       mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
-               if (reply->trd_reply.lrd_xid == xid) {
-                       found = reply;
+               if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
+                       rc = 1;
+                       if (trd != NULL)
+                               *trd = *reply;
+                       break;
+               } else if (check_increasing && reply->trd_tag == tag &&
+                          reply->trd_reply.lrd_xid > req->rq_xid) {
+                       rc = -EPROTO;
+                       CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
+                              tgt_name(lut), tag, req->rq_xid, trd,
+                              reply->trd_reply.lrd_xid,
+                              reply->trd_reply.lrd_transno,
+                              reply->trd_reply.lrd_client_gen,
+                              reply->trd_index, rc);
                        break;
                }
        }
-       mutex_unlock(&ted->ted_lcd_lock);
-       return found;
+
+       return rc;
 }
-EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
 
 /* Look for a reply data matching specified request @req
  * A copy is returned in @trd if the pointer is not NULL
  */
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
 {
-       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
-       struct tg_reply_data    *reply;
-       bool                     found = false;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       int found = 0;
+       bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
 
-       reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
-       if (reply != NULL) {
-               found = true;
-               if (trd != NULL)
-                       *trd = *reply;
+       mutex_lock(&ted->ted_lcd_lock);
+       if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
+               /* A check for the last_xid is needed here in case there is
+                * no reply data is left in the list. It may happen if another
+                * RPC on another slot increased the last_xid between our
+                * process_req_last_xid & tgt_lookup_reply calls */
+               found = -EPROTO;
+       } else {
+               found = tgt_check_lookup_req(req, 1, trd);
        }
+       mutex_unlock(&ted->ted_lcd_lock);
 
-       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
-              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
-              found ? 1 : 0);
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
+              req->rq_export->exp_last_xid);
 
        return found;
 }
@@ -2081,37 +2395,19 @@ int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
        struct lu_target        *lut = class_exp2tgt(exp);
        struct tg_reply_data    *trd, *tmp;
 
-       mutex_lock(&ted->ted_lcd_lock);
+
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
                if (trd->trd_reply.lrd_xid > rcvd_xid)
                        continue;
                ted->ted_release_xid++;
                tgt_release_reply_data(lut, ted, trd);
        }
-       mutex_unlock(&ted->ted_lcd_lock);
 
        return 0;
 }
 
-int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+int tgt_handle_tag(struct ptlrpc_request *req)
 {
-       struct tg_export_data   *ted = &exp->exp_target_data;
-       struct lu_target        *lut = class_exp2tgt(exp);
-       struct tg_reply_data    *trd, *tmp;
-
-       if (tag == 0)
-               return 0;
-
-       mutex_lock(&ted->ted_lcd_lock);
-       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
-               if (trd->trd_tag != tag)
-                       continue;
-               ted->ted_release_tag++;
-               tgt_release_reply_data(lut, ted, trd);
-               break;
-       }
-       mutex_unlock(&ted->ted_lcd_lock);
-
-       return 0;
+       return tgt_check_lookup_req(req, 0, NULL);
 }