Whamcloud - gitweb
LU-7593 target: umount vs tgt_last_rcvd_update deadlock
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index 53826ca..55e5995 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #include "tgt_internal.h"
 
+
+/* Allocate a bitmap for a chunk of reply data slots */
+static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
+{
+       unsigned long *bm;
+
+       OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+                       sizeof(long));
+       if (bm == NULL)
+               return -ENOMEM;
+
+       spin_lock(&lut->lut_client_bitmap_lock);
+
+       if (lut->lut_reply_bitmap[chunk] != NULL) {
+               /* someone else already allocated the bitmap for this chunk */
+               spin_unlock(&lut->lut_client_bitmap_lock);
+               OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+                        sizeof(long));
+               return 0;
+       }
+
+       lut->lut_reply_bitmap[chunk] = bm;
+
+       spin_unlock(&lut->lut_client_bitmap_lock);
+
+       return 0;
+}
+
+/* Look for an available reply data slot in the bitmap
+ * of the target @lut
+ * Allocate bitmap chunk when first used
+ * XXX algo could be improved if this routine limits performance
+ */
+static int tgt_find_free_reply_slot(struct lu_target *lut)
+{
+       unsigned long *bmp;
+       int chunk = 0;
+       int rc;
+       int b;
+
+       for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
+               /* allocate the bitmap chunk if necessary */
+               if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+                       rc = tgt_bitmap_chunk_alloc(lut, chunk);
+                       if (rc != 0)
+                               return rc;
+               }
+               bmp = lut->lut_reply_bitmap[chunk];
+
+               /* look for an available slot in this chunk */
+               do {
+                       b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
+                       if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
+                               break;
+
+                       /* found one */
+                       if (test_and_set_bit(b, bmp) == 0)
+                               return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
+               } while (true);
+       }
+
+       return -ENOSPC;
+}
+
+/* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
+ * of the target @lut
+ * Allocate the bitmap chunk if necessary
+ */
+static int tgt_set_reply_slot(struct lu_target *lut, int idx)
+{
+       int chunk;
+       int b;
+       int rc;
+
+       chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+       b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+       LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+       LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+       /* allocate the bitmap chunk if necessary */
+       if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+               rc = tgt_bitmap_chunk_alloc(lut, chunk);
+               if (rc != 0)
+                       return rc;
+       }
+
+       /* mark the slot 'used' in this chunk */
+       if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
+               CERROR("%s: slot %d already set in bitmap\n",
+                      tgt_name(lut), idx);
+               return -EALREADY;
+       }
+
+       return 0;
+}
+
+
+/* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
+ * of the target @lut
+ */
+static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
+{
+       int chunk;
+       int b;
+
+       chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+       b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+       LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+       LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+       if (lut->lut_reply_bitmap[chunk] == NULL) {
+               CERROR("%s: slot %d not allocated\n",
+                      tgt_name(lut), idx);
+               return -ENOENT;
+       }
+
+       if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
+               CERROR("%s: slot %d already clear in bitmap\n",
+                      tgt_name(lut), idx);
+               return -EALREADY;
+       }
+
+       return 0;
+}
+
+
+/* Read header of reply_data file of target @tgt into structure @lrh */
+static int tgt_reply_header_read(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                struct lsd_reply_header *lrh)
+{
+       int                      rc;
+       struct lsd_reply_header  buf;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+
+       tti->tti_off = 0;
+       tti->tti_buf.lb_buf = &buf;
+       tti->tti_buf.lb_len = sizeof(buf);
+
+       rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+                           &tti->tti_off);
+       if (rc != 0)
+               return rc;
+
+       lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
+       lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
+       lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
+
+       CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
+              "header_size=%d reply_size=%d\n",
+               tgt->lut_obd->obd_name, REPLY_DATA,
+               lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+       return 0;
+}
+
+/* Write header into replay_data file of target @tgt from structure @lrh */
+static int tgt_reply_header_write(const struct lu_env *env,
+                                 struct lu_target *tgt,
+                                 struct lsd_reply_header *lrh)
+{
+       int                      rc;
+       struct lsd_reply_header  buf;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct thandle          *th;
+       struct dt_object        *dto;
+
+       CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
+              "header_size=%d reply_size=%d\n",
+               tgt->lut_obd->obd_name, REPLY_DATA,
+               lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+       buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
+       buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
+       buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
+
+       th = dt_trans_create(env, tgt->lut_bottom);
+       if (IS_ERR(th))
+               return PTR_ERR(th);
+       th->th_sync = 1;
+
+       tti->tti_off = 0;
+       tti->tti_buf.lb_buf = &buf;
+       tti->tti_buf.lb_len = sizeof(buf);
+
+       rc = dt_declare_record_write(env, tgt->lut_reply_data,
+                                    &tti->tti_buf, tti->tti_off, th);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_trans_start(env, tgt->lut_bottom, th);
+       if (rc)
+               GOTO(out, rc);
+
+       dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+       rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+out:
+       dt_trans_stop(env, tgt->lut_bottom, th);
+       return rc;
+}
+
+/* Write the reply data @lrd into reply_data file of target @tgt
+ * at offset @off
+ */
+static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
+                               struct lsd_reply_data *lrd, loff_t off,
+                               struct thandle *th)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *dto;
+       struct lsd_reply_data   *buf = &tti->tti_lrd;
+
+       lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
+
+       buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
+       buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
+       buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
+       buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
+       buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
+
+       lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
+
+       tti->tti_off = off;
+       tti->tti_buf.lb_buf = buf;
+       tti->tti_buf.lb_len = sizeof(*buf);
+
+       dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+       return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+}
+
+/* Read the reply data from reply_data file of target @tgt at offset @off
+ * into structure @lrd
+ */
+static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
+                              struct lsd_reply_data *lrd, loff_t off)
+{
+       int                      rc;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct lsd_reply_data   *buf = &tti->tti_lrd;
+
+       tti->tti_off = off;
+       tti->tti_buf.lb_buf = buf;
+       tti->tti_buf.lb_len = sizeof(*buf);
+
+       rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+                           &tti->tti_off);
+       if (rc != 0)
+               return rc;
+
+       lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
+       lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
+       lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
+       lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
+       lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
+
+       return 0;
+}
+
+
+/* Free the in-memory reply data structure @trd and release
+ * the corresponding slot in the reply_data file of target @lut
+ * Called with ted_lcd_lock held
+ */
+static void tgt_free_reply_data(struct lu_target *lut,
+                               struct tg_export_data *ted,
+                               struct tg_reply_data *trd)
+{
+       CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
+              "client gen %u, slot idx %d\n",
+              lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+              trd->trd_index);
+
+       LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+       list_del(&trd->trd_list);
+       ted->ted_reply_cnt--;
+       if (lut != NULL)
+               tgt_clear_reply_slot(lut, trd->trd_index);
+       OBD_FREE_PTR(trd);
+}
+
+/* Release the reply data @trd from target @lut
+ * The reply data with the highest transno for this export
+ * is retained to ensure correctness of target recovery
+ * Called with ted_lcd_lock held
+ */
+static void tgt_release_reply_data(struct lu_target *lut,
+                                  struct tg_export_data *ted,
+                                  struct tg_reply_data *trd)
+{
+       CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
+              "client gen %u, slot idx %d\n",
+              lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+              trd->trd_index);
+
+       LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+       /* Do not free the reply data corresponding to the
+        * highest transno of this export.
+        * This ensures on-disk reply data is kept and
+        * last committed transno can be restored from disk in case
+        * of target recovery
+        */
+       if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
+               /* free previous retained reply */
+               if (ted->ted_reply_last != NULL)
+                       tgt_free_reply_data(lut, ted, ted->ted_reply_last);
+               /* retain the reply */
+               list_del_init(&trd->trd_list);
+               ted->ted_reply_last = trd;
+       } else {
+               tgt_free_reply_data(lut, ted, trd);
+       }
+}
+
 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
 {
        tti->tti_buf.lb_buf = &tti->tti_lsd;
@@ -66,11 +385,16 @@ int tgt_client_alloc(struct obd_export *exp)
        ENTRY;
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
+
        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
        if (exp->exp_target_data.ted_lcd == NULL)
                RETURN(-ENOMEM);
        /* Mark that slot is not yet valid, 0 doesn't work here */
        exp->exp_target_data.ted_lr_idx = -1;
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
+       mutex_init(&exp->exp_target_data.ted_lcd_lock);
        RETURN(0);
 }
 EXPORT_SYMBOL(tgt_client_alloc);
