Whamcloud - gitweb
LU-5319 mdt: support multiple modify RCPs in parallel 60/14860/13
authorGregoire Pichon <gregoire.pichon@bull.net>
Tue, 31 Mar 2015 12:47:53 +0000 (14:47 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 1 Jul 2015 02:01:30 +0000 (02:01 +0000)
This patch implements the server part of the feature that allows
support of multiple modify RPCs in parallel on MDT targets.

Each target export is able to store several in-memory reply
data so that it can reconstruct several requests issued by
the client in parallel.

Additionally, a new internal file REPLY_DATA is created on
the target to store on-disk reply data. The reply data slots
in that file are managed by a bitmap (lut_reply_bitmap) and
can be used to store reply data of any client.

When target recovers, the on-disk reply data is used to
restore the in-memory reply data and ensure reconstruction
of committed operations.

Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Signed-off-by: Gregoire Pichon <gregoire.pichon@bull.net>
Change-Id: I8f91666f5b0b4f7b9445a01c520d73f56d059ff3
Reviewed-on: http://review.whamcloud.com/14860
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
20 files changed:
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/include/lustre_export.h
lustre/include/lustre_fid.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/obdclass/genops.c
lustre/obdclass/lprocfs_status_server.c
lustre/obdclass/obd_config.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osd-zfs/osd_oi.c
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/target/tgt_main.c

index 78800cf..4428e69 100644 (file)
@@ -138,6 +138,35 @@ struct lu_target {
        spinlock_t               lut_client_bitmap_lock;
        /** Bitmap of known clients */
        unsigned long           *lut_client_bitmap;
+       /* Number of clients supporting multiple modify RPCs
+        * recorded in the bitmap */
+       atomic_t                 lut_num_clients;
+       /* Client generation to identify client slot reuse */
+       atomic_t                 lut_client_generation;
+       /** reply_data file */
+       struct dt_object        *lut_reply_data;
+       /** Bitmap of used slots in the reply data file */
+       unsigned long           **lut_reply_bitmap;
+};
+
+/* number of slots in reply bitmap */
+#define LUT_REPLY_SLOTS_PER_CHUNK (1<<20)
+#define LUT_REPLY_SLOTS_MAX_CHUNKS 16
+
+/**
+ * Target reply data
+ */
+struct tg_reply_data {
+       /** chain of reply data anchored in tg_export_data */
+       struct list_head        trd_list;
+       /** copy of on-disk reply data */
+       struct lsd_reply_data   trd_reply;
+       /** versions for Version Based Recovery */
+       __u64                   trd_pre_versions[4];
+       /** slot index in reply_data file */
+       int                     trd_index;
+       /** tag the client used */
+       __u16                   trd_tag;
 };
 
 extern struct lu_context_key tgt_session_key;
@@ -300,6 +329,12 @@ static inline int req_is_replay(struct ptlrpc_request *req)
        return !!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
 }
 
+static inline bool tgt_is_multimodrpcs_client(struct obd_export *exp)
+{
+       return exp_connect_flags(exp) & OBD_CONNECT_MULTIMODRPCS;
+}
+
+
 /* target/tgt_handler.c */
 int tgt_request_handle(struct ptlrpc_request *req);
 char *tgt_name(struct lu_target *tgt);
@@ -350,6 +385,7 @@ void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
 void tgt_register_lfsck_query(int (*query)(const struct lu_env *,
                                           struct dt_device *,
                                           struct lfsck_request *));
+bool req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
 
 extern struct tgt_handler tgt_lfsck_handlers[];
 extern struct tgt_handler tgt_obd_handlers[];
@@ -394,6 +430,8 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
                           int sync);
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
+int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
+bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
 
 /* target/update_trans.c */
 int distribute_txn_init(const struct lu_env *env,
index ebd526e..91ae410 100644 (file)
@@ -1417,7 +1417,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \
                                OBD_CONNECT_OPEN_BY_FID | \
                                OBD_CONNECT_DIR_STRIPE | \
-                               OBD_CONNECT_BULK_MBITS)
+                               OBD_CONNECT_BULK_MBITS | \
+                               OBD_CONNECT_MULTIMODRPCS)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
index b0dec84..ba40f2b 100644 (file)
@@ -59,6 +59,7 @@
 /** Persistent mount data are stored on the disk in this file. */
 #define MOUNT_DATA_FILE                MOUNT_CONFIGS_DIR"/"CONFIGS_FILE
 #define LAST_RCVD              "last_rcvd"
+#define REPLY_DATA             "reply_data"
 #define LOV_OBJID              "lov_objid"
 #define LOV_OBJSEQ             "lov_objseq"
 #define HEALTH_CHECK           "health_check"
@@ -317,6 +318,8 @@ struct lustre_mount_data {
 #define OBD_INCOMPAT_LMM_VER    0x00000100
 /** multiple OI files for MDT */
 #define OBD_INCOMPAT_MULTI_OI   0x00000200
+/** multiple RPCs in flight */
+#define OBD_INCOMPAT_MULTI_RPCS        0x00000400
 
 /* Data stored per server at the head of the last_rcvd file.  In le32 order.
    This should be common to filter_internal.h, lustre_mds.h */
@@ -361,11 +364,35 @@ struct lsd_client_data {
         /* VBR: last versions */
         __u64 lcd_pre_versions[4];
         __u32 lcd_last_epoch;
-        /** orphans handling for delayed export rely on that */
-        __u32 lcd_first_epoch;
-        __u8  lcd_padding[LR_CLIENT_SIZE - 128];
+       /* generation counter of client slot in last_rcvd */
+       __u32 lcd_generation;
+       __u8  lcd_padding[LR_CLIENT_SIZE - 128];
 };
 
+
+/* Data stored in each slot of the reply_data file.
+ *
+ * The lrd_client_gen field is assigned with lcd_generation value
+ * to allow identify which client the reply data belongs to.
+ */
+struct lsd_reply_data {
+       __u64   lrd_transno;    /* transaction number */
+       __u64   lrd_xid;        /* transmission id */
+       __u64   lrd_data;       /* per-operation data */
+       __u32   lrd_result;     /* request result */
+       __u32   lrd_client_gen; /* client generation */
+};
+
+/* Header of the reply_data file */
+#define LRH_MAGIC 0xbdabda01
+struct lsd_reply_header {
+       __u32   lrh_magic;
+       __u32   lrh_header_size;
+       __u32   lrh_reply_size;
+       __u8    lrh_pad[sizeof(struct lsd_reply_data) - 12];
+};
+
+
 /* bug20354: the lcd_uuid for export of clients may be wrong */
 static inline void check_lcd(char *obd_name, int index,
                              struct lsd_client_data *lcd)
@@ -452,7 +479,7 @@ static inline void lcd_le_to_cpu(struct lsd_client_data *buf,
         lcd->lcd_pre_versions[2]    = le64_to_cpu(buf->lcd_pre_versions[2]);
         lcd->lcd_pre_versions[3]    = le64_to_cpu(buf->lcd_pre_versions[3]);
         lcd->lcd_last_epoch         = le32_to_cpu(buf->lcd_last_epoch);
-        lcd->lcd_first_epoch        = le32_to_cpu(buf->lcd_first_epoch);
+       lcd->lcd_generation         = le32_to_cpu(buf->lcd_generation);
 }
 
 static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
@@ -472,7 +499,7 @@ static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
         buf->lcd_pre_versions[2]    = cpu_to_le64(lcd->lcd_pre_versions[2]);
         buf->lcd_pre_versions[3]    = cpu_to_le64(lcd->lcd_pre_versions[3]);
         buf->lcd_last_epoch         = cpu_to_le32(lcd->lcd_last_epoch);
-        buf->lcd_first_epoch        = cpu_to_le32(lcd->lcd_first_epoch);
+       buf->lcd_generation         = cpu_to_le32(lcd->lcd_generation);
 }
 
 static inline __u64 lcd_last_transno(struct lsd_client_data *lcd)
