Whamcloud - gitweb
LU-14393 recovery: reply reconstruction for batched RPCs
[fs/lustre-release.git] / lustre / mdt / mdt_batch.c
index 936b981..5e6049b 100644 (file)
@@ -60,6 +60,26 @@ static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
        return 0;
 }
 
+typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
+
+static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
+
+static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
+{
+       mdt_batch_reconstructor reconst;
+       int rc;
+
+       ENTRY;
+
+       if (opc >= BUT_LAST_OPC)
+               RETURN(-EOPNOTSUPP);
+
+       reconst = reconstructors[opc];
+       LASSERT(reconst != NULL);
+       rc = reconst(tsi);
+       RETURN(rc);
+}
+
 /* Batch UpdaTe Request with a format known in advance */
 #define TGT_BUT_HDL(flags, opc, fn)                    \
 [opc - BUT_FIRST_OPC] = {                              \
@@ -99,7 +119,9 @@ int mdt_batch(struct tgt_session_info *tsi)
        struct but_update_buffer *bub = NULL;
        struct batch_update_reply *reply = NULL;
        struct ptlrpc_bulk_desc *desc = NULL;
+       struct tg_reply_data *trd = NULL;
        struct lustre_msg *repmsg = NULL;
+       bool need_reconstruct;
        __u32 handled_update_count = 0;
        __u32 update_buf_count;
        __u32 packed_replen;
@@ -199,7 +221,13 @@ int mdt_batch(struct tgt_session_info *tsi)
        info->mti_max_repsize = buh->buh_reply_size;
        info->mti_batch_env = 1;
        info->mti_pill = pill;
+       tsi->tsi_batch_env = true;
+
+       OBD_ALLOC_PTR(trd);
+       if (trd == NULL)
+               GOTO(out, rc = -ENOMEM);
 
+       need_reconstruct = tgt_check_resent(req, trd);
        /* Walk through sub requests in the batch request to execute them. */
        for (i = 0; i < update_buf_count; i++) {
                struct batch_update_request *bur;
@@ -231,8 +259,6 @@ int mdt_batch(struct tgt_session_info *tsi)
                                GOTO(out, rc = -ENOTSUPP);
                        }
 
-                       /* TODO: Check resend case only for modifying RPC */
-
                        LASSERT(h->th_fmt != NULL);
                        req_capsule_subreq_init(pill, h->th_fmt, req,
                                                reqmsg, repmsg, RCL_SERVER);
@@ -248,10 +274,27 @@ int mdt_batch(struct tgt_session_info *tsi)
                        if (rc)
                                GOTO(out, rc);
 
+                       /* Need to reconstruct the reply for committed sub
+                        * requests in a batched RPC.
+                        * It only calls reconstruct for modification sub
+                        * requests.
+                        * For uncommitted or read-only sub requests, the server
+                        * should re-execute them via the ->th_act() below.
+                        */
+                       if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
+                           handled_update_count <=
+                           trd->trd_reply.lrd_batch_idx) {
+                               rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
+                               if (rc)
+                                       GOTO(out, rc);
+                               GOTO(next, rc);
+                       }
+
+                       tsi->tsi_batch_idx = handled_update_count;
                        rc = h->th_act(tsi);
                        if (rc)
                                GOTO(out, rc);
-
+next:
                        repmsg->lm_result = rc;
                        mdt_thread_info_reset(info);
                        /*
@@ -289,10 +332,14 @@ out:
                OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
        }
 
+       if (trd)
+               OBD_FREE_PTR(trd);
+
        if (desc != NULL)
                ptlrpc_free_bulk(desc);
 
        mdt_thread_info_fini(info);
+       tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
        RETURN(rc);
 }