Whamcloud - gitweb
LU-13408 target: update in-memory per client data
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index 0c9cc87..061f088 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #include "tgt_internal.h"
 
+/** version recovery epoch */
+#define LR_EPOCH_BITS  32
 
 /* Allocate a bitmap for a chunk of reply data slots */
 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
 {
        unsigned long *bm;
 
-       OBD_ALLOC(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * sizeof(long));
+       OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+                       sizeof(long));
        if (bm == NULL)
                return -ENOMEM;
 
@@ -59,7 +58,7 @@ static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
        if (lut->lut_reply_bitmap[chunk] != NULL) {
                /* someone else already allocated the bitmap for this chunk */
                spin_unlock(&lut->lut_client_bitmap_lock);
-               OBD_FREE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+               OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
                         sizeof(long));
                return 0;
        }
@@ -149,12 +148,25 @@ static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
        int chunk;
        int b;
 
+       if (lut->lut_obd->obd_stopping)
+               /*
+                * in case of failover keep the bit set in order to
+                * avoid overwriting slots in reply_data which might
+                * be required by resent rpcs
+                */
+               return 0;
        chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
        b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
 
        LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
        LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
 
+       if (lut->lut_reply_bitmap[chunk] == NULL) {
+               CERROR("%s: slot %d not allocated\n",
+                      tgt_name(lut), idx);
+               return -ENOENT;
+       }
+
        if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
                CERROR("%s: slot %d already clear in bitmap\n",
                       tgt_name(lut), idx);
@@ -211,6 +223,9 @@ static int tgt_reply_header_write(const struct lu_env *env,
                tgt->lut_obd->obd_name, REPLY_DATA,
                lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
        buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
        buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
@@ -308,7 +323,7 @@ static void tgt_free_reply_data(struct lu_target *lut,
 {
        CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
               "client gen %u, slot idx %d\n",
-              tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
               trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
               trd->trd_index);
 
@@ -316,7 +331,8 @@ static void tgt_free_reply_data(struct lu_target *lut,
 
        list_del(&trd->trd_list);
        ted->ted_reply_cnt--;
-       tgt_clear_reply_slot(lut, trd->trd_index);
+       if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
+               tgt_clear_reply_slot(lut, trd->trd_index);
        OBD_FREE_PTR(trd);
 }
 
@@ -331,7 +347,7 @@ static void tgt_release_reply_data(struct lu_target *lut,
 {
        CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
               "client gen %u, slot idx %d\n",
-              tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
               trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
               trd->trd_index);
 
@@ -377,6 +393,11 @@ int tgt_client_alloc(struct obd_export *exp)
        ENTRY;
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
+       spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
+
        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
        if (exp->exp_target_data.ted_lcd == NULL)
                RETURN(-ENOMEM);
@@ -399,6 +420,8 @@ void tgt_client_free(struct obd_export *exp)
 
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       tgt_fmd_cleanup(exp);
+
        /* free reply data */
        mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
@@ -418,9 +441,12 @@ void tgt_client_free(struct obd_export *exp)
        OBD_FREE_PTR(ted->ted_lcd);
        ted->ted_lcd = NULL;
 
-       /* Slot may be not yet assigned */
-       if (ted->ted_lr_idx < 0)
+       /* Target may have been freed (see LU-7430)
+        * Slot may be not yet assigned */
+       if (exp->exp_obd->u.obt.obt_magic != OBT_MAGIC ||
+           ted->ted_lr_idx < 0)
                return;
+
        /* Clear bit when lcd is freed */
        LASSERT(lut && lut->lut_client_bitmap);
        if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
@@ -434,8 +460,22 @@ void tgt_client_free(struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_free);
 
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
-                        struct lsd_client_data *lcd, loff_t *off, int index)
+static inline void tgt_check_lcd(const char *obd_name, int index,
+                                struct lsd_client_data *lcd)
+{
+       size_t uuid_size = sizeof(lcd->lcd_uuid);
+
+       if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
+               lcd->lcd_uuid[uuid_size - 1] = '\0';
+
+               LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
+                              lcd->lcd_uuid, obd_name, index);
+       }
+}
+
+static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
+                               struct lsd_client_data *lcd,
+                               loff_t *off, int index)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -443,16 +483,16 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        tti_buf_lcd(tti);
        rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
        if (rc == 0) {
-               check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
+               tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
                lcd_le_to_cpu(&tti->tti_lcd, lcd);
                lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
                lcd->lcd_last_close_result =
                        ptlrpc_status_ntoh(lcd->lcd_last_close_result);
        }
 
