4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2020, DDN Storage Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
29 * lustre/mdt/mdt_batch.c
31 * Batch Metadata Updating on the server (MDT)
33 * Author: Qian Yingjin <qian@ddn.com>
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 #include <lustre_mds.h>
41 #include "mdt_internal.h"
43 static struct ldlm_callback_suite mdt_dlm_cbs = {
44 .lcs_completion = ldlm_server_completion_ast,
45 .lcs_blocking = tgt_blocking_ast,
46 .lcs_glimpse = ldlm_server_glimpse_ast
49 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
55 info->mti_dlm_req = req_capsule_client_get(info->mti_pill,
57 if (info->mti_dlm_req == NULL)
62 CERROR("%s: Unexpected opcode %d: rc = %d\n",
63 mdt_obd_name(info->mti_mdt), opc, rc);
70 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
75 typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
77 static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
79 static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
81 mdt_batch_reconstructor reconst;
86 if (opc >= BUT_LAST_OPC)
89 reconst = reconstructors[opc];
90 LASSERT(reconst != NULL);
95 static int mdt_batch_getattr(struct tgt_session_info *tsi)
97 struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
98 struct req_capsule *pill = &info->mti_sub_pill;
103 rc = ldlm_handle_enqueue(info->mti_exp->exp_obd->obd_namespace,
104 pill, info->mti_dlm_req, &mdt_dlm_cbs);
109 /* Batch UpdaTe Request with a format known in advance */
110 #define TGT_BUT_HDL(flags, opc, fn) \
111 [opc - BUT_FIRST_OPC] = { \
117 .th_fmt = &RQF_ ## opc, \
118 .th_version = LUSTRE_MDS_VERSION, \
122 static struct tgt_handler mdt_batch_handlers[] = {
123 TGT_BUT_HDL(HAS_KEY | HAS_REPLY, BUT_GETATTR, mdt_batch_getattr),
126 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
128 struct tgt_handler *h;
131 if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
132 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
133 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
136 h = NULL; /* unsupported opc */
141 int mdt_batch(struct tgt_session_info *tsi)
143 struct mdt_thread_info *info = tsi2mdt_info(tsi);
144 struct req_capsule *pill = &info->mti_sub_pill;
145 struct ptlrpc_request *req = tgt_ses_req(tsi);
146 struct but_update_header *buh;
147 struct but_update_buffer *bub = NULL;
148 struct batch_update_reply *reply = NULL;
149 struct ptlrpc_bulk_desc *desc = NULL;
150 struct tg_reply_data *trd = NULL;
151 struct lustre_msg *repmsg = NULL;
152 bool need_reconstruct;
153 __u32 handled_update_count = 0;
154 __u32 update_buf_count;
164 buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
167 RETURN(err_serious(-EPROTO));
169 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
171 RETURN(err_serious(-EPROTO));
173 if (buh->buh_magic != BUT_HEADER_MAGIC) {
174 CERROR("%s: invalid update header magic %x expect %x: "
175 "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
176 BUT_HEADER_MAGIC, -EPROTO);
177 RETURN(err_serious(-EPROTO));
180 update_buf_count = buh->buh_count;
181 if (update_buf_count == 0)
182 RETURN(err_serious(-EPROTO));
184 OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
185 if (update_bufs == NULL)
186 RETURN(err_serious(-ENOMEM));
188 if (buh->buh_inline_length > 0) {
189 update_bufs[0] = buh->buh_inline_data;
191 struct but_update_buffer *tmp;
194 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
196 GOTO(out, rc = err_serious(-EPROTO));
198 for (i = 0; i < update_buf_count; i++)
199 /* First *and* last might be partial pages, hence +1 */
200 page_count += DIV_ROUND_UP(bub[i].bub_size,
203 desc = ptlrpc_prep_bulk_exp(req, page_count,
204 PTLRPC_BULK_OPS_COUNT,
205 PTLRPC_BULK_GET_SINK,
207 &ptlrpc_bulk_kiov_nopin_ops);
209 GOTO(out, rc = err_serious(-ENOMEM));
212 for (i = 0; i < update_buf_count; i++, tmp++) {
213 if (tmp->bub_size >= OUT_MAXREQSIZE)
214 GOTO(out, rc = err_serious(-EPROTO));
216 OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
217 if (update_bufs[i] == NULL)
218 GOTO(out, rc = err_serious(-ENOMEM));
220 desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
224 req->rq_bulk_write = 1;
225 rc = sptlrpc_svc_prep_bulk(req, desc);
227 GOTO(out, rc = err_serious(rc));
229 rc = target_bulk_io(req->rq_export, desc);
231 GOTO(out, rc = err_serious(rc));
234 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
235 buh->buh_reply_size);
236 rc = req_capsule_server_pack(&req->rq_pill);
238 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
239 tgt_name(tsi->tsi_tgt), rc);
243 /* Prepare the update reply buffer */
244 reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
246 GOTO(out, rc = -EPROTO);
248 reply->burp_magic = BUT_REPLY_MAGIC;
249 packed_replen = sizeof(*reply);
250 info->mti_batch_env = 1;
251 info->mti_pill = pill;
252 tsi->tsi_batch_env = true;
256 GOTO(out, rc = -ENOMEM);
258 need_reconstruct = tgt_check_resent(req, trd);
259 /* Walk through sub requests in the batch request to execute them. */
260 for (i = 0; i < update_buf_count; i++) {
261 struct batch_update_request *bur;
262 struct lustre_msg *reqmsg = NULL;
263 struct tgt_handler *h;
267 bur = update_bufs[i];
268 update_count = bur->burq_count;
269 for (j = 0; j < update_count; j++) {
272 reqmsg = batch_update_reqmsg_next(bur, reqmsg);
273 repmsg = batch_update_repmsg_next(reply, repmsg);
275 if (handled_update_count > buh->buh_update_count)
276 GOTO(out, rc = -EOVERFLOW);
278 LASSERT(reqmsg != NULL && repmsg != NULL);
279 LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
280 "Invalid reqmsg magic %x expected %x\n",
281 reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
283 h = mdt_batch_handler_find(reqmsg->lm_opc);
284 if (unlikely(h == NULL)) {
285 CERROR("%s: unsupported opc: 0x%x\n",
286 tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
287 GOTO(out, rc = -ENOTSUPP);
290 LASSERT(h->th_fmt != NULL);
291 req_capsule_subreq_init(pill, h->th_fmt, req,
292 reqmsg, repmsg, RCL_SERVER);
294 rc = mdt_batch_unpack(info, reqmsg->lm_opc);
296 CERROR("%s: Can't unpack subreq, rc = %d\n",
297 mdt_obd_name(info->mti_mdt), rc);
301 rc = mdt_batch_pack_repmsg(info);
305 /* Need to reconstruct the reply for committed sub
306 * requests in a batched RPC.
307 * It only calls reconstruct for modification sub
309 * For uncommitted or read-only sub requests, the server
310 * should re-execute them via the ->th_act() below.
312 if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
313 handled_update_count <=
314 trd->trd_reply.lrd_batch_idx) {
315 rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
321 tsi->tsi_batch_idx = handled_update_count;
327 * As @repmsg may be changed if the reply buffer is
328 * too small to grow, thus it needs to reload it here.
330 if (repmsg != pill->rc_repmsg) {
331 repmsg = pill->rc_repmsg;
335 repmsg->lm_result = rc;
336 mdt_thread_info_reset(info);
338 replen = lustre_packed_msg_size(repmsg);
339 packed_replen += replen;
340 handled_update_count++;
344 CDEBUG(D_INFO, "reply size %u packed replen %u\n",
345 buh->buh_reply_size, packed_replen);
346 if (buh->buh_reply_size > packed_replen)
347 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
348 packed_replen, RCL_SERVER);
352 reply = req_capsule_server_get(&req->rq_pill,
355 GOTO(out_free, rc = -EPROTO);
357 reply->burp_count = handled_update_count;
361 if (update_bufs != NULL) {
363 for (i = 0; i < update_buf_count; i++, bub++) {
364 if (update_bufs[i] != NULL)
365 OBD_FREE_LARGE(update_bufs[i],
370 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
377 ptlrpc_free_bulk(desc);
379 mdt_thread_info_fini(info);
380 tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;