@@ -82,22 +406,45 @@ void tgt_client_free(struct obd_export *exp)
 {
        struct tg_export_data   *ted = &exp->exp_target_data;
        struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
 
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       /* free reply data */
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               tgt_release_reply_data(lut, ted, trd);
+       }
+       if (ted->ted_reply_last != NULL) {
+               tgt_free_reply_data(lut, ted, ted->ted_reply_last);
+               ted->ted_reply_last = NULL;
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       if (!hlist_unhashed(&exp->exp_gen_hash))
+               cfs_hash_del(exp->exp_obd->obd_gen_hash,
+                            &ted->ted_lcd->lcd_generation,
+                            &exp->exp_gen_hash);
+
        OBD_FREE_PTR(ted->ted_lcd);
        ted->ted_lcd = NULL;
 
-       /* Slot may be not yet assigned */
-       if (ted->ted_lr_idx < 0)
+       /* Target may have been freed (see LU-7430)
+        * Slot may be not yet assigned */
+       if (exp->exp_obd->u.obt.obt_magic != OBT_MAGIC ||
+           ted->ted_lr_idx < 0)
                return;
+
        /* Clear bit when lcd is freed */
-       LASSERT(lut->lut_client_bitmap);
+       LASSERT(lut && lut->lut_client_bitmap);
        if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
                CERROR("%s: client %u bit already clear in bitmap\n",
                       exp->exp_obd->obd_name, ted->ted_lr_idx);
                LBUG();
        }
+
+       if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping)
+               atomic_dec(&lut->lut_num_clients);
 }
 EXPORT_SYMBOL(tgt_client_free);
 
@@ -127,13 +474,13 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
               lcd->lcd_last_close_result, rc);
        return rc;
 }
-EXPORT_SYMBOL(tgt_client_data_read);
 
 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
                          struct lsd_client_data *lcd, loff_t *off,
                          struct thandle *th)
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
+       struct dt_object        *dto;
 
        lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
        lcd->lcd_last_close_result =
@@ -141,14 +488,15 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
        lcd_cpu_to_le(lcd, &tti->tti_lcd);
        tti_buf_lcd(tti);
 
-       return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
+       dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+       return dt_record_write(env, dto, &tti->tti_buf, off, th);
 }
-EXPORT_SYMBOL(tgt_client_data_write);
 
 /**
  * Update client data in last_rcvd
  */