-       CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
-              ", last_xid = "LPU64", last_result = %u, last_data = %u, "
-              "last_close_transno = "LPU64", last_close_xid = "LPU64", "
+       CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
+              ", last_xid = %llu, last_result = %u, last_data = %u, "
+              "last_close_transno = %llu, last_close_xid = %llu, "
               "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
               *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
               lcd->lcd_last_result, lcd->lcd_last_data,
@@ -461,9 +501,10 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        return rc;
 }
 
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct lsd_client_data *lcd, loff_t *off,
-                         struct thandle *th)
+static int tgt_client_data_write(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                struct lsd_client_data *lcd,
+                                loff_t *off, struct thandle *th)
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -478,6 +519,59 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
        return dt_record_write(env, dto, &tti->tti_buf, off, th);
 }
 
+struct tgt_new_client_callback {
+       struct dt_txn_commit_cb  lncc_cb;
+       struct obd_export       *lncc_exp;
+};
+
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+                             struct dt_txn_commit_cb *cb, int err)
+{
+       struct tgt_new_client_callback *ccb;
+
+       ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
+
+       LASSERT(ccb->lncc_exp->exp_obd);
+
+       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+              ccb->lncc_exp->exp_obd->obd_name,
+              ccb->lncc_exp->exp_client_uuid.uuid);
+
+       spin_lock(&ccb->lncc_exp->exp_lock);
+
+       ccb->lncc_exp->exp_need_sync = 0;
+
+       spin_unlock(&ccb->lncc_exp->exp_lock);
+       class_export_cb_put(ccb->lncc_exp);
+
+       OBD_FREE_PTR(ccb);
+}
+
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+       struct tgt_new_client_callback  *ccb;
+       struct dt_txn_commit_cb         *dcb;
+       int                              rc;
+
+       OBD_ALLOC_PTR(ccb);
+       if (ccb == NULL)
+               return -ENOMEM;
+
+       ccb->lncc_exp = class_export_cb_get(exp);
+
+       dcb = &ccb->lncc_cb;
+       dcb->dcb_func = tgt_cb_new_client;
+       INIT_LIST_HEAD(&dcb->dcb_linkage);
+       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+
+       rc = dt_trans_cb_add(th, dcb);
+       if (rc) {
+               class_export_cb_put(exp);
+               OBD_FREE_PTR(ccb);
+       }
+       return rc;
+}
+
 /**
  * Update client data in last_rcvd
  */