index fe41b2b..d6e5c11 100644 (file)
@@ -59,7 +59,8 @@ struct mdt_idmap_table;
  * Target-specific export data
  */
 struct tg_export_data {
-       /** Protects led_lcd below */
+       /** Protects ted_lcd, ted_reply_* and
+        * ted_release_* fields below */
        struct mutex            ted_lcd_lock;
        /** Per-client data for each export */
        struct lsd_client_data  *ted_lcd;
@@ -71,6 +72,18 @@ struct tg_export_data {
        /** nodemap this export is a member of */
        struct lu_nodemap       *ted_nodemap;
        struct hlist_node       ted_nodemap_member;
+
+       /* Every reply data fields below are
+        * protected by ted_lcd_lock */
+       /** List of reply data */
+       struct list_head        ted_reply_list;
+       int                     ted_reply_cnt;
+       /** Reply data with highest transno is retained */
+       struct tg_reply_data    *ted_reply_last;
+       /* Statistics */
+       int                     ted_reply_max; /* high water mark */
+       int                     ted_release_xid;
+       int                     ted_release_tag;
 };
 
 /**
@@ -182,6 +195,7 @@ struct obd_export {
        struct list_head        exp_obd_chain;
        struct hlist_node       exp_uuid_hash;  /** uuid-export hash*/
        struct hlist_node       exp_nid_hash;   /** nid-export hash */
+       struct hlist_node       exp_gen_hash;   /** last_rcvd clt gen hash */
         /**
          * All exports eligible for ping evictor are linked into a list
          * through this field in "most time since last request on this export"
index e5ef46a..ead5bb9 100644 (file)
@@ -225,10 +225,7 @@ enum local_oid {
        LFSCK_BOOKMARK_OID      = 17UL,
        OTABLE_IT_OID           = 18UL,
        OSD_LPF_OID             = 19UL,
-       /* These two definitions are obsolete
-        * OFD_GROUP0_LAST_OID     = 20UL,
-        * OFD_GROUP4K_LAST_OID    = 20UL+4096,
-        */
+       REPLY_DATA_OID          = 21UL,
        OFD_LAST_GROUP_OID      = 4117UL,
        LLOG_CATALOGS_OID       = 4118UL,
        MGS_CONFIGS_OID         = 4119UL,
index fad6b03..ec275fa 100644 (file)
@@ -548,6 +548,8 @@ struct obd_device {
        struct cfs_hash             *obd_nid_hash;
        /* nid stats body */
        struct cfs_hash             *obd_nid_stats_hash;
+       /* client_generation-export hash body */
+       struct cfs_hash             *obd_gen_hash;
        struct list_head        obd_nid_stats;
        atomic_t                obd_refcount;
        struct list_head        obd_exports;
index e07e87d..a4d400a 100644 (file)
@@ -96,6 +96,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define HASH_NID_STATS_BKT_BITS 5
 #define HASH_NID_STATS_CUR_BITS 7
 #define HASH_NID_STATS_MAX_BITS 12
+#define HASH_GEN_BKT_BITS 5
+#define HASH_GEN_CUR_BITS 7
+#define HASH_GEN_MAX_BITS 12
 #define HASH_LQE_BKT_BITS 5
 #define HASH_LQE_CUR_BITS 7
 #define HASH_LQE_MAX_BITS 12
index 1bb8a0e..a8b9c48 100644 (file)
 #include <lustre_lfsck.h>
 #include <lustre_nodemap.h>
 
+static unsigned int max_mod_rpcs_per_client = 8;
+CFS_MODULE_PARM(max_mod_rpcs_per_client, "i", uint, 0644,
+               "maximum number of modify RPCs in flight allowed per client");
+
 mdl_mode_t mdt_mdl_lock_modes[] = {
         [LCK_MINMODE] = MDL_MINMODE,
         [LCK_EX]      = MDL_EX,
@@ -1754,7 +1758,10 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
         if (rc != 0)
                 GOTO(out_ucred, rc = err_serious(rc));
 
-        if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
+       rc = mdt_check_resent(info, mdt_reconstruct, lhc);
+       if (rc < 0) {
+               GOTO(out_ucred, rc);
+       } else if (rc == 1) {
                DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
                rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
                 GOTO(out_ucred, rc);
@@ -3019,12 +3026,12 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                return;
        }
 
-        /*
-         * If the xid matches, then we know this is a resent request, and allow
-         * it. (It's probably an OPEN, for which we don't send a lock.
-         */
-        if (req_xid_is_last(req))
-                return;
+       /*
+        * If the xid matches, then we know this is a resent request, and allow
+        * it. (It's probably an OPEN, for which we don't send a lock.
+        */
+       if (req_can_reconstruct(req, NULL))
+               return;
 
         /*
          * This remote handle isn't enqueued, so we never received or processed
@@ -4846,6 +4853,18 @@ static int mdt_connect_internal(struct obd_export *exp,
 
        data->ocd_max_easize = mdt->mdt_max_ea_size;
 
+       /* NB: Disregard the rule against updating
+        * exp_connect_data.ocd_connect_flags in this case, since
+        * tgt_client_new() needs to know if this is client supports
+        * multiple modify RPCs, and it is safe to expose this flag before
+        * connection processing completes. */
+       if (data->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
+               data->ocd_maxmodrpcs = max_mod_rpcs_per_client;
+               spin_lock(&exp->exp_lock);
+               *exp_connect_flags_ptr(exp) |= OBD_CONNECT_MULTIMODRPCS;
+               spin_unlock(&exp->exp_lock);
+       }
+
        return 0;
 }
 
index 639a9ef..eb1ad98 100644 (file)
 #include <lustre_quota.h>
 #include <lustre_linkea.h>
 
-/* check if request's xid is equal to last one or not*/
-static inline int req_xid_is_last(struct ptlrpc_request *req)
-{
-        struct lsd_client_data *lcd = req->rq_export->exp_target_data.ted_lcd;
-        return (req->rq_xid == lcd->lcd_last_xid ||
-                req->rq_xid == lcd->lcd_last_close_xid);
-}
-
 struct mdt_object;
 
 /* file data for open files on MDS */
@@ -431,6 +423,7 @@ struct mdt_thread_info {
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
        struct ldlm_enqueue_info   mti_einfo;
+       struct tg_reply_data      *mti_reply_data;
 };
 
 extern struct lu_context_key mdt_thread_key;
@@ -776,7 +769,7 @@ __u32 mdt_identity_get_perm(struct md_identity *, __u32, lnet_nid_t);
 int mdt_pack_remote_perm(struct mdt_thread_info *, struct mdt_object *, void *);
 
 /* mdt/mdt_recovery.c */
-void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd);
+__u64 mdt_req_from_lrd(struct ptlrpc_request *req, struct tg_reply_data *trd);
 
 /* mdt/mdt_hsm.c */
 int mdt_hsm_state_get(struct tgt_session_info *tsi);
@@ -911,18 +904,27 @@ static inline int mdt_check_resent(struct mdt_thread_info *info,
                                    mdt_reconstruct_t reconstruct,
                                    struct mdt_lock_handle *lhc)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
-        ENTRY;
-
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                if (req_xid_is_last(req)) {
-                        reconstruct(info, lhc);
-                        RETURN(1);
-                }
-                DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
-                          req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
-        }
-        RETURN(0);
+       struct ptlrpc_request *req = mdt_info_req(info);
+       int rc = 0;
+       ENTRY;
+
+       if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+               OBD_ALLOC_PTR(info->mti_reply_data);
+               if (info->mti_reply_data == NULL)
+                       RETURN(-ENOMEM);
+
+               if (req_can_reconstruct(req, info->mti_reply_data)) {
+                       reconstruct(info, lhc);
+                       rc = 1;
+               } else {
+                       DEBUG_REQ(D_HA, req,
+                                 "no reply data found for RESENT req");
+                       rc = 0;
+               }
+               OBD_FREE_PTR(info->mti_reply_data);
+               info->mti_reply_data = NULL;
+       }
+       RETURN(rc);
 }
 
 struct lu_ucred *mdt_ucred(const struct mdt_thread_info *info);