-int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
+static int tgt_client_data_update(const struct lu_env *env,
+                                 struct obd_export *exp)
 {
        struct tg_export_data   *ted = &exp->exp_target_data;
        struct lu_target        *tgt = class_exp2tgt(exp);
@@ -158,11 +506,18 @@ int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
 
        ENTRY;
 
+       if (unlikely(tgt == NULL)) {
+               CDEBUG(D_ERROR, "%s: No target for connected export\n",
+                         class_exp2obd(exp)->obd_name);
+               RETURN(-EINVAL);
+       }
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
        tti_buf_lcd(tti);
+       mutex_lock(&ted->ted_lcd_lock);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     &tti->tti_buf,
                                     ted->ted_lr_off, th);
@@ -192,6 +547,7 @@ int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
        rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
        EXIT;
 out:
+       mutex_unlock(&ted->ted_lcd_lock);
        dt_trans_stop(env, tgt->lut_bottom, th);
        CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
               "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
@@ -217,12 +573,12 @@ int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
               tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
         return rc;
 }
-EXPORT_SYMBOL(tgt_server_data_read);
 
 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
                          struct thandle *th)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *dto;
        int                      rc;
 
        ENTRY;
@@ -231,8 +587,8 @@ int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
        tti_buf_lsd(tti);
        lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
 
-       rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
-                            &tti->tti_off, th);
+       dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+       rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
 
        CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
               "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
@@ -240,7 +596,6 @@ int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(tgt_server_data_write);
 
 /**
  * Update server data in last_rcvd
@@ -317,23 +672,23 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
        if (rc)
                GOTO(cleanup, rc);
 
-       rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
+       rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
        if (rc == 0)
-               rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
+               rc = dt_attr_set(env, dt, &attr, th);
 
 cleanup:
        dt_trans_stop(env, tgt->lut_bottom, th);
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(tgt_truncate_last_rcvd);
 
-void tgt_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
+static void tgt_client_epoch_update(const struct lu_env *env,
+                                   struct obd_export *exp)
 {
        struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
        struct lu_target        *tgt = class_exp2tgt(exp);
 
-       LASSERT(tgt->lut_bottom);
+       LASSERT(tgt && tgt->lut_bottom);
        /** VBR: set client last_epoch to current epoch */
        if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
                return;
@@ -349,7 +704,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        struct lu_env            env;
        struct ptlrpc_request   *req;
        __u32                    start_epoch;
-       cfs_list_t               client_list;
+       struct list_head         client_list;
        int                      rc;
 
        if (tgt->lut_obd->obd_stopping)
@@ -368,36 +723,45 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        tgt->lut_lsd.lsd_start_epoch = start_epoch;
        spin_unlock(&tgt->lut_translock);
 
-       CFS_INIT_LIST_HEAD(&client_list);
+       INIT_LIST_HEAD(&client_list);
        /**
         * The recovery is not yet finished and final queue can still be updated
         * with resend requests. Move final list to separate one for processing
         */
        spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
-       cfs_list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
+       list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
        spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
 
        /**
         * go through list of exports participated in recovery and
         * set new epoch for them
         */
-       cfs_list_for_each_entry(req, &client_list, rq_list) {
+       list_for_each_entry(req, &client_list, rq_list) {
                LASSERT(!req->rq_export->exp_delayed);
                if (!req->rq_export->exp_vbr_failed)
                        tgt_client_epoch_update(&env, req->rq_export);
        }
        /** return list back at once */
        spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
-       cfs_list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
+       list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
        spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+
+       /** Clear MULTI RPCS incompatibility flag if
+        * - target is MDT and
+        * - there is no client to recover or the recovery was aborted
+        */
+       if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
+           (tgt->lut_obd->obd_max_recoverable_clients == 0 ||
+           tgt->lut_obd->obd_abort_recovery))
+               tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
+
        /** update server epoch */
        tgt_server_data_update(&env, tgt, 1);
        lu_env_fini(&env);
 }
-EXPORT_SYMBOL(tgt_boot_epoch_update);
 
 /**
- * commit callback, need to update last_commited value
+ * commit callback, need to update last_committed value
  */
 struct tgt_last_committed_callback {
        struct dt_txn_commit_cb  llcc_cb;
@@ -406,28 +770,36 @@ struct tgt_last_committed_callback {
        __u64                    llcc_transno;
 };
 
-void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
-                          struct dt_txn_commit_cb *cb, int err)
+static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
+                                 struct dt_txn_commit_cb *cb, int err)
 {
        struct tgt_last_committed_callback *ccb;
 
        ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
 
+       LASSERT(ccb->llcc_exp);
        LASSERT(ccb->llcc_tgt != NULL);
        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
 
+       /* Fast path w/o spinlock, if exp_last_committed was updated
+        * with higher transno, no need to take spinlock and check,
+        * also no need to update obd_last_committed. */
+       if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
+               goto out;
        spin_lock(&ccb->llcc_tgt->lut_translock);
        if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
                ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
 
-       LASSERT(ccb->llcc_exp);
        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
                spin_unlock(&ccb->llcc_tgt->lut_translock);
+
                ptlrpc_commit_replies(ccb->llcc_exp);
+               tgt_cancel_slc_locks(ccb->llcc_transno);
        } else {
                spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
+out:
        class_export_cb_put(ccb->llcc_exp);
        if (ccb->llcc_transno)
                CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
@@ -452,9 +824,8 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
 
        dcb = &ccb->llcc_cb;
        dcb->dcb_func = tgt_cb_last_committed;
-       CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strncpy(dcb->dcb_name, "tgt_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
-       dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
+       INIT_LIST_HEAD(&dcb->dcb_linkage);
+       strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
 
        rc = dt_trans_cb_add(th, dcb);
        if (rc) {
@@ -468,15 +839,14 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
 
        return rc;
 }
-EXPORT_SYMBOL(tgt_last_commit_cb_add);
 
 struct tgt_new_client_callback {
        struct dt_txn_commit_cb  lncc_cb;
        struct obd_export       *lncc_exp;
 };
 
-void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
-                      struct dt_txn_commit_cb *cb, int err)
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+                             struct dt_txn_commit_cb *cb, int err)
 {
        struct tgt_new_client_callback *ccb;
 
@@ -489,11 +859,8 @@ void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
               ccb->lncc_exp->exp_client_uuid.uuid);
 
        spin_lock(&ccb->lncc_exp->exp_lock);
