Whamcloud - gitweb
LU-5170 utils: Continue on error when multiple files requested
[fs/lustre-release.git] / lustre / mdt / mdt_batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, DDN Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/mdt/mdt_batch.c
30  *
31  * Batch Metadata Updating on the server (MDT)
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 #include <lustre_mds.h>
41 #include "mdt_internal.h"
42
43 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
44 {
45         int rc = 0;
46
47         switch (opc) {
48         default:
49                 rc = -EOPNOTSUPP;
50                 CERROR("%s: Unexpected opcode %d: rc = %d\n",
51                        mdt_obd_name(info->mti_mdt), opc, rc);
52                 break;
53         }
54
55         RETURN(rc);
56 }
57
58 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
59 {
60         return 0;
61 }
62
63 typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
64
65 static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
66
67 static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
68 {
69         mdt_batch_reconstructor reconst;
70         int rc;
71
72         ENTRY;
73
74         if (opc >= BUT_LAST_OPC)
75                 RETURN(-EOPNOTSUPP);
76
77         reconst = reconstructors[opc];
78         LASSERT(reconst != NULL);
79         rc = reconst(tsi);
80         RETURN(rc);
81 }
82
83 /* Batch UpdaTe Request with a format known in advance */
84 #define TGT_BUT_HDL(flags, opc, fn)                     \
85 [opc - BUT_FIRST_OPC] = {                               \
86         .th_name        = #opc,                         \
87         .th_fail_id     = 0,                            \
88         .th_opc         = opc,                          \
89         .th_flags       = flags,                        \
90         .th_act         = fn,                           \
91         .th_fmt         = &RQF_ ## opc,                 \
92         .th_version     = LUSTRE_MDS_VERSION,           \
93         .th_hp          = NULL,                         \
94 }
95
96 static struct tgt_handler mdt_batch_handlers[BUT_LAST_OPC];
97
98 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
99 {
100         struct tgt_handler *h;
101
102         h = NULL;
103         if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
104                 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
105                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
106                          h->th_opc, opc);
107         } else {
108                 h = NULL; /* unsupported opc */
109         }
110         return h;
111 }
112
113 int mdt_batch(struct tgt_session_info *tsi)
114 {
115         struct mdt_thread_info *info = tsi2mdt_info(tsi);
116         struct req_capsule *pill = &info->mti_sub_pill;
117         struct ptlrpc_request *req = tgt_ses_req(tsi);
118         struct but_update_header *buh;
119         struct but_update_buffer *bub = NULL;
120         struct batch_update_reply *reply = NULL;
121         struct ptlrpc_bulk_desc *desc = NULL;
122         struct tg_reply_data *trd = NULL;
123         struct lustre_msg *repmsg = NULL;
124         bool need_reconstruct;
125         __u32 handled_update_count = 0;
126         __u32 update_buf_count;
127         __u32 packed_replen;
128         void **update_bufs;
129         int buh_size;
130         int rc;
131         int i;
132
133         ENTRY;
134
135         buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
136                                         RCL_CLIENT);
137         if (buh_size <= 0)
138                 RETURN(err_serious(-EPROTO));
139
140         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
141         if (buh == NULL)
142                 RETURN(err_serious(-EPROTO));
143
144         if (buh->buh_magic != BUT_HEADER_MAGIC) {
145                 CERROR("%s: invalid update header magic %x expect %x: "
146                        "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
147                        BUT_HEADER_MAGIC, -EPROTO);
148                 RETURN(err_serious(-EPROTO));
149         }
150
151         update_buf_count = buh->buh_count;
152         if (update_buf_count == 0)
153                 RETURN(err_serious(-EPROTO));
154
155         OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
156         if (update_bufs == NULL)
157                 RETURN(err_serious(-ENOMEM));
158
159         if (buh->buh_inline_length > 0) {
160                 update_bufs[0] = buh->buh_inline_data;
161         } else {
162                 struct but_update_buffer *tmp;
163                 int page_count = 0;
164
165                 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
166                 if (bub == NULL)
167                         GOTO(out, rc = err_serious(-EPROTO));
168
169                 for (i = 0; i < update_buf_count; i++)
170                         /* First *and* last might be partial pages, hence +1 */
171                         page_count += DIV_ROUND_UP(bub[i].bub_size,
172                                                    PAGE_SIZE) + 1;
173
174                 desc = ptlrpc_prep_bulk_exp(req, page_count,
175                                             PTLRPC_BULK_OPS_COUNT,
176                                             PTLRPC_BULK_GET_SINK,
177                                             MDS_BULK_PORTAL,
178                                             &ptlrpc_bulk_kiov_nopin_ops);
179                 if (desc == NULL)
180                         GOTO(out, rc = err_serious(-ENOMEM));
181
182                 tmp = bub;
183                 for (i = 0; i < update_buf_count; i++, tmp++) {
184                         if (tmp->bub_size >= OUT_MAXREQSIZE)
185                                 GOTO(out, rc = err_serious(-EPROTO));
186
187                         OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
188                         if (update_bufs[i] == NULL)
189                                 GOTO(out, rc = err_serious(-ENOMEM));
190
191                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
192                                                         tmp->bub_size);
193                 }
194
195                 req->rq_bulk_write = 1;
196                 rc = sptlrpc_svc_prep_bulk(req, desc);
197                 if (rc != 0)
198                         GOTO(out, rc = err_serious(rc));
199
200                 rc = target_bulk_io(req->rq_export, desc);
201                 if (rc < 0)
202                         GOTO(out, rc = err_serious(rc));
203         }
204
205         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
206                              buh->buh_reply_size);
207         rc = req_capsule_server_pack(&req->rq_pill);
208         if (rc != 0) {
209                 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
210                        tgt_name(tsi->tsi_tgt), rc);
211                 GOTO(out, rc);
212         }
213
214         /* Prepare the update reply buffer */
215         reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
216         if (reply == NULL)
217                 GOTO(out, rc = -EPROTO);
218
219         reply->burp_magic = BUT_REPLY_MAGIC;
220         packed_replen = sizeof(*reply);
221         info->mti_max_repsize = buh->buh_reply_size;
222         info->mti_batch_env = 1;
223         info->mti_pill = pill;
224         tsi->tsi_batch_env = true;
225
226         OBD_ALLOC_PTR(trd);
227         if (trd == NULL)
228                 GOTO(out, rc = -ENOMEM);
229
230         need_reconstruct = tgt_check_resent(req, trd);
231         /* Walk through sub requests in the batch request to execute them. */
232         for (i = 0; i < update_buf_count; i++) {
233                 struct batch_update_request *bur;
234                 struct lustre_msg *reqmsg = NULL;
235                 struct tgt_handler *h;
236                 int update_count;
237                 int j;
238
239                 bur = update_bufs[i];
240                 update_count = bur->burq_count;
241                 for (j = 0; j < update_count; j++) {
242                         __u32 replen;
243
244                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
245                         repmsg = batch_update_repmsg_next(reply, repmsg);
246
247                         if (handled_update_count > buh->buh_update_count)
248                                 GOTO(out, rc = -EOVERFLOW);
249
250                         LASSERT(reqmsg != NULL && repmsg != NULL);
251                         LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
252                                  "Invalid reqmsg magic %x expected %x\n",
253                                  reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
254
255                         h = mdt_batch_handler_find(reqmsg->lm_opc);
256                         if (unlikely(h == NULL)) {
257                                 CERROR("%s: unsupported opc: 0x%x\n",
258                                        tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
259                                 GOTO(out, rc = -ENOTSUPP);
260                         }
261
262                         LASSERT(h->th_fmt != NULL);
263                         req_capsule_subreq_init(pill, h->th_fmt, req,
264                                                 reqmsg, repmsg, RCL_SERVER);
265
266                         rc = mdt_batch_unpack(info, reqmsg->lm_opc);
267                         if (rc) {
268                                 CERROR("%s: Can't unpack subreq, rc = %d\n",
269                                        mdt_obd_name(info->mti_mdt), rc);
270                                 GOTO(out, rc);
271                         }
272
273                         rc = mdt_batch_pack_repmsg(info);
274                         if (rc)
275                                 GOTO(out, rc);
276
277                         /* Need to reconstruct the reply for committed sub
278                          * requests in a batched RPC.
279                          * It only calls reconstruct for modification sub
280                          * requests.
281                          * For uncommitted or read-only sub requests, the server
282                          * should re-execute them via the ->th_act() below.
283                          */
284                         if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
285                             handled_update_count <=
286                             trd->trd_reply.lrd_batch_idx) {
287                                 rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
288                                 if (rc)
289                                         GOTO(out, rc);
290                                 GOTO(next, rc);
291                         }
292
293                         tsi->tsi_batch_idx = handled_update_count;
294                         rc = h->th_act(tsi);
295                         if (rc)
296                                 GOTO(out, rc);
297 next:
298                         repmsg->lm_result = rc;
299                         mdt_thread_info_reset(info);
300                         /*
301                          * TODO: Check whether overflow reply buffer.
302                          * Fix reply, shrink and/or grow reply buffers.
303                          */
304                         replen = lustre_packed_msg_size(repmsg);
305                         info->mti_max_repsize -= replen;
306                         packed_replen += replen;
307                         handled_update_count++;
308                 }
309         }
310
311         /*
312          * TODO: Grow/shrink the reply buffer.
313          */
314         CDEBUG(D_INFO, "reply size %u packed replen %u\n",
315                buh->buh_reply_size, packed_replen);
316         if (buh->buh_reply_size > packed_replen)
317                 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
318                                    packed_replen, RCL_SERVER);
319 out:
320         if (reply != NULL)
321                 reply->burp_count = handled_update_count;
322
323         if (update_bufs != NULL) {
324                 if (bub != NULL) {
325                         for (i = 0; i < update_buf_count; i++, bub++) {
326                                 if (update_bufs[i] != NULL)
327                                         OBD_FREE_LARGE(update_bufs[i],
328                                                        bub->bub_size);
329                         }
330                 }
331
332                 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
333         }
334
335         if (trd)
336                 OBD_FREE_PTR(trd);
337
338         if (desc != NULL)
339                 ptlrpc_free_bulk(desc);
340
341         mdt_thread_info_fini(info);
342         tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
343         RETURN(rc);
344 }
345