index 36d96d6..4af70ff 100644 (file)
@@ -206,6 +206,19 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
         if (lustre_msg_get_transno(req->rq_repmsg) != 0)
                 RETURN_EXIT;
 
+       if (tgt_is_multimodrpcs_client(req->rq_export)) {
+               struct thandle         *th;
+
+               /* generate an empty transaction to get a transno
+                * and reply data */
+               th = dt_trans_create(info->mti_env, mdt->mdt_bottom);
+               if (!IS_ERR(th)) {
+                       rc = dt_trans_start(info->mti_env, mdt->mdt_bottom, th);
+                       dt_trans_stop(info->mti_env, mdt->mdt_bottom, th);
+               }
+               RETURN_EXIT;
+       }
+
        spin_lock(&mdt->mdt_lut.lut_translock);
        if (rc != 0) {
                if (info->mti_transno != 0) {
@@ -614,8 +627,6 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_device       *mdt  = info->mti_mdt;
         struct req_capsule      *pill = info->mti_pill;
         struct ptlrpc_request   *req  = mdt_info_req(info);
-        struct tg_export_data   *ted  = &req->rq_export->exp_target_data;
-        struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
        __u64                   flags = info->mti_spec.sp_cr_flags;
@@ -624,17 +635,18 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_object       *child;
         struct mdt_body         *repbody;
         int                      rc;
-        ENTRY;
+       __u64                    opdata;
+       ENTRY;
 
         LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
        ma->ma_need = MA_INODE | MA_HSM;
-        ma->ma_valid = 0;
+       ma->ma_valid = 0;
 
-        mdt_req_from_lcd(req, lcd);
-        mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
+       opdata = mdt_req_from_lrd(req, info->mti_reply_data);
+       mdt_set_disposition(info, ldlm_rep, opdata);
 
         CDEBUG(D_INODE, "This is reconstruct open: disp="LPX64", result=%d\n",
                ldlm_rep->lock_policy_res1, req->rq_status);
index e162279..859c40d 100644 (file)
@@ -177,50 +177,43 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
        spin_unlock(&exp->exp_lock);
 }
 
-/**
- * VBR: restore versions
- */
-static void mdt_vbr_reconstruct(struct ptlrpc_request *req,
-                               struct lsd_client_data *lcd)
+__u64 mdt_req_from_lrd(struct ptlrpc_request *req,
+                      struct tg_reply_data *trd)
 {
-        __u64 pre_versions[4] = {0};
-        pre_versions[0] = lcd->lcd_pre_versions[0];
-        pre_versions[1] = lcd->lcd_pre_versions[1];
-        pre_versions[2] = lcd->lcd_pre_versions[2];
-        pre_versions[3] = lcd->lcd_pre_versions[3];
-        lustre_msg_set_versions(req->rq_repmsg, pre_versions);
-}
+       struct lsd_reply_data *lrd;
 
-void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd)
-{
-        DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
-                  lcd->lcd_last_transno, lcd->lcd_last_result);
-
-       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
-                req->rq_transno = lcd->lcd_last_close_transno;
-                req->rq_status = lcd->lcd_last_close_result;
-        } else {
-                req->rq_transno = lcd->lcd_last_transno;
-                req->rq_status = lcd->lcd_last_result;
-                mdt_vbr_reconstruct(req, lcd);
-        }
-        if (req->rq_status != 0)
-                req->rq_transno = 0;
-        lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
-        lustre_msg_set_status(req->rq_repmsg, req->rq_status);
-        DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d",
-                  req->rq_transno, req->rq_status);
-
-        mdt_steal_ack_locks(req);
+       LASSERT(trd != NULL);
+       lrd = &trd->trd_reply;
+
+       DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
+                 lrd->lrd_transno, lrd->lrd_result);
+
+       req->rq_transno = lrd->lrd_transno;
+       req->rq_status = lrd->lrd_result;
+
+       lustre_msg_set_versions(req->rq_repmsg, trd->trd_pre_versions);
+
+       if (req->rq_status != 0)
+               req->rq_transno = 0;
+       lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+       lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+
+       DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d",
+                 req->rq_transno, req->rq_status);
+
+       mdt_steal_ack_locks(req);
+
+       return lrd->lrd_data;
 }
 
+
 void mdt_reconstruct_generic(struct mdt_thread_info *mti,
-                             struct mdt_lock_handle *lhc)
+                            struct mdt_lock_handle *lhc)
 {
-        struct ptlrpc_request *req = mdt_info_req(mti);
-        struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct ptlrpc_request *req = mdt_info_req(mti);
 
-        return mdt_req_from_lcd(req, ted->ted_lcd);
+       mdt_req_from_lrd(req, mti->mti_reply_data);
+       return;
 }
 
 /**
@@ -246,17 +239,16 @@ static void mdt_fake_ma(struct md_attr *ma)
 static void mdt_reconstruct_create(struct mdt_thread_info *mti,
                                    struct mdt_lock_handle *lhc)
 {
-        struct ptlrpc_request  *req = mdt_info_req(mti);
-        struct obd_export *exp = req->rq_export;
-        struct tg_export_data *ted = &exp->exp_target_data;
-        struct mdt_device *mdt = mti->mti_mdt;
-        struct mdt_object *child;
-        struct mdt_body *body;
-        int rc;
+       struct ptlrpc_request  *req = mdt_info_req(mti);
+       struct obd_export *exp = req->rq_export;
+       struct mdt_device *mdt = mti->mti_mdt;
+       struct mdt_object *child;
+       struct mdt_body *body;
+       int rc;
 
-        mdt_req_from_lcd(req, ted->ted_lcd);
-        if (req->rq_status)
-                return;
+       mdt_req_from_lrd(req, mti->mti_reply_data);
+       if (req->rq_status)
+               return;
 
        /* if no error, so child was created with requested fid */
        child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2);
@@ -296,15 +288,14 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
 {
         struct ptlrpc_request  *req = mdt_info_req(mti);
         struct obd_export *exp = req->rq_export;
-        struct mdt_export_data *med = &exp->exp_mdt_data;
         struct mdt_device *mdt = mti->mti_mdt;
         struct mdt_object *obj;
         struct mdt_body *body;
        int rc;
 
-        mdt_req_from_lcd(req, med->med_ted.ted_lcd);
-        if (req->rq_status)
-                return;
+       mdt_req_from_lrd(req, mti->mti_reply_data);
+       if (req->rq_status)
+               return;
 
         body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
         obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1);
index 344bdd9..257cf03 100644 (file)
@@ -862,6 +862,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
        spin_lock_init(&export->exp_rpc_lock);
        INIT_HLIST_NODE(&export->exp_uuid_hash);
        INIT_HLIST_NODE(&export->exp_nid_hash);
+       INIT_HLIST_NODE(&export->exp_gen_hash);
        spin_lock_init(&export->exp_bl_list_lock);
        INIT_LIST_HEAD(&export->exp_bl_list);
 
index c06964d..86e03ff 100644 (file)
@@ -228,6 +228,36 @@ static int lprocfs_exp_hash_seq_show(struct seq_file *m, void *data)
 }
 LPROC_SEQ_FOPS_RO(lprocfs_exp_hash);
 