-       /* XXX: Currently, we use per-export based sync/async policy for
-        *      the update via OUT RPC, it is coarse-grained policy, and
-        *      will be changed as per-request based by DNE II patches. */
-       if (!ccb->lncc_exp->exp_keep_sync)
-               ccb->lncc_exp->exp_need_sync = 0;
+
+       ccb->lncc_exp->exp_need_sync = 0;
 
        spin_unlock(&ccb->lncc_exp->exp_lock);
        class_export_cb_put(ccb->lncc_exp);
@@ -515,9 +882,8 @@ int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
 
        dcb = &ccb->lncc_cb;
        dcb->dcb_func = tgt_cb_new_client;
-       CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strncpy(dcb->dcb_name, "tgt_cb_new_client", MAX_COMMIT_CB_STR_LEN);
-       dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
+       INIT_LIST_HEAD(&dcb->dcb_linkage);
+       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
 
        rc = dt_trans_cb_add(th, dcb);
        if (rc) {
@@ -541,12 +907,10 @@ int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
 
        ENTRY;
 
-       LASSERT(tgt->lut_client_bitmap != NULL);
+       LASSERT(tgt && tgt->lut_client_bitmap != NULL);
        if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
                RETURN(0);
 
-       mutex_init(&ted->ted_lcd_lock);
-
        if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
@@ -567,18 +931,40 @@ repeat:
                goto repeat;
        }
 
-       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
-
        ted->ted_lr_idx = idx;
        ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
                          idx * tgt->lut_lsd.lsd_client_size;
 
        LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
 
-       CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
+       if (tgt_is_multimodrpcs_client(exp)) {
+               /* Set MULTI RPCS incompatibility flag to prevent previous
+                * Lustre versions to mount a target with reply_data file */
+               atomic_inc(&tgt->lut_num_clients);
+               if (!(tgt->lut_lsd.lsd_feature_incompat &
+                     OBD_INCOMPAT_MULTI_RPCS)) {
+                       tgt->lut_lsd.lsd_feature_incompat |=
+                                                       OBD_INCOMPAT_MULTI_RPCS;
+                       rc = tgt_server_data_update(env, tgt, 1);
+                       if (rc < 0) {
+                               CERROR("%s: unable to set MULTI RPCS "
+                                      "incompatibility flag\n",
+                                      exp->exp_obd->obd_name);
+                               RETURN(rc);
+                       }
+               }
+
+               /* assign client slot generation */
+               ted->ted_lcd->lcd_generation =
+                               atomic_inc_return(&tgt->lut_client_generation);
+       } else {
+               ted->ted_lcd->lcd_generation = 0;
+       }
+
+       CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
+              "generation %d\n",
               tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
-              ted->ted_lcd->lcd_uuid);
+              ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
                RETURN(-ENOSPC);
@@ -592,10 +978,9 @@ repeat:
 }
 EXPORT_SYMBOL(tgt_client_new);
 
