return 0;
}
+typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
+
+static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
+
+static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
+{
+ mdt_batch_reconstructor reconst;
+ int rc;
+
+ ENTRY;
+
+ if (opc >= BUT_LAST_OPC)
+ RETURN(-EOPNOTSUPP);
+
+ reconst = reconstructors[opc];
+ LASSERT(reconst != NULL);
+ rc = reconst(tsi);
+ RETURN(rc);
+}
+
/* Batch UpdaTe Request with a format known in advance */
#define TGT_BUT_HDL(flags, opc, fn) \
[opc - BUT_FIRST_OPC] = { \
struct but_update_buffer *bub = NULL;
struct batch_update_reply *reply = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
+ struct tg_reply_data *trd = NULL;
struct lustre_msg *repmsg = NULL;
+ bool need_reconstruct;
__u32 handled_update_count = 0;
__u32 update_buf_count;
__u32 packed_replen;
info->mti_max_repsize = buh->buh_reply_size;
info->mti_batch_env = 1;
info->mti_pill = pill;
+ tsi->tsi_batch_env = true;
+
+ OBD_ALLOC_PTR(trd);
+ if (trd == NULL)
+ GOTO(out, rc = -ENOMEM);
+ need_reconstruct = tgt_check_resent(req, trd);
/* Walk through sub requests in the batch request to execute them. */
for (i = 0; i < update_buf_count; i++) {
struct batch_update_request *bur;
GOTO(out, rc = -ENOTSUPP);
}
- /* TODO: Check resend case only for modifying RPC */
-
LASSERT(h->th_fmt != NULL);
req_capsule_subreq_init(pill, h->th_fmt, req,
reqmsg, repmsg, RCL_SERVER);
if (rc)
GOTO(out, rc);
+ /* Need to reconstruct the reply for committed sub
+ * requests in a batched RPC.
+ * It only calls reconstruct for modification sub
+ * requests.
+ * For uncommitted or read-only sub requests, the server
+ * should re-execute them via the ->th_act() below.
+ */
+ if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
+ handled_update_count <=
+ trd->trd_reply.lrd_batch_idx) {
+ rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
+ if (rc)
+ GOTO(out, rc);
+ GOTO(next, rc);
+ }
+
+ tsi->tsi_batch_idx = handled_update_count;
rc = h->th_act(tsi);
if (rc)
GOTO(out, rc);
-
+next:
repmsg->lm_result = rc;
mdt_thread_info_reset(info);
/*
OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
}
+ if (trd)
+ OBD_FREE_PTR(trd);
+
if (desc != NULL)
ptlrpc_free_bulk(desc);
mdt_thread_info_fini(info);
+ tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
RETURN(rc);
}