Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / mdt / mdt_batch.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2020, DDN Storage Corporation.
5  */
6
7 /*
8  * This file is part of Lustre, http://www.lustre.org/
9  *
10  * Batch Metadata Updating on the server (MDT)
11  *
12  * Author: Qian Yingjin <qian@ddn.com>
13  */
14
15 #define DEBUG_SUBSYSTEM S_MDS
16
17 #include <linux/module.h>
18
19 #include <lustre_mds.h>
20 #include "mdt_internal.h"
21
22 static struct ldlm_callback_suite mdt_dlm_cbs = {
23         .lcs_completion = ldlm_server_completion_ast,
24         .lcs_blocking   = tgt_blocking_ast,
25         .lcs_glimpse    = ldlm_server_glimpse_ast
26 };
27
28 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
29 {
30         int rc = 0;
31
32         switch (opc) {
33         case BUT_GETATTR:
34                 info->mti_dlm_req = req_capsule_client_get(info->mti_pill,
35                                                            &RMF_DLM_REQ);
36                 if (info->mti_dlm_req == NULL)
37                         RETURN(-EFAULT);
38                 break;
39         default:
40                 rc = -EOPNOTSUPP;
41                 CERROR("%s: Unexpected opcode %d: rc = %d\n",
42                        mdt_obd_name(info->mti_mdt), opc, rc);
43                 break;
44         }
45
46         RETURN(rc);
47 }
48
49 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
50 {
51         return 0;
52 }
53
54 typedef int (*mdt_batch_reconstructor)(struct tgt_session_info *tsi);
55
56 static mdt_batch_reconstructor reconstructors[BUT_LAST_OPC];
57
58 static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
59 {
60         mdt_batch_reconstructor reconst;
61         int rc;
62
63         ENTRY;
64
65         if (opc >= BUT_LAST_OPC)
66                 RETURN(-EOPNOTSUPP);
67
68         reconst = reconstructors[opc];
69         LASSERT(reconst != NULL);
70         rc = reconst(tsi);
71         RETURN(rc);
72 }
73
74 static int mdt_batch_getattr(struct tgt_session_info *tsi)
75 {
76         struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
77         struct req_capsule *pill = &info->mti_sub_pill;
78         int rc;
79
80         ENTRY;
81
82         rc = ldlm_handle_enqueue(info->mti_exp->exp_obd->obd_namespace,
83                                  pill, info->mti_dlm_req, &mdt_dlm_cbs);
84
85         RETURN(rc);
86 }
87
88 /* Batch UpdaTe Request with a format known in advance */
89 #define TGT_BUT_HDL(flags, opc, fn)                     \
90 [opc - BUT_FIRST_OPC] = {                               \
91         .th_name        = #opc,                         \
92         .th_fail_id     = 0,                            \
93         .th_opc         = opc,                          \
94         .th_flags       = flags,                        \
95         .th_act         = fn,                           \
96         .th_fmt         = &RQF_ ## opc,                 \
97         .th_version     = LUSTRE_MDS_VERSION,           \
98         .th_hp          = NULL,                         \
99 }
100
101 static struct tgt_handler mdt_batch_handlers[] = {
102 TGT_BUT_HDL(HAS_KEY | HAS_REPLY,        BUT_GETATTR,    mdt_batch_getattr),
103 };
104
105 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
106 {
107         struct tgt_handler *h;
108
109         h = NULL;
110         if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
111                 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
112                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
113                          h->th_opc, opc);
114         } else {
115                 h = NULL; /* unsupported opc */
116         }
117         return h;
118 }
119
120 int mdt_batch(struct tgt_session_info *tsi)
121 {
122         struct mdt_thread_info *info = tsi2mdt_info(tsi);
123         struct req_capsule *pill = &info->mti_sub_pill;
124         struct ptlrpc_request *req = tgt_ses_req(tsi);
125         struct but_update_header *buh;
126         struct but_update_buffer *bub = NULL;
127         struct batch_update_reply *reply = NULL;
128         struct ptlrpc_bulk_desc *desc = NULL;
129         struct tg_reply_data *trd = NULL;
130         struct lustre_msg *repmsg = NULL;
131         bool need_reconstruct;
132         __u32 handled_update_count = 0;
133         __u32 update_buf_count;
134         __u32 packed_replen;
135         void **update_bufs;
136         bool grown = false;
137         int buh_size;
138         int rc;
139         int i;
140
141         ENTRY;
142
143         buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
144                                         RCL_CLIENT);
145         if (buh_size <= 0)
146                 RETURN(err_serious(-EPROTO));
147
148         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
149         if (buh == NULL)
150                 RETURN(err_serious(-EPROTO));
151
152         if (buh->buh_magic != BUT_HEADER_MAGIC) {
153                 CERROR("%s: invalid update header magic %x expect %x: "
154                        "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
155                        BUT_HEADER_MAGIC, -EPROTO);
156                 RETURN(err_serious(-EPROTO));
157         }
158
159         update_buf_count = buh->buh_count;
160         if (update_buf_count == 0)
161                 RETURN(err_serious(-EPROTO));
162
163         OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
164         if (update_bufs == NULL)
165                 RETURN(err_serious(-ENOMEM));
166
167         if (buh->buh_inline_length > 0) {
168                 update_bufs[0] = buh->buh_inline_data;
169         } else {
170                 struct but_update_buffer *tmp;
171                 int page_count = 0;
172
173                 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
174                 if (bub == NULL)
175                         GOTO(out, rc = err_serious(-EPROTO));
176
177                 for (i = 0; i < update_buf_count; i++)
178                         /* First *and* last might be partial pages, hence +1 */
179                         page_count += DIV_ROUND_UP(bub[i].bub_size,
180                                                    PAGE_SIZE) + 1;
181
182                 desc = ptlrpc_prep_bulk_exp(req, page_count,
183                                             PTLRPC_BULK_OPS_COUNT,
184                                             PTLRPC_BULK_GET_SINK,
185                                             MDS_BULK_PORTAL,
186                                             &ptlrpc_bulk_kiov_nopin_ops);
187                 if (desc == NULL)
188                         GOTO(out, rc = err_serious(-ENOMEM));
189
190                 tmp = bub;
191                 for (i = 0; i < update_buf_count; i++, tmp++) {
192                         if (tmp->bub_size >= OUT_MAXREQSIZE)
193                                 GOTO(out, rc = err_serious(-EPROTO));
194
195                         OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
196                         if (update_bufs[i] == NULL)
197                                 GOTO(out, rc = err_serious(-ENOMEM));
198
199                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
200                                                         tmp->bub_size);
201                 }
202
203                 req->rq_bulk_write = 1;
204                 rc = sptlrpc_svc_prep_bulk(req, desc);
205                 if (rc != 0)
206                         GOTO(out, rc = err_serious(rc));
207
208                 rc = target_bulk_io(req->rq_export, desc);
209                 if (rc < 0)
210                         GOTO(out, rc = err_serious(rc));
211         }
212
213         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
214                              buh->buh_reply_size);
215         rc = req_capsule_server_pack(&req->rq_pill);
216         if (rc != 0) {
217                 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
218                        tgt_name(tsi->tsi_tgt), rc);
219                 GOTO(out, rc);
220         }
221
222         /* Prepare the update reply buffer */
223         reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
224         if (reply == NULL)
225                 GOTO(out, rc = -EPROTO);
226
227         reply->burp_magic = BUT_REPLY_MAGIC;
228         packed_replen = sizeof(*reply);
229         info->mti_batch_env = 1;
230         info->mti_pill = pill;
231         tsi->tsi_batch_env = true;
232
233         OBD_ALLOC_PTR(trd);
234         if (trd == NULL)
235                 GOTO(out, rc = -ENOMEM);
236
237         need_reconstruct = tgt_check_resent(req, trd);
238         /* Walk through sub requests in the batch request to execute them. */
239         for (i = 0; i < update_buf_count; i++) {
240                 struct batch_update_request *bur;
241                 struct lustre_msg *reqmsg = NULL;
242                 struct tgt_handler *h;
243                 int update_count;
244                 int j;
245
246                 bur = update_bufs[i];
247                 update_count = bur->burq_count;
248                 for (j = 0; j < update_count; j++) {
249                         __u32 replen;
250
251                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
252                         repmsg = batch_update_repmsg_next(reply, repmsg);
253
254                         if (handled_update_count > buh->buh_update_count)
255                                 GOTO(out, rc = -EOVERFLOW);
256
257                         LASSERT(reqmsg != NULL && repmsg != NULL);
258                         LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
259                                  "Invalid reqmsg magic %x expected %x\n",
260                                  reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
261
262                         h = mdt_batch_handler_find(reqmsg->lm_opc);
263                         if (unlikely(h == NULL)) {
264                                 CERROR("%s: unsupported opc: 0x%x\n",
265                                        tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
266                                 GOTO(out, rc = -ENOTSUPP);
267                         }
268
269                         LASSERT(h->th_fmt != NULL);
270                         req_capsule_subreq_init(pill, h->th_fmt, req,
271                                                 reqmsg, repmsg, RCL_SERVER);
272
273                         rc = mdt_batch_unpack(info, reqmsg->lm_opc);
274                         if (rc) {
275                                 CERROR("%s: Can't unpack subreq, rc = %d\n",
276                                        mdt_obd_name(info->mti_mdt), rc);
277                                 GOTO(out, rc);
278                         }
279
280                         rc = mdt_batch_pack_repmsg(info);
281                         if (rc)
282                                 GOTO(out, rc);
283
284                         /* Need to reconstruct the reply for committed sub
285                          * requests in a batched RPC.
286                          * It only calls reconstruct for modification sub
287                          * requests.
288                          * For uncommitted or read-only sub requests, the server
289                          * should re-execute them via the ->th_act() below.
290                          */
291                         if ((h->th_flags & IS_MUTABLE) && need_reconstruct &&
292                             handled_update_count <=
293                             trd->trd_reply.lrd_batch_idx) {
294                                 rc = mdt_batch_reconstruct(tsi, reqmsg->lm_opc);
295                                 if (rc)
296                                         GOTO(out, rc);
297                                 GOTO(next, rc);
298                         }
299
300                         tsi->tsi_batch_idx = handled_update_count;
301                         rc = h->th_act(tsi);
302 next:
303                         /*
304                          * As @repmsg may be changed if the reply buffer is
305                          * too small to grow, thus it needs to reload it here.
306                          */
307                         if (repmsg != pill->rc_repmsg) {
308                                 repmsg = pill->rc_repmsg;
309                                 grown = true;
310                         }
311
312                         if (rc)
313                                 GOTO(out, rc);
314
315                         repmsg->lm_result = rc;
316                         mdt_thread_info_reset(info);
317
318                         replen = lustre_packed_msg_size(repmsg);
319                         packed_replen += replen;
320                         handled_update_count++;
321                 }
322         }
323
324         CDEBUG(D_INFO, "reply size %u packed replen %u\n",
325                buh->buh_reply_size, packed_replen);
326         if (buh->buh_reply_size > packed_replen)
327                 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
328                                    packed_replen, RCL_SERVER);
329 out:
330         if (reply != NULL) {
331                 if (grown) {
332                         reply = req_capsule_server_get(&req->rq_pill,
333                                                        &RMF_BUT_REPLY);
334                         if (reply == NULL)
335                                 GOTO(out_free, rc = -EPROTO);
336                 }
337                 reply->burp_count = handled_update_count;
338         }
339
340 out_free:
341         if (update_bufs != NULL) {
342                 if (bub != NULL) {
343                         for (i = 0; i < update_buf_count; i++, bub++) {
344                                 if (update_bufs[i] != NULL)
345                                         OBD_FREE_LARGE(update_bufs[i],
346                                                        bub->bub_size);
347                         }
348                 }
349
350                 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
351         }
352
353         OBD_FREE_PTR(trd);
354
355         if (desc != NULL)
356                 ptlrpc_free_bulk(desc);
357
358         mdt_thread_info_fini(info);
359         tsi->tsi_reply_fail_id = OBD_FAIL_BUT_UPDATE_NET_REP;
360         RETURN(rc);
361 }
362