-/* Add client data to the MDS.  We use a bitmap to locate a free space
- * in the last_rcvd file if cl_off is -1 (i.e. a new client).
- * Otherwise, we just have to read the data from the last_rcvd file and
- * we know its offset.
+/* Add an existing client to the MDS in-memory state based on
+ * a client that was previously found in the last_rcvd file and
+ * already has an assigned slot (idx >= 0).
  *
  * It should not be possible to fail adding an existing client - otherwise
  * mdt_init_server_data() callsite needs to be fixed.
@@ -607,7 +992,7 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
 
        ENTRY;
 
-       LASSERT(tgt->lut_client_bitmap != NULL);
+       LASSERT(tgt && tgt->lut_client_bitmap != NULL);
        LASSERTF(idx >= 0, "%d\n", idx);
 
        if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
@@ -619,9 +1004,12 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
                       tgt->lut_obd->obd_name,  idx);
                LBUG();
        }
+       atomic_inc(&tgt->lut_num_clients);
 
-       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
+       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
+              "generation %d\n",
+              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
+              ted->ted_lcd->lcd_generation);
 
        ted->ted_lr_idx = idx;
        ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
@@ -633,7 +1021,6 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
 
        RETURN(0);
 }
-EXPORT_SYMBOL(tgt_client_add);
 
 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 {
@@ -645,6 +1032,12 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 
        LASSERT(ted->ted_lcd);
 
+       if (unlikely(tgt == NULL)) {
+               CDEBUG(D_ERROR, "%s: No target for connected export\n",
+                      class_exp2obd(exp)->obd_name);
+               RETURN(-EINVAL);
+       }
+
        /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
        if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
                    (char *)tgt->lut_obd->obd_uuid.uuid) ||
@@ -678,10 +1071,8 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
                RETURN(rc);
        }
 
-       mutex_lock(&ted->ted_lcd_lock);
        memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
        rc = tgt_client_data_update(env, exp);
-       mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
               "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
@@ -691,30 +1082,90 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_del);
 
+int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
+                      struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct thandle *th, bool update_lrd_file)
+{
+       struct lsd_reply_data   *lrd;
+       int     i;
+
+       lrd = &trd->trd_reply;
+       /* update export last transno */
+       mutex_lock(&ted->ted_lcd_lock);
+       if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
+               ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       /* find a empty slot */
+       i = tgt_find_free_reply_slot(tgt);
+       if (unlikely(i < 0)) {
+               CERROR("%s: couldn't find a slot for reply data: "
+                      "rc = %d\n", tgt_name(tgt), i);
+               RETURN(i);
+       }
+       trd->trd_index = i;
+
+       if (update_lrd_file) {
+               loff_t  off;
+               int     rc;
+
+               /* write reply data to disk */
+               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+               rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+               if (unlikely(rc != 0)) {
+                       CERROR("%s: can't update %s file: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       RETURN(rc);
+               }
+       }
+       /* add reply data to target export's reply list */
+       mutex_lock(&ted->ted_lcd_lock);
+       list_add(&trd->trd_list, &ted->ted_reply_list);
+       ted->ted_reply_cnt++;
+       if (ted->ted_reply_cnt > ted->ted_reply_max)
+               ted->ted_reply_max = ted->ted_reply_cnt;
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
+              "tag %hu, client gen %u, slot idx %d\n",
+              trd, lrd->lrd_xid, lrd->lrd_transno,
+              trd->trd_tag, lrd->lrd_client_gen, i);
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_add_reply_data);
+
 /*
  * last_rcvd & last_committed update callbacks
  */
-int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
-                        struct dt_object *obj, __u64 opdata,
-                        struct thandle *th, struct ptlrpc_request *req)
+static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
+                               struct dt_object *obj, __u64 opdata,
+                               struct thandle *th, struct ptlrpc_request *req)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct obd_export       *exp = tsi->tsi_exp;
        struct tg_export_data   *ted;
        __u64                   *transno_p;
        int                      rc = 0;
-       bool                     lw_client, update = false;
+       bool                     lw_client;
 
        ENTRY;
 
-       ted = &req->rq_export->exp_target_data;
 
-       lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+       LASSERT(exp != NULL);
+       ted = &exp->exp_target_data;
+
+       lw_client = exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT;
        if (ted->ted_lr_idx < 0 && !lw_client)
                /* ofd connect may cause transaction before export has
                 * last_rcvd slot */
                RETURN(0);
 
-       tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+       if (req != NULL)
+               tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+       else
+               /* From update replay, tti_transno should be set already */
+               LASSERT(tti->tti_transno != 0);
 
        spin_lock(&tgt->lut_translock);
        if (th->th_result != 0) {
@@ -732,19 +1183,22 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        spin_unlock(&tgt->lut_translock);
 
        /** VBR: set new versions */
-       if (th->th_result == 0 && obj != NULL)
-               dt_version_set(env, obj, tti->tti_transno, th);
+       if (th->th_result == 0 && obj != NULL) {
+               struct dt_object *dto = dt_object_locate(obj, th->th_dev);
+               dt_version_set(env, dto, tti->tti_transno, th);
+       }
 
        /* filling reply data */
        CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
               tti->tti_transno, tgt->lut_obd->obd_last_committed);
 
-       req->rq_transno = tti->tti_transno;
-       lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
+       if (req != NULL) {
+               req->rq_transno = tti->tti_transno;
+               lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
+       }
 
        /* if can't add callback, do sync write */
-       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
-                                               tti->tti_transno);
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
 
        if (lw_client) {
                /* All operations performed by LW clients are synchronous and
@@ -752,31 +1206,73 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                spin_lock(&tgt->lut_translock);
                if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
                        tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
-                       update = true;
+                       spin_unlock(&tgt->lut_translock);
+                       /* Although lightweight (LW) connections have no slot
+                        * in the last_rcvd, we still want to maintain
+                        * the in-memory lsd_client_data structure in order to
+                        * properly handle reply reconstruction. */
+                       rc = tgt_server_data_write(env, tgt, th);
+               } else {
+                       spin_unlock(&tgt->lut_translock);
                }