+int lprocfs_exp_print_replydata_seq(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                                   struct hlist_node *hnode, void *cb_data)
+
+{
+       struct obd_export *exp = cfs_hash_object(hs, hnode);
+       struct seq_file *m = cb_data;
+       struct tg_export_data *ted = &exp->exp_target_data;
+
+       seq_printf(m, "reply_cnt: %d\n"
+                     "reply_max: %d\n"
+                     "reply_released_by_xid: %d\n"
+                     "reply_released_by_tag: %d\n\n",
+                  ted->ted_reply_cnt,
+                  ted->ted_reply_max,
+                  ted->ted_release_xid,
+                  ted->ted_release_tag);
+       return 0;
+}
+
+int lprocfs_exp_replydata_seq_show(struct seq_file *m, void *data)
+{
+       struct nid_stat *stats = m->private;
+       struct obd_device *obd = stats->nid_obd;
+
+       cfs_hash_for_each_key(obd->obd_nid_hash, &stats->nid,
+                               lprocfs_exp_print_replydata_seq, m);
+       return 0;
+}
+LPROC_SEQ_FOPS_RO(lprocfs_exp_replydata);
+
 int lprocfs_nid_stats_clear_seq_show(struct seq_file *m, void *data)
 {
        return seq_printf(m, "%s\n", "Write into this file to clear all nid "
@@ -373,6 +403,15 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid)
                GOTO(destroy_new_ns, rc);
        }
 
+       entry = lprocfs_add_simple(new_stat->nid_proc, "reply_data", new_stat,
+                                  &lprocfs_exp_replydata_fops);
+       if (IS_ERR(entry)) {
+               rc = PTR_ERR(entry);
+               CWARN("%s: Error adding the reply_data file: rc = %d\n",
+                     obd->obd_name, rc);
+               GOTO(destroy_new_ns, rc);
+       }
+
        spin_lock(&exp->exp_lock);
        exp->exp_nid_stats = new_stat;
        spin_unlock(&exp->exp_lock);
index 28af8f8..bbedbdc 100644 (file)
@@ -52,6 +52,7 @@
 static struct cfs_hash_ops uuid_hash_ops;
 static struct cfs_hash_ops nid_hash_ops;
 static struct cfs_hash_ops nid_stat_hash_ops;
+static struct cfs_hash_ops gen_hash_ops;
 
 /*********** string parsing utils *********/
 
@@ -478,6 +479,7 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         obd->obd_uuid_hash = NULL;
         obd->obd_nid_hash = NULL;
         obd->obd_nid_stats_hash = NULL;
+       obd->obd_gen_hash = NULL;
        spin_unlock(&obd->obd_dev_lock);
 
         /* create an uuid-export lustre hash */
@@ -513,6 +515,17 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (!obd->obd_nid_stats_hash)
                 GOTO(err_hash, err = -ENOMEM);
 
+       /* create a client_generation-export lustre hash */
+       obd->obd_gen_hash = cfs_hash_create("UUID_HASH",
+                                           HASH_GEN_CUR_BITS,
+                                           HASH_GEN_MAX_BITS,
+                                           HASH_GEN_BKT_BITS, 0,
+                                           CFS_HASH_MIN_THETA,
+                                           CFS_HASH_MAX_THETA,
+                                           &gen_hash_ops, CFS_HASH_DEFAULT);
+       if (!obd->obd_gen_hash)
+               GOTO(err_hash, err = -ENOMEM);
+
         exp = class_new_export(obd, &obd->obd_uuid);
         if (IS_ERR(exp))
                 GOTO(err_hash, err = PTR_ERR(exp));
@@ -554,6 +567,10 @@ err_hash:
                 cfs_hash_putref(obd->obd_nid_stats_hash);
                 obd->obd_nid_stats_hash = NULL;
         }
+       if (obd->obd_gen_hash) {
+               cfs_hash_putref(obd->obd_gen_hash);
+               obd->obd_gen_hash = NULL;
+       }
         obd->obd_starting = 0;
         CERROR("setup %s failed (%d)\n", obd->obd_name, err);
         return err;
@@ -682,6 +699,12 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 obd->obd_nid_stats_hash = NULL;
         }
 
+       /* destroy a client_generation-export hash body */
+       if (obd->obd_gen_hash) {
+               cfs_hash_putref(obd->obd_gen_hash);
+               obd->obd_gen_hash = NULL;
+       }
+
         class_decref(obd, "setup", obd);
         obd->obd_set_up = 0;
 
@@ -2105,3 +2128,73 @@ static struct cfs_hash_ops nid_stat_hash_ops = {
         .hs_get         = nidstats_get,
         .hs_put_locked  = nidstats_put_locked,
 };
+
+
+/*
+ * client_generation<->export hash operations
+ */
+
+static unsigned
+gen_hash(struct cfs_hash *hs, const void *key, unsigned mask)
+{
+       return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
+}
+
+static void *
+gen_key(struct hlist_node *hnode)
+{
+       struct obd_export *exp;
+
+       exp = hlist_entry(hnode, struct obd_export, exp_gen_hash);
+
+       RETURN(&exp->exp_target_data.ted_lcd->lcd_generation);
+}
+
+/*
+ * NOTE: It is impossible to find an export that is in failed
+ *       state with this function
+ */
+static int
+gen_kepcmp(const void *key, struct hlist_node *hnode)
+{
+       struct obd_export *exp;
+
+       LASSERT(key);
+       exp = hlist_entry(hnode, struct obd_export, exp_gen_hash);
+
+       RETURN(exp->exp_target_data.ted_lcd->lcd_generation == *(__u32 *)key &&
+              !exp->exp_failed);
+}
+
+static void *
+gen_export_object(struct hlist_node *hnode)
+{
+       return hlist_entry(hnode, struct obd_export, exp_gen_hash);
+}
+
+static void
+gen_export_get(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+       struct obd_export *exp;
+
+       exp = hlist_entry(hnode, struct obd_export, exp_gen_hash);
+       class_export_get(exp);
+}
+
+static void
+gen_export_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+       struct obd_export *exp;
+
+       exp = hlist_entry(hnode, struct obd_export, exp_gen_hash);
+       class_export_put(exp);
+}
+
+static struct cfs_hash_ops gen_hash_ops = {
+       .hs_hash        = gen_hash,
+       .hs_key         = gen_key,
+       .hs_keycmp      = gen_kepcmp,
+       .hs_object      = gen_export_object,
+       .hs_get         = gen_export_get,
+       .hs_put_locked  = gen_export_put_locked,
+};
index 458977f..909967a 100644 (file)
@@ -1594,6 +1594,10 @@ static const struct osd_lf_map osd_lf_maps[] = {
        { LAST_RCVD, { FID_SEQ_LOCAL_FILE, LAST_RECV_OID, 0 }, OLF_SHOW_NAME,
                sizeof(LAST_RCVD) - 1, NULL, NULL },
 
+       /* reply_data */
+       { REPLY_DATA, { FID_SEQ_LOCAL_FILE, REPLY_DATA_OID, 0 }, OLF_SHOW_NAME,
+               sizeof(REPLY_DATA) - 1, NULL, NULL },
+
        /* lov_objid */
        { LOV_OBJID, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OID, 0 }, OLF_SHOW_NAME,
                sizeof(LOV_OBJID) - 1, NULL, NULL },
index 435e654..22f6ff0 100644 (file)
@@ -94,6 +94,7 @@ static const struct named_oid oids[] = {
        { OFD_HEALTH_CHECK_OID,         HEALTH_CHECK },
        { ACCT_USER_OID,                "acct_usr_inode" },
        { ACCT_GROUP_OID,               "acct_grp_inode" },
+       { REPLY_DATA_OID,               REPLY_DATA },
        { 0,                            NULL }
 };
 
index a8091d5..2e369a6 100644 (file)
@@ -526,11 +526,11 @@ static int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
 
        /* sanity check: if the xid matches, the request must be marked as a
         * resent or replayed */