@@ -498,6 +592,9 @@ static int tgt_client_data_update(const struct lu_env *env,
                RETURN(-EINVAL);
        }
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -512,6 +609,9 @@ static int tgt_client_data_update(const struct lu_env *env,
        rc = dt_trans_start_local(env, tgt->lut_bottom, th);
        if (rc)
                GOTO(out, rc);
+
+       mutex_lock(&ted->ted_lcd_lock);
+
        /*
         * Until this operations will be committed the sync is needed
         * for this export. This should be done _after_ starting the
@@ -530,17 +630,20 @@ static int tgt_client_data_update(const struct lu_env *env,
 
        tti->tti_off = ted->ted_lr_off;
        rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+
+       mutex_unlock(&ted->ted_lcd_lock);
+
        EXIT;
 out:
        dt_trans_stop(env, tgt->lut_bottom, th);
        CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
               tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
 
        return rc;
 }
 
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
+static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -553,13 +656,13 @@ int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
                lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
 
        CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
               tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
         return rc;
 }
 
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct thandle *th)
+static int tgt_server_data_write(const struct lu_env *env,
+                                struct lu_target *tgt, struct thandle *th)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -575,7 +678,7 @@ int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
        rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
 
        CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
               tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
 
        RETURN(rc);
@@ -594,7 +697,7 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
        ENTRY;
 
        CDEBUG(D_SUPER,
-              "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+              "%s: mount_count is %llu, last_transno is %llu\n",
               tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
               tgt->lut_last_transno);
 
@@ -603,6 +706,9 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
        tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
        spin_unlock(&tgt->lut_translock);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -624,14 +730,14 @@ out:
        dt_trans_stop(env, tgt->lut_bottom, th);
 
        CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
               tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_server_data_update);
 
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
-                          loff_t size)
+static int tgt_truncate_last_rcvd(const struct lu_env *env,
+                                 struct lu_target *tgt, loff_t size)
 {
        struct dt_object *dt = tgt->lut_last_rcvd;
        struct thandle   *th;
@@ -640,6 +746,9 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
 
        ENTRY;
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        attr.la_size = size;
        attr.la_valid = LA_SIZE;
 
@@ -688,7 +797,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        struct lu_env            env;
        struct ptlrpc_request   *req;
        __u32                    start_epoch;
-       struct list_head         client_list;
+       LIST_HEAD(client_list);
        int                      rc;
 
        if (tgt->lut_obd->obd_stopping)
@@ -702,12 +811,11 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        }
 
        spin_lock(&tgt->lut_translock);
-       start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
+       start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
        tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
        tgt->lut_lsd.lsd_start_epoch = start_epoch;
        spin_unlock(&tgt->lut_translock);
 
-       INIT_LIST_HEAD(&client_list);
        /**
         * The recovery is not yet finished and final queue can still be updated
         * with resend requests. Move final list to separate one for processing
@@ -735,7 +843,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
         * - there is no client to recover or the recovery was aborted
         */
        if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
-           (tgt->lut_obd->obd_max_recoverable_clients == 0 ||
+           (atomic_read(&tgt->lut_obd->obd_max_recoverable_clients) == 0 ||
            tgt->lut_obd->obd_abort_recovery))
                tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
 
@@ -745,7 +853,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
 }
 
 /**
- * commit callback, need to update last_commited value
+ * commit callback, need to update last_committed value
  */
 struct tgt_last_committed_callback {
        struct dt_txn_commit_cb  llcc_cb;
@@ -759,12 +867,17 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
 {
        struct tgt_last_committed_callback *ccb;
 
-       ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
+       ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
 
        LASSERT(ccb->llcc_exp);
        LASSERT(ccb->llcc_tgt != NULL);
        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
 
+       /* error hit, don't update last committed to provide chance to
+        * replay data after fail */
+       if (err != 0)
+               goto out;
+
        /* Fast path w/o spinlock, if exp_last_committed was updated
         * with higher transno, no need to take spinlock and check,
         * also no need to update obd_last_committed. */
@@ -777,20 +890,27 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
                spin_unlock(&ccb->llcc_tgt->lut_translock);
+
                ptlrpc_commit_replies(ccb->llcc_exp);
+               tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
        } else {
                spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
+
+       CDEBUG(D_HA, "%s: transno %lld is committed\n",
+              ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
+
 out:
        class_export_cb_put(ccb->llcc_exp);
-       if (ccb->llcc_transno)
-               CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
-                      ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
        OBD_FREE_PTR(ccb);
 }
 
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
-                          struct obd_export *exp, __u64 transno)
+/**
+ * Add commit callback function, it returns a non-zero value to inform
+ * caller to use sync transaction if necessary.
+ */
+static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+                                 struct obd_export *exp, __u64 transno)
 {
        struct tgt_last_committed_callback      *ccb;
        struct dt_txn_commit_cb                 *dcb;
@@ -819,60 +939,29 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
                /* report failure to force synchronous operation */
                return -EPERM;
 
-       return rc;
+       /* if exp_need_sync is set, return non-zero value to force
+        * a sync transaction. */
+       return rc ? rc : exp->exp_need_sync;
 }
 
-struct tgt_new_client_callback {
-       struct dt_txn_commit_cb  lncc_cb;
-       struct obd_export       *lncc_exp;
-};
-
-static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
-                             struct dt_txn_commit_cb *cb, int err)
+static int tgt_is_local_client(const struct lu_env *env,
+                                     struct obd_export *exp)
 {
-       struct tgt_new_client_callback *ccb;
-
-       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
-
-       LASSERT(ccb->lncc_exp->exp_obd);
-
-       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
-              ccb->lncc_exp->exp_obd->obd_name,
-              ccb->lncc_exp->exp_client_uuid.uuid);
-
-       spin_lock(&ccb->lncc_exp->exp_lock);
-
-       ccb->lncc_exp->exp_need_sync = 0;
-
-       spin_unlock(&ccb->lncc_exp->exp_lock);
-       class_export_cb_put(ccb->lncc_exp);
-
-       OBD_FREE_PTR(ccb);
-}
-
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
-{
-       struct tgt_new_client_callback  *ccb;
-       struct dt_txn_commit_cb         *dcb;
-       int                              rc;
-
-       OBD_ALLOC_PTR(ccb);
-       if (ccb == NULL)
-               return -ENOMEM;
-
-       ccb->lncc_exp = class_export_cb_get(exp);
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
 
-       dcb = &ccb->lncc_cb;
-       dcb->dcb_func = tgt_cb_new_client;
-       INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+       if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
+           exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
+               return 0;
+       if (tgt->lut_local_recovery)
+               return 0;
+       if (!req)
+               return 0;
+       if (!LNetIsPeerLocal(req->rq_peer.nid))
+               return 0;
 
-       rc = dt_trans_cb_add(th, dcb);
-       if (rc) {
-               class_export_cb_put(exp);
-               OBD_FREE_PTR(ccb);
-       }
-       return rc;
+       return 1;
 }
 
 /**
@@ -896,6 +985,13 @@ int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
        if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
+       if (tgt_is_local_client(env, exp)) {
+               LCONSOLE_WARN("%s: local client %s w/o recovery\n",
+                             exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
+               exp->exp_no_recovery = 1;
+               RETURN(0);
+       }
+
        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
         * there's no need for extra complication here
         */