-               spin_unlock(&tgt->lut_translock);
-               /* Although lightweight (LW) connections have no slot in
-                * last_rcvd, we still want to maintain the in-memory
-                * lsd_client_data structure in order to properly handle reply
-                * reconstruction. */
        } else if (ted->ted_lr_off == 0) {
                CERROR("%s: client idx %d has offset %lld\n",
                       tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
                RETURN(-EINVAL);
        }
 
-       /* if the export has already been disconnected, we have no last_rcvd
-        * slot, update server data with latest transno then */
-       if (ted->ted_lcd == NULL) {
-               CWARN("commit transaction for disconnected client %s: rc %d\n",
-                     req->rq_export->exp_client_uuid.uuid, rc);
-               GOTO(srv_update, rc = 0);
+       /* Target that supports multiple reply data */
+       if (tgt_is_multimodrpcs_client(exp)) {
+               struct tg_reply_data    *trd;
+               struct lsd_reply_data   *lrd;
+               __u64                   *pre_versions;
+               bool                    write_update;
+
+               OBD_ALLOC_PTR(trd);
+               if (unlikely(trd == NULL))
+                       RETURN(-ENOMEM);
+
+               /* fill reply data information */
+               lrd = &trd->trd_reply;
+               lrd->lrd_transno = tti->tti_transno;
+               if (req != NULL) {
+                       lrd->lrd_xid = req->rq_xid;
+                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                       lrd->lrd_result = th->th_result;
+                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+                       write_update = true;
+               } else {
+                       LASSERT(tsi->tsi_xid != 0);
+                       lrd->lrd_xid = tsi->tsi_xid;
+                       lrd->lrd_result = tsi->tsi_result;
+                       lrd->lrd_client_gen = tsi->tsi_client_gen;
+                       trd->trd_tag = 0;
+                       pre_versions = NULL;
+                       write_update = false;
+               }
+
+               lrd->lrd_data = opdata;
+               if (pre_versions) {
+                       trd->trd_pre_versions[0] = pre_versions[0];
+                       trd->trd_pre_versions[1] = pre_versions[1];
+                       trd->trd_pre_versions[2] = pre_versions[2];
+                       trd->trd_pre_versions[3] = pre_versions[3];
+               }
+
+               rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
+               if (rc < 0)
+                       OBD_FREE_PTR(trd);
+               return rc;
        }
 
+       /* Enough for update replay, let's return */
+       if (req == NULL)
+               RETURN(rc);
+
        mutex_lock(&ted->ted_lcd_lock);
        LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
-       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
-           lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
                transno_p = &ted->ted_lcd->lcd_last_close_transno;
                ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
                ted->ted_lcd->lcd_last_close_result = th->th_result;
@@ -800,21 +1296,27 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        /* Update transno in slot only if non-zero number, i.e. no errors */
        if (likely(tti->tti_transno != 0)) {
-               if (*transno_p > tti->tti_transno &&
-                   !tgt->lut_no_reconstruct) {
-                       CERROR("%s: trying to overwrite bigger transno:"
-                              "on-disk: "LPU64", new: "LPU64" replay: %d. "
-                              "see LU-617.\n", tgt_name(tgt), *transno_p,
-                              tti->tti_transno, req_is_replay(req));
-                       if (req_is_replay(req)) {
-                               spin_lock(&req->rq_export->exp_lock);
-                               req->rq_export->exp_vbr_failed = 1;
-                               spin_unlock(&req->rq_export->exp_lock);
+               /* Don't overwrite bigger transaction number with lower one.
+                * That is not sign of problem in all cases, but in any case
+                * this value should be monotonically increased only. */
+               if (*transno_p > tti->tti_transno) {
+                       if (!tgt->lut_no_reconstruct) {
+                               CERROR("%s: trying to overwrite bigger transno:"
+                                      "on-disk: "LPU64", new: "LPU64" replay: "
+                                      "%d. See LU-617.\n", tgt_name(tgt),
+                                      *transno_p, tti->tti_transno,
+                                      req_is_replay(req));
+                               if (req_is_replay(req)) {
+                                       spin_lock(&req->rq_export->exp_lock);
+                                       req->rq_export->exp_vbr_failed = 1;
+                                       spin_unlock(&req->rq_export->exp_lock);
+                               }
+                               mutex_unlock(&ted->ted_lcd_lock);
+                               RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
                        }
-                       mutex_unlock(&ted->ted_lcd_lock);
-                       RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
+               } else {
+                       *transno_p = tti->tti_transno;
                }
-               *transno_p = tti->tti_transno;
        }
 
        if (!lw_client) {
@@ -826,11 +1328,7 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                }
        }
        mutex_unlock(&ted->ted_lcd_lock);
-       EXIT;
-srv_update:
-       if (update)
-               rc = tgt_server_data_write(env, tgt, th);
-       return rc;
+       RETURN(rc);
 }
 
 /*
@@ -838,9 +1336,11 @@ srv_update:
  * It updates last_rcvd client slot and version of object in
  * simple way but with all locks to simulate all drawbacks
  */
-int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
-                             struct dt_object *obj, struct thandle *th,
-                             struct obd_export *exp)
+static int tgt_last_rcvd_update_echo(const struct lu_env *env,
+                                    struct lu_target *tgt,
+                                    struct dt_object *obj,
+                                    struct thandle *th,
+                                    struct obd_export *exp)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct tg_export_data   *ted = &exp->exp_target_data;
@@ -876,8 +1376,9 @@ int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
        RETURN(rc);
 }
 
