From 178988d67aa2f83aa65badc81e305fac0328838d Mon Sep 17 00:00:00 2001 From: Qian Yingjin Date: Tue, 16 Aug 2022 03:57:47 -0400 Subject: [PATCH] LU-14393 recovery: reply reconstruction for batched RPCs Batched RPC can boost the metadata performance for Lustre dramatically. However, it also increases the complexity of the recovery, such as how to reconstruct the reply in case of the RPC resend if the reply was lost. In this patch, it adds a new field @lrd_batch_idx in the data structure @lsd_reply_data to store each slot of the "reply_data" file: struct lsd_reply_data { __u64 lrd_transno; /* transaction number */ __u64 lrd_xid; /* transmission id */ __u64 lrd_data; /* per-operation data */ __u32 lrd_result; /* request result */ __u32 lrd_client_gen; /* client generation */ __u32 lrd_batch_idx; /* index in a batched RPC */ __u32 lrd_padding[7]; /* unused fields */ }; When found that a batched RPC was a resend RPC request, and if the index of the sub request in the batched RPC is smaller or equal than @lrd_batch_idx in the reply data, it means that the sub request has already executed, the server will reconstruct the reply for this sub request; if the index is larger than @lrd_batch_idx, the server will re-execute the sub reqeust in the batched RPC. Disable conf-sanity/32{a,b,c,d,e,f,g}, 108{a,b} temporarily until the compatibility issue during upgrade for new reply data format is fixed. Signed-off-by: Qian Yingjin Change-Id: Id48ecc263002cb783f5032642d05e1f3f6673837 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48228 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Mikhail Pershin Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/lu_target.h | 7 ++ lustre/include/obd_support.h | 1 + lustre/include/uapi/linux/lustre/lustre_disk.h | 2 + lustre/mdt/mdt_batch.c | 53 +++++++++++- lustre/mdt/mdt_recovery.c | 4 +- lustre/ptlrpc/batch.c | 8 ++ lustre/target/out_handler.c | 30 +------ lustre/target/tgt_handler.c | 29 +++++++ lustre/target/tgt_lastrcvd.c | 112 +++++++++++++++++-------- lustre/target/tgt_main.c | 3 + lustre/tests/conf-sanity.sh | 4 +- 11 files changed, 182 insertions(+), 71 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 43f015d..d1d6255 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -299,6 +299,12 @@ struct tgt_session_info { /* RPC transaction handling */ bool tsi_mult_trans; int tsi_has_trans; + + /* Batched RPC replay */ + bool tsi_batch_env; + /* Sub request index in the batched RPC. */ + __u32 tsi_batch_idx; + struct tg_reply_data *tsi_batch_trd; }; static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env) @@ -483,6 +489,7 @@ void tgt_register_lfsck_query(int (*query)(const struct lu_env *, struct lfsck_reply *, struct lfsck_query *)); int req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd); +bool tgt_check_resent(struct ptlrpc_request *req, struct tg_reply_data *trd); extern struct tgt_handler tgt_sec_ctx_handlers[]; extern struct tgt_handler tgt_lfsck_handlers[]; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 5126b46..d765048 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -687,6 +687,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_OUT_UPDATE_DROP 0x1707 #define OBD_FAIL_OUT_OBJECT_MISS 0x1708 #define OBD_FAIL_OUT_EIO 0x1709 +#define OBD_FAIL_BUT_UPDATE_NET_REP 0x170a /* MIGRATE */ #define OBD_FAIL_MIGRATE_ENTRIES 0x1801 diff --git a/lustre/include/uapi/linux/lustre/lustre_disk.h b/lustre/include/uapi/linux/lustre/lustre_disk.h index 54f73fd..8bbd2bd 100644 --- a/lustre/include/uapi/linux/lustre/lustre_disk.h +++ b/lustre/include/uapi/linux/lustre/lustre_disk.h @@ -215,6 +215,8 @@ struct lsd_reply_data { __u64 lrd_data; /* per-operation data */ __u32 lrd_result; /* request result */ __u32 lrd_client_gen; /* client generation */ + __u32 lrd_batch_idx; /* sub request index in the batched RPC */ + __u32 lrd_padding[7]; /* unused fields, total size is 8X __u64 */ }; /* Header of the reply_data file */ diff --git a/lustre/mdt/mdt_batch.c b/lustre/mdt/mdt_batch.c index 936b981..5e6049b 100644 --- a/lustre/mdt/mdt_batch.c +++ b/lustre/mdt/mdt_batch.c @@ -60,6 +60,26 @@ static int mdt_batch_pack_repmsg(struct mdt_thread_info *info) return 0; } +typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi); + +static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC]; + +static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc) +{ + mdt_batch_reconstructor reconst; + int rc; + + ENTRY; + + if (opc >= BUT_LAST_OPC) + RETURN(-EOPNOTSUPP); + + reconst = reconstructors[opc]; + LASSERT(reconst != NULL); + rc = reconst(tsi); + RETURN(rc); +} + /* Batch UpdaTe Request with a format known in advance */ #define TGT_BUT_HDL(flags, opc, fn) \ [opc - BUT_FIRST_OPC] = { \ @@ -99,7 +119,9 @@ int mdt_batch(struct tgt_session_info *tsi) struct but_update_buffer *bub = NULL; struct batch_update_reply *reply = NULL; struct ptlrpc_bulk_desc *desc = NULL; + struct tg_reply_data *trd = NULL; struct lustre_msg *repmsg = NULL; + bool need_reconstruct; __u32 handled_update_count = 0; __u32 update_buf_count; __u32 packed_replen; @@ -199,7 +221,13 @@ int mdt_batch(struct tgt_session_info *tsi) info->mti_max_repsize = buh->buh_reply_size; info->mti_batch_env = 1; info->mti_pill = pill; + tsi->tsi_batch_env = true; + + OBD_ALLOC_PTR(trd); + if (trd == NULL) + GOTO(out, rc = -ENOMEM); + need_reconstruct = tgt_check_resent(req, trd); /* Walk through sub requests in the batch request to execute them. */ for (i = 0; i < update_buf_count; i++) { struct batch_update_request *bur; @@ -231,8 +259,6 @@ int mdt_batch(struct tgt_session_info *tsi) GOTO(out, rc = -ENOTSUPP); } - /* TODO: Check resend case only for modifying RPC */ - LASSERT(h->th_fmt != NULL); req_capsule_subreq_init(pill, h->th_fmt, req, reqmsg, repmsg, RCL_SERVER); @@ -248,10 +274,27 @@ int mdt_batch(struct tgt_session_info *tsi) if (rc) GOTO(out, rc); + /* Need to reconstruct the reply for committed sub + * requests in a batched RPC. + * It only calls reconstruct for modification sub + * requests. + * For uncommitted or read-only sub requests, the server + * should re-execute them via the ->th_act() below. + */ + if ((h->th_flags & IS_MUTABLE) && need_reconstruct && + handled_update_count <= + trd->trd_reply.lrd_batch_idx) { + rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc); + if (rc) + GOTO(out, rc); + GOTO(next, rc); + } + + tsi->tsi_batch_idx = handled_update_count; rc = h->th_act(tsi); if (rc) GOTO(out, rc); - +next: repmsg->lm_result = rc; mdt_thread_info_reset(info); /* @@ -289,10 +332,14 @@ out: OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count); } + if (trd) + OBD_FREE_PTR(trd); + if (desc != NULL) ptlrpc_free_bulk(desc); mdt_thread_info_fini(info); + tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP; RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index d560f02..760e029 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -289,8 +289,8 @@ void mdt_reconstruct(struct mdt_thread_info *mti, struct mdt_lock_handle *lhc) mdt_reconstructor reconst; ENTRY; - LASSERT(mti->mti_rr.rr_opcode < REINT_MAX && - (reconst = reconstructors[mti->mti_rr.rr_opcode]) != NULL); + reconst = reconstructors[mti->mti_rr.rr_opcode]; + LASSERT(reconst != NULL); reconst(mti, lhc); EXIT; } diff --git a/lustre/ptlrpc/batch.c b/lustre/ptlrpc/batch.c index 32a8987..18b8796 100644 --- a/lustre/ptlrpc/batch.c +++ b/lustre/ptlrpc/batch.c @@ -375,6 +375,7 @@ static int batch_update_interpret(const struct lu_env *env, if (aa->ba_head == NULL) RETURN(0); + ptlrpc_put_mod_rpc_slot(req); /* Unpack the results from the reply message. */ if (req->rq_repmsg != NULL && req->rq_replied) { reply = req_capsule_server_sized_get(&req->rq_pill, @@ -414,6 +415,13 @@ static int batch_send_update_req(const struct lu_env *env, aa->ba_head = head; req->rq_interpret_reply = batch_update_interpret; + /* + * Only acquire modification RPC slot for the batched RPC + * which contains metadata updates. + */ + if (!(bh->lbt_flags & BATCH_FL_RDONLY)) + ptlrpc_get_mod_rpc_slot(req); + if (bh->lbt_flags & BATCH_FL_SYNC) { rc = ptlrpc_queue_wait(req); } else { diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 707cff2..c293050 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -64,34 +64,6 @@ typedef void (*out_reconstruct_t)(const struct lu_env *env, struct object_update_reply *reply, int index); -static inline bool out_check_resent(struct ptlrpc_request *req, - struct tg_reply_data *trd) -{ - struct lsd_reply_data *lrd; - bool reconstruct = false; - - if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) - return false; - - if (req_can_reconstruct(req, trd)) { - lrd = &trd->trd_reply; - req->rq_transno = lrd->lrd_transno; - req->rq_status = lrd->lrd_result; - - if (req->rq_status != 0) - req->rq_transno = 0; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); - - DEBUG_REQ(D_HA, req, "reconstruct resent RPC"); - reconstruct = true; - } else { - DEBUG_REQ(D_HA, req, "no reply for RESENT req"); - } - - return reconstruct; -} - static int out_create(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -1115,7 +1087,7 @@ int out_handle(struct tgt_session_info *tsi) if (!trd) GOTO(out_free, rc = -ENOMEM); - need_reconstruct = out_check_resent(pill->rc_req, trd); + need_reconstruct = tgt_check_resent(pill->rc_req, trd); /* Walk through updates in the request to execute them */ for (i = 0; i < update_buf_count; i++) { diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index 145f993..d665969 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -518,6 +518,7 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req, case OBD_PING: case MDS_REINT: case OUT_UPDATE: + case MDS_BATCH: case SEQ_QUERY: case FLD_QUERY: case FLD_READ: @@ -3059,3 +3060,31 @@ int req_can_reconstruct(struct ptlrpc_request *req, } EXPORT_SYMBOL(req_can_reconstruct); +bool tgt_check_resent(struct ptlrpc_request *req, struct tg_reply_data *trd) +{ + struct lsd_reply_data *lrd; + bool need_reconstruct = false; + + ENTRY; + + if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) + return false; + + if (req_can_reconstruct(req, trd)) { + lrd = &trd->trd_reply; + req->rq_transno = lrd->lrd_transno; + req->rq_status = lrd->lrd_result; + if (req->rq_status != 0) + req->rq_transno = 0; + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + lustre_msg_set_status(req->rq_repmsg, req->rq_status); + + DEBUG_REQ(D_HA, req, "reconstruct resent RPC"); + need_reconstruct = true; + } else { + DEBUG_REQ(D_HA, req, "no reply for RESENT req"); + } + + return need_reconstruct; +} +EXPORT_SYMBOL(tgt_check_resent); diff --git a/lustre/target/tgt_lastrcvd.c b/lustre/target/tgt_lastrcvd.c index 8a8a587..5e0ff3c 100644 --- a/lustre/target/tgt_lastrcvd.c +++ b/lustre/target/tgt_lastrcvd.c @@ -307,7 +307,7 @@ static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt, lrd->lrd_data = le64_to_cpu(buf->lrd_data); lrd->lrd_result = le32_to_cpu(buf->lrd_result); lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen); - + lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx); return 0; } @@ -1226,9 +1226,10 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, struct ptlrpc_request *req, struct thandle *th, bool update_lrd_file) { - struct lsd_reply_data *lrd; - int i; - int rc; + struct tgt_session_info *tsi = NULL; + struct lsd_reply_data *lrd; + int i = -1; + int rc; lrd = &trd->trd_reply; /* update export last transno */ @@ -1237,32 +1238,45 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, ted->ted_lcd->lcd_last_transno = lrd->lrd_transno; mutex_unlock(&ted->ted_lcd_lock); - if (tgt != NULL) { + if (!tgt) { + trd->trd_index = TRD_INDEX_MEMORY; + GOTO(add_reply_data, rc = 0); + } + + if (env) { + tsi = tgt_ses_info(env); + if (tsi->tsi_batch_trd) { + LASSERT(tsi->tsi_batch_env); + trd = tsi->tsi_batch_trd; + i = trd->trd_index; + } + } + + if (i == -1) { /* find a empty slot */ i = tgt_find_free_reply_slot(tgt); if (unlikely(i < 0)) { - CERROR("%s: couldn't find a slot for reply data: " - "rc = %d\n", tgt_name(tgt), i); + CERROR("%s: couldn't find a slot for reply data: rc = %d\n", + tgt_name(tgt), i); RETURN(i); } trd->trd_index = i; + } - if (update_lrd_file) { - loff_t off; + if (update_lrd_file) { + loff_t off; - /* write reply data to disk */ - off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i; - rc = tgt_reply_data_write(env, tgt, lrd, off, th); - if (unlikely(rc != 0)) { - CERROR("%s: can't update %s file: rc = %d\n", - tgt_name(tgt), REPLY_DATA, rc); - GOTO(free_slot, rc); - } + /* write reply data to disk */ + off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i; + rc = tgt_reply_data_write(env, tgt, lrd, off, th); + if (unlikely(rc != 0)) { + CERROR("%s: can't update %s file: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + GOTO(free_slot, rc); } - } else { - trd->trd_index = TRD_INDEX_MEMORY; } +add_reply_data: /* add reply data to target export's reply list */ mutex_lock(&ted->ted_lcd_lock); if (req != NULL) { @@ -1278,14 +1292,23 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, GOTO(free_slot, rc = -EBADR); } - if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude)) + if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) && + !(tsi && tsi->tsi_batch_env && + trd->trd_reply.lrd_batch_idx > 0)) tgt_clean_by_tag(req->rq_export, req->rq_xid, trd->trd_tag); } - list_add(&trd->trd_list, &ted->ted_reply_list); - ted->ted_reply_cnt++; - if (ted->ted_reply_cnt > ted->ted_reply_max) - ted->ted_reply_max = ted->ted_reply_cnt; + + /* + * For the batched RPC, all sub requests use one common @trd for the + * reply data. + */ + if (list_empty(&trd->trd_list)) { + list_add(&trd->trd_list, &ted->ted_reply_list); + ted->ted_reply_cnt++; + if (ted->ted_reply_cnt > ted->ted_reply_max) + ted->ted_reply_max = ted->ted_reply_cnt; + } mutex_unlock(&ted->ted_lcd_lock); CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, " @@ -1310,23 +1333,42 @@ int tgt_mk_reply_data(const struct lu_env *env, bool write_update, __u64 transno) { - struct tg_reply_data *trd; - struct lsd_reply_data *lrd; - __u64 *pre_versions = NULL; - int rc; + struct tg_reply_data *trd = NULL; + struct lsd_reply_data *lrd; + __u64 *pre_versions = NULL; struct tgt_session_info *tsi = NULL; + int rc; - OBD_ALLOC_PTR(trd); - if (unlikely(trd == NULL)) - RETURN(-ENOMEM); - - if (env != NULL) + if (env != NULL) { tsi = tgt_ses_info(env); + if (tsi->tsi_batch_trd) { + LASSERT(tsi->tsi_batch_env); + trd = tsi->tsi_batch_trd; + } + } + + if (trd == NULL) { + OBD_ALLOC_PTR(trd); + if (unlikely(trd == NULL)) + RETURN(-ENOMEM); + + INIT_LIST_HEAD(&trd->trd_list); + } /* fill reply data information */ lrd = &trd->trd_reply; lrd->lrd_transno = transno; - if (req != NULL) { + if (tsi && tsi->tsi_batch_env) { + if (tsi->tsi_batch_idx == 0) { + LASSERT(req != NULL); + tsi->tsi_batch_trd = trd; + trd->trd_index = -1; + lrd->lrd_xid = req->rq_xid; + trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg); + lrd->lrd_client_gen = ted->ted_lcd->lcd_generation; + } + lrd->lrd_batch_idx = tsi->tsi_batch_idx; + } else if (req != NULL) { lrd->lrd_xid = req->rq_xid; trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg); lrd->lrd_client_gen = ted->ted_lcd->lcd_generation; @@ -1999,7 +2041,7 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th, echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0); - if (tsi->tsi_has_trans && !echo_client) { + if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) { if (!tsi->tsi_mult_trans) { CDEBUG(D_HA, "More than one transaction %llu\n", tti->tti_transno); diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index cef0a5d..861b0d6 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -802,6 +802,9 @@ static void tgt_ses_key_exit(const struct lu_context *ctx, tsi->tsi_has_trans); tsi->tsi_has_trans = 0; tsi->tsi_mult_trans = false; + tsi->tsi_batch_trd = NULL; + tsi->tsi_batch_env = false; + tsi->tsi_batch_idx = 0; } /* context key: tgt_session_key */ diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index fe341e1..7f1fc09 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -14,8 +14,8 @@ init_logging # tool to create lustre filesystem images ALWAYS_EXCEPT="$CONF_SANITY_EXCEPT 32newtarball" -# bug number for skipped test: LU-11915 -ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110" +# bug number for skipped test: LU-11915 LU-14393 +ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110 32 108" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! if $SHARED_KEY; then -- 1.8.3.1