@@ -1023,9 +1119,18 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
        if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
                    (char *)tgt->lut_obd->obd_uuid.uuid) ||
-           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
+           exp->exp_no_recovery)
                RETURN(0);
 
+       /* Slot may be not yet assigned, use case is race between Client
+        * reconnect and forced eviction */
+       if (ted->ted_lr_idx < 0) {
+               CWARN("%s: client with UUID '%s' not in bitmap\n",
+                     tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
+               RETURN(0);
+       }
+
        CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
               tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
               ted->ted_lcd->lcd_uuid);
@@ -1042,6 +1147,9 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        if (exp->exp_flags & OBD_OPT_FAILOVER)
                RETURN(0);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
+               RETURN(0);
+
        /* Make sure the server's last_transno is up to date.
         * This should be done before zeroing client slot so last_transno will
         * be in server data or in client data in case of failure */
@@ -1053,10 +1161,8 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
                RETURN(rc);
        }
 
-       mutex_lock(&ted->ted_lcd_lock);
        memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
        rc = tgt_client_data_update(env, exp);
-       mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
               "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
@@ -1066,12 +1172,35 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_del);
 
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
+static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return;
+
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+
+               LASSERT(ergo(tgt_is_increasing_xid_client(exp),
+                            trd->trd_reply.lrd_xid <= xid));
+
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+}
+
+static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
 {
        struct lsd_reply_data   *lrd;
        int     i;
+       int     rc;
 
        lrd = &trd->trd_reply;
        /* update export last transno */
@@ -1080,30 +1209,51 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
        mutex_unlock(&ted->ted_lcd_lock);
 
-       /* find a empty slot */
-       i = tgt_find_free_reply_slot(tgt);
-       if (unlikely(i < 0)) {
-               CERROR("%s: couldn't find a slot for reply data: "
-                      "rc = %d\n", tgt_name(tgt), i);
-               RETURN(i);
-       }
-       trd->trd_index = i;
+       if (tgt != NULL) {
+               /* find a empty slot */
+               i = tgt_find_free_reply_slot(tgt);
+               if (unlikely(i < 0)) {
+                       CERROR("%s: couldn't find a slot for reply data: "
+                              "rc = %d\n", tgt_name(tgt), i);
+                       RETURN(i);
+               }
+               trd->trd_index = i;
 
-       if (update_lrd_file) {
-               loff_t  off;
-               int     rc;
+               if (update_lrd_file) {
+                       loff_t  off;
 
-               /* write reply data to disk */
-               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
-               rc = tgt_reply_data_write(env, tgt, lrd, off, th);
-               if (unlikely(rc != 0)) {
-                       CERROR("%s: can't update %s file: rc = %d\n",
-                              tgt_name(tgt), REPLY_DATA, rc);
-                       RETURN(rc);
+                       /* write reply data to disk */
+                       off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+                       rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+                       if (unlikely(rc != 0)) {
+                               CERROR("%s: can't update %s file: rc = %d\n",
+                                      tgt_name(tgt), REPLY_DATA, rc);
+                               GOTO(free_slot, rc);
+                       }
                }
+       } else {
+               trd->trd_index = TRD_INDEX_MEMORY;
        }
+
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
+       if (req != NULL) {
+               int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
+                             MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
+
+               if (req->rq_obsolete) {
+                       CDEBUG(D_INFO,
+                              "drop reply data update for obsolete req xid=%llu,"
+                              "transno=%llu, tag=%hu\n", req->rq_xid,
+                              lrd->lrd_transno, trd->trd_tag);
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       GOTO(free_slot, rc = -EBADR);
+               }
+
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+                       tgt_clean_by_tag(req->rq_export, req->rq_xid,
+                                        trd->trd_tag);
+       }
        list_add(&trd->trd_list, &ted->ted_reply_list);
        ted->ted_reply_cnt++;
        if (ted->ted_reply_cnt > ted->ted_reply_max)
@@ -1113,10 +1263,76 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
        CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
               "tag %hu, client gen %u, slot idx %d\n",
               trd, lrd->lrd_xid, lrd->lrd_transno,
-              trd->trd_tag, lrd->lrd_client_gen, i);
+              trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
+
        RETURN(0);
+
+free_slot:
+       if (tgt != NULL)
+               tgt_clear_reply_slot(tgt, trd->trd_index);
+       return rc;
 }
-EXPORT_SYMBOL(tgt_add_reply_data);
+
+int tgt_mk_reply_data(const struct lu_env *env,
+                     struct lu_target *tgt,
+                     struct tg_export_data *ted,
+                     struct ptlrpc_request *req,
+                     __u64 opdata,
+                     struct thandle *th,
+                     bool write_update,
+                     __u64 transno)
+{
+       struct tg_reply_data    *trd;
+       struct lsd_reply_data   *lrd;
+       __u64                   *pre_versions = NULL;
+       int                     rc;
+
+       OBD_ALLOC_PTR(trd);
+       if (unlikely(trd == NULL))
+               RETURN(-ENOMEM);
+
+       /* fill reply data information */
+       lrd = &trd->trd_reply;
+       lrd->lrd_transno = transno;
+       if (req != NULL) {
+               lrd->lrd_xid = req->rq_xid;
+               trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+               lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               if (write_update) {
+                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                       lrd->lrd_result = th->th_result;
+               }
+       } else {
+               struct tgt_session_info *tsi;
+
+               LASSERT(env != NULL);
+               tsi = tgt_ses_info(env);
+               LASSERT(tsi->tsi_xid != 0);
+
+               lrd->lrd_xid = tsi->tsi_xid;
+               lrd->lrd_result = tsi->tsi_result;
+               lrd->lrd_client_gen = tsi->tsi_client_gen;
+       }
+
+       lrd->lrd_data = opdata;
+       if (pre_versions) {
+               trd->trd_pre_versions[0] = pre_versions[0];
+               trd->trd_pre_versions[1] = pre_versions[1];
+               trd->trd_pre_versions[2] = pre_versions[2];
+               trd->trd_pre_versions[3] = pre_versions[3];
+       }
+
+       rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+                               th, write_update);
+       if (rc < 0) {
+               OBD_FREE_PTR(trd);
+               if (rc == -EBADR)
+                       rc = 0;
+       }
+       return rc;
+
+}
+EXPORT_SYMBOL(tgt_mk_reply_data);
 
 /*
  * last_rcvd & last_committed update callbacks
@@ -1127,11 +1343,11 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct tgt_session_info *tsi = tgt_ses_info(env);
-       struct obd_export       *exp = tsi->tsi_exp;
-       struct tg_export_data   *ted;
-       __u64                   *transno_p;
-       int                      rc = 0;
-       bool                     lw_client;
+       struct obd_export *exp = tsi->tsi_exp;
+       struct tg_export_data *ted;
+       __u64 *transno_p;
+       bool nolcd = false;
+       int rc = 0;
 
        ENTRY;
 
@@ -1139,11 +1355,15 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        LASSERT(exp != NULL);
        ted = &exp->exp_target_data;
 
-       lw_client = exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT;
-       if (ted->ted_lr_idx < 0 && !lw_client)
-               /* ofd connect may cause transaction before export has
-                * last_rcvd slot */
-               RETURN(0);
+       /* Some clients don't support recovery, and they don't have last_rcvd
+        * client data:
+        * 1. lightweight clients.
+        * 2. local clients on MDS which doesn't enable "localrecov".
+        * 3. OFD connect may cause transaction before export has last_rcvd
+        *    slot.
+        */
+       if (ted->ted_lr_idx < 0)
+               nolcd = true;
 
        if (req != NULL)
                tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