-int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
-                         unsigned long last_size)
+static int tgt_clients_data_init(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                unsigned long last_size)
 {
        struct obd_device       *obd = tgt->lut_obd;
        struct lr_server_data   *lsd = &tgt->lut_lsd;
@@ -886,6 +1387,8 @@ int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
        int                      cl_idx;
        int                      rc = 0;
        loff_t                   off = lsd->lsd_client_start;
+       __u32                    generation = 0;
+       struct cfs_hash         *hash = NULL;
 
        ENTRY;
 
@@ -896,6 +1399,10 @@ int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
        if (lcd == NULL)
                RETURN(-ENOMEM);
 
+       hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
+       if (hash == NULL)
+               GOTO(err_out, rc = -ENODEV);
+
        for (cl_idx = 0; off < last_size; cl_idx++) {
                struct obd_export       *exp;
                __u64                    last_transno;
@@ -925,8 +1432,9 @@ int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
                 * need to be set up like real exports as connect does.
                 */
                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                      " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
-                      last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
+                      " srv lr: "LPU64" lx: "LPU64" gen %u\n", lcd->lcd_uuid,
+                      cl_idx, last_transno, lsd->lsd_last_transno,
+                      lcd_last_xid(lcd), lcd->lcd_generation);
 
                exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
                if (IS_ERR(exp)) {
@@ -951,8 +1459,32 @@ int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
                exp->exp_in_recovery = 0;
                spin_unlock(&exp->exp_lock);
                obd->obd_max_recoverable_clients++;
+
+               if (tgt->lut_lsd.lsd_feature_incompat &
+                   OBD_INCOMPAT_MULTI_RPCS &&
+                   lcd->lcd_generation != 0) {
+                       /* compute the highest valid client generation */
+                       generation = max(generation, lcd->lcd_generation);
+                       /* fill client_generation <-> export hash table */
+                       rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
+                                                &exp->exp_gen_hash);
+                       if (rc != 0) {
+                               CERROR("%s: duplicate export for client "
+                                      "generation %u\n",
+                                      tgt_name(tgt), lcd->lcd_generation);
+                               class_export_put(exp);
+                               GOTO(err_out, rc);
+                       }
+               }
+
                class_export_put(exp);
 
+               rc = rev_import_init(exp);
+               if (rc != 0) {
+                       class_unlink_export(exp);
+                       GOTO(err_out, rc);
+               }
+
                /* Need to check last_rcvd even for duplicated exports. */
                CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
                       cl_idx, last_transno);
@@ -963,7 +1495,12 @@ int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
                spin_unlock(&tgt->lut_translock);
        }
 
+       /* record highest valid client generation */
+       atomic_set(&tgt->lut_client_generation, generation);
+
 err_out:
+       if (hash != NULL)
+               cfs_hash_putref(hash);
        OBD_FREE_PTR(lcd);
        RETURN(rc);
 }
@@ -980,16 +1517,17 @@ static struct server_compat_data tgt_scd[] = {
                .rocompat = OBD_ROCOMPAT_LOVOBJID,
                .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
                            OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
-                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
+                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
+                           OBD_INCOMPAT_MULTI_RPCS,
                .rocinit = OBD_ROCOMPAT_LOVOBJID,
                .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
                           OBD_INCOMPAT_MULTI_OI,
        },
        [LDD_F_SV_TYPE_OST] = {
-               .rocompat = 0,
+               .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
                .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
                            OBD_INCOMPAT_FID,
-               .rocinit = 0,
+               .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
                .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
        }
 };
@@ -1002,7 +1540,7 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
        __u32                            index;
        int                              rc, type;
 
-       rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
        if (rc)
                RETURN(rc);
 
@@ -1172,6 +1710,7 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        struct lu_target        *tgt = cookie;
        struct tgt_session_info *tsi;
        struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *dto;
        int                      rc;
 
        /* if there is no session, then this transaction is not result of
@@ -1185,23 +1724,38 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        if (tsi->tsi_exp == NULL)
                return 0;
 
-       tti_buf_lcd(tti);
-       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    &tti->tti_buf,
-                                    tsi->tsi_exp->exp_target_data.ted_lr_off,
-                                    th);
-       if (rc)
-               return rc;
-
-       tti_buf_lsd(tti);
-       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    &tti->tti_buf, 0, th);
-       if (rc)
-               return rc;
+       if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
+               /*
+                * Use maximum possible file offset for declaration to ensure
+                * ZFS will reserve enough credits for a write anywhere in this
+                * file, since we don't know where in the file the write will be
+                * because a replay slot has not been assigned.  This should be
+                * replaced by dmu_tx_hold_append() when available.
+                */
+               tti->tti_off = atomic_read(&tgt->lut_num_clients) * 8 *
+                               sizeof(struct lsd_reply_data);
+               tti->tti_buf.lb_buf = NULL;
+               tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
+               dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+               rc = dt_declare_record_write(env, dto, &tti->tti_buf,
+                                            tti->tti_off, th);
+               if (rc)
+                       return rc;
+       } else {
+               dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+               tti_buf_lcd(tti);
+               tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
+               rc = dt_declare_record_write(env, dto, &tti->tti_buf,
+                                            tti->tti_off, th);
+               if (rc)
+                       return rc;
+       }
 
        if (tsi->tsi_vbr_obj != NULL &&
-           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
-               rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
+           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
+               dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
+               rc = dt_declare_version_set(env, dto, th);
+       }
 
        return rc;
 }
@@ -1225,7 +1779,7 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
        if (tsi->tsi_exp == NULL)
                return 0;
 
-       echo_client = (tgt_ses_req(tsi) == NULL);
+       echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
 
        if (tti->tti_has_trans && !echo_client) {
                if (tti->tti_mult_trans == 0) {
@@ -1252,3 +1806,237 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
                                          tgt_ses_req(tsi));
        return rc;
 }
