Whamcloud - gitweb
New release 2.15.64
[fs/lustre-release.git] / lustre / mdt / mdt_batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, DDN Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/mdt/mdt_batch.c
30  *
31  * Batch Metadata Updating on the server (MDT)
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 #include <lustre_mds.h>
41 #include "mdt_internal.h"
42
43 static struct ldlm_callback_suite mdt_dlm_cbs = {
44         .lcs_completion = ldlm_server_completion_ast,
45         .lcs_blocking   = tgt_blocking_ast,
46         .lcs_glimpse    = ldlm_server_glimpse_ast
47 };
48
49 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
50 {
51         int rc = 0;
52
53         switch (opc) {
54         case BUT_GETATTR:
55                 info->mti_dlm_req = req_capsule_client_get(info->mti_pill,
56                                                            &RMF_DLM_REQ);
57                 if (info->mti_dlm_req == NULL)
58                         RETURN(-EFAULT);
59                 break;
60         default:
61                 rc = -EOPNOTSUPP;
62                 CERROR("%s: Unexpected opcode %d: rc = %d\n",
63                        mdt_obd_name(info->mti_mdt), opc, rc);
64                 break;
65         }
66
67         RETURN(rc);
68 }
69
70 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
71 {
72         return 0;
73 }
74
75 typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
76
77 static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
78
79 static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
80 {
81         mdt_batch_reconstructor reconst;
82         int rc;
83
84         ENTRY;
85
86         if (opc >= BUT_LAST_OPC)
87                 RETURN(-EOPNOTSUPP);
88
89         reconst = reconstructors[opc];
90         LASSERT(reconst != NULL);
91         rc = reconst(tsi);
92         RETURN(rc);
93 }
94
95 static int mdt_batch_getattr(struct tgt_session_info *tsi)
96 {
97         struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
98         struct req_capsule *pill = &info->mti_sub_pill;
99         int rc;
100
101         ENTRY;
102
103         rc = ldlm_handle_enqueue(info->mti_exp->exp_obd->obd_namespace,
104                                  pill, info->mti_dlm_req, &mdt_dlm_cbs);
105
106         RETURN(rc);
107 }
108
109 /* Batch UpdaTe Request with a format known in advance */
110 #define TGT_BUT_HDL(flags, opc, fn)                     \
111 [opc - BUT_FIRST_OPC] = {                               \
112         .th_name        = #opc,                         \
113         .th_fail_id     = 0,                            \
114         .th_opc         = opc,                          \
115         .th_flags       = flags,                        \
116         .th_act         = fn,                           \
117         .th_fmt         = &RQF_ ## opc,                 \
118         .th_version     = LUSTRE_MDS_VERSION,           \
119         .th_hp          = NULL,                         \
120 }
121
122 static struct tgt_handler mdt_batch_handlers[] = {
123 TGT_BUT_HDL(HAS_KEY | HAS_REPLY,        BUT_GETATTR,    mdt_batch_getattr),
124 };
125
126 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
127 {
128         struct tgt_handler *h;
129
130         h = NULL;
131         if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
132                 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
133                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
134                          h->th_opc, opc);
135         } else {
136                 h = NULL; /* unsupported opc */
137         }
138         return h;
139 }
140
141 int mdt_batch(struct tgt_session_info *tsi)
142 {
143         struct mdt_thread_info *info = tsi2mdt_info(tsi);
144         struct req_capsule *pill = &info->mti_sub_pill;
145         struct ptlrpc_request *req = tgt_ses_req(tsi);
146         struct but_update_header *buh;
147         struct but_update_buffer *bub = NULL;
148         struct batch_update_reply *reply = NULL;
149         struct ptlrpc_bulk_desc *desc = NULL;
150         struct tg_reply_data *trd = NULL;
151         struct lustre_msg *repmsg = NULL;
152         bool need_reconstruct;
153         __u32 handled_update_count = 0;
154         __u32 update_buf_count;
155         __u32 packed_replen;
156         void **update_bufs;
157         bool grown = false;
158         int buh_size;
159         int rc;
160         int i;
161
162         ENTRY;
163
164         buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
165                                         RCL_CLIENT);
166         if (buh_size <= 0)
167                 RETURN(err_serious(-EPROTO));
168
169         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
170         if (buh == NULL)
171                 RETURN(err_serious(-EPROTO));
172
173         if (buh->buh_magic != BUT_HEADER_MAGIC) {
174                 CERROR("%s: invalid update header magic %x expect %x: "
175                        "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
176                        BUT_HEADER_MAGIC, -EPROTO);
177                 RETURN(err_serious(-EPROTO));
178         }
179
180         update_buf_count = buh->buh_count;
181         if (update_buf_count == 0)
182                 RETURN(err_serious(-EPROTO));
183
184         OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
185         if (update_bufs == NULL)
186                 RETURN(err_serious(-ENOMEM));
187
188         if (buh->buh_inline_length > 0) {
189                 update_bufs[0] = buh->buh_inline_data;
190         } else {
191                 struct but_update_buffer *tmp;
192                 int page_count = 0;
193
194                 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
195                 if (bub == NULL)
196                         GOTO(out, rc = err_serious(-EPROTO));
197
198                 for (i = 0; i < update_buf_count; i++)
199                         /* First *and* last might be partial pages, hence +1 */
200                         page_count += DIV_ROUND_UP(bub[i].bub_size,
201                                                    PAGE_SIZE) + 1;
202
203                 desc = ptlrpc_prep_bulk_exp(req, page_count,
204                                             PTLRPC_BULK_OPS_COUNT,
205                                             PTLRPC_BULK_GET_SINK,
206                                             MDS_BULK_PORTAL,
207                                             &ptlrpc_bulk_kiov_nopin_ops);
208                 if (desc == NULL)
209                         GOTO(out, rc = err_serious(-ENOMEM));
210
211                 tmp = bub;
212                 for (i = 0; i < update_buf_count; i++, tmp++) {
213                         if (tmp->bub_size >= OUT_MAXREQSIZE)
214                                 GOTO(out, rc = err_serious(-EPROTO));
215
216                         OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
217                         if (update_bufs[i] == NULL)
218                                 GOTO(out, rc = err_serious(-ENOMEM));
219
220                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
221                                                         tmp->bub_size);
222                 }
223
224                 req->rq_bulk_write = 1;
225                 rc = sptlrpc_svc_prep_bulk(req, desc);
226                 if (rc != 0)
227                         GOTO(out, rc = err_serious(rc));
228
229                 rc = target_bulk_io(req->rq_export, desc);
230                 if (rc < 0)
231                         GOTO(out, rc = err_serious(rc));
232         }
233
234         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
235                              buh->buh_reply_size);
236         rc = req_capsule_server_pack(&req->rq_pill);
237         if (rc != 0) {
238                 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
239                        tgt_name(tsi->tsi_tgt), rc);
240                 GOTO(out, rc);
241         }
242
243         /* Prepare the update reply buffer */
244         reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
245         if (reply == NULL)
246                 GOTO(out, rc = -EPROTO);
247
248         reply->burp_magic = BUT_REPLY_MAGIC;
249         packed_replen = sizeof(*reply);
250         info->mti_batch_env = 1;
251         info->mti_pill = pill;
252         tsi->tsi_batch_env = true;
253
254         OBD_ALLOC_PTR(trd);
255         if (trd == NULL)
256                 GOTO(out, rc = -ENOMEM);
257
258         need_reconstruct = tgt_check_resent(req, trd);
259         /* Walk through sub requests in the batch request to execute them. */
260         for (i = 0; i < update_buf_count; i++) {
261                 struct batch_update_request *bur;
262                 struct lustre_msg *reqmsg = NULL;
263                 struct tgt_handler *h;
264                 int update_count;
265                 int j;
266
267                 bur = update_bufs[i];
268                 update_count = bur->burq_count;
269                 for (j = 0; j < update_count; j++) {
270                         __u32 replen;
271
272                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
273                         repmsg = batch_update_repmsg_next(reply, repmsg);
274
275                         if (handled_update_count > buh->buh_update_count)
276                                 GOTO(out, rc = -EOVERFLOW);
277
278                         LASSERT(reqmsg != NULL && repmsg != NULL);
279                         LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
280                                  "Invalid reqmsg magic %x expected %x\n",
281                                  reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
282
283                         h = mdt_batch_handler_find(reqmsg->lm_opc);
284                         if (unlikely(h == NULL)) {
285                                 CERROR("%s: unsupported opc: 0x%x\n",
286                                        tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
287                                 GOTO(out, rc = -ENOTSUPP);
288                         }
289
290                         LASSERT(h->th_fmt != NULL);
291                         req_capsule_subreq_init(pill, h->th_fmt, req,
292                                                 reqmsg, repmsg, RCL_SERVER);
293
294                         rc = mdt_batch_unpack(info, reqmsg->lm_opc);
295                         if (rc) {
296                                 CERROR("%s: Can't unpack subreq, rc = %d\n",
297                                        mdt_obd_name(info->mti_mdt), rc);
298                                 GOTO(out, rc);
299                         }
300
301                         rc = mdt_batch_pack_repmsg(info);
302                         if (rc)
303                                 GOTO(out, rc);
304
305                         /* Need to reconstruct the reply for committed sub
306                          * requests in a batched RPC.
307                          * It only calls reconstruct for modification sub
308                          * requests.
309                          * For uncommitted or read-only sub requests, the server
310                          * should re-execute them via the ->th_act() below.
311                          */
312                         if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
313                             handled_update_count <=
314                             trd->trd_reply.lrd_batch_idx) {
315                                 rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
316                                 if (rc)
317                                         GOTO(out, rc);
318                                 GOTO(next, rc);
319                         }
320
321                         tsi->tsi_batch_idx = handled_update_count;
322                         rc = h->th_act(tsi);
323 next:
324                         /*
325                          * As @repmsg may be changed if the reply buffer is
326                          * too small to grow, thus it needs to reload it here.
327                          */
328                         if (repmsg != pill->rc_repmsg) {
329                                 repmsg = pill->rc_repmsg;
330                                 grown = true;
331                         }
332
333                         if (rc)
334                                 GOTO(out, rc);
335
336                         repmsg->lm_result = rc;
337                         mdt_thread_info_reset(info);
338
339                         replen = lustre_packed_msg_size(repmsg);
340                         packed_replen += replen;
341                         handled_update_count++;
342                 }
343         }
344
345         CDEBUG(D_INFO, "reply size %u packed replen %u\n",
346                buh->buh_reply_size, packed_replen);
347         if (buh->buh_reply_size > packed_replen)
348                 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
349                                    packed_replen, RCL_SERVER);
350 out:
351         if (reply != NULL) {
352                 if (grown) {
353                         reply = req_capsule_server_get(&req->rq_pill,
354                                                        &RMF_BUT_REPLY);
355                         if (reply == NULL)
356                                 GOTO(out_free, rc = -EPROTO);
357                 }
358                 reply->burp_count = handled_update_count;
359         }
360
361 out_free:
362         if (update_bufs != NULL) {
363                 if (bub != NULL) {
364                         for (i = 0; i < update_buf_count; i++, bub++) {
365                                 if (update_bufs[i] != NULL)
366                                         OBD_FREE_LARGE(update_bufs[i],
367                                                        bub->bub_size);
368                         }
369                 }
370
371                 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
372         }
373
374         if (trd)
375                 OBD_FREE_PTR(trd);
376
377         if (desc != NULL)
378                 ptlrpc_free_bulk(desc);
379
380         mdt_thread_info_fini(info);
381         tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
382         RETURN(rc);
383 }
384