@@ -1154,7 +1374,7 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        spin_lock(&tgt->lut_translock);
        if (th->th_result != 0) {
                if (tti->tti_transno != 0) {
-                       CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
+                       CERROR("%s: replay transno %llu failed: rc = %d\n",
                               tgt_name(tgt), tti->tti_transno, th->th_result);
                }
        } else if (tti->tti_transno == 0) {
@@ -1173,7 +1393,7 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        }
 
        /* filling reply data */
-       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+       CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
               tti->tti_transno, tgt->lut_obd->obd_last_committed);
 
        if (req != NULL) {
@@ -1184,14 +1404,13 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        /* if can't add callback, do sync write */
        th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
 
-       if (lw_client) {
-               /* All operations performed by LW clients are synchronous and
-                * we store the committed transno in the last_rcvd header */
+       if (nolcd) {
+               /* store transno in the last_rcvd header */
                spin_lock(&tgt->lut_translock);
                if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
                        tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
                        spin_unlock(&tgt->lut_translock);
-                       /* Although lightweight (LW) connections have no slot
+                       /* Although current connection doesn't have slot
                         * in the last_rcvd, we still want to maintain
                         * the in-memory lsd_client_data structure in order to
                         * properly handle reply reconstruction. */
@@ -1207,47 +1426,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        /* Target that supports multiple reply data */
        if (tgt_is_multimodrpcs_client(exp)) {
-               struct tg_reply_data    *trd;
-               struct lsd_reply_data   *lrd;
-               __u64                   *pre_versions;
-               bool                    write_update;
-
-               OBD_ALLOC_PTR(trd);
-               if (unlikely(trd == NULL))
-                       RETURN(-ENOMEM);
-
-               /* fill reply data information */
-               lrd = &trd->trd_reply;
-               lrd->lrd_transno = tti->tti_transno;
-               if (req != NULL) {
-                       lrd->lrd_xid = req->rq_xid;
-                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
-                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
-                       lrd->lrd_result = th->th_result;
-                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
-                       write_update = true;
-               } else {
-                       LASSERT(tsi->tsi_xid != 0);
-                       lrd->lrd_xid = tsi->tsi_xid;
-                       lrd->lrd_result = tsi->tsi_result;
-                       lrd->lrd_client_gen = tsi->tsi_client_gen;
-                       trd->trd_tag = 0;
-                       pre_versions = NULL;
-                       write_update = false;
-               }
-
-               lrd->lrd_data = opdata;
-               if (pre_versions) {
-                       trd->trd_pre_versions[0] = pre_versions[0];
-                       trd->trd_pre_versions[1] = pre_versions[1];
-                       trd->trd_pre_versions[2] = pre_versions[2];
-                       trd->trd_pre_versions[3] = pre_versions[3];
-               }
-
-               rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
-               if (rc < 0)
-                       OBD_FREE_PTR(trd);
-               return rc;
+               return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
+                                        !!(req != NULL), tti->tti_transno);
        }
 
        /* Enough for update replay, let's return */
@@ -1286,7 +1466,7 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                if (*transno_p > tti->tti_transno) {
                        if (!tgt->lut_no_reconstruct) {
                                CERROR("%s: trying to overwrite bigger transno:"
-                                      "on-disk: "LPU64", new: "LPU64" replay: "
+                                      "on-disk: %llu, new: %llu replay: "
                                       "%d. See LU-617.\n", tgt_name(tgt),
                                       *transno_p, tti->tti_transno,
                                       req_is_replay(req));
@@ -1303,9 +1483,13 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                }
        }
 
-       if (!lw_client) {
+       if (!nolcd) {
                tti->tti_off = ted->ted_lr_off;
-               rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+               if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
+                       rc = -EIO;
+               else
+                       rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
+                                                  &tti->tti_off, th);
                if (rc < 0) {
                        mutex_unlock(&ted->ted_lcd_lock);
                        RETURN(rc);
@@ -1376,8 +1560,11 @@ static int tgt_clients_data_init(const struct lu_env *env,
 
        ENTRY;
 
-       CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
-                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
+       BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
+                    sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
 
        OBD_ALLOC_PTR(lcd);
        if (lcd == NULL)
@@ -1415,8 +1602,8 @@ static int tgt_clients_data_init(const struct lu_env *env,
                /* These exports are cleaned up by disconnect, so they
                 * need to be set up like real exports as connect does.
                 */
-               CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                      " srv lr: "LPU64" lx: "LPU64" gen %u\n", lcd->lcd_uuid,
+               CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
+                      " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
                       cl_idx, last_transno, lsd->lsd_last_transno,
                       lcd_last_xid(lcd), lcd->lcd_generation);
 
@@ -1442,7 +1629,7 @@ static int tgt_clients_data_init(const struct lu_env *env,
                exp->exp_connecting = 0;
                exp->exp_in_recovery = 0;
                spin_unlock(&exp->exp_lock);
-               obd->obd_max_recoverable_clients++;
+               atomic_inc(&obd->obd_max_recoverable_clients);
 
                if (tgt->lut_lsd.lsd_feature_incompat &
                    OBD_INCOMPAT_MULTI_RPCS &&
@@ -1470,7 +1657,7 @@ static int tgt_clients_data_init(const struct lu_env *env,
                }
 
                /* Need to check last_rcvd even for duplicated exports. */
-               CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+               CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
                       cl_idx, last_transno);
 
                spin_lock(&tgt->lut_translock);
@@ -1531,8 +1718,8 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
        last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
 
        /* ensure padding in the struct is the correct size */
-       CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
-                sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
+       BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
+                    sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
 
        rc = server_name2index(tgt_name(tgt), &index, NULL);
        if (rc < 0) {
@@ -1575,19 +1762,29 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
                        RETURN(rc);
                }
                if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
-                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
-                                          "using the wrong disk %s. Were the"
-                                          " /dev/ assignments rearranged?\n",
-                                          tgt->lut_obd->obd_uuid.uuid,
-                                          lsd->lsd_uuid);
-                       RETURN(-EINVAL);
+                       if (tgt->lut_bottom->dd_rdonly) {
+                               /* Such difference may be caused by mounting
+                                * up snapshot with new fsname under rd_only
+                                * mode. But even if it was NOT, it will not
+                                * damage the system because of "rd_only". */
+                               memcpy(lsd->lsd_uuid,
+                                      tgt->lut_obd->obd_uuid.uuid,
+                                      sizeof(lsd->lsd_uuid));
+                       } else {
+                               LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+                                                  "OBD %s using the wrong "
+                                                  "disk %s. Were the /dev/ "
+                                                  "assignments rearranged?\n",
+                                                  tgt->lut_obd->obd_uuid.uuid,
+                                                  lsd->lsd_uuid);
+                               RETURN(-EINVAL);
+                       }
                }
 
                if (lsd->lsd_osd_index != index) {
-                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
-                                          "is different with the index %d in"
-                                          "config log, It might be disk"
-                                          "corruption!\n", tgt_name(tgt),
+                       LCONSOLE_ERROR_MSG(0x157,
+                                          "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
+                                          tgt_name(tgt),
                                           lsd->lsd_osd_index, index);
                        RETURN(-EINVAL);
                }
@@ -1633,9 +1830,9 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
        lsd->lsd_mount_count++;
 
        CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
-       CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+       CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
               tgt_name(tgt), tgt->lut_last_transno);
-       CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+       CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
               tgt_name(tgt), lsd->lsd_mount_count);
        CDEBUG(D_INODE, "%s: server data size: %u\n",
               tgt_name(tgt), lsd->lsd_server_size);
@@ -1697,6 +1894,14 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        struct dt_object        *dto;
        int                      rc;
 
+       /* For readonly case, the caller should have got failure
+        * when start the transaction. If the logic comes here,
+        * there must be something wrong. */
+       if (unlikely(tgt->lut_bottom->dd_rdonly)) {
+               dump_stack();
+               LBUG();
+       }
+
        /* if there is no session, then this transaction is not result of
         * request processing but some local operation */
        if (env->le_ses == NULL)
@@ -1708,26 +1913,24 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        if (tsi->tsi_exp == NULL)
                return 0;
 
-       dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
-       tti_buf_lcd(tti);
-
-       rc = dt_declare_record_write(env, dto, &tti->tti_buf,
-                                    tsi->tsi_exp->exp_target_data.ted_lr_off,
-                                    th);
-       if (rc)
-               return rc;
-
-       tti_buf_lsd(tti);
-       rc = dt_declare_record_write(env, dto, &tti->tti_buf, 0, th);
-       if (rc)
-               return rc;
-
        if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
-               tti->tti_off = atomic_read(&tgt->lut_num_clients) * 8
-                               * sizeof(struct lsd_reply_data);
+               /*
+                * Use maximum possible file offset for declaration to ensure
+                * ZFS will reserve enough credits for a write anywhere in this
+                * file, since we don't know where in the file the write will be
+                * because a replay slot has not been assigned.  This should be
+                * replaced by dmu_tx_hold_append() when available.
+                */
                tti->tti_buf.lb_buf = NULL;
                tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
                dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+               rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
+               if (rc)
+                       return rc;
+       } else {
+               dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+               tti_buf_lcd(tti);
+               tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
                rc = dt_declare_record_write(env, dto, &tti->tti_buf,
                                             tti->tti_off, th);
                if (rc)
@@ -1766,7 +1969,7 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
 
        if (tti->tti_has_trans && !echo_client) {
                if (tti->tti_mult_trans == 0) {
-                       CDEBUG(D_HA, "More than one transaction "LPU64"\n",
+                       CDEBUG(D_HA, "More than one transaction %llu\n",
                               tti->tti_transno);
                        RETURN(0);
                }
@@ -1797,7 +2000,6 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
        unsigned long            reply_data_size;
        int                      rc;
        struct lsd_reply_header *lrh = NULL;
-       struct lsd_client_data  *lcd = NULL;
        struct tg_reply_data    *trd = NULL;
        int                      idx;
        loff_t                   off;
@@ -1846,10 +2048,6 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                if (hash == NULL)
                        GOTO(out, rc = -ENODEV);
 
-               OBD_ALLOC_PTR(lcd);
-               if (lcd == NULL)
-                       GOTO(out, rc = -ENOMEM);
-
                OBD_ALLOC_PTR(trd);
                if (trd == NULL)
                        GOTO(out, rc = -ENOMEM);
@@ -1875,7 +2073,11 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
 
                        /* create in-memory reply_data and link it to
                         * target export's reply list */
-                       tgt_set_reply_slot(tgt, idx);
+                       rc = tgt_set_reply_slot(tgt, idx);
+                       if (rc != 0) {
+                               mutex_unlock(&ted->ted_lcd_lock);
+                               GOTO(out, rc);
+                       }
                        trd->trd_reply = *lrd;
                        trd->trd_pre_versions[0] = 0;
                        trd->trd_pre_versions[1] = 0;
@@ -1897,6 +2099,13 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
                        /* update export last committed transation */
                        exp->exp_last_committed = max(exp->exp_last_committed,
                                                      lrd->lrd_transno);
+                       /* Update lcd_last_transno as well for check in
+                        * tgt_release_reply_data() or the latest client
+                        * transno can be lost.
+                        */
+                       ted->ted_lcd->lcd_last_transno =
+                               max(ted->ted_lcd->lcd_last_transno,
+                                   exp->exp_last_committed);
 
                        mutex_unlock(&ted->ted_lcd_lock);
                        class_export_put(exp);
@@ -1928,8 +2137,6 @@ int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
 out:
        if (hash != NULL)
                cfs_hash_putref(hash);
-       if (lcd != NULL)
-               OBD_FREE_PTR(lcd);
        if (trd != NULL)
                OBD_FREE_PTR(trd);
        if (lrh != NULL)
@@ -1937,43 +2144,70 @@ out:
        return rc;
 }
 
-struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
-                                             __u64 xid)
+static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
+                               struct tg_reply_data *trd)
 {
-       struct tg_reply_data    *found = NULL;
-       struct tg_reply_data    *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct lu_target *lut = class_exp2tgt(req->rq_export);
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+       int rc = 0;
+       struct tg_reply_data *reply;
+       bool check_increasing;
+
+       if (tag == 0)
+               return 0;
+
+       check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
+                          !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+       if (!lookup && !check_increasing)
+               return 0;
 
-       mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
-               if (reply->trd_reply.lrd_xid == xid) {
-                       found = reply;
+               if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
+                       rc = 1;
+                       if (trd != NULL)
+                               *trd = *reply;
+                       break;
+               } else if (check_increasing && reply->trd_tag == tag &&
+                          reply->trd_reply.lrd_xid > req->rq_xid) {
+                       rc = -EPROTO;
+                       CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
+                              tgt_name(lut), tag, req->rq_xid, trd,
+                              reply->trd_reply.lrd_xid,
+                              reply->trd_reply.lrd_transno,
+                              reply->trd_reply.lrd_client_gen,
+                              reply->trd_index, rc);
                        break;
                }
        }
