/* RPC transaction handling */
bool tsi_mult_trans;
int tsi_has_trans;
+
+ /* Batched RPC replay */
+ bool tsi_batch_env;
+ /* Sub request index in the batched RPC. */
+ __u32 tsi_batch_idx;
+ struct tg_reply_data *tsi_batch_trd;
};
static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env)
struct lfsck_reply *,
struct lfsck_query *));
int req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
+bool tgt_check_resent(struct ptlrpc_request *req, struct tg_reply_data *trd);
extern struct tgt_handler tgt_sec_ctx_handlers[];
extern struct tgt_handler tgt_lfsck_handlers[];
#define OBD_FAIL_OUT_UPDATE_DROP 0x1707
#define OBD_FAIL_OUT_OBJECT_MISS 0x1708
#define OBD_FAIL_OUT_EIO 0x1709
+#define OBD_FAIL_BUT_UPDATE_NET_REP 0x170a
/* MIGRATE */
#define OBD_FAIL_MIGRATE_ENTRIES 0x1801
__u64 lrd_data; /* per-operation data */
__u32 lrd_result; /* request result */
__u32 lrd_client_gen; /* client generation */
+ __u32 lrd_batch_idx; /* sub request index in the batched RPC */
+ __u32 lrd_padding[7]; /* unused fields, total size is 8X __u64 */
};
/* Header of the reply_data file */
return 0;
}
+typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
+
+static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
+
+static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
+{
+ mdt_batch_reconstructor reconst;
+ int rc;
+
+ ENTRY;
+
+ if (opc >= BUT_LAST_OPC)
+ RETURN(-EOPNOTSUPP);
+
+ reconst = reconstructors[opc];
+ LASSERT(reconst != NULL);
+ rc = reconst(tsi);
+ RETURN(rc);
+}
+
/* Batch UpdaTe Request with a format known in advance */
#define TGT_BUT_HDL(flags, opc, fn) \
[opc - BUT_FIRST_OPC] = { \
struct but_update_buffer *bub = NULL;
struct batch_update_reply *reply = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
+ struct tg_reply_data *trd = NULL;
struct lustre_msg *repmsg = NULL;
+ bool need_reconstruct;
__u32 handled_update_count = 0;
__u32 update_buf_count;
__u32 packed_replen;
info->mti_max_repsize = buh->buh_reply_size;
info->mti_batch_env = 1;
info->mti_pill = pill;
+ tsi->tsi_batch_env = true;
+
+ OBD_ALLOC_PTR(trd);
+ if (trd == NULL)
+ GOTO(out, rc = -ENOMEM);
+ need_reconstruct = tgt_check_resent(req, trd);
/* Walk through sub requests in the batch request to execute them. */
for (i = 0; i < update_buf_count; i++) {
struct batch_update_request *bur;
GOTO(out, rc = -ENOTSUPP);
}
- /* TODO: Check resend case only for modifying RPC */
-
LASSERT(h->th_fmt != NULL);
req_capsule_subreq_init(pill, h->th_fmt, req,
reqmsg, repmsg, RCL_SERVER);
if (rc)
GOTO(out, rc);
+ /* Need to reconstruct the reply for committed sub
+ * requests in a batched RPC.
+ * It only calls reconstruct for modification sub
+ * requests.
+ * For uncommitted or read-only sub requests, the server
+ * should re-execute them via the ->th_act() below.
+ */
+ if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
+ handled_update_count <=
+ trd->trd_reply.lrd_batch_idx) {
+ rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
+ if (rc)
+ GOTO(out, rc);
+ GOTO(next, rc);
+ }
+
+ tsi->tsi_batch_idx = handled_update_count;
rc = h->th_act(tsi);
if (rc)
GOTO(out, rc);
-
+next:
repmsg->lm_result = rc;
mdt_thread_info_reset(info);
/*
OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
}
+ if (trd)
+ OBD_FREE_PTR(trd);
+
if (desc != NULL)
ptlrpc_free_bulk(desc);
mdt_thread_info_fini(info);
+ tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
RETURN(rc);
}
mdt_reconstructor reconst;
ENTRY;
- LASSERT(mti->mti_rr.rr_opcode < REINT_MAX &&
- (reconst = reconstructors[mti->mti_rr.rr_opcode]) != NULL);
+ reconst = reconstructors[mti->mti_rr.rr_opcode];
+ LASSERT(reconst != NULL);
reconst(mti, lhc);
EXIT;
}
if (aa->ba_head == NULL)
RETURN(0);
+ ptlrpc_put_mod_rpc_slot(req);
/* Unpack the results from the reply message. */
if (req->rq_repmsg != NULL && req->rq_replied) {
reply = req_capsule_server_sized_get(&req->rq_pill,
aa->ba_head = head;
req->rq_interpret_reply = batch_update_interpret;
+ /*
+ * Only acquire modification RPC slot for the batched RPC
+ * which contains metadata updates.
+ */
+ if (!(bh->lbt_flags & BATCH_FL_RDONLY))
+ ptlrpc_get_mod_rpc_slot(req);
+
if (bh->lbt_flags & BATCH_FL_SYNC) {
rc = ptlrpc_queue_wait(req);
} else {
struct object_update_reply *reply,
int index);
-static inline bool out_check_resent(struct ptlrpc_request *req,
- struct tg_reply_data *trd)
-{
- struct lsd_reply_data *lrd;
- bool reconstruct = false;
-
- if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
- return false;
-
- if (req_can_reconstruct(req, trd)) {
- lrd = &trd->trd_reply;
- req->rq_transno = lrd->lrd_transno;
- req->rq_status = lrd->lrd_result;
-
- if (req->rq_status != 0)
- req->rq_transno = 0;
- lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
- lustre_msg_set_status(req->rq_repmsg, req->rq_status);
-
- DEBUG_REQ(D_HA, req, "reconstruct resent RPC");
- reconstruct = true;
- } else {
- DEBUG_REQ(D_HA, req, "no reply for RESENT req");
- }
-
- return reconstruct;
-}
-
static int out_create(struct tgt_session_info *tsi)
{
struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
if (!trd)
GOTO(out_free, rc = -ENOMEM);
- need_reconstruct = out_check_resent(pill->rc_req, trd);
+ need_reconstruct = tgt_check_resent(pill->rc_req, trd);
/* Walk through updates in the request to execute them */
for (i = 0; i < update_buf_count; i++) {
case OBD_PING:
case MDS_REINT:
case OUT_UPDATE:
+ case MDS_BATCH:
case SEQ_QUERY:
case FLD_QUERY:
case FLD_READ:
}
EXPORT_SYMBOL(req_can_reconstruct);
+bool tgt_check_resent(struct ptlrpc_request *req, struct tg_reply_data *trd)
+{
+ struct lsd_reply_data *lrd;
+ bool need_reconstruct = false;
+
+ ENTRY;
+
+ if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
+ return false;
+
+ if (req_can_reconstruct(req, trd)) {
+ lrd = &trd->trd_reply;
+ req->rq_transno = lrd->lrd_transno;
+ req->rq_status = lrd->lrd_result;
+ if (req->rq_status != 0)
+ req->rq_transno = 0;
+ lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+ lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+
+ DEBUG_REQ(D_HA, req, "reconstruct resent RPC");
+ need_reconstruct = true;
+ } else {
+ DEBUG_REQ(D_HA, req, "no reply for RESENT req");
+ }
+
+ return need_reconstruct;
+}
+EXPORT_SYMBOL(tgt_check_resent);
lrd->lrd_data = le64_to_cpu(buf->lrd_data);
lrd->lrd_result = le32_to_cpu(buf->lrd_result);
lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
-
+ lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx);
return 0;
}
struct ptlrpc_request *req,
struct thandle *th, bool update_lrd_file)
{
- struct lsd_reply_data *lrd;
- int i;
- int rc;
+ struct tgt_session_info *tsi = NULL;
+ struct lsd_reply_data *lrd;
+ int i = -1;
+ int rc;
lrd = &trd->trd_reply;
/* update export last transno */
ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
mutex_unlock(&ted->ted_lcd_lock);
- if (tgt != NULL) {
+ if (!tgt) {
+ trd->trd_index = TRD_INDEX_MEMORY;
+ GOTO(add_reply_data, rc = 0);
+ }
+
+ if (env) {
+ tsi = tgt_ses_info(env);
+ if (tsi->tsi_batch_trd) {
+ LASSERT(tsi->tsi_batch_env);
+ trd = tsi->tsi_batch_trd;
+ i = trd->trd_index;
+ }
+ }
+
+ if (i == -1) {
/* find a empty slot */
i = tgt_find_free_reply_slot(tgt);
if (unlikely(i < 0)) {
- CERROR("%s: couldn't find a slot for reply data: "
- "rc = %d\n", tgt_name(tgt), i);
+ CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
+ tgt_name(tgt), i);
RETURN(i);
}
trd->trd_index = i;
+ }
- if (update_lrd_file) {
- loff_t off;
+ if (update_lrd_file) {
+ loff_t off;
- /* write reply data to disk */
- off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
- rc = tgt_reply_data_write(env, tgt, lrd, off, th);
- if (unlikely(rc != 0)) {
- CERROR("%s: can't update %s file: rc = %d\n",
- tgt_name(tgt), REPLY_DATA, rc);
- GOTO(free_slot, rc);
- }
+ /* write reply data to disk */
+ off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+ rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+ if (unlikely(rc != 0)) {
+ CERROR("%s: can't update %s file: rc = %d\n",
+ tgt_name(tgt), REPLY_DATA, rc);
+ GOTO(free_slot, rc);
}
- } else {
- trd->trd_index = TRD_INDEX_MEMORY;
}
+add_reply_data:
/* add reply data to target export's reply list */
mutex_lock(&ted->ted_lcd_lock);
if (req != NULL) {
GOTO(free_slot, rc = -EBADR);
}
- if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
+ !(tsi && tsi->tsi_batch_env &&
+ trd->trd_reply.lrd_batch_idx > 0))
tgt_clean_by_tag(req->rq_export, req->rq_xid,
trd->trd_tag);
}
- list_add(&trd->trd_list, &ted->ted_reply_list);
- ted->ted_reply_cnt++;
- if (ted->ted_reply_cnt > ted->ted_reply_max)
- ted->ted_reply_max = ted->ted_reply_cnt;
+
+ /*
+ * For the batched RPC, all sub requests use one common @trd for the
+ * reply data.
+ */
+ if (list_empty(&trd->trd_list)) {
+ list_add(&trd->trd_list, &ted->ted_reply_list);
+ ted->ted_reply_cnt++;
+ if (ted->ted_reply_cnt > ted->ted_reply_max)
+ ted->ted_reply_max = ted->ted_reply_cnt;
+ }
mutex_unlock(&ted->ted_lcd_lock);
CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
bool write_update,
__u64 transno)
{
- struct tg_reply_data *trd;
- struct lsd_reply_data *lrd;
- __u64 *pre_versions = NULL;
- int rc;
+ struct tg_reply_data *trd = NULL;
+ struct lsd_reply_data *lrd;
+ __u64 *pre_versions = NULL;
struct tgt_session_info *tsi = NULL;
+ int rc;
- OBD_ALLOC_PTR(trd);
- if (unlikely(trd == NULL))
- RETURN(-ENOMEM);
-
- if (env != NULL)
+ if (env != NULL) {
tsi = tgt_ses_info(env);
+ if (tsi->tsi_batch_trd) {
+ LASSERT(tsi->tsi_batch_env);
+ trd = tsi->tsi_batch_trd;
+ }
+ }
+
+ if (trd == NULL) {
+ OBD_ALLOC_PTR(trd);
+ if (unlikely(trd == NULL))
+ RETURN(-ENOMEM);
+
+ INIT_LIST_HEAD(&trd->trd_list);
+ }
/* fill reply data information */
lrd = &trd->trd_reply;
lrd->lrd_transno = transno;
- if (req != NULL) {
+ if (tsi && tsi->tsi_batch_env) {
+ if (tsi->tsi_batch_idx == 0) {
+ LASSERT(req != NULL);
+ tsi->tsi_batch_trd = trd;
+ trd->trd_index = -1;
+ lrd->lrd_xid = req->rq_xid;
+ trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+ lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+ }
+ lrd->lrd_batch_idx = tsi->tsi_batch_idx;
+ } else if (req != NULL) {
lrd->lrd_xid = req->rq_xid;
trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
- if (tsi->tsi_has_trans && !echo_client) {
+ if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
if (!tsi->tsi_mult_trans) {
CDEBUG(D_HA, "More than one transaction %llu\n",
tti->tti_transno);
tsi->tsi_has_trans);
tsi->tsi_has_trans = 0;
tsi->tsi_mult_trans = false;
+ tsi->tsi_batch_trd = NULL;
+ tsi->tsi_batch_env = false;
+ tsi->tsi_batch_idx = 0;
}
/* context key: tgt_session_key */
# tool to create lustre filesystem images
ALWAYS_EXCEPT="$CONF_SANITY_EXCEPT 32newtarball"
-# bug number for skipped test: LU-11915
-ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110"
+# bug number for skipped test: LU-11915 LU-14393
+ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110 32 108"
# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
if $SHARED_KEY; then