+
+int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct lsd_reply_data   *lrd = &tti->tti_lrd;
+       unsigned long            reply_data_size;
+       int                      rc;
+       struct lsd_reply_header *lrh = NULL;
+       struct lsd_client_data  *lcd = NULL;
+       struct tg_reply_data    *trd = NULL;
+       int                      idx;
+       loff_t                   off;
+       struct cfs_hash         *hash = NULL;
+       struct obd_export       *exp;
+       struct tg_export_data   *ted;
+       int                      reply_data_recovered = 0;
+
+       rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
+       if (rc)
+               GOTO(out, rc);
+       reply_data_size = (unsigned long)tti->tti_attr.la_size;
+
+       OBD_ALLOC_PTR(lrh);
+       if (lrh == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       if (reply_data_size == 0) {
+               CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
+                      tgt_name(tgt));
+               lrh->lrh_magic = LRH_MAGIC;
+               lrh->lrh_header_size = sizeof(struct lsd_reply_header);
+               lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
+               rc = tgt_reply_header_write(env, tgt, lrh);
+               if (rc) {
+                       CERROR("%s: error writing %s: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       GOTO(out, rc);
+               }
+       } else {
+               rc = tgt_reply_header_read(env, tgt, lrh);
+               if (rc) {
+                       CERROR("%s: error reading %s: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       GOTO(out, rc);
+               }
+               if (lrh->lrh_magic != LRH_MAGIC ||
+                   lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
+                   lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
+                       CERROR("%s: invalid header in %s\n",
+                              tgt_name(tgt), REPLY_DATA);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
+               if (hash == NULL)
+                       GOTO(out, rc = -ENODEV);
+
+               OBD_ALLOC_PTR(lcd);
+               if (lcd == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               OBD_ALLOC_PTR(trd);
+               if (trd == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               /* Load reply_data from disk */
+               for (idx = 0, off = sizeof(struct lsd_reply_header);
+                    off < reply_data_size;
+                    idx++, off += sizeof(struct lsd_reply_data)) {
+                       rc = tgt_reply_data_read(env, tgt, lrd, off);
+                       if (rc) {
+                               CERROR("%s: error reading %s: rc = %d\n",
+                                      tgt_name(tgt), REPLY_DATA, rc);
+                               GOTO(out, rc);
+                       }
+
+                       exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+                       if (exp == NULL) {
+                               /* old reply data from a disconnected client */
+                               continue;
+                       }
+                       ted = &exp->exp_target_data;
+                       mutex_lock(&ted->ted_lcd_lock);
+
+                       /* create in-memory reply_data and link it to
+                        * target export's reply list */
+                       rc = tgt_set_reply_slot(tgt, idx);
+                       if (rc != 0) {
+                               mutex_unlock(&ted->ted_lcd_lock);
+                               GOTO(out, rc);
+                       }
+                       trd->trd_reply = *lrd;
+                       trd->trd_pre_versions[0] = 0;
+                       trd->trd_pre_versions[1] = 0;
+                       trd->trd_pre_versions[2] = 0;
+                       trd->trd_pre_versions[3] = 0;
+                       trd->trd_index = idx;
+                       trd->trd_tag = 0;
+                       list_add(&trd->trd_list, &ted->ted_reply_list);
+                       ted->ted_reply_cnt++;
+                       if (ted->ted_reply_cnt > ted->ted_reply_max)
+                               ted->ted_reply_max = ted->ted_reply_cnt;
+
+                       CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
+                              "transno %llu, client gen %u, slot idx %d\n",
+                              tgt_name(tgt), trd, lrd->lrd_xid,
+                              lrd->lrd_transno, lrd->lrd_client_gen,
+                              trd->trd_index);
+
+                       /* update export last committed transation */
+                       exp->exp_last_committed = max(exp->exp_last_committed,
+                                                     lrd->lrd_transno);
+
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       class_export_put(exp);
+
+                       /* update target last committed transaction */
+                       spin_lock(&tgt->lut_translock);
+                       tgt->lut_last_transno = max(tgt->lut_last_transno,
+                                                   lrd->lrd_transno);
+                       spin_unlock(&tgt->lut_translock);
+
+                       reply_data_recovered++;
+
+                       OBD_ALLOC_PTR(trd);
+                       if (trd == NULL)
+                               GOTO(out, rc = -ENOMEM);
+               }
+               CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
+                      tgt_name(tgt), reply_data_recovered);
+       }
+
+       spin_lock(&tgt->lut_translock);
+       /* obd_last_committed is used for compatibility
+        * with other lustre recovery code */
+       tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       rc = 0;
+
+out:
+       if (hash != NULL)
+               cfs_hash_putref(hash);
+       if (lcd != NULL)
+               OBD_FREE_PTR(lcd);
+       if (trd != NULL)
+               OBD_FREE_PTR(trd);
+       if (lrh != NULL)
+               OBD_FREE_PTR(lrh);
+       return rc;
+}
+
+struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
+                                             __u64 xid)
+{
+       struct tg_reply_data    *found = NULL;
+       struct tg_reply_data    *reply;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
+               if (reply->trd_reply.lrd_xid == xid) {
+                       found = reply;
+                       break;
+               }
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+       return found;
+}
+EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
+
+/* Look for a reply data matching specified request @req
+ * A copy is returned in @trd if the pointer is not NULL
+ */
+bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+{
+       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
+       struct tg_reply_data    *reply;
+       bool                     found = false;
+
+       reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
+       if (reply != NULL) {
+               found = true;
+               if (trd != NULL)
+                       *trd = *reply;
+       }
+
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
+              found ? 1 : 0);
+
+       return found;
+}
+EXPORT_SYMBOL(tgt_lookup_reply);
+
+int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_reply.lrd_xid > rcvd_xid)
+                       continue;
+               ted->ted_release_xid++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       return 0;
+}
+
+int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return 0;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+               break;
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       return 0;
+}
+