-       mutex_unlock(&ted->ted_lcd_lock);
-       return found;
+
+       return rc;
 }
-EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
 
 /* Look for a reply data matching specified request @req
  * A copy is returned in @trd if the pointer is not NULL
  */
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
 {
-       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
-       struct tg_reply_data    *reply;
-       bool                     found = false;
-
-       reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
-       if (reply != NULL) {
-               found = true;
-               if (trd != NULL)
-                       *trd = *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       int found = 0;
+       bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
+               /* A check for the last_xid is needed here in case there is
+                * no reply data is left in the list. It may happen if another
+                * RPC on another slot increased the last_xid between our
+                * process_req_last_xid & tgt_lookup_reply calls */
+               found = -EPROTO;
+       } else {
+               found = tgt_check_lookup_req(req, 1, trd);
        }
+       mutex_unlock(&ted->ted_lcd_lock);
 
-       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
-              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
-              found ? 1 : 0);
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
+              req->rq_export->exp_last_xid);
 
        return found;
 }
@@ -1985,37 +2219,19 @@ int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
        struct lu_target        *lut = class_exp2tgt(exp);
        struct tg_reply_data    *trd, *tmp;
 
-       mutex_lock(&ted->ted_lcd_lock);
+
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
                if (trd->trd_reply.lrd_xid > rcvd_xid)
                        continue;
                ted->ted_release_xid++;
                tgt_release_reply_data(lut, ted, trd);
        }
-       mutex_unlock(&ted->ted_lcd_lock);
 
        return 0;
 }
 
-int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+int tgt_handle_tag(struct ptlrpc_request *req)
 {
-       struct tg_export_data   *ted = &exp->exp_target_data;
-       struct lu_target        *lut = class_exp2tgt(exp);
-       struct tg_reply_data    *trd, *tmp;
-
-       if (tag == 0)
-               return 0;
-
-       mutex_lock(&ted->ted_lcd_lock);
-       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
-               if (trd->trd_tag != tag)
-                       continue;
-               ted->ted_release_tag++;
-               tgt_release_reply_data(lut, ted, trd);
-               break;
-       }
-       mutex_unlock(&ted->ted_lcd_lock);
-
-       return 0;
+       return tgt_check_lookup_req(req, 0, NULL);
 }