-       if (req_xid_is_last(req)) {
+       if (req_can_reconstruct(req, NULL)) {
                if (!(lustre_msg_get_flags(req->rq_reqmsg) &
                      (MSG_RESENT | MSG_REPLAY))) {
                        DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
-                                 "last_xid, expected REPLAY or RESENT flag "
+                                 "saved xid, expected REPLAY or RESENT flag "
                                  "(%x)", req->rq_xid,
                                  lustre_msg_get_flags(req->rq_reqmsg));
                        req->rq_status = -ENOTCONN;
@@ -689,6 +689,16 @@ int tgt_request_handle(struct ptlrpc_request *req)
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
 
+       /* try to release in-memory reply data */
+       if (tgt_is_multimodrpcs_client(req->rq_export)) {
+               tgt_handle_received_xid(req->rq_export,
+                               lustre_msg_get_last_xid(req->rq_reqmsg));
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) &
+                     (MSG_RESENT | MSG_REPLAY)))
+                       tgt_handle_tag(req->rq_export,
+                                      lustre_msg_get_tag(req->rq_reqmsg));
+       }
+
        h = tgt_handler_find_check(req);
        if (IS_ERR(h)) {
                req->rq_status = PTR_ERR(h);
@@ -2172,3 +2182,44 @@ out:
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_brw_write);
+
+/* Check if request can be reconstructed from saved reply data
+ * A copy of the reply data is returned in @trd if the pointer is not NULL
+ */
+bool req_can_reconstruct(struct ptlrpc_request *req,
+                        struct tg_reply_data *trd)
+{
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct lsd_client_data *lcd = ted->ted_lcd;
+       bool found;
+
+       if (tgt_is_multimodrpcs_client(req->rq_export))
+               return tgt_lookup_reply(req, trd);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       found = req->rq_xid == lcd->lcd_last_xid ||
+               req->rq_xid == lcd->lcd_last_close_xid;
+
+       if (found && trd != NULL) {
+               if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
+                       trd->trd_reply.lrd_xid = lcd->lcd_last_close_xid;
+                       trd->trd_reply.lrd_transno =
+                                               lcd->lcd_last_close_transno;
+                       trd->trd_reply.lrd_result = lcd->lcd_last_close_result;
+               } else {
+                       trd->trd_reply.lrd_xid = lcd->lcd_last_xid;
+                       trd->trd_reply.lrd_transno = lcd->lcd_last_transno;
+                       trd->trd_reply.lrd_result = lcd->lcd_last_result;
+                       trd->trd_reply.lrd_data = lcd->lcd_last_data;
+                       trd->trd_pre_versions[0] = lcd->lcd_pre_versions[0];
+                       trd->trd_pre_versions[1] = lcd->lcd_pre_versions[1];
+                       trd->trd_pre_versions[2] = lcd->lcd_pre_versions[2];
+                       trd->trd_pre_versions[3] = lcd->lcd_pre_versions[3];
+               }
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       return found;
+}
+EXPORT_SYMBOL(req_can_reconstruct);
+
index 603a4c2..ace6af9 100644 (file)
@@ -55,6 +55,7 @@ struct tgt_thread_info {
        /* server and client data buffers */
        struct lr_server_data    tti_lsd;
        struct lsd_client_data   tti_lcd;
+       struct lsd_reply_data    tti_lrd;
        struct lu_buf            tti_buf;
        loff_t                   tti_off;
 
@@ -245,6 +246,8 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
                     void *cookie);
 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
                    void *cookie);
+int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid);
+int tgt_handle_tag(struct obd_export *exp, __u16 tag);
 
 void update_records_dump(const struct update_records *records,
                         unsigned int mask, bool dump_updates);
index 1aa891f..558ee6b 100644 (file)
 
 #include "tgt_internal.h"
 
+
+/* Allocate a bitmap for a chunk of reply data slots */
+static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
+{
+       unsigned long *bm;
+
+       OBD_ALLOC(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * sizeof(long));
+       if (bm == NULL)
+               return -ENOMEM;
+
+       spin_lock(&lut->lut_client_bitmap_lock);
+
+       if (lut->lut_reply_bitmap[chunk] != NULL) {
+               /* someone else already allocated the bitmap for this chunk */
+               spin_unlock(&lut->lut_client_bitmap_lock);
+               OBD_FREE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+                        sizeof(long));
+               return 0;
+       }
+
+       lut->lut_reply_bitmap[chunk] = bm;
+
+       spin_unlock(&lut->lut_client_bitmap_lock);
+
+       return 0;
+}
+
+/* Look for an available reply data slot in the bitmap
+ * of the target @lut
+ * Allocate bitmap chunk when first used
+ * XXX algo could be improved if this routine limits performance
+ */
+static int tgt_find_free_reply_slot(struct lu_target *lut)
+{
+       unsigned long *bmp;
+       int chunk = 0;
+       int rc;
+       int b;
+
+       for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
+               /* allocate the bitmap chunk if necessary */
+               if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+                       rc = tgt_bitmap_chunk_alloc(lut, chunk);
+                       if (rc != 0)
+                               return rc;
+               }
+               bmp = lut->lut_reply_bitmap[chunk];
+
+               /* look for an available slot in this chunk */
+               do {
+                       b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
+                       if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
+                               break;
+
+                       /* found one */
+                       if (test_and_set_bit(b, bmp) == 0)
+                               return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
+               } while (true);
+       }
+
+       return -ENOSPC;
+}
+
+/* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
+ * of the target @lut
+ * Allocate the bitmap chunk if necessary
+ */
+static int tgt_set_reply_slot(struct lu_target *lut, int idx)
+{
+       int chunk;
+       int b;
+       int rc;
+
+       chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+       b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+       LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+       LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+       /* allocate the bitmap chunk if necessary */
+       if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+               rc = tgt_bitmap_chunk_alloc(lut, chunk);
+               if (rc != 0)
+                       return rc;
+       }
+
+       /* mark the slot 'used' in this chunk */
+       if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
+               CERROR("%s: slot %d already set in bitmap\n",
+                      tgt_name(lut), idx);
+               return -EALREADY;
+       }
+
+       return 0;
+}
+
+
+/* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
+ * of the target @lut
+ */
+static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
+{
+       int chunk;
+       int b;
+
+       chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+       b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+       LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+       LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+       if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
+               CERROR("%s: slot %d already clear in bitmap\n",
+                      tgt_name(lut), idx);
+               return -EALREADY;
+       }
+
+       return 0;
+}
+
+
+/* Read header of reply_data file of target @tgt into structure @lrh */
+static int tgt_reply_header_read(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                struct lsd_reply_header *lrh)
+{
+       int                      rc;
+       struct lsd_reply_header  buf;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+
+       tti->tti_off = 0;
+       tti->tti_buf.lb_buf = &buf;
+       tti->tti_buf.lb_len = sizeof(buf);
+
+       rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+                           &tti->tti_off);
+       if (rc != 0)
+               return rc;
+
+       lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
+       lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
+       lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
+
+       CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
+              "header_size=%d reply_size=%d\n",
+               tgt->lut_obd->obd_name, REPLY_DATA,
+               lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+       return 0;
+}
+
+/* Write header into replay_data file of target @tgt from structure @lrh */
+static int tgt_reply_header_write(const struct lu_env *env,
+                                 struct lu_target *tgt,
+                                 struct lsd_reply_header *lrh)
+{
+       int                      rc;
+       struct lsd_reply_header  buf;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct thandle          *th;
+       struct dt_object        *dto;
+
+       CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
+              "header_size=%d reply_size=%d\n",
+               tgt->lut_obd->obd_name, REPLY_DATA,
+               lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+       buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
+       buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
+       buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
+
+       th = dt_trans_create(env, tgt->lut_bottom);
+       if (IS_ERR(th))
+               return PTR_ERR(th);
+       th->th_sync = 1;
+
+       tti->tti_off = 0;
+       tti->tti_buf.lb_buf = &buf;
+       tti->tti_buf.lb_len = sizeof(buf);
+
+       rc = dt_declare_record_write(env, tgt->lut_reply_data,
+                                    &tti->tti_buf, tti->tti_off, th);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_trans_start(env, tgt->lut_bottom, th);
+       if (rc)
+               GOTO(out, rc);
+
+       dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+       rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+out:
+       dt_trans_stop(env, tgt->lut_bottom, th);
+       return rc;
+}
+
+/* Write the reply data @lrd into reply_data file of target @tgt
+ * at offset @off
+ */
+static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
+                               struct lsd_reply_data *lrd, loff_t off,
+                               struct thandle *th)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *dto;
+       struct lsd_reply_data   *buf = &tti->tti_lrd;
+
+       lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
+
+       buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
+       buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
+       buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
+       buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
+       buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
+
+       lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
+
+       tti->tti_off = off;
+       tti->tti_buf.lb_buf = buf;
+       tti->tti_buf.lb_len = sizeof(*buf);
+
+       dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+       return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+}
+
+/* Read the reply data from reply_data file of target @tgt at offset @off
+ * into structure @lrd
+ */
+static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
+                              struct lsd_reply_data *lrd, loff_t off)
+{
+       int                      rc;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct lsd_reply_data   *buf = &tti->tti_lrd;
+
+       tti->tti_off = off;
+       tti->tti_buf.lb_buf = buf;
+       tti->tti_buf.lb_len = sizeof(*buf);
+
+       rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+                           &tti->tti_off);
+       if (rc != 0)
+               return rc;
+
+       lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
+       lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
+       lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
+       lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
+       lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
+
+       return 0;
+}
+
+
+/* Free the in-memory reply data structure @trd and release
+ * the corresponding slot in the reply_data file of target @lut
+ * Called with ted_lcd_lock held
+ */
+static void tgt_free_reply_data(struct lu_target *lut,
+                               struct tg_export_data *ted,
+                               struct tg_reply_data *trd)
+{
+       CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
+              "client gen %u, slot idx %d\n",
+              tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+              trd->trd_index);
+
+       LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+       list_del(&trd->trd_list);
+       ted->ted_reply_cnt--;
+       tgt_clear_reply_slot(lut, trd->trd_index);
+       OBD_FREE_PTR(trd);
+}
+
+/* Release the reply data @trd from target @lut
+ * The reply data with the highest transno for this export
+ * is retained to ensure correctness of target recovery
+ * Called with ted_lcd_lock held
+ */
+static void tgt_release_reply_data(struct lu_target *lut,
+                                  struct tg_export_data *ted,
+                                  struct tg_reply_data *trd)
+{
+       CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
+              "client gen %u, slot idx %d\n",
+              tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+              trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+              trd->trd_index);
+
+       LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+       /* Do not free the reply data corresponding to the
+        * highest transno of this export.
+        * This ensures on-disk reply data is kept and
+        * last committed transno can be restored from disk in case
+        * of target recovery
+        */
+       if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
+               /* free previous retained reply */
+               if (ted->ted_reply_last != NULL)
+                       tgt_free_reply_data(lut, ted, ted->ted_reply_last);
+               /* retain the reply */
+               list_del_init(&trd->trd_list);
+               ted->ted_reply_last = trd;
+       } else {
+               tgt_free_reply_data(lut, ted, trd);
+       }
+}
+
 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
 {
        tti->tti_buf.lb_buf = &tti->tti_lsd;
@@ -71,6 +382,7 @@ int tgt_client_alloc(struct obd_export *exp)
                RETURN(-ENOMEM);
        /* Mark that slot is not yet valid, 0 doesn't work here */
        exp->exp_target_data.ted_lr_idx = -1;
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
        RETURN(0);
 }
 EXPORT_SYMBOL(tgt_client_alloc);
