1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2020, DDN Storage Corporation.
8 * This file is part of Lustre, http://www.lustre.org/
10 * Batch Metadata Updating on the server (MDT)
12 * Author: Qian Yingjin <qian@ddn.com>
15 #define DEBUG_SUBSYSTEM S_MDS
17 #include <linux/module.h>
19 #include <lustre_mds.h>
20 #include "mdt_internal.h"
22 static struct ldlm_callback_suite mdt_dlm_cbs = {
23 .lcs_completion = ldlm_server_completion_ast,
24 .lcs_blocking = tgt_blocking_ast,
25 .lcs_glimpse = ldlm_server_glimpse_ast
28 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
34 info->mti_dlm_req = req_capsule_client_get(info->mti_pill,
36 if (info->mti_dlm_req == NULL)
41 CERROR("%s: Unexpected opcode %d: rc = %d\n",
42 mdt_obd_name(info->mti_mdt), opc, rc);
49 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
54 typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
56 static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
58 static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
60 mdt_batch_reconstructor reconst;
65 if (opc >= BUT_LAST_OPC)
68 reconst = reconstructors[opc];
69 LASSERT(reconst != NULL);
74 static int mdt_batch_getattr(struct tgt_session_info *tsi)
76 struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
77 struct req_capsule *pill = &info->mti_sub_pill;
82 rc = ldlm_handle_enqueue(info->mti_exp->exp_obd->obd_namespace,
83 pill, info->mti_dlm_req, &mdt_dlm_cbs);
88 /* Batch UpdaTe Request with a format known in advance */
89 #define TGT_BUT_HDL(flags, opc, fn) \
90 [opc - BUT_FIRST_OPC] = { \
96 .th_fmt = &RQF_ ## opc, \
97 .th_version = LUSTRE_MDS_VERSION, \
101 static struct tgt_handler mdt_batch_handlers[] = {
102 TGT_BUT_HDL(HAS_KEY | HAS_REPLY, BUT_GETATTR, mdt_batch_getattr),
105 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
107 struct tgt_handler *h;
110 if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
111 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
112 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
115 h = NULL; /* unsupported opc */
120 int mdt_batch(struct tgt_session_info *tsi)
122 struct mdt_thread_info *info = tsi2mdt_info(tsi);
123 struct req_capsule *pill = &info->mti_sub_pill;
124 struct ptlrpc_request *req = tgt_ses_req(tsi);
125 struct but_update_header *buh;
126 struct but_update_buffer *bub = NULL;
127 struct batch_update_reply *reply = NULL;
128 struct ptlrpc_bulk_desc *desc = NULL;
129 struct tg_reply_data *trd = NULL;
130 struct lustre_msg *repmsg = NULL;
131 bool need_reconstruct;
132 __u32 handled_update_count = 0;
133 __u32 update_buf_count;
143 buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
146 RETURN(err_serious(-EPROTO));
148 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
150 RETURN(err_serious(-EPROTO));
152 if (buh->buh_magic != BUT_HEADER_MAGIC) {
153 CERROR("%s: invalid update header magic %x expect %x: "
154 "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
155 BUT_HEADER_MAGIC, -EPROTO);
156 RETURN(err_serious(-EPROTO));
159 update_buf_count = buh->buh_count;
160 if (update_buf_count == 0)
161 RETURN(err_serious(-EPROTO));
163 OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
164 if (update_bufs == NULL)
165 RETURN(err_serious(-ENOMEM));
167 if (buh->buh_inline_length > 0) {
168 update_bufs[0] = buh->buh_inline_data;
170 struct but_update_buffer *tmp;
173 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
175 GOTO(out, rc = err_serious(-EPROTO));
177 for (i = 0; i < update_buf_count; i++)
178 /* First *and* last might be partial pages, hence +1 */
179 page_count += DIV_ROUND_UP(bub[i].bub_size,
182 desc = ptlrpc_prep_bulk_exp(req, page_count,
183 PTLRPC_BULK_OPS_COUNT,
184 PTLRPC_BULK_GET_SINK,
186 &ptlrpc_bulk_kiov_nopin_ops);
188 GOTO(out, rc = err_serious(-ENOMEM));
191 for (i = 0; i < update_buf_count; i++, tmp++) {
192 if (tmp->bub_size >= OUT_MAXREQSIZE)
193 GOTO(out, rc = err_serious(-EPROTO));
195 OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
196 if (update_bufs[i] == NULL)
197 GOTO(out, rc = err_serious(-ENOMEM));
199 desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
203 req->rq_bulk_write = 1;
204 rc = sptlrpc_svc_prep_bulk(req, desc);
206 GOTO(out, rc = err_serious(rc));
208 rc = target_bulk_io(req->rq_export, desc);
210 GOTO(out, rc = err_serious(rc));
213 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
214 buh->buh_reply_size);
215 rc = req_capsule_server_pack(&req->rq_pill);
217 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
218 tgt_name(tsi->tsi_tgt), rc);
222 /* Prepare the update reply buffer */
223 reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
225 GOTO(out, rc = -EPROTO);
227 reply->burp_magic = BUT_REPLY_MAGIC;
228 packed_replen = sizeof(*reply);
229 info->mti_batch_env = 1;
230 info->mti_pill = pill;
231 tsi->tsi_batch_env = true;
235 GOTO(out, rc = -ENOMEM);
237 need_reconstruct = tgt_check_resent(req, trd);
238 /* Walk through sub requests in the batch request to execute them. */
239 for (i = 0; i < update_buf_count; i++) {
240 struct batch_update_request *bur;
241 struct lustre_msg *reqmsg = NULL;
242 struct tgt_handler *h;
246 bur = update_bufs[i];
247 update_count = bur->burq_count;
248 for (j = 0; j < update_count; j++) {
251 reqmsg = batch_update_reqmsg_next(bur, reqmsg);
252 repmsg = batch_update_repmsg_next(reply, repmsg);
254 if (handled_update_count > buh->buh_update_count)
255 GOTO(out, rc = -EOVERFLOW);
257 LASSERT(reqmsg != NULL && repmsg != NULL);
258 LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
259 "Invalid reqmsg magic %x expected %x\n",
260 reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
262 h = mdt_batch_handler_find(reqmsg->lm_opc);
263 if (unlikely(h == NULL)) {
264 CERROR("%s: unsupported opc: 0x%x\n",
265 tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
266 GOTO(out, rc = -ENOTSUPP);
269 LASSERT(h->th_fmt != NULL);
270 req_capsule_subreq_init(pill, h->th_fmt, req,
271 reqmsg, repmsg, RCL_SERVER);
273 rc = mdt_batch_unpack(info, reqmsg->lm_opc);
275 CERROR("%s: Can't unpack subreq, rc = %d\n",
276 mdt_obd_name(info->mti_mdt), rc);
280 rc = mdt_batch_pack_repmsg(info);
284 /* Need to reconstruct the reply for committed sub
285 * requests in a batched RPC.
286 * It only calls reconstruct for modification sub
288 * For uncommitted or read-only sub requests, the server
289 * should re-execute them via the ->th_act() below.
291 if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
292 handled_update_count <=
293 trd->trd_reply.lrd_batch_idx) {
294 rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
300 tsi->tsi_batch_idx = handled_update_count;
304 * As @repmsg may be changed if the reply buffer is
305 * too small to grow, thus it needs to reload it here.
307 if (repmsg != pill->rc_repmsg) {
308 repmsg = pill->rc_repmsg;
315 repmsg->lm_result = rc;
316 mdt_thread_info_reset(info);
318 replen = lustre_packed_msg_size(repmsg);
319 packed_replen += replen;
320 handled_update_count++;
324 CDEBUG(D_INFO, "reply size %u packed replen %u\n",
325 buh->buh_reply_size, packed_replen);
326 if (buh->buh_reply_size > packed_replen)
327 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
328 packed_replen, RCL_SERVER);
332 reply = req_capsule_server_get(&req->rq_pill,
335 GOTO(out_free, rc = -EPROTO);
337 reply->burp_count = handled_update_count;
341 if (update_bufs != NULL) {
343 for (i = 0; i < update_buf_count; i++, bub++) {
344 if (update_bufs[i] != NULL)
345 OBD_FREE_LARGE(update_bufs[i],
350 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
356 ptlrpc_free_bulk(desc);
358 mdt_thread_info_fini(info);
359 tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;