Whamcloud - gitweb
LU-12885 mds: add enums for MDS_OPEN flags
[fs/lustre-release.git] / lustre / mdc / mdc_batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, 2022, DDN Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/mdc/mdc_batch.c
30  *
31  * Batch Metadata Updating on the client (MDC)
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDC
37
38 #include <linux/module.h>
39 #include <lustre_acl.h>
40
41 #include "mdc_internal.h"
42
43 static int mdc_ldlm_lock_pack(struct obd_export *exp,
44                               struct req_capsule *pill,
45                               union ldlm_policy_data *policy,
46                               struct lu_fid *fid, struct md_op_item *item)
47 {
48         struct ldlm_request *dlmreq;
49         struct ldlm_res_id res_id;
50         struct ldlm_enqueue_info *einfo = &item->mop_einfo;
51         int rc;
52
53         ENTRY;
54
55         dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
56         if (IS_ERR(dlmreq))
57                 RETURN(PTR_ERR(dlmreq));
58
59         /* With Data-on-MDT the glimpse callback is needed too.
60          * It is set here in advance but not in mdc_finish_enqueue()
61          * to avoid possible races. It is safe to have glimpse handler
62          * for non-DOM locks and costs nothing.
63          */
64         if (einfo->ei_cb_gl == NULL)
65                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
66
67         fid_build_reg_res_name(fid, &res_id);
68         rc = ldlm_cli_lock_create_pack(exp, dlmreq, einfo, &res_id,
69                                        policy, &item->mop_lock_flags,
70                                        NULL, 0, LVB_T_NONE, &item->mop_lockh);
71
72         RETURN(rc);
73 }
74
75 static int mdc_batch_getattr_pack(struct batch_update_head *head,
76                                   struct lustre_msg *reqmsg,
77                                   size_t *max_pack_size,
78                                   struct md_op_item *item)
79 {
80         struct obd_export *exp = head->buh_exp;
81         struct lookup_intent *it = &item->mop_it;
82         struct md_op_data *op_data = &item->mop_data;
83         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
84                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
85                     OBD_MD_DEFAULT_MEA;
86         union ldlm_policy_data policy = {
87                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
88         };
89         struct ldlm_intent *lit;
90         bool have_secctx = false;
91         struct req_capsule pill;
92         __u32 easize;
93         __u32 size;
94         int rc;
95
96         ENTRY;
97
98         req_capsule_subreq_init(&pill, &RQF_BUT_GETATTR, NULL,
99                                 reqmsg, NULL, RCL_CLIENT);
100
101         /* send name of security xattr to get upon intent */
102         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
103             req_capsule_has_field(&pill, &RMF_FILE_SECCTX_NAME,
104                                   RCL_CLIENT) &&
105             op_data->op_file_secctx_name_size > 0 &&
106             op_data->op_file_secctx_name != NULL) {
107                 have_secctx = true;
108                 req_capsule_set_size(&pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT,
109                                      op_data->op_file_secctx_name_size);
110         }
111
112         req_capsule_set_size(&pill, &RMF_NAME, RCL_CLIENT,
113                              op_data->op_namelen + 1);
114
115         size = req_capsule_msg_size(&pill, RCL_CLIENT);
116         if (unlikely(size >= *max_pack_size)) {
117                 *max_pack_size = size;
118                 return -E2BIG;
119         }
120
121         req_capsule_client_pack(&pill);
122         /* pack the intent */
123         lit = req_capsule_client_get(&pill, &RMF_LDLM_INTENT);
124         lit->opc = (__u64)it->it_op;
125
126         easize = MAX_MD_SIZE_OLD; /* obd->u.cli.cl_default_mds_easize; */
127
128         /* pack the intended request */
129         mdc_getattr_pack(&pill, valid, it->it_open_flags, op_data, easize);
130
131         item->mop_lock_flags |= LDLM_FL_HAS_INTENT;
132         rc = mdc_ldlm_lock_pack(head->buh_exp, &pill, &policy,
133                                 &item->mop_data.op_fid1, item);
134         if (rc)
135                 RETURN(rc);
136
137         req_capsule_set_size(&pill, &RMF_MDT_MD, RCL_SERVER, easize);
138         req_capsule_set_size(&pill, &RMF_ACL, RCL_SERVER,
139                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
140         req_capsule_set_size(&pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
141                              /*sizeof(struct lmv_user_md)*/MIN_MD_SIZE);
142
143         if (have_secctx) {
144                 char *secctx_name;
145
146                 secctx_name = req_capsule_client_get(&pill,
147                                                      &RMF_FILE_SECCTX_NAME);
148                 memcpy(secctx_name, op_data->op_file_secctx_name,
149                        op_data->op_file_secctx_name_size);
150
151                 req_capsule_set_size(&pill, &RMF_FILE_SECCTX,
152                                      RCL_SERVER, easize);
153
154                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
155                        op_data->op_file_secctx_name_size,
156                        op_data->op_file_secctx_name);
157         } else {
158                 req_capsule_set_size(&pill, &RMF_FILE_SECCTX, RCL_SERVER, 0);
159         }
160
161         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
162                 req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
163                                      RCL_SERVER, easize);
164         else
165                 req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
166                                      RCL_SERVER, 0);
167
168         req_capsule_set_replen(&pill);
169         reqmsg->lm_opc = BUT_GETATTR;
170         *max_pack_size = size;
171         RETURN(rc);
172 }
173
174 static md_update_pack_t mdc_update_packers[MD_OP_MAX] = {
175         [MD_OP_GETATTR] = mdc_batch_getattr_pack,
176 };
177
178 static int mdc_batch_getattr_interpret(struct ptlrpc_request *req,
179                                        struct lustre_msg *repmsg,
180                                        struct object_update_callback *ouc,
181                                        int rc)
182 {
183         struct md_op_item *item = (struct md_op_item *)ouc->ouc_data;
184         struct ldlm_enqueue_info *einfo = &item->mop_einfo;
185         struct batch_update_head *head = ouc->ouc_head;
186         struct obd_export *exp = head->buh_exp;
187         struct req_capsule *pill = item->mop_pill;
188         struct ldlm_reply *lockrep;
189
190         req_capsule_subreq_init(pill, &RQF_BUT_GETATTR, req,
191                                 NULL, repmsg, RCL_CLIENT);
192
193         rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &item->mop_lock_flags,
194                                    NULL, 0, &item->mop_lockh, rc, false);
195         if (rc)
196                 GOTO(out, rc);
197
198         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
199         LASSERT(lockrep != NULL);
200
201         lockrep->lock_policy_res2 =
202                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
203
204         rc = mdc_finish_enqueue(exp, pill, einfo, &item->mop_it,
205                                 &item->mop_lockh, rc);
206 out:
207         return item->mop_cb(item, rc);
208 }
209
210 object_update_interpret_t mdc_update_interpreters[MD_OP_MAX] = {
211         [MD_OP_GETATTR] = mdc_batch_getattr_interpret,
212 };
213
214 int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
215                   struct md_op_item *item)
216 {
217         enum md_item_opcode opc = item->mop_opc;
218
219         ENTRY;
220
221         if (opc >= MD_OP_MAX || mdc_update_packers[opc] == NULL ||
222             mdc_update_interpreters[opc] == NULL) {
223                 CERROR("%s: unexpected opcode %d\n",
224                        exp->exp_obd->obd_name, opc);
225                 RETURN(-EFAULT);
226         }
227
228         OBD_ALLOC_PTR(item->mop_pill);
229         if (item->mop_pill == NULL)
230                 RETURN(-ENOMEM);
231
232         item->mop_subpill_allocated = 1;
233         RETURN(cli_batch_add(exp, bh, item, mdc_update_packers[opc],
234                              mdc_update_interpreters[opc]));
235 }