@@ -82,9 +394,26 @@ void tgt_client_free(struct obd_export *exp)
 {
        struct tg_export_data   *ted = &exp->exp_target_data;
        struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
 
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       /* free reply data */
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               tgt_release_reply_data(lut, ted, trd);
+       }
+       if (ted->ted_reply_last != NULL) {
+               tgt_free_reply_data(lut, ted, ted->ted_reply_last);
+               ted->ted_reply_last = NULL;
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       if (!hlist_unhashed(&exp->exp_gen_hash))
+               cfs_hash_del(exp->exp_obd->obd_gen_hash,
+                            &ted->ted_lcd->lcd_generation,
+                            &exp->exp_gen_hash);
+
        OBD_FREE_PTR(ted->ted_lcd);
        ted->ted_lcd = NULL;
 
@@ -98,6 +427,9 @@ void tgt_client_free(struct obd_export *exp)
                       exp->exp_obd->obd_name, ted->ted_lr_idx);
                LBUG();
        }
+
+       if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping)
+               atomic_dec(&lut->lut_num_clients);
 }
 EXPORT_SYMBOL(tgt_client_free);
 
@@ -396,6 +728,16 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
        list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
        spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+
+       /** Clear MULTI RPCS incompatibility flag if
+        * - target is MDT and
+        * - there is no client to recover or the recovery was aborted
+        */
+       if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
+           (tgt->lut_obd->obd_max_recoverable_clients == 0 ||
+           tgt->lut_obd->obd_abort_recovery))
+               tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
+
        /** update server epoch */
        tgt_server_data_update(&env, tgt, 1);
        lu_env_fini(&env);
@@ -566,18 +908,40 @@ repeat:
                goto repeat;
        }
 
-       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
-
        ted->ted_lr_idx = idx;
        ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
                          idx * tgt->lut_lsd.lsd_client_size;
 
        LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
 
-       CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
+       if (tgt_is_multimodrpcs_client(exp)) {
+               /* Set MULTI RPCS incompatibility flag to prevent previous
+                * Lustre versions to mount a target with reply_data file */
+               atomic_inc(&tgt->lut_num_clients);
+               if (!(tgt->lut_lsd.lsd_feature_incompat &
+                     OBD_INCOMPAT_MULTI_RPCS)) {
+                       tgt->lut_lsd.lsd_feature_incompat |=
+                                                       OBD_INCOMPAT_MULTI_RPCS;
+                       rc = tgt_server_data_update(env, tgt, 1);
+                       if (rc < 0) {
+                               CERROR("%s: unable to set MULTI RPCS "
+                                      "incompatibility flag\n",
+                                      exp->exp_obd->obd_name);
+                               RETURN(rc);
+                       }
+               }
+
+               /* assign client slot generation */
+               ted->ted_lcd->lcd_generation =
+                               atomic_inc_return(&tgt->lut_client_generation);
+       } else {
+               ted->ted_lcd->lcd_generation = 0;
+       }
+
+       CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
+              "generation %d\n",
               tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
-              ted->ted_lcd->lcd_uuid);
+              ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
                RETURN(-ENOSPC);
@@ -591,10 +955,9 @@ repeat:
 }
 EXPORT_SYMBOL(tgt_client_new);
 
