Whamcloud - gitweb
LU-14139 ptlrpc: grow PtlRPC properly when prepare sub request
[fs/lustre-release.git] / lustre / ptlrpc / layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ptlrpc/layout.c
32  *
33  * Lustre Metadata Target (mdt) request handler
34  *
35  * Author: Nikita Danilov <nikita@clusterfs.com>
36  */
37 /*
38  * This file contains the "capsule/pill" abstraction layered above PTLRPC.
39  *
40  * Every struct ptlrpc_request contains a "pill", which points to a description
41  * of the format that the request conforms to.
42  */
43
44 #define DEBUG_SUBSYSTEM S_RPC
45
46 #include <linux/module.h>
47
48 #include <llog_swab.h>
49 #include <lustre_swab.h>
50 #include <obd.h>
51 #include <obd_support.h>
52
53 /* struct ptlrpc_request, lustre_msg* */
54 #include <lustre_req_layout.h>
55 #include <lustre_acl.h>
56 #include <lustre_nodemap.h>
57
58 /*
59  * RQFs (see below) refer to two struct req_msg_field arrays describing the
60  * client request and server reply, respectively.
61  */
62 /* empty set of fields... for suitable definition of emptiness. */
63 static const struct req_msg_field *empty[] = {
64         &RMF_PTLRPC_BODY
65 };
66
67 static const struct req_msg_field *mgs_target_info_only[] = {
68         &RMF_PTLRPC_BODY,
69         &RMF_MGS_TARGET_INFO
70 };
71
72 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 18, 53, 0)
73 static const struct req_msg_field *mgs_set_info[] = {
74         &RMF_PTLRPC_BODY,
75         &RMF_MGS_SEND_PARAM
76 };
77 #endif
78
79 static const struct req_msg_field *mgs_config_read_client[] = {
80         &RMF_PTLRPC_BODY,
81         &RMF_MGS_CONFIG_BODY
82 };
83
84 static const struct req_msg_field *mgs_config_read_server[] = {
85         &RMF_PTLRPC_BODY,
86         &RMF_MGS_CONFIG_RES
87 };
88
89 static const struct req_msg_field *mdt_body_only[] = {
90         &RMF_PTLRPC_BODY,
91         &RMF_MDT_BODY
92 };
93
94 static const struct req_msg_field *mdt_body_capa[] = {
95         &RMF_PTLRPC_BODY,
96         &RMF_MDT_BODY,
97         &RMF_CAPA1
98 };
99
100 static const struct req_msg_field *quotactl_only[] = {
101         &RMF_PTLRPC_BODY,
102         &RMF_OBD_QUOTACTL
103 };
104
105 static const struct req_msg_field *quota_body_only[] = {
106         &RMF_PTLRPC_BODY,
107         &RMF_QUOTA_BODY
108 };
109
110 static const struct req_msg_field *ldlm_intent_quota_client[] = {
111         &RMF_PTLRPC_BODY,
112         &RMF_DLM_REQ,
113         &RMF_LDLM_INTENT,
114         &RMF_QUOTA_BODY
115 };
116
117 static const struct req_msg_field *ldlm_intent_quota_server[] = {
118         &RMF_PTLRPC_BODY,
119         &RMF_DLM_REP,
120         &RMF_DLM_LVB,
121         &RMF_QUOTA_BODY
122 };
123
124 static const struct req_msg_field *mdt_close_client[] = {
125         &RMF_PTLRPC_BODY,
126         &RMF_MDT_EPOCH,
127         &RMF_REC_REINT,
128         &RMF_CAPA1
129 };
130
131 static const struct req_msg_field *mdt_close_intent_client[] = {
132         &RMF_PTLRPC_BODY,
133         &RMF_MDT_EPOCH,
134         &RMF_REC_REINT,
135         &RMF_CAPA1,
136         &RMF_CLOSE_DATA,
137         &RMF_U32
138 };
139
140 static const struct req_msg_field *obd_statfs_server[] = {
141         &RMF_PTLRPC_BODY,
142         &RMF_OBD_STATFS
143 };
144
145 static const struct req_msg_field *seq_query_client[] = {
146         &RMF_PTLRPC_BODY,
147         &RMF_SEQ_OPC,
148         &RMF_SEQ_RANGE
149 };
150
151 static const struct req_msg_field *seq_query_server[] = {
152         &RMF_PTLRPC_BODY,
153         &RMF_SEQ_RANGE
154 };
155
156 static const struct req_msg_field *fld_query_client[] = {
157         &RMF_PTLRPC_BODY,
158         &RMF_FLD_OPC,
159         &RMF_FLD_MDFLD
160 };
161
162 static const struct req_msg_field *fld_query_server[] = {
163         &RMF_PTLRPC_BODY,
164         &RMF_FLD_MDFLD
165 };
166
167 static const struct req_msg_field *fld_read_client[] = {
168         &RMF_PTLRPC_BODY,
169         &RMF_FLD_MDFLD
170 };
171
172 static const struct req_msg_field *fld_read_server[] = {
173         &RMF_PTLRPC_BODY,
174         &RMF_GENERIC_DATA
175 };
176
177 static const struct req_msg_field *mds_getattr_name_client[] = {
178         &RMF_PTLRPC_BODY,
179         &RMF_MDT_BODY,
180         &RMF_CAPA1,
181         &RMF_NAME
182 };
183
184 static const struct req_msg_field *mds_reint_client[] = {
185         &RMF_PTLRPC_BODY,
186         &RMF_REC_REINT
187 };
188
189 static const struct req_msg_field *mds_reint_create_client[] = {
190         &RMF_PTLRPC_BODY,
191         &RMF_REC_REINT,
192         &RMF_CAPA1,
193         &RMF_NAME
194 };
195
196 static const struct req_msg_field *mds_reint_create_slave_client[] = {
197         &RMF_PTLRPC_BODY,
198         &RMF_REC_REINT,
199         &RMF_CAPA1,
200         &RMF_NAME,
201         &RMF_EADATA,
202         &RMF_DLM_REQ
203 };
204
205 static const struct req_msg_field *mds_reint_create_acl_client[] = {
206         &RMF_PTLRPC_BODY,
207         &RMF_REC_REINT,
208         &RMF_CAPA1,
209         &RMF_NAME,
210         &RMF_EADATA,
211         &RMF_DLM_REQ,
212         &RMF_FILE_SECCTX_NAME,
213         &RMF_FILE_SECCTX,
214         &RMF_SELINUX_POL,
215         &RMF_FILE_ENCCTX,
216 };
217
218 static const struct req_msg_field *mds_reint_create_sym_client[] = {
219         &RMF_PTLRPC_BODY,
220         &RMF_REC_REINT,
221         &RMF_CAPA1,
222         &RMF_NAME,
223         &RMF_SYMTGT,
224         &RMF_DLM_REQ,
225         &RMF_FILE_SECCTX_NAME,
226         &RMF_FILE_SECCTX,
227         &RMF_SELINUX_POL,
228         &RMF_FILE_ENCCTX,
229 };
230
231 static const struct req_msg_field *mds_reint_open_client[] = {
232         &RMF_PTLRPC_BODY,
233         &RMF_REC_REINT,
234         &RMF_CAPA1,
235         &RMF_CAPA2,
236         &RMF_NAME,
237         &RMF_EADATA,
238         &RMF_FILE_SECCTX_NAME,
239         &RMF_FILE_SECCTX,
240         &RMF_SELINUX_POL,
241         &RMF_FILE_ENCCTX,
242 };
243
244 static const struct req_msg_field *mds_reint_open_server[] = {
245         &RMF_PTLRPC_BODY,
246         &RMF_MDT_BODY,
247         &RMF_MDT_MD,
248         &RMF_ACL,
249         &RMF_CAPA1,
250         &RMF_CAPA2
251 };
252
253 static const struct req_msg_field *mds_reint_unlink_client[] = {
254         &RMF_PTLRPC_BODY,
255         &RMF_REC_REINT,
256         &RMF_CAPA1,
257         &RMF_NAME,
258         &RMF_DLM_REQ,
259         &RMF_SELINUX_POL
260 };
261
262 static const struct req_msg_field *mds_reint_link_client[] = {
263         &RMF_PTLRPC_BODY,
264         &RMF_REC_REINT,
265         &RMF_CAPA1,
266         &RMF_CAPA2,
267         &RMF_NAME,
268         &RMF_DLM_REQ,
269         &RMF_SELINUX_POL
270 };
271
272 static const struct req_msg_field *mds_reint_rename_client[] = {
273         &RMF_PTLRPC_BODY,
274         &RMF_REC_REINT,
275         &RMF_CAPA1,
276         &RMF_CAPA2,
277         &RMF_NAME,
278         &RMF_SYMTGT,
279         &RMF_DLM_REQ,
280         &RMF_SELINUX_POL
281 };
282
283 static const struct req_msg_field *mds_reint_migrate_client[] = {
284         &RMF_PTLRPC_BODY,
285         &RMF_REC_REINT,
286         &RMF_CAPA1,
287         &RMF_CAPA2,
288         &RMF_NAME,
289         &RMF_SYMTGT,
290         &RMF_DLM_REQ,
291         &RMF_SELINUX_POL,
292         &RMF_MDT_EPOCH,
293         &RMF_CLOSE_DATA,
294         &RMF_EADATA
295 };
296
297 static const struct req_msg_field *mds_last_unlink_server[] = {
298         &RMF_PTLRPC_BODY,
299         &RMF_MDT_BODY,
300         &RMF_MDT_MD,
301         &RMF_LOGCOOKIES,
302         &RMF_CAPA1,
303         &RMF_CAPA2
304 };
305
306 static const struct req_msg_field *mds_reint_setattr_client[] = {
307         &RMF_PTLRPC_BODY,
308         &RMF_REC_REINT,
309         &RMF_CAPA1,
310         &RMF_MDT_EPOCH,
311         &RMF_EADATA,
312         &RMF_LOGCOOKIES,
313         &RMF_DLM_REQ
314 };
315
316 static const struct req_msg_field *mds_reint_setxattr_client[] = {
317         &RMF_PTLRPC_BODY,
318         &RMF_REC_REINT,
319         &RMF_CAPA1,
320         &RMF_NAME,
321         &RMF_EADATA,
322         &RMF_DLM_REQ,
323         &RMF_SELINUX_POL
324 };
325
326 static const struct req_msg_field *mds_reint_resync[] = {
327         &RMF_PTLRPC_BODY,
328         &RMF_REC_REINT,
329         &RMF_DLM_REQ
330 };
331
332 static const struct req_msg_field *mdt_swap_layouts[] = {
333         &RMF_PTLRPC_BODY,
334         &RMF_MDT_BODY,
335         &RMF_SWAP_LAYOUTS,
336         &RMF_CAPA1,
337         &RMF_CAPA2,
338         &RMF_DLM_REQ
339 };
340
341 static const struct req_msg_field *mds_rmfid_client[] = {
342         &RMF_PTLRPC_BODY,
343         &RMF_MDT_BODY,
344         &RMF_FID_ARRAY,
345         &RMF_CAPA1,
346         &RMF_CAPA2,
347 };
348
349 static const struct req_msg_field *mds_rmfid_server[] = {
350         &RMF_PTLRPC_BODY,
351         &RMF_MDT_BODY,
352         &RMF_FID_ARRAY,
353         &RMF_RCS,
354 };
355
356 static const struct req_msg_field *obd_connect_client[] = {
357         &RMF_PTLRPC_BODY,
358         &RMF_TGTUUID,
359         &RMF_CLUUID,
360         &RMF_CONN,
361         &RMF_CONNECT_DATA,
362         &RMF_SELINUX_POL
363 };
364
365 static const struct req_msg_field *obd_connect_server[] = {
366         &RMF_PTLRPC_BODY,
367         &RMF_CONNECT_DATA
368 };
369
370 static const struct req_msg_field *obd_set_info_client[] = {
371         &RMF_PTLRPC_BODY,
372         &RMF_SETINFO_KEY,
373         &RMF_SETINFO_VAL
374 };
375
376 static const struct req_msg_field *mdt_set_info_client[] = {
377         &RMF_PTLRPC_BODY,
378         &RMF_SETINFO_KEY,
379         &RMF_SETINFO_VAL,
380         &RMF_MDT_BODY
381 };
382
383 static const struct req_msg_field *ost_grant_shrink_client[] = {
384         &RMF_PTLRPC_BODY,
385         &RMF_SETINFO_KEY,
386         &RMF_OST_BODY
387 };
388
389 static const struct req_msg_field *mds_getinfo_client[] = {
390         &RMF_PTLRPC_BODY,
391         &RMF_GETINFO_KEY,
392         &RMF_GETINFO_VALLEN
393 };
394
395 static const struct req_msg_field *mds_getinfo_server[] = {
396         &RMF_PTLRPC_BODY,
397         &RMF_GETINFO_VAL,
398 };
399
400 static const struct req_msg_field *ldlm_enqueue_client[] = {
401         &RMF_PTLRPC_BODY,
402         &RMF_DLM_REQ
403 };
404
405 static const struct req_msg_field *ldlm_enqueue_server[] = {
406         &RMF_PTLRPC_BODY,
407         &RMF_DLM_REP
408 };
409
410 static const struct req_msg_field *ldlm_enqueue_lvb_server[] = {
411         &RMF_PTLRPC_BODY,
412         &RMF_DLM_REP,
413         &RMF_DLM_LVB
414 };
415
416 static const struct req_msg_field *ldlm_cp_callback_client[] = {
417         &RMF_PTLRPC_BODY,
418         &RMF_DLM_REQ,
419         &RMF_DLM_LVB
420 };
421
422 static const struct req_msg_field *ldlm_gl_callback_desc_client[] = {
423         &RMF_PTLRPC_BODY,
424         &RMF_DLM_REQ,
425         &RMF_DLM_GL_DESC
426 };
427
428 static const struct req_msg_field *ldlm_gl_callback_server[] = {
429         &RMF_PTLRPC_BODY,
430         &RMF_DLM_LVB
431 };
432
433 static const struct req_msg_field *ldlm_intent_basic_client[] = {
434         &RMF_PTLRPC_BODY,
435         &RMF_DLM_REQ,
436         &RMF_LDLM_INTENT,
437 };
438
439 static const struct req_msg_field *ldlm_intent_client[] = {
440         &RMF_PTLRPC_BODY,
441         &RMF_DLM_REQ,
442         &RMF_LDLM_INTENT,
443         &RMF_REC_REINT
444 };
445
446 static const struct req_msg_field *ldlm_intent_server[] = {
447         &RMF_PTLRPC_BODY,
448         &RMF_DLM_REP,
449         &RMF_MDT_BODY,
450         &RMF_MDT_MD,
451         &RMF_ACL
452 };
453
454 static const struct req_msg_field *ldlm_intent_layout_client[] = {
455         &RMF_PTLRPC_BODY,
456         &RMF_DLM_REQ,
457         &RMF_LDLM_INTENT,
458         &RMF_LAYOUT_INTENT,
459         &RMF_EADATA /* for new layout to be set up */
460 };
461
462 static const struct req_msg_field *ldlm_intent_open_server[] = {
463         &RMF_PTLRPC_BODY,
464         &RMF_DLM_REP,
465         &RMF_MDT_BODY,
466         &RMF_MDT_MD,
467         &RMF_ACL,
468         &RMF_CAPA1,
469         &RMF_CAPA2,
470         &RMF_NIOBUF_INLINE,
471         &RMF_FILE_SECCTX,
472         &RMF_FILE_ENCCTX,
473         &RMF_DEFAULT_MDT_MD,
474 };
475
476 static const struct req_msg_field *ldlm_intent_getattr_client[] = {
477         &RMF_PTLRPC_BODY,
478         &RMF_DLM_REQ,
479         &RMF_LDLM_INTENT,
480         &RMF_MDT_BODY,     /* coincides with mds_getattr_name_client[] */
481         &RMF_CAPA1,
482         &RMF_NAME,
483         &RMF_FILE_SECCTX_NAME
484 };
485
486 static const struct req_msg_field *ldlm_intent_getattr_server[] = {
487         &RMF_PTLRPC_BODY,
488         &RMF_DLM_REP,
489         &RMF_MDT_BODY,
490         &RMF_MDT_MD,
491         &RMF_ACL,
492         &RMF_CAPA1,
493         &RMF_FILE_SECCTX,
494         &RMF_DEFAULT_MDT_MD,
495         &RMF_FILE_ENCCTX,
496 };
497
498 static const struct req_msg_field *ldlm_intent_create_client[] = {
499         &RMF_PTLRPC_BODY,
500         &RMF_DLM_REQ,
501         &RMF_LDLM_INTENT,
502         &RMF_REC_REINT,    /* coincides with mds_reint_create_client[] */
503         &RMF_CAPA1,
504         &RMF_NAME,
505         &RMF_EADATA,
506         &RMF_FILE_SECCTX_NAME,
507         &RMF_FILE_SECCTX,
508         &RMF_SELINUX_POL,
509         &RMF_FILE_ENCCTX,
510 };
511
512 static const struct req_msg_field *ldlm_intent_open_client[] = {
513         &RMF_PTLRPC_BODY,
514         &RMF_DLM_REQ,
515         &RMF_LDLM_INTENT,
516         &RMF_REC_REINT,    /* coincides with mds_reint_open_client[] */
517         &RMF_CAPA1,
518         &RMF_CAPA2,
519         &RMF_NAME,
520         &RMF_EADATA,
521         &RMF_FILE_SECCTX_NAME,
522         &RMF_FILE_SECCTX,
523         &RMF_SELINUX_POL,
524         &RMF_FILE_ENCCTX,
525 };
526
527 static const struct req_msg_field *ldlm_intent_getxattr_client[] = {
528         &RMF_PTLRPC_BODY,
529         &RMF_DLM_REQ,
530         &RMF_LDLM_INTENT,
531         &RMF_MDT_BODY,
532         &RMF_CAPA1,
533         &RMF_SELINUX_POL
534 };
535
536 static const struct req_msg_field *ldlm_intent_getxattr_server[] = {
537         &RMF_PTLRPC_BODY,
538         &RMF_DLM_REP,
539         &RMF_MDT_BODY,
540         &RMF_MDT_MD,
541         &RMF_ACL, /* for req_capsule_extend/mdt_intent_policy */
542         &RMF_EADATA,
543         &RMF_EAVALS,
544         &RMF_EAVALS_LENS
545 };
546
547 static const struct req_msg_field *mds_get_root_client[] = {
548         &RMF_PTLRPC_BODY,
549         &RMF_MDT_BODY,
550         &RMF_NAME
551 };
552
553 static const struct req_msg_field *mds_getxattr_client[] = {
554         &RMF_PTLRPC_BODY,
555         &RMF_MDT_BODY,
556         &RMF_CAPA1,
557         &RMF_NAME,
558         &RMF_EADATA,
559         &RMF_SELINUX_POL
560 };
561
562 static const struct req_msg_field *mds_getxattr_server[] = {
563         &RMF_PTLRPC_BODY,
564         &RMF_MDT_BODY,
565         &RMF_EADATA
566 };
567
568 static const struct req_msg_field *mds_getattr_server[] = {
569         &RMF_PTLRPC_BODY,
570         &RMF_MDT_BODY,
571         &RMF_MDT_MD,
572         &RMF_ACL,
573         &RMF_CAPA1,
574         &RMF_CAPA2,
575         &RMF_FILE_ENCCTX,
576 };
577
578 static const struct req_msg_field *mds_setattr_server[] = {
579         &RMF_PTLRPC_BODY,
580         &RMF_MDT_BODY,
581         &RMF_MDT_MD,
582         &RMF_ACL,
583         &RMF_CAPA1,
584         &RMF_CAPA2
585 };
586
587 static const struct req_msg_field *mds_batch_client[] = {
588         &RMF_PTLRPC_BODY,
589         &RMF_BUT_HEADER,
590         &RMF_BUT_BUF,
591 };
592
593 static const struct req_msg_field *mds_batch_server[] = {
594         &RMF_PTLRPC_BODY,
595         &RMF_BUT_REPLY,
596 };
597
598 static const struct req_msg_field *llog_origin_handle_create_client[] = {
599         &RMF_PTLRPC_BODY,
600         &RMF_LLOGD_BODY,
601         &RMF_NAME,
602         &RMF_MDT_BODY
603 };
604
605 static const struct req_msg_field *llogd_body_only[] = {
606         &RMF_PTLRPC_BODY,
607         &RMF_LLOGD_BODY
608 };
609
610 static const struct req_msg_field *llog_log_hdr_only[] = {
611         &RMF_PTLRPC_BODY,
612         &RMF_LLOG_LOG_HDR
613 };
614
615 static const struct req_msg_field *llog_origin_handle_next_block_server[] = {
616         &RMF_PTLRPC_BODY,
617         &RMF_LLOGD_BODY,
618         &RMF_EADATA
619 };
620
621 static const struct req_msg_field *obd_idx_read_client[] = {
622         &RMF_PTLRPC_BODY,
623         &RMF_IDX_INFO
624 };
625
626 static const struct req_msg_field *obd_idx_read_server[] = {
627         &RMF_PTLRPC_BODY,
628         &RMF_IDX_INFO
629 };
630
631 static const struct req_msg_field *ost_body_only[] = {
632         &RMF_PTLRPC_BODY,
633         &RMF_OST_BODY
634 };
635
636 static const struct req_msg_field *ost_body_capa[] = {
637         &RMF_PTLRPC_BODY,
638         &RMF_OST_BODY,
639         &RMF_CAPA1
640 };
641
642 static const struct req_msg_field *ost_destroy_client[] = {
643         &RMF_PTLRPC_BODY,
644         &RMF_OST_BODY,
645         &RMF_DLM_REQ,
646         &RMF_CAPA1
647 };
648
649
650 static const struct req_msg_field *ost_brw_client[] = {
651         &RMF_PTLRPC_BODY,
652         &RMF_OST_BODY,
653         &RMF_OBD_IOOBJ,
654         &RMF_NIOBUF_REMOTE,
655         &RMF_CAPA1,
656         &RMF_SHORT_IO
657 };
658
659 static const struct req_msg_field *ost_brw_read_server[] = {
660         &RMF_PTLRPC_BODY,
661         &RMF_OST_BODY,
662         &RMF_SHORT_IO
663 };
664
665 static const struct req_msg_field *ost_brw_write_server[] = {
666         &RMF_PTLRPC_BODY,
667         &RMF_OST_BODY,
668         &RMF_RCS
669 };
670
671 static const struct req_msg_field *ost_get_info_generic_server[] = {
672         &RMF_PTLRPC_BODY,
673         &RMF_GENERIC_DATA,
674 };
675
676 static const struct req_msg_field *ost_get_info_generic_client[] = {
677         &RMF_PTLRPC_BODY,
678         &RMF_GETINFO_KEY
679 };
680
681 static const struct req_msg_field *ost_get_last_id_server[] = {
682         &RMF_PTLRPC_BODY,
683         &RMF_OBD_ID
684 };
685
686 static const struct req_msg_field *ost_get_last_fid_client[] = {
687         &RMF_PTLRPC_BODY,
688         &RMF_GETINFO_KEY,
689         &RMF_FID,
690 };
691
692 static const struct req_msg_field *ost_get_last_fid_server[] = {
693         &RMF_PTLRPC_BODY,
694         &RMF_FID,
695 };
696
697 static const struct req_msg_field *ost_get_fiemap_client[] = {
698         &RMF_PTLRPC_BODY,
699         &RMF_FIEMAP_KEY,
700         &RMF_FIEMAP_VAL
701 };
702
703 static const struct req_msg_field *ost_ladvise[] = {
704         &RMF_PTLRPC_BODY,
705         &RMF_OST_BODY,
706         &RMF_OST_LADVISE_HDR,
707         &RMF_OST_LADVISE,
708 };
709
710 static const struct req_msg_field *ost_get_fiemap_server[] = {
711         &RMF_PTLRPC_BODY,
712         &RMF_FIEMAP_VAL
713 };
714
715 static const struct req_msg_field *mdt_hsm_progress[] = {
716         &RMF_PTLRPC_BODY,
717         &RMF_MDT_BODY,
718         &RMF_MDS_HSM_PROGRESS,
719 };
720
721 static const struct req_msg_field *mdt_hsm_ct_register[] = {
722         &RMF_PTLRPC_BODY,
723         &RMF_MDT_BODY,
724         &RMF_MDS_HSM_ARCHIVE,
725 };
726
727 static const struct req_msg_field *mdt_hsm_ct_unregister[] = {
728         &RMF_PTLRPC_BODY,
729         &RMF_MDT_BODY,
730 };
731
732 static const struct req_msg_field *mdt_hsm_action_server[] = {
733         &RMF_PTLRPC_BODY,
734         &RMF_MDT_BODY,
735         &RMF_MDS_HSM_CURRENT_ACTION,
736 };
737
738 static const struct req_msg_field *mdt_hsm_state_get_server[] = {
739         &RMF_PTLRPC_BODY,
740         &RMF_MDT_BODY,
741         &RMF_HSM_USER_STATE,
742 };
743
744 static const struct req_msg_field *mdt_hsm_state_set[] = {
745         &RMF_PTLRPC_BODY,
746         &RMF_MDT_BODY,
747         &RMF_CAPA1,
748         &RMF_HSM_STATE_SET,
749 };
750
751 static const struct req_msg_field *mdt_hsm_request[] = {
752         &RMF_PTLRPC_BODY,
753         &RMF_MDT_BODY,
754         &RMF_MDS_HSM_REQUEST,
755         &RMF_MDS_HSM_USER_ITEM,
756         &RMF_GENERIC_DATA,
757 };
758
759 static const struct req_msg_field *obd_lfsck_request[] = {
760         &RMF_PTLRPC_BODY,
761         &RMF_LFSCK_REQUEST,
762 };
763
764 static const struct req_msg_field *obd_lfsck_reply[] = {
765         &RMF_PTLRPC_BODY,
766         &RMF_LFSCK_REPLY,
767 };
768
769 static const struct req_msg_field *mds_batch_getattr_client[] = {
770         &RMF_DLM_REQ,
771         &RMF_LDLM_INTENT,
772         &RMF_MDT_BODY,     /* coincides with mds_getattr_name_client[] */
773         &RMF_CAPA1,
774         &RMF_NAME,
775         &RMF_FILE_SECCTX_NAME
776 };
777
778 static const struct req_msg_field *mds_batch_getattr_server[] = {
779         &RMF_DLM_REP,
780         &RMF_MDT_BODY,
781         &RMF_MDT_MD,
782         &RMF_ACL,
783         &RMF_CAPA1,
784         &RMF_FILE_SECCTX,
785         &RMF_DEFAULT_MDT_MD,
786         &RMF_FILE_ENCCTX,
787 };
788
789 static struct req_format *req_formats[] = {
790         &RQF_OBD_PING,
791         &RQF_OBD_SET_INFO,
792         &RQF_MDT_SET_INFO,
793         &RQF_OBD_IDX_READ,
794         &RQF_SEC_CTX,
795         &RQF_MGS_TARGET_REG,
796 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 18, 53, 0)
797         &RQF_MGS_SET_INFO,
798 #endif
799         &RQF_MGS_CONFIG_READ,
800         &RQF_SEQ_QUERY,
801         &RQF_FLD_QUERY,
802         &RQF_FLD_READ,
803         &RQF_MDS_CONNECT,
804         &RQF_MDS_DISCONNECT,
805         &RQF_MDS_GET_INFO,
806         &RQF_MDS_GET_ROOT,
807         &RQF_MDS_STATFS,
808         &RQF_MDS_STATFS_NEW,
809         &RQF_MDS_GETATTR,
810         &RQF_MDS_GETATTR_NAME,
811         &RQF_MDS_GETXATTR,
812         &RQF_MDS_SYNC,
813         &RQF_MDS_CLOSE,
814         &RQF_MDS_CLOSE_INTENT,
815         &RQF_MDS_READPAGE,
816         &RQF_MDS_REINT,
817         &RQF_MDS_REINT_CREATE,
818         &RQF_MDS_REINT_CREATE_ACL,
819         &RQF_MDS_REINT_CREATE_SLAVE,
820         &RQF_MDS_REINT_CREATE_SYM,
821         &RQF_MDS_REINT_OPEN,
822         &RQF_MDS_REINT_UNLINK,
823         &RQF_MDS_REINT_LINK,
824         &RQF_MDS_REINT_RENAME,
825         &RQF_MDS_REINT_MIGRATE,
826         &RQF_MDS_REINT_SETATTR,
827         &RQF_MDS_REINT_SETXATTR,
828         &RQF_MDS_REINT_RESYNC,
829         &RQF_MDS_QUOTACTL,
830         &RQF_MDS_HSM_PROGRESS,
831         &RQF_MDS_HSM_CT_REGISTER,
832         &RQF_MDS_HSM_CT_UNREGISTER,
833         &RQF_MDS_HSM_STATE_GET,
834         &RQF_MDS_HSM_STATE_SET,
835         &RQF_MDS_HSM_ACTION,
836         &RQF_MDS_HSM_REQUEST,
837         &RQF_MDS_SWAP_LAYOUTS,
838         &RQF_MDS_RMFID,
839 #ifdef HAVE_SERVER_SUPPORT
840         &RQF_OUT_UPDATE,
841 #endif
842         &RQF_OST_CONNECT,
843         &RQF_OST_DISCONNECT,
844         &RQF_OST_QUOTACTL,
845         &RQF_OST_GETATTR,
846         &RQF_OST_SETATTR,
847         &RQF_OST_CREATE,
848         &RQF_OST_PUNCH,
849         &RQF_OST_FALLOCATE,
850         &RQF_OST_SYNC,
851         &RQF_OST_DESTROY,
852         &RQF_OST_BRW_READ,
853         &RQF_OST_BRW_WRITE,
854         &RQF_OST_STATFS,
855         &RQF_OST_SET_GRANT_INFO,
856         &RQF_OST_GET_INFO,
857         &RQF_OST_GET_INFO_LAST_ID,
858         &RQF_OST_GET_INFO_LAST_FID,
859         &RQF_OST_SET_INFO_LAST_FID,
860         &RQF_OST_GET_INFO_FIEMAP,
861         &RQF_OST_LADVISE,
862         &RQF_OST_SEEK,
863         &RQF_LDLM_ENQUEUE,
864         &RQF_LDLM_ENQUEUE_LVB,
865         &RQF_LDLM_CONVERT,
866         &RQF_LDLM_CANCEL,
867         &RQF_LDLM_CALLBACK,
868         &RQF_LDLM_CP_CALLBACK,
869         &RQF_LDLM_BL_CALLBACK,
870         &RQF_LDLM_GL_CALLBACK,
871         &RQF_LDLM_GL_CALLBACK_DESC,
872         &RQF_LDLM_INTENT,
873         &RQF_LDLM_INTENT_BASIC,
874         &RQF_LDLM_INTENT_LAYOUT,
875         &RQF_LDLM_INTENT_GETATTR,
876         &RQF_LDLM_INTENT_OPEN,
877         &RQF_LDLM_INTENT_CREATE,
878         &RQF_LDLM_INTENT_GETXATTR,
879         &RQF_LDLM_INTENT_QUOTA,
880         &RQF_QUOTA_DQACQ,
881         &RQF_LLOG_ORIGIN_HANDLE_CREATE,
882         &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
883         &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
884         &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
885         &RQF_CONNECT,
886         &RQF_LFSCK_NOTIFY,
887         &RQF_LFSCK_QUERY,
888         &RQF_BUT_GETATTR,
889         &RQF_MDS_BATCH,
890 };
891
892 struct req_msg_field {
893         const __u32 rmf_flags;
894         const char  *rmf_name;
895         /**
896          * Field length. (-1) means "variable length".  If the
897          * \a RMF_F_STRUCT_ARRAY flag is set the field is also variable-length,
898          * but the actual size must be a whole multiple of \a rmf_size.
899          */
900         const int   rmf_size;
901         void        (*rmf_swabber)(void *);
902         /**
903          * Pass buffer size to swabbing function
904          * \retval      > 0             the number of bytes swabbed
905          *              -EOVERFLOW      on error
906          */
907         int         (*rmf_swab_len)(void *, __u32);
908         void        (*rmf_dumper)(void *);
909         int         rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR];
910 };
911
912 enum rmf_flags {
913         /**
914          * The field is a string, must be NUL-terminated.
915          */
916         RMF_F_STRING            = BIT(0),
917         /**
918          * The field's buffer size need not match the declared \a rmf_size.
919          */
920         RMF_F_NO_SIZE_CHECK     = BIT(1),
921         /**
922          * The field's buffer size must be a whole multiple of the declared \a
923          * rmf_size and the \a rmf_swabber function must work on the declared \a
924          * rmf_size worth of bytes.
925          */
926         RMF_F_STRUCT_ARRAY      = BIT(2),
927 };
928
929 struct req_capsule;
930
931 /*
932  * Request fields.
933  */
934 #define DEFINE_MSGF(name, flags, size, swabber, dumper) {       \
935         .rmf_name    = (name),                                  \
936         .rmf_flags   = (flags),                                 \
937         .rmf_size    = (size),                                  \
938         .rmf_swabber = (void (*)(void*))(swabber),              \
939         .rmf_dumper  = (void (*)(void*))(dumper)                \
940 }
941
942 #define DEFINE_MSGFL(name, flags, size, swab_len, dumper) {     \
943         .rmf_name     = (name),                                 \
944         .rmf_flags    = (flags),                                \
945         .rmf_size     = (size),                                 \
946         .rmf_swab_len = (int (*)(void *, __u32))(swab_len),     \
947         .rmf_dumper   = (void (*)(void *))(dumper)              \
948 }
949
950 struct req_msg_field RMF_GENERIC_DATA =
951         DEFINE_MSGF("generic_data", 0,
952                     -1, NULL, NULL);
953 EXPORT_SYMBOL(RMF_GENERIC_DATA);
954
955 struct req_msg_field RMF_MGS_TARGET_INFO =
956         DEFINE_MSGF("mgs_target_info", 0,
957                     sizeof(struct mgs_target_info),
958                     lustre_swab_mgs_target_info, NULL);
959 EXPORT_SYMBOL(RMF_MGS_TARGET_INFO);
960
961 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 18, 53, 0)
962 struct req_msg_field RMF_MGS_SEND_PARAM =
963         DEFINE_MSGF("mgs_send_param", 0,
964                     sizeof(struct mgs_send_param),
965                     NULL, NULL);
966 EXPORT_SYMBOL(RMF_MGS_SEND_PARAM);
967 #endif
968
969 struct req_msg_field RMF_MGS_CONFIG_BODY =
970         DEFINE_MSGF("mgs_config_read request", 0,
971                     sizeof(struct mgs_config_body),
972                     lustre_swab_mgs_config_body, NULL);
973 EXPORT_SYMBOL(RMF_MGS_CONFIG_BODY);
974
975 struct req_msg_field RMF_MGS_CONFIG_RES =
976         DEFINE_MSGF("mgs_config_read reply ", 0,
977                     sizeof(struct mgs_config_res),
978                     lustre_swab_mgs_config_res, NULL);
979 EXPORT_SYMBOL(RMF_MGS_CONFIG_RES);
980
981 struct req_msg_field RMF_U32 =
982         DEFINE_MSGF("generic u32", RMF_F_STRUCT_ARRAY,
983                     sizeof(__u32), lustre_swab_generic_32s, NULL);
984 EXPORT_SYMBOL(RMF_U32);
985
986 struct req_msg_field RMF_SETINFO_VAL =
987         DEFINE_MSGF("setinfo_val", 0, -1, NULL, NULL);
988 EXPORT_SYMBOL(RMF_SETINFO_VAL);
989
990 struct req_msg_field RMF_GETINFO_KEY =
991         DEFINE_MSGF("getinfo_key", 0, -1, NULL, NULL);
992 EXPORT_SYMBOL(RMF_GETINFO_KEY);
993
994 struct req_msg_field RMF_GETINFO_VALLEN =
995         DEFINE_MSGF("getinfo_vallen", 0,
996                     sizeof(__u32), lustre_swab_generic_32s, NULL);
997 EXPORT_SYMBOL(RMF_GETINFO_VALLEN);
998
999 struct req_msg_field RMF_GETINFO_VAL =
1000         DEFINE_MSGF("getinfo_val", 0, -1, NULL, NULL);
1001 EXPORT_SYMBOL(RMF_GETINFO_VAL);
1002
1003 struct req_msg_field RMF_SEQ_OPC =
1004         DEFINE_MSGF("seq_query_opc", 0,
1005                     sizeof(__u32), lustre_swab_generic_32s, NULL);
1006 EXPORT_SYMBOL(RMF_SEQ_OPC);
1007
1008 struct req_msg_field RMF_SEQ_RANGE =
1009         DEFINE_MSGF("seq_query_range", 0,
1010                     sizeof(struct lu_seq_range),
1011                     lustre_swab_lu_seq_range, NULL);
1012 EXPORT_SYMBOL(RMF_SEQ_RANGE);
1013
1014 struct req_msg_field RMF_FLD_OPC =
1015         DEFINE_MSGF("fld_query_opc", 0,
1016                     sizeof(__u32), lustre_swab_generic_32s, NULL);
1017 EXPORT_SYMBOL(RMF_FLD_OPC);
1018
1019 struct req_msg_field RMF_FLD_MDFLD =
1020         DEFINE_MSGF("fld_query_mdfld", 0,
1021                     sizeof(struct lu_seq_range),
1022                     lustre_swab_lu_seq_range, NULL);
1023 EXPORT_SYMBOL(RMF_FLD_MDFLD);
1024
1025 struct req_msg_field RMF_MDT_BODY =
1026         DEFINE_MSGF("mdt_body", 0,
1027                     sizeof(struct mdt_body), lustre_swab_mdt_body, NULL);
1028 EXPORT_SYMBOL(RMF_MDT_BODY);
1029
1030 struct req_msg_field RMF_OBD_QUOTACTL =
1031         DEFINE_MSGFL("obd_quotactl",
1032                      0,
1033                      sizeof(struct obd_quotactl),
1034                      lustre_swab_obd_quotactl, NULL);
1035 EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
1036
1037 struct req_msg_field RMF_QUOTA_BODY =
1038         DEFINE_MSGF("quota_body", 0,
1039                     sizeof(struct quota_body), lustre_swab_quota_body, NULL);
1040 EXPORT_SYMBOL(RMF_QUOTA_BODY);
1041
1042 struct req_msg_field RMF_MDT_EPOCH =
1043         DEFINE_MSGF("mdt_ioepoch", 0,
1044                     sizeof(struct mdt_ioepoch), lustre_swab_mdt_ioepoch, NULL);
1045 EXPORT_SYMBOL(RMF_MDT_EPOCH);
1046
1047 struct req_msg_field RMF_PTLRPC_BODY =
1048         DEFINE_MSGF("ptlrpc_body", 0,
1049                     sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
1050 EXPORT_SYMBOL(RMF_PTLRPC_BODY);
1051
1052 struct req_msg_field RMF_CLOSE_DATA =
1053         DEFINE_MSGF("data_version", 0,
1054                     sizeof(struct close_data), lustre_swab_close_data, NULL);
1055 EXPORT_SYMBOL(RMF_CLOSE_DATA);
1056
1057 struct req_msg_field RMF_OBD_STATFS =
1058         DEFINE_MSGF("obd_statfs", 0,
1059                     sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
1060 EXPORT_SYMBOL(RMF_OBD_STATFS);
1061
1062 struct req_msg_field RMF_SETINFO_KEY =
1063         DEFINE_MSGF("setinfo_key", 0, -1, NULL, NULL);
1064 EXPORT_SYMBOL(RMF_SETINFO_KEY);
1065
1066 struct req_msg_field RMF_NAME =
1067         DEFINE_MSGF("name", RMF_F_STRING, -1, NULL, NULL);
1068 EXPORT_SYMBOL(RMF_NAME);
1069
1070 struct req_msg_field RMF_FID_ARRAY =
1071         DEFINE_MSGF("fid_array", 0, -1, NULL, NULL);
1072 EXPORT_SYMBOL(RMF_FID_ARRAY);
1073
1074 struct req_msg_field RMF_SYMTGT =
1075         DEFINE_MSGF("symtgt", 0, -1, NULL, NULL);
1076 EXPORT_SYMBOL(RMF_SYMTGT);
1077
1078 struct req_msg_field RMF_TGTUUID =
1079         DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
1080         NULL);
1081 EXPORT_SYMBOL(RMF_TGTUUID);
1082
1083 struct req_msg_field RMF_CLUUID =
1084         DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
1085         NULL);
1086 EXPORT_SYMBOL(RMF_CLUUID);
1087
1088 struct req_msg_field RMF_STRING =
1089         DEFINE_MSGF("string", RMF_F_STRING, -1, NULL, NULL);
1090 EXPORT_SYMBOL(RMF_STRING);
1091
1092 struct req_msg_field RMF_FILE_SECCTX_NAME =
1093         DEFINE_MSGF("file_secctx_name", RMF_F_STRING, -1, NULL, NULL);
1094 EXPORT_SYMBOL(RMF_FILE_SECCTX_NAME);
1095
1096 struct req_msg_field RMF_FILE_SECCTX =
1097         DEFINE_MSGF("file_secctx", RMF_F_NO_SIZE_CHECK, -1, NULL, NULL);
1098 EXPORT_SYMBOL(RMF_FILE_SECCTX);
1099
1100 struct req_msg_field RMF_FILE_ENCCTX =
1101         DEFINE_MSGF("file_encctx", RMF_F_NO_SIZE_CHECK, -1, NULL, NULL);
1102 EXPORT_SYMBOL(RMF_FILE_ENCCTX);
1103
1104 struct req_msg_field RMF_LLOGD_BODY =
1105         DEFINE_MSGF("llogd_body", 0,
1106                     sizeof(struct llogd_body), lustre_swab_llogd_body, NULL);
1107 EXPORT_SYMBOL(RMF_LLOGD_BODY);
1108
1109 struct req_msg_field RMF_LLOG_LOG_HDR =
1110         DEFINE_MSGF("llog_log_hdr", 0,
1111                     sizeof(struct llog_log_hdr), lustre_swab_llog_hdr, NULL);
1112 EXPORT_SYMBOL(RMF_LLOG_LOG_HDR);
1113
1114 struct req_msg_field RMF_LLOGD_CONN_BODY =
1115         DEFINE_MSGF("llogd_conn_body", 0,
1116                     sizeof(struct llogd_conn_body),
1117                     lustre_swab_llogd_conn_body, NULL);
1118 EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY);
1119
1120 /*
1121  * connection handle received in MDS_CONNECT request.
1122  *
1123  * No swabbing needed because struct lustre_handle contains only a 64-bit cookie
1124  * that the client does not interpret at all.
1125  */
1126 struct req_msg_field RMF_CONN =
1127         DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL, NULL);
1128 EXPORT_SYMBOL(RMF_CONN);
1129
1130 struct req_msg_field RMF_CONNECT_DATA =
1131         DEFINE_MSGF("cdata",
1132                     RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */,
1133                     sizeof(struct obd_connect_data),
1134                     lustre_swab_connect, NULL);
1135 EXPORT_SYMBOL(RMF_CONNECT_DATA);
1136
1137 struct req_msg_field RMF_DLM_REQ =
1138         DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */,
1139                     sizeof(struct ldlm_request),
1140                     lustre_swab_ldlm_request, NULL);
1141 EXPORT_SYMBOL(RMF_DLM_REQ);
1142
1143 struct req_msg_field RMF_DLM_REP =
1144         DEFINE_MSGF("dlm_rep", 0,
1145                     sizeof(struct ldlm_reply), lustre_swab_ldlm_reply, NULL);
1146 EXPORT_SYMBOL(RMF_DLM_REP);
1147
1148 struct req_msg_field RMF_LDLM_INTENT =
1149         DEFINE_MSGF("ldlm_intent", 0,
1150                     sizeof(struct ldlm_intent), lustre_swab_ldlm_intent, NULL);
1151 EXPORT_SYMBOL(RMF_LDLM_INTENT);
1152
1153 struct req_msg_field RMF_DLM_LVB =
1154         DEFINE_MSGF("dlm_lvb", 0, -1, NULL, NULL);
1155 EXPORT_SYMBOL(RMF_DLM_LVB);
1156
1157 struct req_msg_field RMF_DLM_GL_DESC =
1158         DEFINE_MSGF("dlm_gl_desc", 0, sizeof(union ldlm_gl_desc), NULL, NULL);
1159 EXPORT_SYMBOL(RMF_DLM_GL_DESC);
1160
1161 struct req_msg_field RMF_MDT_MD =
1162         DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL, NULL);
1163 EXPORT_SYMBOL(RMF_MDT_MD);
1164
1165 struct req_msg_field RMF_DEFAULT_MDT_MD =
1166         DEFINE_MSGF("default_mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL,
1167                     NULL);
1168 EXPORT_SYMBOL(RMF_DEFAULT_MDT_MD);
1169
1170 struct req_msg_field RMF_REC_REINT =
1171         DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint),
1172                     lustre_swab_mdt_rec_reint, NULL);
1173 EXPORT_SYMBOL(RMF_REC_REINT);
1174
1175 /* FIXME: this length should be defined as a macro */
1176 struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1,
1177                                                     NULL, NULL);
1178 EXPORT_SYMBOL(RMF_EADATA);
1179
1180 struct req_msg_field RMF_EAVALS = DEFINE_MSGF("eavals", 0, -1, NULL, NULL);
1181 EXPORT_SYMBOL(RMF_EAVALS);
1182
1183 struct req_msg_field RMF_ACL = DEFINE_MSGF("acl", 0, -1, NULL, NULL);
1184 EXPORT_SYMBOL(RMF_ACL);
1185
1186 /* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
1187 struct req_msg_field RMF_LOGCOOKIES =
1188         DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */,
1189                     sizeof(struct llog_cookie), NULL, NULL);
1190 EXPORT_SYMBOL(RMF_LOGCOOKIES);
1191
1192 struct req_msg_field RMF_CAPA1 =
1193         DEFINE_MSGF("capa", 0, 0, NULL, NULL);
1194 EXPORT_SYMBOL(RMF_CAPA1);
1195
1196 struct req_msg_field RMF_CAPA2 =
1197         DEFINE_MSGF("capa", 0, 0, NULL, NULL);
1198 EXPORT_SYMBOL(RMF_CAPA2);
1199
1200 struct req_msg_field RMF_LAYOUT_INTENT =
1201         DEFINE_MSGF("layout_intent", 0,
1202                     sizeof(struct layout_intent), lustre_swab_layout_intent,
1203                     NULL);
1204 EXPORT_SYMBOL(RMF_LAYOUT_INTENT);
1205
1206 struct req_msg_field RMF_SELINUX_POL =
1207         DEFINE_MSGF("selinux_pol", RMF_F_STRING, -1, NULL, NULL);
1208 EXPORT_SYMBOL(RMF_SELINUX_POL);
1209
1210 /*
1211  * OST request field.
1212  */
1213 struct req_msg_field RMF_OST_BODY =
1214         DEFINE_MSGF("ost_body", 0,
1215                     sizeof(struct ost_body), lustre_swab_ost_body,
1216                     dump_ost_body);
1217 EXPORT_SYMBOL(RMF_OST_BODY);
1218
1219 struct req_msg_field RMF_OBD_IOOBJ =
1220         DEFINE_MSGF("obd_ioobj", RMF_F_STRUCT_ARRAY,
1221                     sizeof(struct obd_ioobj), lustre_swab_obd_ioobj, dump_ioo);
1222 EXPORT_SYMBOL(RMF_OBD_IOOBJ);
1223
1224 struct req_msg_field RMF_NIOBUF_REMOTE =
1225         DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY,
1226                     sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
1227                     dump_rniobuf);
1228 EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
1229
1230 struct req_msg_field RMF_NIOBUF_INLINE =
1231         DEFINE_MSGF("niobuf_inline", RMF_F_NO_SIZE_CHECK,
1232                     sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
1233                     dump_rniobuf);
1234 EXPORT_SYMBOL(RMF_NIOBUF_INLINE);
1235
1236 struct req_msg_field RMF_RCS =
1237         DEFINE_MSGF("niobuf_rcs", RMF_F_STRUCT_ARRAY, sizeof(__u32),
1238                     lustre_swab_generic_32s, dump_rcs);
1239 EXPORT_SYMBOL(RMF_RCS);
1240
1241 struct req_msg_field RMF_EAVALS_LENS =
1242         DEFINE_MSGF("eavals_lens", RMF_F_STRUCT_ARRAY, sizeof(__u32),
1243                 lustre_swab_generic_32s, NULL);
1244 EXPORT_SYMBOL(RMF_EAVALS_LENS);
1245
1246 struct req_msg_field RMF_OBD_ID =
1247         DEFINE_MSGF("obd_id", 0,
1248                     sizeof(__u64), lustre_swab_ost_last_id, NULL);
1249 EXPORT_SYMBOL(RMF_OBD_ID);
1250
1251 struct req_msg_field RMF_FID =
1252         DEFINE_MSGF("fid", 0,
1253                     sizeof(struct lu_fid), lustre_swab_lu_fid, NULL);
1254 EXPORT_SYMBOL(RMF_FID);
1255
1256 struct req_msg_field RMF_OST_ID =
1257         DEFINE_MSGF("ost_id", 0,
1258                     sizeof(struct ost_id), lustre_swab_ost_id, NULL);
1259 EXPORT_SYMBOL(RMF_OST_ID);
1260
1261 struct req_msg_field RMF_FIEMAP_KEY =
1262         DEFINE_MSGF("fiemap_key", 0, sizeof(struct ll_fiemap_info_key),
1263                     lustre_swab_fiemap_info_key, NULL);
1264 EXPORT_SYMBOL(RMF_FIEMAP_KEY);
1265
1266 struct req_msg_field RMF_FIEMAP_VAL =
1267         DEFINE_MSGFL("fiemap", 0, -1, lustre_swab_fiemap, NULL);
1268 EXPORT_SYMBOL(RMF_FIEMAP_VAL);
1269
1270 struct req_msg_field RMF_IDX_INFO =
1271         DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info),
1272                     lustre_swab_idx_info, NULL);
1273 EXPORT_SYMBOL(RMF_IDX_INFO);
1274 struct req_msg_field RMF_SHORT_IO =
1275         DEFINE_MSGF("short_io", 0, -1, NULL, NULL);
1276 EXPORT_SYMBOL(RMF_SHORT_IO);
1277 struct req_msg_field RMF_HSM_USER_STATE =
1278         DEFINE_MSGF("hsm_user_state", 0, sizeof(struct hsm_user_state),
1279                     lustre_swab_hsm_user_state, NULL);
1280 EXPORT_SYMBOL(RMF_HSM_USER_STATE);
1281
1282 struct req_msg_field RMF_HSM_STATE_SET =
1283         DEFINE_MSGF("hsm_state_set", 0, sizeof(struct hsm_state_set),
1284                     lustre_swab_hsm_state_set, NULL);
1285 EXPORT_SYMBOL(RMF_HSM_STATE_SET);
1286
1287 struct req_msg_field RMF_MDS_HSM_PROGRESS =
1288         DEFINE_MSGF("hsm_progress", 0, sizeof(struct hsm_progress_kernel),
1289                     lustre_swab_hsm_progress_kernel, NULL);
1290 EXPORT_SYMBOL(RMF_MDS_HSM_PROGRESS);
1291
1292 struct req_msg_field RMF_MDS_HSM_CURRENT_ACTION =
1293         DEFINE_MSGF("hsm_current_action", 0, sizeof(struct hsm_current_action),
1294                     lustre_swab_hsm_current_action, NULL);
1295 EXPORT_SYMBOL(RMF_MDS_HSM_CURRENT_ACTION);
1296
1297 struct req_msg_field RMF_MDS_HSM_USER_ITEM =
1298         DEFINE_MSGF("hsm_user_item", RMF_F_STRUCT_ARRAY,
1299                     sizeof(struct hsm_user_item), lustre_swab_hsm_user_item,
1300                     NULL);
1301 EXPORT_SYMBOL(RMF_MDS_HSM_USER_ITEM);
1302
1303 struct req_msg_field RMF_MDS_HSM_ARCHIVE =
1304         DEFINE_MSGF("hsm_archive", RMF_F_STRUCT_ARRAY,
1305                     sizeof(__u32), lustre_swab_generic_32s, NULL);
1306 EXPORT_SYMBOL(RMF_MDS_HSM_ARCHIVE);
1307
1308 struct req_msg_field RMF_MDS_HSM_REQUEST =
1309         DEFINE_MSGF("hsm_request", 0, sizeof(struct hsm_request),
1310                     lustre_swab_hsm_request, NULL);
1311 EXPORT_SYMBOL(RMF_MDS_HSM_REQUEST);
1312
1313 struct req_msg_field RMF_SWAP_LAYOUTS =
1314         DEFINE_MSGF("swap_layouts", 0, sizeof(struct  mdc_swap_layouts),
1315                     lustre_swab_swap_layouts, NULL);
1316 EXPORT_SYMBOL(RMF_SWAP_LAYOUTS);
1317
1318 struct req_msg_field RMF_LFSCK_REQUEST =
1319         DEFINE_MSGF("lfsck_request", 0, sizeof(struct lfsck_request),
1320                     lustre_swab_lfsck_request, NULL);
1321 EXPORT_SYMBOL(RMF_LFSCK_REQUEST);
1322
1323 struct req_msg_field RMF_LFSCK_REPLY =
1324         DEFINE_MSGF("lfsck_reply", 0, sizeof(struct lfsck_reply),
1325                     lustre_swab_lfsck_reply, NULL);
1326 EXPORT_SYMBOL(RMF_LFSCK_REPLY);
1327
1328 struct req_msg_field RMF_OST_LADVISE_HDR =
1329         DEFINE_MSGF("ladvise_request", 0,
1330                     sizeof(struct ladvise_hdr),
1331                     lustre_swab_ladvise_hdr, NULL);
1332 EXPORT_SYMBOL(RMF_OST_LADVISE_HDR);
1333
1334 struct req_msg_field RMF_OST_LADVISE =
1335         DEFINE_MSGF("ladvise_request", RMF_F_STRUCT_ARRAY,
1336                     sizeof(struct lu_ladvise),
1337                     lustre_swab_ladvise, NULL);
1338 EXPORT_SYMBOL(RMF_OST_LADVISE);
1339
1340 struct req_msg_field RMF_BUT_REPLY =
1341                         DEFINE_MSGF("batch_update_reply", 0, -1,
1342                                     lustre_swab_batch_update_reply, NULL);
1343 EXPORT_SYMBOL(RMF_BUT_REPLY);
1344
1345 struct req_msg_field RMF_BUT_HEADER = DEFINE_MSGF("but_update_header", 0,
1346                                 -1, lustre_swab_but_update_header, NULL);
1347 EXPORT_SYMBOL(RMF_BUT_HEADER);
1348
1349 struct req_msg_field RMF_BUT_BUF = DEFINE_MSGF("but_update_buf",
1350                         RMF_F_STRUCT_ARRAY, sizeof(struct but_update_buffer),
1351                         lustre_swab_but_update_buffer, NULL);
1352 EXPORT_SYMBOL(RMF_BUT_BUF);
1353
1354 /*
1355  * Request formats.
1356  */
1357
1358 struct req_format {
1359         const char *rf_name;
1360         size_t      rf_idx;
1361         struct {
1362                 size_t                       nr;
1363                 const struct req_msg_field **d;
1364         } rf_fields[RCL_NR];
1365 };
1366
1367 #define DEFINE_REQ_FMT(name, client, client_nr, server, server_nr) {    \
1368         .rf_name   = name,                                              \
1369         .rf_fields = {                                                  \
1370                 [RCL_CLIENT] = {                                        \
1371                         .nr = client_nr,                                \
1372                         .d  = client                                    \
1373                 },                                                      \
1374                 [RCL_SERVER] = {                                        \
1375                         .nr = server_nr,                                \
1376                         .d  = server                                    \
1377                 }                                                       \
1378         }                                                               \
1379 }
1380
1381 #define DEFINE_REQ_FMT0(name, client, server)                                  \
1382 DEFINE_REQ_FMT(name, client, ARRAY_SIZE(client), server, ARRAY_SIZE(server))
1383
1384 struct req_format RQF_OBD_PING =
1385         DEFINE_REQ_FMT0("OBD_PING", empty, empty);
1386 EXPORT_SYMBOL(RQF_OBD_PING);
1387
1388 struct req_format RQF_OBD_SET_INFO =
1389         DEFINE_REQ_FMT0("OBD_SET_INFO", obd_set_info_client, empty);
1390 EXPORT_SYMBOL(RQF_OBD_SET_INFO);
1391
1392 struct req_format RQF_MDT_SET_INFO =
1393         DEFINE_REQ_FMT0("MDT_SET_INFO", mdt_set_info_client, empty);
1394 EXPORT_SYMBOL(RQF_MDT_SET_INFO);
1395
1396 /* Read index file through the network */
1397 struct req_format RQF_OBD_IDX_READ =
1398         DEFINE_REQ_FMT0("OBD_IDX_READ",
1399                         obd_idx_read_client, obd_idx_read_server);
1400 EXPORT_SYMBOL(RQF_OBD_IDX_READ);
1401
1402 struct req_format RQF_SEC_CTX =
1403         DEFINE_REQ_FMT0("SEC_CTX", empty, empty);
1404 EXPORT_SYMBOL(RQF_SEC_CTX);
1405
1406 struct req_format RQF_MGS_TARGET_REG =
1407         DEFINE_REQ_FMT0("MGS_TARGET_REG", mgs_target_info_only,
1408                          mgs_target_info_only);
1409 EXPORT_SYMBOL(RQF_MGS_TARGET_REG);
1410
1411 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 18, 53, 0)
1412 struct req_format RQF_MGS_SET_INFO =
1413         DEFINE_REQ_FMT0("MGS_SET_INFO", mgs_set_info,
1414                         mgs_set_info);
1415 EXPORT_SYMBOL(RQF_MGS_SET_INFO);
1416 #endif
1417
1418 struct req_format RQF_MGS_CONFIG_READ =
1419         DEFINE_REQ_FMT0("MGS_CONFIG_READ", mgs_config_read_client,
1420                          mgs_config_read_server);
1421 EXPORT_SYMBOL(RQF_MGS_CONFIG_READ);
1422
1423 struct req_format RQF_SEQ_QUERY =
1424         DEFINE_REQ_FMT0("SEQ_QUERY", seq_query_client, seq_query_server);
1425 EXPORT_SYMBOL(RQF_SEQ_QUERY);
1426
1427 struct req_format RQF_FLD_QUERY =
1428         DEFINE_REQ_FMT0("FLD_QUERY", fld_query_client, fld_query_server);
1429 EXPORT_SYMBOL(RQF_FLD_QUERY);
1430
1431 /* The 'fld_read_server' uses 'RMF_GENERIC_DATA' to hold the 'FLD_QUERY'
1432  * RPC reply that is composed of 'struct lu_seq_range_array'. But there
1433  * is not registered swabber function for 'RMF_GENERIC_DATA'. So the RPC
1434  * peers need to handle the RPC reply with fixed little-endian format.
1435  *
1436  * In theory, we can define new structure with some swabber registered to
1437  * handle the 'FLD_QUERY' RPC reply result automatically. But from the
1438  * implementation view, it is not easy to be done within current "struct
1439  * req_msg_field" framework. Because the sequence range array in the RPC
1440  * reply is not fixed length, instead, its length depends on 'lu_seq_range'
1441  * count, that is unknown when prepare the RPC buffer. Generally, for such
1442  * flexible length RPC usage, there will be a field in the RPC layout to
1443  * indicate the data length. But for the 'FLD_READ' RPC, we have no way to
1444  * do that unless we add new length filed that will broken the on-wire RPC
1445  * protocol and cause interoperability trouble with old peer. */
1446 struct req_format RQF_FLD_READ =
1447         DEFINE_REQ_FMT0("FLD_READ", fld_read_client, fld_read_server);
1448 EXPORT_SYMBOL(RQF_FLD_READ);
1449
1450 struct req_format RQF_MDS_QUOTACTL =
1451         DEFINE_REQ_FMT0("MDS_QUOTACTL", quotactl_only, quotactl_only);
1452 EXPORT_SYMBOL(RQF_MDS_QUOTACTL);
1453
1454 struct req_format RQF_OST_QUOTACTL =
1455         DEFINE_REQ_FMT0("OST_QUOTACTL", quotactl_only, quotactl_only);
1456 EXPORT_SYMBOL(RQF_OST_QUOTACTL);
1457
1458 struct req_format RQF_QUOTA_DQACQ =
1459         DEFINE_REQ_FMT0("QUOTA_DQACQ", quota_body_only, quota_body_only);
1460 EXPORT_SYMBOL(RQF_QUOTA_DQACQ);
1461
1462 struct req_format RQF_LDLM_INTENT_QUOTA =
1463         DEFINE_REQ_FMT0("LDLM_INTENT_QUOTA",
1464                         ldlm_intent_quota_client,
1465                         ldlm_intent_quota_server);
1466 EXPORT_SYMBOL(RQF_LDLM_INTENT_QUOTA);
1467
1468 struct req_format RQF_MDS_GET_ROOT =
1469         DEFINE_REQ_FMT0("MDS_GET_ROOT", mds_get_root_client, mdt_body_capa);
1470 EXPORT_SYMBOL(RQF_MDS_GET_ROOT);
1471
1472 struct req_format RQF_MDS_STATFS =
1473         DEFINE_REQ_FMT0("MDS_STATFS", empty, obd_statfs_server);
1474 EXPORT_SYMBOL(RQF_MDS_STATFS);
1475
1476 struct req_format RQF_MDS_STATFS_NEW =
1477         DEFINE_REQ_FMT0("MDS_STATFS_NEW", mdt_body_only, obd_statfs_server);
1478 EXPORT_SYMBOL(RQF_MDS_STATFS_NEW);
1479
1480 struct req_format RQF_MDS_SYNC =
1481         DEFINE_REQ_FMT0("MDS_SYNC", mdt_body_capa, mdt_body_only);
1482 EXPORT_SYMBOL(RQF_MDS_SYNC);
1483
1484 struct req_format RQF_MDS_GETATTR =
1485         DEFINE_REQ_FMT0("MDS_GETATTR", mdt_body_capa, mds_getattr_server);
1486 EXPORT_SYMBOL(RQF_MDS_GETATTR);
1487
1488 struct req_format RQF_MDS_GETXATTR =
1489         DEFINE_REQ_FMT0("MDS_GETXATTR",
1490                         mds_getxattr_client, mds_getxattr_server);
1491 EXPORT_SYMBOL(RQF_MDS_GETXATTR);
1492
1493 struct req_format RQF_MDS_GETATTR_NAME =
1494         DEFINE_REQ_FMT0("MDS_GETATTR_NAME",
1495                         mds_getattr_name_client, mds_getattr_server);
1496 EXPORT_SYMBOL(RQF_MDS_GETATTR_NAME);
1497
1498 struct req_format RQF_MDS_REINT =
1499         DEFINE_REQ_FMT0("MDS_REINT", mds_reint_client, mdt_body_only);
1500 EXPORT_SYMBOL(RQF_MDS_REINT);
1501
1502 struct req_format RQF_MDS_REINT_CREATE =
1503         DEFINE_REQ_FMT0("MDS_REINT_CREATE",
1504                         mds_reint_create_client, mdt_body_capa);
1505 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
1506
1507 struct req_format RQF_MDS_REINT_CREATE_ACL =
1508         DEFINE_REQ_FMT0("MDS_REINT_CREATE_ACL",
1509                         mds_reint_create_acl_client, mdt_body_capa);
1510 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_ACL);
1511
1512 struct req_format RQF_MDS_REINT_CREATE_SLAVE =
1513         DEFINE_REQ_FMT0("MDS_REINT_CREATE_EA",
1514                         mds_reint_create_slave_client, mdt_body_capa);
1515 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SLAVE);
1516
1517 struct req_format RQF_MDS_REINT_CREATE_SYM =
1518         DEFINE_REQ_FMT0("MDS_REINT_CREATE_SYM",
1519                         mds_reint_create_sym_client, mdt_body_capa);
1520 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SYM);
1521
1522 struct req_format RQF_MDS_REINT_OPEN =
1523         DEFINE_REQ_FMT0("MDS_REINT_OPEN",
1524                         mds_reint_open_client, mds_reint_open_server);
1525 EXPORT_SYMBOL(RQF_MDS_REINT_OPEN);
1526
1527 struct req_format RQF_MDS_REINT_UNLINK =
1528         DEFINE_REQ_FMT0("MDS_REINT_UNLINK", mds_reint_unlink_client,
1529                         mds_last_unlink_server);
1530 EXPORT_SYMBOL(RQF_MDS_REINT_UNLINK);
1531
1532 struct req_format RQF_MDS_REINT_LINK =
1533         DEFINE_REQ_FMT0("MDS_REINT_LINK",
1534                         mds_reint_link_client, mdt_body_only);
1535 EXPORT_SYMBOL(RQF_MDS_REINT_LINK);
1536
1537 struct req_format RQF_MDS_REINT_RENAME =
1538         DEFINE_REQ_FMT0("MDS_REINT_RENAME", mds_reint_rename_client,
1539                         mds_last_unlink_server);
1540 EXPORT_SYMBOL(RQF_MDS_REINT_RENAME);
1541
1542 struct req_format RQF_MDS_REINT_MIGRATE =
1543         DEFINE_REQ_FMT0("MDS_REINT_MIGRATE", mds_reint_migrate_client,
1544                         mds_last_unlink_server);
1545 EXPORT_SYMBOL(RQF_MDS_REINT_MIGRATE);
1546
1547 struct req_format RQF_MDS_REINT_SETATTR =
1548         DEFINE_REQ_FMT0("MDS_REINT_SETATTR",
1549                         mds_reint_setattr_client, mds_setattr_server);
1550 EXPORT_SYMBOL(RQF_MDS_REINT_SETATTR);
1551
1552 struct req_format RQF_MDS_REINT_SETXATTR =
1553         DEFINE_REQ_FMT0("MDS_REINT_SETXATTR",
1554                         mds_reint_setxattr_client, mdt_body_only);
1555 EXPORT_SYMBOL(RQF_MDS_REINT_SETXATTR);
1556
1557 struct req_format RQF_MDS_REINT_RESYNC =
1558         DEFINE_REQ_FMT0("MDS_REINT_RESYNC", mds_reint_resync, mdt_body_only);
1559 EXPORT_SYMBOL(RQF_MDS_REINT_RESYNC);
1560
1561 struct req_format RQF_MDS_CONNECT =
1562         DEFINE_REQ_FMT0("MDS_CONNECT",
1563                         obd_connect_client, obd_connect_server);
1564 EXPORT_SYMBOL(RQF_MDS_CONNECT);
1565
1566 struct req_format RQF_MDS_DISCONNECT =
1567         DEFINE_REQ_FMT0("MDS_DISCONNECT", empty, empty);
1568 EXPORT_SYMBOL(RQF_MDS_DISCONNECT);
1569
1570 struct req_format RQF_MDS_GET_INFO =
1571         DEFINE_REQ_FMT0("MDS_GET_INFO", mds_getinfo_client,
1572                         mds_getinfo_server);
1573 EXPORT_SYMBOL(RQF_MDS_GET_INFO);
1574
1575 struct req_format RQF_MDS_BATCH =
1576         DEFINE_REQ_FMT0("MDS_BATCH", mds_batch_client,
1577                         mds_batch_server);
1578 EXPORT_SYMBOL(RQF_MDS_BATCH);
1579
1580 struct req_format RQF_LDLM_ENQUEUE =
1581         DEFINE_REQ_FMT0("LDLM_ENQUEUE",
1582                         ldlm_enqueue_client, ldlm_enqueue_lvb_server);
1583 EXPORT_SYMBOL(RQF_LDLM_ENQUEUE);
1584
1585 struct req_format RQF_LDLM_ENQUEUE_LVB =
1586         DEFINE_REQ_FMT0("LDLM_ENQUEUE_LVB",
1587                         ldlm_enqueue_client, ldlm_enqueue_lvb_server);
1588 EXPORT_SYMBOL(RQF_LDLM_ENQUEUE_LVB);
1589
1590 struct req_format RQF_LDLM_CONVERT =
1591         DEFINE_REQ_FMT0("LDLM_CONVERT",
1592                         ldlm_enqueue_client, ldlm_enqueue_server);
1593 EXPORT_SYMBOL(RQF_LDLM_CONVERT);
1594
1595 struct req_format RQF_LDLM_CANCEL =
1596         DEFINE_REQ_FMT0("LDLM_CANCEL", ldlm_enqueue_client, empty);
1597 EXPORT_SYMBOL(RQF_LDLM_CANCEL);
1598
1599 struct req_format RQF_LDLM_CALLBACK =
1600         DEFINE_REQ_FMT0("LDLM_CALLBACK", ldlm_enqueue_client, empty);
1601 EXPORT_SYMBOL(RQF_LDLM_CALLBACK);
1602
1603 struct req_format RQF_LDLM_CP_CALLBACK =
1604         DEFINE_REQ_FMT0("LDLM_CP_CALLBACK", ldlm_cp_callback_client, empty);
1605 EXPORT_SYMBOL(RQF_LDLM_CP_CALLBACK);
1606
1607 struct req_format RQF_LDLM_BL_CALLBACK =
1608         DEFINE_REQ_FMT0("LDLM_BL_CALLBACK", ldlm_enqueue_client, empty);
1609 EXPORT_SYMBOL(RQF_LDLM_BL_CALLBACK);
1610
1611 struct req_format RQF_LDLM_GL_CALLBACK =
1612         DEFINE_REQ_FMT0("LDLM_GL_CALLBACK", ldlm_enqueue_client,
1613                         ldlm_gl_callback_server);
1614 EXPORT_SYMBOL(RQF_LDLM_GL_CALLBACK);
1615
1616 struct req_format RQF_LDLM_GL_CALLBACK_DESC =
1617         DEFINE_REQ_FMT0("LDLM_GL_CALLBACK", ldlm_gl_callback_desc_client,
1618                         ldlm_gl_callback_server);
1619 EXPORT_SYMBOL(RQF_LDLM_GL_CALLBACK_DESC);
1620
1621 struct req_format RQF_LDLM_INTENT_BASIC =
1622         DEFINE_REQ_FMT0("LDLM_INTENT_BASIC",
1623                         ldlm_intent_basic_client, ldlm_enqueue_lvb_server);
1624 EXPORT_SYMBOL(RQF_LDLM_INTENT_BASIC);
1625
1626 struct req_format RQF_LDLM_INTENT =
1627         DEFINE_REQ_FMT0("LDLM_INTENT",
1628                         ldlm_intent_client, ldlm_intent_server);
1629 EXPORT_SYMBOL(RQF_LDLM_INTENT);
1630
1631 struct req_format RQF_LDLM_INTENT_LAYOUT =
1632         DEFINE_REQ_FMT0("LDLM_INTENT_LAYOUT",
1633                         ldlm_intent_layout_client, ldlm_enqueue_lvb_server);
1634 EXPORT_SYMBOL(RQF_LDLM_INTENT_LAYOUT);
1635
1636 struct req_format RQF_LDLM_INTENT_GETATTR =
1637         DEFINE_REQ_FMT0("LDLM_INTENT_GETATTR",
1638                         ldlm_intent_getattr_client, ldlm_intent_getattr_server);
1639 EXPORT_SYMBOL(RQF_LDLM_INTENT_GETATTR);
1640
1641 struct req_format RQF_LDLM_INTENT_OPEN =
1642         DEFINE_REQ_FMT0("LDLM_INTENT_OPEN",
1643                         ldlm_intent_open_client, ldlm_intent_open_server);
1644 EXPORT_SYMBOL(RQF_LDLM_INTENT_OPEN);
1645
1646 struct req_format RQF_LDLM_INTENT_CREATE =
1647         DEFINE_REQ_FMT0("LDLM_INTENT_CREATE",
1648                         ldlm_intent_create_client, ldlm_intent_getattr_server);
1649 EXPORT_SYMBOL(RQF_LDLM_INTENT_CREATE);
1650
1651 struct req_format RQF_LDLM_INTENT_GETXATTR =
1652         DEFINE_REQ_FMT0("LDLM_INTENT_GETXATTR",
1653                         ldlm_intent_getxattr_client,
1654                         ldlm_intent_getxattr_server);
1655 EXPORT_SYMBOL(RQF_LDLM_INTENT_GETXATTR);
1656
1657 struct req_format RQF_MDS_CLOSE =
1658         DEFINE_REQ_FMT0("MDS_CLOSE",
1659                         mdt_close_client, mds_last_unlink_server);
1660 EXPORT_SYMBOL(RQF_MDS_CLOSE);
1661
1662 struct req_format RQF_MDS_CLOSE_INTENT =
1663         DEFINE_REQ_FMT0("MDS_CLOSE_INTENT",
1664                         mdt_close_intent_client, mds_last_unlink_server);
1665 EXPORT_SYMBOL(RQF_MDS_CLOSE_INTENT);
1666
1667 struct req_format RQF_MDS_READPAGE =
1668         DEFINE_REQ_FMT0("MDS_READPAGE",
1669                         mdt_body_capa, mdt_body_only);
1670 EXPORT_SYMBOL(RQF_MDS_READPAGE);
1671
1672 struct req_format RQF_MDS_HSM_ACTION =
1673         DEFINE_REQ_FMT0("MDS_HSM_ACTION", mdt_body_capa, mdt_hsm_action_server);
1674 EXPORT_SYMBOL(RQF_MDS_HSM_ACTION);
1675
1676 struct req_format RQF_MDS_HSM_PROGRESS =
1677         DEFINE_REQ_FMT0("MDS_HSM_PROGRESS", mdt_hsm_progress, empty);
1678 EXPORT_SYMBOL(RQF_MDS_HSM_PROGRESS);
1679
1680 struct req_format RQF_MDS_HSM_CT_REGISTER =
1681         DEFINE_REQ_FMT0("MDS_HSM_CT_REGISTER", mdt_hsm_ct_register, empty);
1682 EXPORT_SYMBOL(RQF_MDS_HSM_CT_REGISTER);
1683
1684 struct req_format RQF_MDS_HSM_CT_UNREGISTER =
1685         DEFINE_REQ_FMT0("MDS_HSM_CT_UNREGISTER", mdt_hsm_ct_unregister, empty);
1686 EXPORT_SYMBOL(RQF_MDS_HSM_CT_UNREGISTER);
1687
1688 struct req_format RQF_MDS_HSM_STATE_GET =
1689         DEFINE_REQ_FMT0("MDS_HSM_STATE_GET",
1690                         mdt_body_capa, mdt_hsm_state_get_server);
1691 EXPORT_SYMBOL(RQF_MDS_HSM_STATE_GET);
1692
1693 struct req_format RQF_MDS_HSM_STATE_SET =
1694         DEFINE_REQ_FMT0("MDS_HSM_STATE_SET", mdt_hsm_state_set, empty);
1695 EXPORT_SYMBOL(RQF_MDS_HSM_STATE_SET);
1696
1697 struct req_format RQF_MDS_HSM_REQUEST =
1698         DEFINE_REQ_FMT0("MDS_HSM_REQUEST", mdt_hsm_request, empty);
1699 EXPORT_SYMBOL(RQF_MDS_HSM_REQUEST);
1700
1701 struct req_format RQF_MDS_SWAP_LAYOUTS =
1702         DEFINE_REQ_FMT0("MDS_SWAP_LAYOUTS",
1703                         mdt_swap_layouts, empty);
1704 EXPORT_SYMBOL(RQF_MDS_SWAP_LAYOUTS);
1705
1706 struct req_format RQF_MDS_RMFID =
1707         DEFINE_REQ_FMT0("MDS_RMFID", mds_rmfid_client,
1708                         mds_rmfid_server);
1709 EXPORT_SYMBOL(RQF_MDS_RMFID);
1710
1711 struct req_format RQF_LLOG_ORIGIN_HANDLE_CREATE =
1712         DEFINE_REQ_FMT0("LLOG_ORIGIN_HANDLE_CREATE",
1713                         llog_origin_handle_create_client, llogd_body_only);
1714 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_HANDLE_CREATE);
1715
1716 struct req_format RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK =
1717         DEFINE_REQ_FMT0("LLOG_ORIGIN_HANDLE_NEXT_BLOCK",
1718                         llogd_body_only, llog_origin_handle_next_block_server);
1719 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
1720
1721 struct req_format RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK =
1722         DEFINE_REQ_FMT0("LLOG_ORIGIN_HANDLE_PREV_BLOCK",
1723                         llogd_body_only, llog_origin_handle_next_block_server);
1724 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK);
1725
1726 struct req_format RQF_LLOG_ORIGIN_HANDLE_READ_HEADER =
1727         DEFINE_REQ_FMT0("LLOG_ORIGIN_HANDLE_READ_HEADER",
1728                         llogd_body_only, llog_log_hdr_only);
1729 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
1730
1731 struct req_format RQF_CONNECT =
1732         DEFINE_REQ_FMT0("CONNECT", obd_connect_client, obd_connect_server);
1733 EXPORT_SYMBOL(RQF_CONNECT);
1734
1735 struct req_format RQF_OST_CONNECT =
1736         DEFINE_REQ_FMT0("OST_CONNECT",
1737                         obd_connect_client, obd_connect_server);
1738 EXPORT_SYMBOL(RQF_OST_CONNECT);
1739
1740 struct req_format RQF_OST_DISCONNECT =
1741         DEFINE_REQ_FMT0("OST_DISCONNECT", empty, empty);
1742 EXPORT_SYMBOL(RQF_OST_DISCONNECT);
1743
1744 struct req_format RQF_OST_GETATTR =
1745         DEFINE_REQ_FMT0("OST_GETATTR", ost_body_capa, ost_body_only);
1746 EXPORT_SYMBOL(RQF_OST_GETATTR);
1747
1748 struct req_format RQF_OST_SETATTR =
1749         DEFINE_REQ_FMT0("OST_SETATTR", ost_body_capa, ost_body_only);
1750 EXPORT_SYMBOL(RQF_OST_SETATTR);
1751
1752 struct req_format RQF_OST_CREATE =
1753         DEFINE_REQ_FMT0("OST_CREATE", ost_body_only, ost_body_only);
1754 EXPORT_SYMBOL(RQF_OST_CREATE);
1755
1756 struct req_format RQF_OST_PUNCH =
1757         DEFINE_REQ_FMT0("OST_PUNCH", ost_body_capa, ost_body_only);
1758 EXPORT_SYMBOL(RQF_OST_PUNCH);
1759
1760 struct req_format RQF_OST_FALLOCATE =
1761         DEFINE_REQ_FMT0("OST_FALLOCATE", ost_body_capa, ost_body_only);
1762 EXPORT_SYMBOL(RQF_OST_FALLOCATE);
1763
1764 struct req_format RQF_OST_SEEK =
1765         DEFINE_REQ_FMT0("OST_SEEK", ost_body_only, ost_body_only);
1766 EXPORT_SYMBOL(RQF_OST_SEEK);
1767
1768 struct req_format RQF_OST_SYNC =
1769         DEFINE_REQ_FMT0("OST_SYNC", ost_body_capa, ost_body_only);
1770 EXPORT_SYMBOL(RQF_OST_SYNC);
1771
1772 struct req_format RQF_OST_DESTROY =
1773         DEFINE_REQ_FMT0("OST_DESTROY", ost_destroy_client, ost_body_only);
1774 EXPORT_SYMBOL(RQF_OST_DESTROY);
1775
1776 struct req_format RQF_OST_BRW_READ =
1777         DEFINE_REQ_FMT0("OST_BRW_READ", ost_brw_client, ost_brw_read_server);
1778 EXPORT_SYMBOL(RQF_OST_BRW_READ);
1779
1780 struct req_format RQF_OST_BRW_WRITE =
1781         DEFINE_REQ_FMT0("OST_BRW_WRITE", ost_brw_client, ost_brw_write_server);
1782 EXPORT_SYMBOL(RQF_OST_BRW_WRITE);
1783
1784 struct req_format RQF_OST_STATFS =
1785         DEFINE_REQ_FMT0("OST_STATFS", empty, obd_statfs_server);
1786 EXPORT_SYMBOL(RQF_OST_STATFS);
1787
1788 struct req_format RQF_OST_SET_GRANT_INFO =
1789         DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_grant_shrink_client,
1790                          ost_body_only);
1791 EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
1792
1793 struct req_format RQF_OST_GET_INFO =
1794         DEFINE_REQ_FMT0("OST_GET_INFO", ost_get_info_generic_client,
1795                                         ost_get_info_generic_server);
1796 EXPORT_SYMBOL(RQF_OST_GET_INFO);
1797
1798 struct req_format RQF_OST_GET_INFO_LAST_ID =
1799         DEFINE_REQ_FMT0("OST_GET_INFO_LAST_ID", ost_get_info_generic_client,
1800                                                 ost_get_last_id_server);
1801 EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_ID);
1802
1803 struct req_format RQF_OST_GET_INFO_LAST_FID =
1804         DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", ost_get_last_fid_client,
1805                                                  ost_get_last_fid_server);
1806 EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_FID);
1807
1808 struct req_format RQF_OST_SET_INFO_LAST_FID =
1809         DEFINE_REQ_FMT0("OST_SET_INFO_LAST_FID", obd_set_info_client,
1810                                                  empty);
1811 EXPORT_SYMBOL(RQF_OST_SET_INFO_LAST_FID);
1812
1813 struct req_format RQF_OST_GET_INFO_FIEMAP =
1814         DEFINE_REQ_FMT0("OST_GET_INFO_FIEMAP", ost_get_fiemap_client,
1815                                                ost_get_fiemap_server);
1816 EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP);
1817
1818 struct req_format RQF_LFSCK_NOTIFY =
1819         DEFINE_REQ_FMT0("LFSCK_NOTIFY", obd_lfsck_request, empty);
1820 EXPORT_SYMBOL(RQF_LFSCK_NOTIFY);
1821
1822 struct req_format RQF_LFSCK_QUERY =
1823         DEFINE_REQ_FMT0("LFSCK_QUERY", obd_lfsck_request, obd_lfsck_reply);
1824 EXPORT_SYMBOL(RQF_LFSCK_QUERY);
1825
1826 struct req_format RQF_OST_LADVISE =
1827         DEFINE_REQ_FMT0("OST_LADVISE", ost_ladvise, ost_body_only);
1828 EXPORT_SYMBOL(RQF_OST_LADVISE);
1829
1830 struct req_format RQF_BUT_GETATTR =
1831         DEFINE_REQ_FMT0("MDS_BATCH_GETATTR", mds_batch_getattr_client,
1832                         mds_batch_getattr_server);
1833 EXPORT_SYMBOL(RQF_BUT_GETATTR);
1834
1835 /* Convenience macro */
1836 #define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
1837
1838 /**
1839  * Initializes the capsule abstraction by computing and setting the \a rf_idx
1840  * field of RQFs and the \a rmf_offset field of RMFs.
1841  */
1842 int req_layout_init(void)
1843 {
1844         size_t i;
1845         size_t j;
1846         size_t k;
1847         struct req_format *rf = NULL;
1848
1849         for (i = 0; i < ARRAY_SIZE(req_formats); ++i) {
1850                 rf = req_formats[i];
1851                 rf->rf_idx = i;
1852                 for (j = 0; j < RCL_NR; ++j) {
1853                         LASSERT(rf->rf_fields[j].nr <= REQ_MAX_FIELD_NR);
1854                         for (k = 0; k < rf->rf_fields[j].nr; ++k) {
1855                                 struct req_msg_field *field;
1856
1857                                 field = (typeof(field))rf->rf_fields[j].d[k];
1858                                 LASSERT(!(field->rmf_flags & RMF_F_STRUCT_ARRAY)
1859                                         || field->rmf_size > 0);
1860                                 LASSERT(field->rmf_offset[i][j] == 0);
1861                                 /*
1862                                  * k + 1 to detect unused format/field
1863                                  * combinations.
1864                                  */
1865                                 field->rmf_offset[i][j] = k + 1;
1866                         }
1867                 }
1868         }
1869         return 0;
1870 }
1871 EXPORT_SYMBOL(req_layout_init);
1872
1873 void req_layout_fini(void)
1874 {
1875 }
1876 EXPORT_SYMBOL(req_layout_fini);
1877
1878 /**
1879  * Initializes the expected sizes of each RMF in a \a pill (\a rc_area) to -1.
1880  *
1881  * Actual/expected field sizes are set elsewhere in functions in this file:
1882  * req_capsule_init(), req_capsule_server_pack(), req_capsule_set_size() and
1883  * req_capsule_msg_size().  The \a rc_area information is used by.
1884  * ptlrpc_request_set_replen().
1885  */
1886 void req_capsule_init_area(struct req_capsule *pill)
1887 {
1888         size_t i;
1889
1890         for (i = 0; i < ARRAY_SIZE(pill->rc_area[RCL_CLIENT]); i++) {
1891                 pill->rc_area[RCL_CLIENT][i] = -1;
1892                 pill->rc_area[RCL_SERVER][i] = -1;
1893         }
1894 }
1895 EXPORT_SYMBOL(req_capsule_init_area);
1896
1897 /**
1898  * Initialize a pill.
1899  *
1900  * The \a location indicates whether the caller is executing on the client side
1901  * (RCL_CLIENT) or server side (RCL_SERVER)..
1902  */
1903 void req_capsule_init(struct req_capsule *pill,
1904                       struct ptlrpc_request *req,
1905                       enum req_location location)
1906 {
1907         LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
1908
1909         /*
1910          * Today all capsules are embedded in ptlrpc_request structs,
1911          * but just in case that ever isn't the case, we don't reach
1912          * into req unless req != NULL and pill is the one embedded in
1913          * the req.
1914          *
1915          * The req->rq_pill_init flag makes it safe to initialize a pill
1916          * twice, which might happen in the OST paths as a result of the
1917          * high-priority RPC queue getting peeked at before ost_handle()
1918          * handles an OST RPC.
1919          */
1920         if (req != NULL && pill == &req->rq_pill && req->rq_pill_init)
1921                 return;
1922
1923         pill->rc_fmt = NULL;
1924         pill->rc_req = req;
1925         pill->rc_loc = location;
1926         req_capsule_init_area(pill);
1927
1928         if (req != NULL && pill == &req->rq_pill)
1929                 req->rq_pill_init = 1;
1930 }
1931 EXPORT_SYMBOL(req_capsule_init);
1932
1933 void req_capsule_fini(struct req_capsule *pill)
1934 {
1935 }
1936 EXPORT_SYMBOL(req_capsule_fini);
1937
1938 static int __req_format_is_sane(const struct req_format *fmt)
1939 {
1940         return fmt->rf_idx < ARRAY_SIZE(req_formats) &&
1941                 req_formats[fmt->rf_idx] == fmt;
1942 }
1943
1944 static struct lustre_msg *__req_msg(const struct req_capsule *pill,
1945                                     enum req_location loc)
1946 {
1947         return loc == RCL_CLIENT ? pill->rc_reqmsg : pill->rc_repmsg;
1948 }
1949
1950 /**
1951  * Set the format (\a fmt) of a \a pill; format changes are not allowed here
1952  * (see req_capsule_extend()).
1953  */
1954 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
1955 {
1956         LASSERT(pill->rc_fmt == NULL || pill->rc_fmt == fmt);
1957         LASSERT(__req_format_is_sane(fmt));
1958
1959         pill->rc_fmt = fmt;
1960 }
1961 EXPORT_SYMBOL(req_capsule_set);
1962
1963 /**
1964  * Fills in any parts of the \a rc_area of a \a pill that haven't been filled in
1965  * yet.
1966
1967  * \a rc_area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
1968  * variable-sized fields.  The field sizes come from the declared \a rmf_size
1969  * field of a \a pill's \a rc_fmt's RMF's.
1970  */
1971 size_t req_capsule_filled_sizes(struct req_capsule *pill,
1972                                 enum req_location loc)
1973 {
1974         const struct req_format *fmt = pill->rc_fmt;
1975         size_t                   i;
1976
1977         LASSERT(fmt != NULL);
1978
1979         for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
1980                 if (pill->rc_area[loc][i] == -1) {
1981                         pill->rc_area[loc][i] =
1982                                             fmt->rf_fields[loc].d[i]->rmf_size;
1983                         if (pill->rc_area[loc][i] == -1) {
1984                                 /*
1985                                  * Skip the following fields.
1986                                  *
1987                                  * If this LASSERT() trips then you're missing a
1988                                  * call to req_capsule_set_size().
1989                                  */
1990                                 LASSERT(loc != RCL_SERVER);
1991                                 break;
1992                         }
1993                 }
1994         }
1995         return i;
1996 }
1997 EXPORT_SYMBOL(req_capsule_filled_sizes);
1998
1999 /**
2000  * Capsule equivalent of lustre_pack_request() and lustre_pack_reply().
2001  *
2002  * This function uses the \a pill's \a rc_area as filled in by
2003  * req_capsule_set_size() or req_capsule_filled_sizes() (the latter is called by
2004  * this function).
2005  */
2006 int req_capsule_server_pack(struct req_capsule *pill)
2007 {
2008         const struct req_format *fmt;
2009         int count;
2010         int rc;
2011
2012         LASSERT(pill->rc_loc == RCL_SERVER);
2013         fmt = pill->rc_fmt;
2014         LASSERT(fmt != NULL);
2015
2016         count = req_capsule_filled_sizes(pill, RCL_SERVER);
2017         if (req_capsule_ptlreq(pill)) {
2018                 rc = lustre_pack_reply(pill->rc_req, count,
2019                                        pill->rc_area[RCL_SERVER], NULL);
2020                 if (rc != 0) {
2021                         DEBUG_REQ(D_ERROR, pill->rc_req,
2022                                   "Cannot pack %d fields in format '%s'",
2023                                    count, fmt->rf_name);
2024                 }
2025         } else { /* SUB request */
2026                 struct ptlrpc_request *req = pill->rc_req;
2027                 __u32 used_len;
2028                 __u32 msg_len;
2029
2030                 msg_len = lustre_msg_size_v2(count, pill->rc_area[RCL_SERVER]);
2031                 used_len = (char *)pill->rc_repmsg - (char *)req->rq_repmsg;
2032                 /* Overflow the reply buffer */
2033                 if (used_len + msg_len > req->rq_replen) {
2034                         __u32 len;
2035                         __u32 max;
2036
2037                         if (!req_capsule_has_field(&req->rq_pill,
2038                                                    &RMF_BUT_REPLY, RCL_SERVER))
2039                                 return -EINVAL;
2040
2041                         if (!req_capsule_field_present(&req->rq_pill,
2042                                                        &RMF_BUT_REPLY,
2043                                                        RCL_SERVER))
2044                                 return -EINVAL;
2045
2046                         if (used_len + msg_len > BUT_MAXREPSIZE)
2047                                 return -EOVERFLOW;
2048
2049                         len = req_capsule_get_size(&req->rq_pill,
2050                                                    &RMF_BUT_REPLY, RCL_SERVER);
2051                         /*
2052                          * Currently just increase the batch reply buffer
2053                          * by 2.
2054                          */
2055                         max = BUT_MAXREPSIZE - req->rq_replen;
2056                         if (used_len + msg_len > len)
2057                                 len = used_len + msg_len;
2058
2059                         if (len > max)
2060                                 len += max;
2061                         else
2062                                 len += len;
2063                         rc = req_capsule_server_grow(&req->rq_pill,
2064                                                      &RMF_BUT_REPLY, len);
2065                         if (rc)
2066                                 return rc;
2067
2068                         pill->rc_repmsg =
2069                                 (struct lustre_msg *)((char *)req->rq_repmsg +
2070                                                       used_len);
2071                 }
2072                 if (msg_len > pill->rc_reqmsg->lm_repsize)
2073                         /* TODO: Check whether there is enough buffer size */
2074                         CDEBUG(D_INFO,
2075                                "Overflow pack %d fields in format '%s' for "
2076                                "the SUB request with message len %u:%u\n",
2077                                count, fmt->rf_name, msg_len,
2078                                pill->rc_reqmsg->lm_repsize);
2079
2080                 rc = 0;
2081                 lustre_init_msg_v2(pill->rc_repmsg, count,
2082                                    pill->rc_area[RCL_SERVER], NULL);
2083         }
2084
2085         return rc;
2086 }
2087 EXPORT_SYMBOL(req_capsule_server_pack);
2088
2089 int req_capsule_client_pack(struct req_capsule *pill)
2090 {
2091         const struct req_format *fmt;
2092         int count;
2093         int rc = 0;
2094
2095         LASSERT(pill->rc_loc == RCL_CLIENT);
2096         fmt = pill->rc_fmt;
2097         LASSERT(fmt != NULL);
2098
2099         count = req_capsule_filled_sizes(pill, RCL_CLIENT);
2100         if (req_capsule_ptlreq(pill)) {
2101                 struct ptlrpc_request *req = pill->rc_req;
2102
2103                 rc = lustre_pack_request(req, req->rq_import->imp_msg_magic,
2104                                          count, pill->rc_area[RCL_CLIENT],
2105                                          NULL);
2106         } else {
2107                 /* Sub request in a batch PTLRPC request */
2108                 lustre_init_msg_v2(pill->rc_reqmsg, count,
2109                                    pill->rc_area[RCL_CLIENT], NULL);
2110         }
2111         return rc;
2112 }
2113 EXPORT_SYMBOL(req_capsule_client_pack);
2114
2115 /**
2116  * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
2117  * corresponding to the given RMF (\a field).
2118  */
2119 __u32 __req_capsule_offset(const struct req_capsule *pill,
2120                            const struct req_msg_field *field,
2121                            enum req_location loc)
2122 {
2123         unsigned int offset;
2124
2125         offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
2126         LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n",
2127                              pill->rc_fmt->rf_name,
2128                              field->rmf_name, offset, loc);
2129         offset--;
2130
2131         LASSERT(offset < REQ_MAX_FIELD_NR);
2132         return offset;
2133 }
2134
2135 void req_capsule_set_swabbed(struct req_capsule *pill, enum req_location loc,
2136                             __u32 index)
2137 {
2138         if (loc == RCL_CLIENT)
2139                 req_capsule_set_req_swabbed(pill, index);
2140         else
2141                 req_capsule_set_rep_swabbed(pill, index);
2142 }
2143
2144 bool req_capsule_need_swab(struct req_capsule *pill, enum req_location loc,
2145                            __u32 index)
2146 {
2147         if (loc == RCL_CLIENT)
2148                 return (req_capsule_req_need_swab(pill) &&
2149                         !req_capsule_req_swabbed(pill, index));
2150
2151         return (req_capsule_rep_need_swab(pill) &&
2152                !req_capsule_rep_swabbed(pill, index));
2153 }
2154
2155 /**
2156  * Helper for __req_capsule_get(); swabs value / array of values and/or dumps
2157  * them if desired.
2158  */
2159 static int
2160 swabber_dumper_helper(struct req_capsule *pill,
2161                       const struct req_msg_field *field,
2162                       enum req_location loc,
2163                       int offset,
2164                       void *value, int len, bool dump, void (*swabber)(void *))
2165 {
2166         void *p;
2167         int i;
2168         int n;
2169         int size;
2170         int rc = 0;
2171         bool do_swab;
2172         bool array = field->rmf_flags & RMF_F_STRUCT_ARRAY;
2173
2174         swabber = swabber ?: field->rmf_swabber;
2175
2176         if (req_capsule_need_swab(pill, loc, offset) &&
2177             (swabber != NULL || field->rmf_swab_len != NULL) && value != NULL)
2178                 do_swab = true;
2179         else
2180                 do_swab = false;
2181
2182         if (!field->rmf_dumper)
2183                 dump = false;
2184
2185         /*
2186          * We're swabbing an array; swabber() swabs a single array element, so
2187          * swab every element.
2188          */
2189         if (array && (len % field->rmf_size)) {
2190                 static const struct req_msg_field *last_field;
2191
2192                 if (field != last_field) {
2193                         CERROR("%s: array buffer size %u is not a multiple of element size %u\n",
2194                                field->rmf_name, len, field->rmf_size);
2195                         last_field = field;
2196                 }
2197         }
2198         /* For the non-array cases, the process of swab/dump/swab only
2199          * needs to be done once. (n = 1)
2200          */
2201         if (!array)
2202                 len = field->rmf_size;
2203         for (p = value, i = 0, n = len / field->rmf_size;
2204              i < n;
2205              i++, p += field->rmf_size) {
2206                 if (dump) {
2207                         CDEBUG(D_RPCTRACE, "Dump of %s%sfield %s element %d follows\n",
2208                                do_swab ? "unswabbed " : "",
2209                                array ? "array " : "",
2210                                field->rmf_name, i);
2211                         field->rmf_dumper(p);
2212                 }
2213                 if (!do_swab) {
2214                         if (array)
2215                                 continue;
2216                         else
2217                                 break;
2218                 }
2219                 if (!field->rmf_swab_len) {
2220                         swabber(p);
2221                 } else {
2222                         size = field->rmf_swab_len(p, len);
2223                         if (size > 0) {
2224                                 len -= size;
2225                         } else {
2226                                 rc = size;
2227                                 break;
2228                         }
2229                 }
2230                 if (dump) {
2231                         CDEBUG(D_RPCTRACE, "Dump of swabbed %sfield %s, element %d follows\n",
2232                                array ? "array " : "", field->rmf_name, i);
2233                         field->rmf_dumper(value);
2234                 }
2235         }
2236         if (do_swab)
2237                 req_capsule_set_swabbed(pill, loc, offset);
2238
2239         return rc;
2240 }
2241
2242 /**
2243  * Returns the pointer to a PTLRPC request or reply (\a loc) buffer of a \a pill
2244  * corresponding to the given RMF (\a field).
2245  *
2246  * The buffer will be swabbed using the given \a swabber.  If \a swabber == NULL
2247  * then the \a rmf_swabber from the RMF will be used.  Soon there will be no
2248  * calls to __req_capsule_get() with a non-NULL \a swabber; \a swabber will then
2249  * be removed.  Fields with the \a RMF_F_STRUCT_ARRAY flag set will have each
2250  * element of the array swabbed.
2251  */
2252 static void *__req_capsule_get(struct req_capsule *pill,
2253                                const struct req_msg_field *field,
2254                                enum req_location loc,
2255                                void (*swabber)(void *),
2256                                bool dump)
2257 {
2258         const struct req_format *fmt;
2259         struct lustre_msg       *msg;
2260         void                    *value;
2261         __u32                    len;
2262         __u32                    offset;
2263
2264         void *(*getter)(struct lustre_msg *m, __u32 n, __u32 minlen);
2265
2266         static const char *rcl_names[RCL_NR] = {
2267                 [RCL_CLIENT] = "client",
2268                 [RCL_SERVER] = "server"
2269         };
2270
2271         LASSERT(pill != NULL);
2272         LASSERT(pill != LP_POISON);
2273         fmt = pill->rc_fmt;
2274         LASSERT(fmt != NULL);
2275         LASSERT(fmt != LP_POISON);
2276         LASSERT(__req_format_is_sane(fmt));
2277
2278         offset = __req_capsule_offset(pill, field, loc);
2279
2280         msg = __req_msg(pill, loc);
2281         LASSERT(msg != NULL);
2282
2283         getter = (field->rmf_flags & RMF_F_STRING) ?
2284                 (typeof(getter))lustre_msg_string : lustre_msg_buf;
2285
2286         if (field->rmf_flags & (RMF_F_STRUCT_ARRAY|RMF_F_NO_SIZE_CHECK)) {
2287                 /*
2288                  * We've already asserted that field->rmf_size > 0 in
2289                  * req_layout_init().
2290                  */
2291                 len = lustre_msg_buflen(msg, offset);
2292                 if (!(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
2293                     (len % field->rmf_size) != 0) {
2294                         CERROR("%s: array field size mismatch "
2295                                 "%d modulo %u != 0 (%d)\n",
2296                                 field->rmf_name, len, field->rmf_size, loc);
2297                         return NULL;
2298                 }
2299         } else if (pill->rc_area[loc][offset] != -1) {
2300                 len = pill->rc_area[loc][offset];
2301         } else {
2302                 len = max_t(typeof(field->rmf_size), field->rmf_size, 0);
2303         }
2304         value = getter(msg, offset, len);
2305
2306         if (value == NULL) {
2307                 LASSERT(pill->rc_req != NULL);
2308                 DEBUG_REQ(D_ERROR, pill->rc_req,
2309                            "Wrong buffer for field '%s' (%u of %u) in format '%s', %u vs. %u (%s)",
2310                            field->rmf_name, offset, lustre_msg_bufcount(msg),
2311                            fmt->rf_name, lustre_msg_buflen(msg, offset), len,
2312                            rcl_names[loc]);
2313         } else {
2314                 swabber_dumper_helper(pill, field, loc, offset, value, len,
2315                                       dump, swabber);
2316         }
2317
2318         return value;
2319 }
2320
2321 /**
2322  * Dump a request and/or reply
2323  */
2324 void __req_capsule_dump(struct req_capsule *pill, enum req_location loc)
2325 {
2326         const struct req_format *fmt;
2327         const struct req_msg_field *field;
2328         __u32 len;
2329         size_t i;
2330
2331         fmt = pill->rc_fmt;
2332
2333         DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP");
2334         for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
2335                 field = FMT_FIELD(fmt, loc, i);
2336                 if (field->rmf_dumper == NULL) {
2337                         /*
2338                          * FIXME Add a default hex dumper for fields that don't
2339                          * have a specific dumper
2340                          */
2341                         len = req_capsule_get_size(pill, field, loc);
2342                         CDEBUG(D_RPCTRACE,
2343                                "Field %s has no dumper function; field size is %u\n",
2344                                field->rmf_name, len);
2345                 } else {
2346                         /* It's dumping side-effect that we're interested in */
2347                         (void) __req_capsule_get(pill, field, loc, NULL, true);
2348                 }
2349         }
2350         CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n");
2351 }
2352
2353 /**
2354  * Dump a request.
2355  */
2356 void req_capsule_client_dump(struct req_capsule *pill)
2357 {
2358         __req_capsule_dump(pill, RCL_CLIENT);
2359 }
2360 EXPORT_SYMBOL(req_capsule_client_dump);
2361
2362 /**
2363  * Dump a reply
2364  */
2365 void req_capsule_server_dump(struct req_capsule *pill)
2366 {
2367         __req_capsule_dump(pill, RCL_SERVER);
2368 }
2369 EXPORT_SYMBOL(req_capsule_server_dump);
2370
2371 /**
2372  * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request
2373  * buffer corresponding to the given RMF (\a field) of a \a pill.
2374  */
2375 void *req_capsule_client_get(struct req_capsule *pill,
2376                              const struct req_msg_field *field)
2377 {
2378         return __req_capsule_get(pill, field, RCL_CLIENT, NULL, false);
2379 }
2380 EXPORT_SYMBOL(req_capsule_client_get);
2381
2382 /**
2383  * Same as req_capsule_client_get(), but with a \a swabber argument.
2384  *
2385  * Currently unused; will be removed when req_capsule_server_swab_get() is
2386  * unused too.
2387  */
2388 void *req_capsule_client_swab_get(struct req_capsule *pill,
2389                                   const struct req_msg_field *field,
2390                                   void *swabber)
2391 {
2392         return __req_capsule_get(pill, field, RCL_CLIENT, swabber, false);
2393 }
2394 EXPORT_SYMBOL(req_capsule_client_swab_get);
2395
2396 /**
2397  * Utility that combines req_capsule_set_size() and req_capsule_client_get().
2398  *
2399  * First the \a pill's request \a field's size is set (\a rc_area) using
2400  * req_capsule_set_size() with the given \a len.  Then the actual buffer is
2401  * returned.
2402  */
2403 void *req_capsule_client_sized_get(struct req_capsule *pill,
2404                                    const struct req_msg_field *field,
2405                                    __u32 len)
2406 {
2407         req_capsule_set_size(pill, field, RCL_CLIENT, len);
2408         return __req_capsule_get(pill, field, RCL_CLIENT, NULL, false);
2409 }
2410 EXPORT_SYMBOL(req_capsule_client_sized_get);
2411
2412 /**
2413  * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC reply
2414  * buffer corresponding to the given RMF (\a field) of a \a pill.
2415  */
2416 void *req_capsule_server_get(struct req_capsule *pill,
2417                              const struct req_msg_field *field)
2418 {
2419         return __req_capsule_get(pill, field, RCL_SERVER, NULL, false);
2420 }
2421 EXPORT_SYMBOL(req_capsule_server_get);
2422
2423 /**
2424  * Same as req_capsule_server_get(), but with a \a swabber argument.
2425  *
2426  * Ideally all swabbing should be done pursuant to RMF definitions, with no
2427  * swabbing done outside this capsule abstraction.
2428  */
2429 void *req_capsule_server_swab_get(struct req_capsule *pill,
2430                                   const struct req_msg_field *field,
2431                                   void *swabber)
2432 {
2433         return __req_capsule_get(pill, field, RCL_SERVER, swabber, false);
2434 }
2435 EXPORT_SYMBOL(req_capsule_server_swab_get);
2436
2437 /**
2438  * Utility that combines req_capsule_set_size() and req_capsule_server_get().
2439  *
2440  * First the \a pill's request \a field's size is set (\a rc_area) using
2441  * req_capsule_set_size() with the given \a len.  Then the actual buffer is
2442  * returned.
2443  */
2444 void *req_capsule_server_sized_get(struct req_capsule *pill,
2445                                    const struct req_msg_field *field,
2446                                    __u32 len)
2447 {
2448         req_capsule_set_size(pill, field, RCL_SERVER, len);
2449         return __req_capsule_get(pill, field, RCL_SERVER, NULL, false);
2450 }
2451 EXPORT_SYMBOL(req_capsule_server_sized_get);
2452
2453 void *req_capsule_server_sized_swab_get(struct req_capsule *pill,
2454                                         const struct req_msg_field *field,
2455                                         __u32 len, void *swabber)
2456 {
2457         req_capsule_set_size(pill, field, RCL_SERVER, len);
2458         return __req_capsule_get(pill, field, RCL_SERVER, swabber, false);
2459 }
2460 EXPORT_SYMBOL(req_capsule_server_sized_swab_get);
2461
2462 /**
2463  * Returns the buffer of a \a pill corresponding to the given \a field from the
2464  * request (if the caller is executing on the server-side) or reply (if the
2465  * caller is executing on the client-side).
2466  *
2467  * This function convienient for use is code that could be executed on the
2468  * client and server alike.
2469  */
2470 const void *req_capsule_other_get(struct req_capsule *pill,
2471                                   const struct req_msg_field *field)
2472 {
2473         return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, false);
2474 }
2475 EXPORT_SYMBOL(req_capsule_other_get);
2476
2477 /**
2478  * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a
2479  * field of the given \a pill.
2480  *
2481  * This function must be used when constructing variable sized fields of a
2482  * request or reply.
2483  */
2484 void req_capsule_set_size(struct req_capsule *pill,
2485                           const struct req_msg_field *field,
2486                           enum req_location loc, __u32 size)
2487 {
2488         LASSERT(loc == RCL_SERVER || loc == RCL_CLIENT);
2489
2490         if ((size != (__u32)field->rmf_size) &&
2491             (field->rmf_size != -1) &&
2492             !(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
2493             (size > 0)) {
2494                 __u32 rmf_size = (__u32)field->rmf_size;
2495                 if ((field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
2496                     (size % rmf_size != 0)) {
2497                         CERROR("%s: array field size mismatch "
2498                                 "%u %% %u != 0 (%d)\n",
2499                                 field->rmf_name, size, rmf_size, loc);
2500                         LBUG();
2501                 } else if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
2502                            size < rmf_size) {
2503                         CERROR("%s: field size mismatch %u != %u (%d)\n",
2504                                 field->rmf_name, size, rmf_size, loc);
2505                         LBUG();
2506                 }
2507         }
2508
2509         pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size;
2510 }
2511 EXPORT_SYMBOL(req_capsule_set_size);
2512
2513 /**
2514  * Return the actual PTLRPC buffer length of a request or reply (\a loc)
2515  * for the given \a pill's given \a field.
2516  *
2517  * NB: this function doesn't correspond with req_capsule_set_size(), which
2518  * actually sets the size in pill.rc_area[loc][offset], but this function
2519  * returns the message buflen[offset], maybe we should use another name.
2520  */
2521 __u32 req_capsule_get_size(const struct req_capsule *pill,
2522                          const struct req_msg_field *field,
2523                          enum req_location loc)
2524 {
2525         LASSERT(loc == RCL_SERVER || loc == RCL_CLIENT);
2526
2527         return lustre_msg_buflen(__req_msg(pill, loc),
2528                                  __req_capsule_offset(pill, field, loc));
2529 }
2530 EXPORT_SYMBOL(req_capsule_get_size);
2531
2532 /**
2533  * Wrapper around lustre_msg_size() that returns the PTLRPC size needed for the
2534  * given \a pill's request or reply (\a loc) given the field size recorded in
2535  * the \a pill's rc_area.
2536  *
2537  * See also req_capsule_set_size().
2538  */
2539 __u32 req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
2540 {
2541         if (req_capsule_ptlreq(pill)) {
2542                 return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
2543                                        pill->rc_fmt->rf_fields[loc].nr,
2544                                        pill->rc_area[loc]);
2545         } else { /* SUB request in a batch request */
2546                 int count;
2547
2548                 count = req_capsule_filled_sizes(pill, loc);
2549                 return lustre_msg_size_v2(count, pill->rc_area[loc]);
2550         }
2551 }
2552 EXPORT_SYMBOL(req_capsule_msg_size);
2553
2554 /**
2555  * While req_capsule_msg_size() computes the size of a PTLRPC request or reply
2556  * (\a loc) given a \a pill's \a rc_area, this function computes the size of a
2557  * PTLRPC request or reply given only an RQF (\a fmt).
2558  *
2559  * This function should not be used for formats which contain variable size
2560  * fields.
2561  */
2562 __u32 req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
2563                          enum req_location loc)
2564 {
2565         __u32 size;
2566         size_t i = 0;
2567
2568         /*
2569          * This function should probably LASSERT() that fmt has no fields with
2570          * RMF_F_STRUCT_ARRAY in rmf_flags, since we can't know here how many
2571          * elements in the array there will ultimately be, but then, we could
2572          * assume that there will be at least one element, and that's just what
2573          * we do.
2574          */
2575         size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
2576         if (size == 0)
2577                 return size;
2578
2579         for (; i < fmt->rf_fields[loc].nr; ++i)
2580                 if (fmt->rf_fields[loc].d[i]->rmf_size != -1)
2581                         size += cfs_size_round(fmt->rf_fields[loc].d[i]->
2582                                                rmf_size);
2583         return size;
2584 }
2585 EXPORT_SYMBOL(req_capsule_fmt_size);
2586
2587 /**
2588  * Changes the format of an RPC.
2589  *
2590  * The pill must already have been initialized, which means that it already has
2591  * a request format.  The new format \a fmt must be an extension of the pill's
2592  * old format.  Specifically: the new format must have as many request and reply
2593  * fields as the old one, and all fields shared by the old and new format must
2594  * be at least as large in the new format.
2595  *
2596  * The new format's fields may be of different "type" than the old format, but
2597  * only for fields that are "opaque" blobs: fields which have a) have no
2598  * \a rmf_swabber, b) \a rmf_flags == 0 or RMF_F_NO_SIZE_CHECK, and c) \a
2599  * rmf_size == -1 or \a rmf_flags == RMF_F_NO_SIZE_CHECK.  For example,
2600  * OBD_SET_INFO has a key field and an opaque value field that gets interpreted
2601  * according to the key field.  When the value, according to the key, contains a
2602  * structure (or array thereof) to be swabbed, the format should be changed to
2603  * one where the value field has \a rmf_size/rmf_flags/rmf_swabber set
2604  * accordingly.
2605  */
2606 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
2607 {
2608         int i;
2609         size_t j;
2610
2611         const struct req_format *old;
2612
2613         LASSERT(pill->rc_fmt != NULL);
2614         LASSERT(__req_format_is_sane(fmt));
2615
2616         old = pill->rc_fmt;
2617         /*
2618          * Sanity checking...
2619          */
2620         for (i = 0; i < RCL_NR; ++i) {
2621                 LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
2622                 for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
2623                         const struct req_msg_field *ofield = FMT_FIELD(old, i, j);
2624
2625                         /* "opaque" fields can be transmogrified */
2626                         if (ofield->rmf_swabber == NULL &&
2627                             (ofield->rmf_flags & ~RMF_F_NO_SIZE_CHECK) == 0 &&
2628                             (ofield->rmf_size == -1 ||
2629                             ofield->rmf_flags == RMF_F_NO_SIZE_CHECK))
2630                                 continue;
2631                         LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
2632                 }
2633                 /*
2634                  * Last field in old format can be shorter than in new.
2635                  */
2636                 LASSERT(FMT_FIELD(fmt, i, j)->rmf_size >=
2637                         FMT_FIELD(old, i, j)->rmf_size);
2638         }
2639
2640         pill->rc_fmt = fmt;
2641 }
2642 EXPORT_SYMBOL(req_capsule_extend);
2643
2644 /**
2645  * This function returns a non-zero value if the given \a field is present in
2646  * the format (\a rc_fmt) of \a pill's PTLRPC request or reply (\a loc), else it
2647  * returns 0.
2648  */
2649 int req_capsule_has_field(const struct req_capsule *pill,
2650                           const struct req_msg_field *field,
2651                           enum req_location loc)
2652 {
2653         LASSERT(loc == RCL_SERVER || loc == RCL_CLIENT);
2654
2655         return field->rmf_offset[pill->rc_fmt->rf_idx][loc];
2656 }
2657 EXPORT_SYMBOL(req_capsule_has_field);
2658
2659 /**
2660  * Returns a non-zero value if the given \a field is present in the given \a
2661  * pill's PTLRPC request or reply (\a loc), else it returns 0.
2662  */
2663 int req_capsule_field_present(const struct req_capsule *pill,
2664                               const struct req_msg_field *field,
2665                               enum req_location loc)
2666 {
2667         __u32 offset;
2668
2669         LASSERT(loc == RCL_SERVER || loc == RCL_CLIENT);
2670         LASSERT(req_capsule_has_field(pill, field, loc));
2671
2672         offset = __req_capsule_offset(pill, field, loc);
2673         return lustre_msg_bufcount(__req_msg(pill, loc)) > offset;
2674 }
2675 EXPORT_SYMBOL(req_capsule_field_present);
2676
2677 /**
2678  * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC
2679  * request or reply (\a loc).
2680  *
2681  * This is not the opposite of req_capsule_extend().
2682  */
2683 void req_capsule_shrink(struct req_capsule *pill,
2684                         const struct req_msg_field *field,
2685                         __u32 newlen,
2686                         enum req_location loc)
2687 {
2688         const struct req_format *fmt;
2689         struct lustre_msg       *msg;
2690         __u32                    len;
2691         int                      offset;
2692
2693         fmt = pill->rc_fmt;
2694         LASSERT(fmt != NULL);
2695         LASSERT(__req_format_is_sane(fmt));
2696         LASSERT(req_capsule_has_field(pill, field, loc));
2697         LASSERT(req_capsule_field_present(pill, field, loc));
2698
2699         offset = __req_capsule_offset(pill, field, loc);
2700
2701         msg = __req_msg(pill, loc);
2702         len = lustre_msg_buflen(msg, offset);
2703         LASSERTF(newlen <= len, "%s:%s, oldlen=%u, newlen=%u\n",
2704                  fmt->rf_name, field->rmf_name, len, newlen);
2705
2706         len = lustre_shrink_msg(msg, offset, newlen, 1);
2707         if (loc == RCL_CLIENT) {
2708                 if (req_capsule_ptlreq(pill))
2709                         pill->rc_req->rq_reqlen = len;
2710         } else {
2711                 /* update also field size in reply lenghts arrays for possible
2712                  * reply re-pack due to req_capsule_server_grow() call.
2713                  */
2714                 req_capsule_set_size(pill, field, loc, newlen);
2715                 if (req_capsule_ptlreq(pill))
2716                         pill->rc_req->rq_replen = len;
2717         }
2718 }
2719 EXPORT_SYMBOL(req_capsule_shrink);
2720
2721 int req_capsule_server_grow(struct req_capsule *pill,
2722                             const struct req_msg_field *field,
2723                             __u32 newlen)
2724 {
2725         struct ptlrpc_request *req = pill->rc_req;
2726         struct ptlrpc_reply_state *rs = req->rq_reply_state, *nrs;
2727         char *from, *to, *sptr = NULL;
2728         __u32 slen = 0, snewlen = 0;
2729         __u32 offset, len, max, diff;
2730         int rc;
2731
2732         LASSERT(pill->rc_fmt != NULL);
2733         LASSERT(__req_format_is_sane(pill->rc_fmt));
2734         LASSERT(req_capsule_has_field(pill, field, RCL_SERVER));
2735         LASSERT(req_capsule_field_present(pill, field, RCL_SERVER));
2736
2737         if (req_capsule_subreq(pill)) {
2738                 if (!req_capsule_has_field(&req->rq_pill, &RMF_BUT_REPLY,
2739                                            RCL_SERVER))
2740                         return -EINVAL;
2741
2742                 if (!req_capsule_field_present(&req->rq_pill, &RMF_BUT_REPLY,
2743                                                RCL_SERVER))
2744                         return -EINVAL;
2745
2746                 len = req_capsule_get_size(&req->rq_pill, &RMF_BUT_REPLY,
2747                                            RCL_SERVER);
2748                 sptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
2749                 slen = req_capsule_get_size(pill, field, RCL_SERVER);
2750
2751                 LASSERT(len >= (char *)pill->rc_repmsg - sptr +
2752                                lustre_packed_msg_size(pill->rc_repmsg));
2753                 if (len >= (char *)pill->rc_repmsg - sptr +
2754                            lustre_packed_msg_size(pill->rc_repmsg) - slen +
2755                            newlen) {
2756                         req_capsule_set_size(pill, field, RCL_SERVER, newlen);
2757                         offset = __req_capsule_offset(pill, field, RCL_SERVER);
2758                         lustre_grow_msg(pill->rc_repmsg, offset, newlen);
2759                         return 0;
2760                 }
2761
2762                 /*
2763                  * Currently first try to increase the reply buffer by
2764                  * 2 * newlen with reply buffer limit of BUT_MAXREPSIZE.
2765                  * TODO: Enlarge the reply buffer properly according to the
2766                  * left SUB requests in the batch PTLRPC request.
2767                  */
2768                 snewlen = newlen;
2769                 diff = snewlen - slen;
2770                 max = BUT_MAXREPSIZE - req->rq_replen;
2771                 if (diff > max)
2772                         return -EOVERFLOW;
2773
2774                 if (diff * 2 + len < max)
2775                         newlen = (len + diff) * 2;
2776                 else
2777                         newlen = len + max;
2778
2779                 req_capsule_set_size(pill, field, RCL_SERVER, snewlen);
2780                 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
2781                                      newlen);
2782                 offset = __req_capsule_offset(&req->rq_pill, &RMF_BUT_REPLY,
2783                                               RCL_SERVER);
2784         } else {
2785                 len = req_capsule_get_size(pill, field, RCL_SERVER);
2786                 offset = __req_capsule_offset(pill, field, RCL_SERVER);
2787                 req_capsule_set_size(pill, field, RCL_SERVER, newlen);
2788         }
2789
2790         CDEBUG(D_INFO, "Reply packed: %d, allocated: %d, field len %d -> %d\n",
2791                lustre_packed_msg_size(rs->rs_msg), rs->rs_repbuf_len,
2792                                       len, newlen);
2793
2794         /**
2795          * There can be enough space in current reply buffer, make sure
2796          * that rs_repbuf is not a wrapper but real reply msg, otherwise
2797          * re-packing is still needed.
2798          */
2799         if (rs->rs_msg == rs->rs_repbuf &&
2800             rs->rs_repbuf_len >=
2801             lustre_packed_msg_size(rs->rs_msg) - len + newlen) {
2802                 req->rq_replen = lustre_grow_msg(rs->rs_msg, offset, newlen);
2803                 return 0;
2804         }
2805
2806         /* Re-allocate replay state */
2807         req->rq_reply_state = NULL;
2808         rc = req_capsule_server_pack(&req->rq_pill);
2809         if (rc) {
2810                 /* put old values back, the caller should decide what to do */
2811                 if (req_capsule_subreq(pill)) {
2812                         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
2813                                              RCL_SERVER, len);
2814                         req_capsule_set_size(pill, field, RCL_SERVER, slen);
2815                 } else {
2816                         req_capsule_set_size(pill, field, RCL_SERVER, len);
2817                 }
2818                 pill->rc_req->rq_reply_state = rs;
2819                 return rc;
2820         }
2821         nrs = req->rq_reply_state;
2822         LASSERT(lustre_packed_msg_size(nrs->rs_msg) >
2823                 lustre_packed_msg_size(rs->rs_msg));
2824
2825         /* Now we need only buffers, copy them and grow the needed one */
2826         to = lustre_msg_buf(nrs->rs_msg, 0, 0);
2827         from = lustre_msg_buf(rs->rs_msg, 0, 0);
2828         memcpy(to, from,
2829                (char *)rs->rs_msg + lustre_packed_msg_size(rs->rs_msg) - from);
2830         lustre_msg_set_buflen(nrs->rs_msg, offset, len);
2831         req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen);
2832
2833         if (req_capsule_subreq(pill)) {
2834                 char *ptr;
2835
2836                 ptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
2837                 pill->rc_repmsg = (struct lustre_msg *)(ptr +
2838                                   ((char *)pill->rc_repmsg - sptr));
2839                 offset = __req_capsule_offset(pill, field, RCL_SERVER);
2840                 lustre_grow_msg(pill->rc_repmsg, offset, snewlen);
2841         }
2842
2843         if (rs->rs_difficult) {
2844                 /* copy rs data */
2845                 int i;
2846
2847                 nrs->rs_difficult = 1;
2848                 nrs->rs_no_ack = rs->rs_no_ack;
2849                 nrs->rs_convert_lock = rs->rs_convert_lock;
2850                 for (i = 0; i < rs->rs_nlocks; i++) {
2851                         nrs->rs_locks[i] = rs->rs_locks[i];
2852                         nrs->rs_modes[i] = rs->rs_modes[i];
2853                         nrs->rs_nlocks++;
2854                 }
2855                 rs->rs_nlocks = 0;
2856                 rs->rs_difficult = 0;
2857                 rs->rs_no_ack = 0;
2858         }
2859         ptlrpc_rs_decref(rs);
2860         return 0;
2861 }
2862 EXPORT_SYMBOL(req_capsule_server_grow);
2863
2864 #ifdef HAVE_SERVER_SUPPORT
2865 static const struct req_msg_field *mds_update_client[] = {
2866         &RMF_PTLRPC_BODY,
2867         &RMF_OUT_UPDATE_HEADER,
2868         &RMF_OUT_UPDATE_BUF,
2869 };
2870
2871 static const struct req_msg_field *mds_update_server[] = {
2872         &RMF_PTLRPC_BODY,
2873         &RMF_OUT_UPDATE_REPLY,
2874 };
2875
2876 struct req_msg_field RMF_OUT_UPDATE = DEFINE_MSGFL("object_update", 0, -1,
2877                                 lustre_swab_object_update_request, NULL);
2878 EXPORT_SYMBOL(RMF_OUT_UPDATE);
2879
2880 struct req_msg_field RMF_OUT_UPDATE_REPLY =
2881                         DEFINE_MSGFL("object_update_reply", 0, -1,
2882                                     lustre_swab_object_update_reply, NULL);
2883 EXPORT_SYMBOL(RMF_OUT_UPDATE_REPLY);
2884
2885 struct req_msg_field RMF_OUT_UPDATE_HEADER = DEFINE_MSGF("out_update_header", 0,
2886                                 -1, lustre_swab_out_update_header, NULL);
2887 EXPORT_SYMBOL(RMF_OUT_UPDATE_HEADER);
2888
2889 struct req_msg_field RMF_OUT_UPDATE_BUF = DEFINE_MSGF("update_buf",
2890                         RMF_F_STRUCT_ARRAY, sizeof(struct out_update_buffer),
2891                         lustre_swab_out_update_buffer, NULL);
2892 EXPORT_SYMBOL(RMF_OUT_UPDATE_BUF);
2893
2894 struct req_format RQF_OUT_UPDATE =
2895         DEFINE_REQ_FMT0("OUT_UPDATE", mds_update_client,
2896                         mds_update_server);
2897 EXPORT_SYMBOL(RQF_OUT_UPDATE);
2898
2899 int req_check_sepol(struct req_capsule *pill)
2900 {
2901         int rc = 0;
2902         struct obd_export *export;
2903         struct lu_nodemap *nm = NULL;
2904         const char *sepol = NULL;
2905         const char *nm_sepol = NULL;
2906
2907         if (req_capsule_subreq(pill))
2908                 return 0;
2909
2910         if (!pill->rc_req)
2911                 return -EPROTO;
2912
2913         export = pill->rc_req->rq_export;
2914         if (!export || !exp_connect_sepol(export) ||
2915             !req_capsule_has_field(pill, &RMF_SELINUX_POL, RCL_CLIENT))
2916                 goto nm;
2917
2918         if (req_capsule_get_size(pill, &RMF_SELINUX_POL, RCL_CLIENT) == 0)
2919                 goto nm;
2920
2921         sepol = req_capsule_client_get(pill, &RMF_SELINUX_POL);
2922         CDEBUG(D_SEC, "retrieved sepol %s\n", sepol);
2923
2924 nm:
2925         if (export) {
2926                 nm = nodemap_get_from_exp(export);
2927                 if (!IS_ERR_OR_NULL(nm)) {
2928                         nm_sepol = nodemap_get_sepol(nm);
2929                         if (nm_sepol && nm_sepol[0])
2930                                 if (sepol == NULL ||
2931                                     strcmp(sepol, nm_sepol) != 0)
2932                                         rc = -EACCES;
2933                 }
2934         }
2935
2936         if (!IS_ERR_OR_NULL(nm))
2937                 nodemap_putref(nm);
2938
2939         return rc;
2940 }
2941 EXPORT_SYMBOL(req_check_sepol);
2942 #endif
2943
2944 void req_capsule_subreq_init(struct req_capsule *pill,
2945                              const struct req_format *fmt,
2946                              struct ptlrpc_request *req,
2947                              struct lustre_msg *reqmsg,
2948                              struct lustre_msg *repmsg,
2949                              enum req_location loc)
2950 {
2951         req_capsule_init(pill, req, loc);
2952         req_capsule_set(pill, fmt);
2953         pill->rc_reqmsg = reqmsg;
2954         pill->rc_repmsg = repmsg;
2955 }
2956 EXPORT_SYMBOL(req_capsule_subreq_init);
2957
2958 void req_capsule_set_replen(struct req_capsule *pill)
2959 {
2960         if (req_capsule_ptlreq(pill)) {
2961                 ptlrpc_request_set_replen(pill->rc_req);
2962         } else { /* SUB request in a batch request */
2963                 int count;
2964
2965                 count = req_capsule_filled_sizes(pill, RCL_SERVER);
2966                 pill->rc_reqmsg->lm_repsize =
2967                                 lustre_msg_size_v2(count,
2968                                                    pill->rc_area[RCL_SERVER]);
2969         }
2970 }
2971 EXPORT_SYMBOL(req_capsule_set_replen);