-/* Add client data to the MDS.  We use a bitmap to locate a free space
- * in the last_rcvd file if cl_off is -1 (i.e. a new client).
- * Otherwise, we just have to read the data from the last_rcvd file and
- * we know its offset.
+/* Add an existing client to the MDS in-memory state based on
+ * a client that was previously found in the last_rcvd file and
+ * already has an assigned slot (idx >= 0).
  *
  * It should not be possible to fail adding an existing client - otherwise
  * mdt_init_server_data() callsite needs to be fixed.
@@ -618,9 +981,12 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
                       tgt->lut_obd->obd_name,  idx);
                LBUG();
        }
+       atomic_inc(&tgt->lut_num_clients);
 
-       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
+       CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
+              "generation %d\n",
+              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
+              ted->ted_lcd->lcd_generation);
 
        ted->ted_lr_idx = idx;
        ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
@@ -779,6 +1145,74 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                GOTO(srv_update, rc = 0);
        }
 
+       /* Target that supports multiple reply data */
+       if (tgt_is_multimodrpcs_client(req->rq_export)) {
+               struct tg_reply_data    *trd;
+               struct lsd_reply_data   *lrd;
+               __u64                   *pre_versions;
+               int                      i;
+               loff_t                   off;
+
+               OBD_ALLOC_PTR(trd);
+               if (unlikely(trd == NULL))
+                       GOTO(srv_update, rc = -ENOMEM);
+
+               /* update export last transno */
+               mutex_lock(&ted->ted_lcd_lock);
+               if (tti->tti_transno > ted->ted_lcd->lcd_last_transno)
+                       ted->ted_lcd->lcd_last_transno = tti->tti_transno;
+               mutex_unlock(&ted->ted_lcd_lock);
+
+               /* fill reply data information */
+               lrd = &trd->trd_reply;
+               lrd->lrd_transno = tti->tti_transno;
+               lrd->lrd_xid = req->rq_xid;
+               lrd->lrd_result = th->th_result;
+               lrd->lrd_data = opdata;
+               lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+               pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+               if (pre_versions) {
+                       trd->trd_pre_versions[0] = pre_versions[0];
+                       trd->trd_pre_versions[1] = pre_versions[1];
+                       trd->trd_pre_versions[2] = pre_versions[2];
+                       trd->trd_pre_versions[3] = pre_versions[3];
+               }
+
+               /* find a empty slot */
+               i = tgt_find_free_reply_slot(tgt);
+               if (unlikely(i < 0)) {
+                       CERROR("%s: couldn't find a slot for reply data: "
+                              "rc = %d\n", tgt_name(tgt), i);
+                       GOTO(srv_update, rc = i);
+               }
+               trd->trd_index = i;
+
+               /* write reply data to disk */
+               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+               rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+               if (unlikely(rc != 0)) {
+                       CERROR("%s: can't update %s file: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       RETURN(rc);
+               }
+
+               /* add reply data to target export's reply list */
+               mutex_lock(&ted->ted_lcd_lock);
+               list_add(&trd->trd_list, &ted->ted_reply_list);
+               ted->ted_reply_cnt++;
+               if (ted->ted_reply_cnt > ted->ted_reply_max)
+                       ted->ted_reply_max = ted->ted_reply_cnt;
+               mutex_unlock(&ted->ted_lcd_lock);
+
+               CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
+                      "tag %hu, client gen %u, slot idx %d\n",
+                      trd, lrd->lrd_xid, lrd->lrd_transno,
+                      trd->trd_tag, lrd->lrd_client_gen, i);
+
+               GOTO(srv_update, rc = 0);
+       }
+
        mutex_lock(&ted->ted_lcd_lock);
        LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
        if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
@@ -894,6 +1328,8 @@ static int tgt_clients_data_init(const struct lu_env *env,
        int                      cl_idx;
        int                      rc = 0;
        loff_t                   off = lsd->lsd_client_start;
+       __u32                    generation = 0;
+       struct cfs_hash         *hash = NULL;
 
        ENTRY;
 
@@ -904,6 +1340,10 @@ static int tgt_clients_data_init(const struct lu_env *env,
        if (lcd == NULL)
                RETURN(-ENOMEM);
 
+       hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
+       if (hash == NULL)
+               GOTO(err_out, rc = -ENODEV);
+
        for (cl_idx = 0; off < last_size; cl_idx++) {
                struct obd_export       *exp;
                __u64                    last_transno;
@@ -933,8 +1373,9 @@ static int tgt_clients_data_init(const struct lu_env *env,
                 * need to be set up like real exports as connect does.
                 */
                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                      " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
-                      last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
+                      " srv lr: "LPU64" lx: "LPU64" gen %u\n", lcd->lcd_uuid,
+                      cl_idx, last_transno, lsd->lsd_last_transno,
+                      lcd_last_xid(lcd), lcd->lcd_generation);
 
                exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
                if (IS_ERR(exp)) {
@@ -959,6 +1400,25 @@ static int tgt_clients_data_init(const struct lu_env *env,
                exp->exp_in_recovery = 0;
                spin_unlock(&exp->exp_lock);
                obd->obd_max_recoverable_clients++;
+
+               if (tgt->lut_lsd.lsd_feature_incompat &
+                   OBD_INCOMPAT_MULTI_RPCS &&
+                   lcd->lcd_generation != 0) {
+                       /* compute the highest valid client generation */
+                       generation = max(generation, lcd->lcd_generation);
+
+                       /* fill client_generation <-> export hash table */
+                       rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
+                                                &exp->exp_gen_hash);
+                       if (rc != 0) {
+                               CERROR("%s: duplicate export for client "
+                                      "generation %u\n",
+                                      tgt_name(tgt), lcd->lcd_generation);
+                               class_export_put(exp);
+                               GOTO(err_out, rc);
+                       }
+               }
+
                class_export_put(exp);
 
                /* Need to check last_rcvd even for duplicated exports. */
@@ -971,7 +1431,12 @@ static int tgt_clients_data_init(const struct lu_env *env,
                spin_unlock(&tgt->lut_translock);
        }
 
+       /* record highest valid client generation */
+       atomic_set(&tgt->lut_client_generation, generation);
+
 err_out:
+       if (hash != NULL)
+               cfs_hash_putref(hash);
        OBD_FREE_PTR(lcd);
        RETURN(rc);
 }
@@ -988,7 +1453,8 @@ static struct server_compat_data tgt_scd[] = {
                .rocompat = OBD_ROCOMPAT_LOVOBJID,
                .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
                            OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
-                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
+                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
+                           OBD_INCOMPAT_MULTI_RPCS,
                .rocinit = OBD_ROCOMPAT_LOVOBJID,
                .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
                           OBD_INCOMPAT_MULTI_OI,
@@ -1263,3 +1729,228 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
                                          tgt_ses_req(tsi));
        return rc;
 }
+
+int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct lsd_reply_data   *lrd = &tti->tti_lrd;
+       unsigned long            reply_data_size;
+       int                      rc;
+       struct lsd_reply_header *lrh = NULL;
+       struct lsd_client_data  *lcd = NULL;
+       struct tg_reply_data    *trd = NULL;
+       int                      idx;
+       loff_t                   off;
+       struct cfs_hash         *hash = NULL;
+       struct obd_export       *exp;
+       struct obd_export       *tmp;
+       struct tg_export_data   *ted;
+       int                      reply_data_recovered = 0;
+
+       rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
+       if (rc)
+               GOTO(out, rc);
+       reply_data_size = (unsigned long)tti->tti_attr.la_size;
+
+       OBD_ALLOC_PTR(lrh);
+       if (lrh == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       if (reply_data_size == 0) {
+               CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
+                      tgt_name(tgt));
+               lrh->lrh_magic = LRH_MAGIC;
+               lrh->lrh_header_size = sizeof(struct lsd_reply_header);
+               lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
+               rc = tgt_reply_header_write(env, tgt, lrh);
+               if (rc) {
+                       CERROR("%s: error writing %s: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       GOTO(out, rc);
+               }
+       } else {
+               rc = tgt_reply_header_read(env, tgt, lrh);
+               if (rc) {
+                       CERROR("%s: error reading %s: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       GOTO(out, rc);
+               }
+               if (lrh->lrh_magic != LRH_MAGIC ||
+                   lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
+                   lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
+                       CERROR("%s: invalid header in %s\n",
+                              tgt_name(tgt), REPLY_DATA);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
+               if (hash == NULL)
+                       GOTO(out, rc = -ENODEV);
+
+               OBD_ALLOC_PTR(lcd);
+               if (lcd == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               OBD_ALLOC_PTR(trd);
+               if (trd == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               /* Load reply_data from disk */
+               for (idx = 0, off = sizeof(struct lsd_reply_header);
+                    off < reply_data_size;
+                    idx++, off += sizeof(struct lsd_reply_data)) {
+                       rc = tgt_reply_data_read(env, tgt, lrd, off);
+                       if (rc) {
+                               CERROR("%s: error reading %s: rc = %d\n",
+                                      tgt_name(tgt), REPLY_DATA, rc);
+                               GOTO(out, rc);
+                       }
+
+                       exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+                       if (exp == NULL) {
+                               /* old reply data from a disconnected client */
+                               continue;
+                       }
+                       ted = &exp->exp_target_data;
+                       mutex_lock(&ted->ted_lcd_lock);
+
+                       /* create in-memory reply_data and link it to
+                        * target export's reply list */
+                       tgt_set_reply_slot(tgt, idx);
+                       trd->trd_reply = *lrd;
+                       trd->trd_pre_versions[0] = 0;
+                       trd->trd_pre_versions[1] = 0;
+                       trd->trd_pre_versions[2] = 0;
+                       trd->trd_pre_versions[3] = 0;
+                       trd->trd_index = idx;
+                       trd->trd_tag = 0;
+                       list_add(&trd->trd_list, &ted->ted_reply_list);
+                       ted->ted_reply_cnt++;
+                       if (ted->ted_reply_cnt > ted->ted_reply_max)
+                               ted->ted_reply_max = ted->ted_reply_cnt;
+
+                       CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
+                              "transno %llu, client gen %u, slot idx %d\n",
+                              tgt_name(tgt), trd, lrd->lrd_xid,
+                              lrd->lrd_transno, lrd->lrd_client_gen,
+                              trd->trd_index);
+
+                       /* update export last committed transation */
+                       exp->exp_last_committed = max(exp->exp_last_committed,
+                                                     lrd->lrd_transno);
+
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       class_export_put(exp);
+
+                       /* update target last committed transaction */
+                       spin_lock(&tgt->lut_translock);
+                       tgt->lut_last_transno = max(tgt->lut_last_transno,
+                                                   lrd->lrd_transno);
+                       spin_unlock(&tgt->lut_translock);
+
+                       reply_data_recovered++;
+
+                       OBD_ALLOC_PTR(trd);
+                       if (trd == NULL)
+                               GOTO(out, rc = -ENOMEM);
+               }
+               CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
+                      tgt_name(tgt), reply_data_recovered);
+
+               /* delete entries from client_generation<->export hash */
+               spin_lock(&tgt->lut_obd->obd_dev_lock);
+               list_for_each_entry_safe(exp, tmp,
+                                        &tgt->lut_obd->obd_exports,
+                                        exp_obd_chain) {
+                       struct tg_export_data *ted = &exp->exp_target_data;
+
+                       if (!hlist_unhashed(&exp->exp_gen_hash))
+                               cfs_hash_del(hash,
+                                            &ted->ted_lcd->lcd_generation,
+                                            &exp->exp_gen_hash);
+               }
+               spin_unlock(&tgt->lut_obd->obd_dev_lock);
+       }
+
+       rc = 0;
+
+out:
+       if (hash != NULL)
+               cfs_hash_putref(hash);
+       if (lcd != NULL)
+               OBD_FREE_PTR(lcd);
+       if (trd != NULL)
+               OBD_FREE_PTR(trd);
+       if (lrh != NULL)
+               OBD_FREE_PTR(lrh);
+       return rc;
+}
+
+/* Look for a reply data matching specified request @req
+ * A copy is returned in @trd if the pointer is not NULL
+ */
+bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+{
+       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
+       struct tg_reply_data    *reply, *tmp;
+       bool                     found = false;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(reply, tmp, &ted->ted_reply_list, trd_list) {
+               if (reply->trd_reply.lrd_xid == req->rq_xid) {
+                       found = true;
+                       break;
+               }
+       }
+       if (found && trd != NULL)
+               *trd = *reply;
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
+              found ? 1 : 0);
+
+       return found;
+}
+EXPORT_SYMBOL(tgt_lookup_reply);
+
+int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_reply.lrd_xid > rcvd_xid)
+                       continue;
+               ted->ted_release_xid++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       return 0;
+}
+
+int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return 0;
+
+       mutex_lock(&ted->ted_lcd_lock);
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+               break;
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+
+       return 0;
+}
+
index 269d896..ea628fd 100644 (file)
@@ -56,6 +56,10 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        lut->lut_bottom = dt;
        lut->lut_last_rcvd = NULL;
        lut->lut_client_bitmap = NULL;
+       atomic_set(&lut->lut_num_clients, 0);
+       atomic_set(&lut->lut_client_generation, 0);
+       lut->lut_reply_data = NULL;
+       lut->lut_reply_bitmap = NULL;
        obd->u.obt.obt_lut = lut;
        obd->u.obt.obt_magic = OBT_MAGIC;
 
@@ -93,13 +97,13 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
                rc = PTR_ERR(o);
                CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut),
                       rc);
-               GOTO(out_bitmap, rc);
+               GOTO(out, rc);
        }
 
        lut->lut_last_rcvd = o;
        rc = tgt_server_data_init(env, lut);
        if (rc < 0)
-               GOTO(out_obj, rc);
+               GOTO(out, rc);
 
        /* prepare transactions callbacks */
        lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb;
@@ -112,23 +116,89 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb);
        lut->lut_bottom->dd_lu_dev.ld_site->ls_tgt = lut;
 
+       /* reply_data is supported by MDT targets only for now */
+       if (strncmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) != 0)
+               RETURN(0);
+
+       OBD_ALLOC(lut->lut_reply_bitmap,
+                 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
+       if (lut->lut_reply_bitmap == NULL)
+               GOTO(out, rc);
+
+       memset(&attr, 0, sizeof(attr));
+       attr.la_valid = LA_MODE;
+       attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+       dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+       lu_local_obj_fid(&fid, REPLY_DATA_OID);
+
+       o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
+       if (IS_ERR(o)) {
+               rc = PTR_ERR(o);
+               CERROR("%s: cannot open REPLY_DATA: rc = %d\n", tgt_name(lut),
+                      rc);
+               GOTO(out, rc);
+       }
+       lut->lut_reply_data = o;
+
+       rc = tgt_reply_data_init(env, lut);
+       if (rc < 0)
+               GOTO(out, rc);
+
        RETURN(0);
-out_obj:
-       lu_object_put(env, &lut->lut_last_rcvd->do_lu);
+out:
+       if (lut->lut_last_rcvd != NULL)
+               lu_object_put(env, &lut->lut_last_rcvd->do_lu);
        lut->lut_last_rcvd = NULL;
-out_bitmap:
-       OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+       if (lut->lut_client_bitmap != NULL)
+               OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
        lut->lut_client_bitmap = NULL;
+       if (lut->lut_reply_data != NULL)
+               lu_object_put(env, &lut->lut_reply_data->do_lu);
+       lut->lut_reply_data = NULL;
+       if (lut->lut_reply_bitmap != NULL)
+               OBD_FREE(lut->lut_reply_bitmap,
+                        LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
+       lut->lut_reply_bitmap = NULL;
        return rc;
 }
 EXPORT_SYMBOL(tgt_init);
 
 void tgt_fini(const struct lu_env *env, struct lu_target *lut)
 {
+       int i;
+       int rc;
        ENTRY;
 
+       if (lut->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
+           atomic_read(&lut->lut_num_clients) == 0) {
+               /* Clear MULTI RPCS incompatibility flag that prevents previous
+                * Lustre versions to mount a target with reply_data file */
+               lut->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
+               rc = tgt_server_data_update(env, lut, 1);
+               if (rc < 0)
+                       CERROR("%s: unable to clear MULTI RPCS "
+                              "incompatibility flag\n",
+                              lut->lut_obd->obd_name);
+       }
+
        sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset);
 
+       if (lut->lut_reply_data != NULL)
+               lu_object_put(env, &lut->lut_reply_data->do_lu);
+       lut->lut_reply_data = NULL;
+       if (lut->lut_reply_bitmap != NULL) {
+               for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
+                       if (lut->lut_reply_bitmap[i] != NULL)
+                               OBD_FREE(lut->lut_reply_bitmap[i],
+                                   BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+                                   sizeof(long));
+                       lut->lut_reply_bitmap[i] = NULL;
+               }
+               OBD_FREE(lut->lut_reply_bitmap,
+                        LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
+       }
+       lut->lut_reply_bitmap = NULL;
        if (lut->lut_client_bitmap) {
                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
                lut->lut_client_bitmap = NULL;