4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2020, 2022, DDN Storage Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
29 * lustre/mdc/mdc_batch.c
31 * Batch Metadata Updating on the client (MDC)
33 * Author: Qian Yingjin <qian@ddn.com>
36 #define DEBUG_SUBSYSTEM S_MDC
38 #include <linux/module.h>
39 #include <lustre_acl.h>
41 #include "mdc_internal.h"
43 static int mdc_ldlm_lock_pack(struct obd_export *exp,
44 struct req_capsule *pill,
45 union ldlm_policy_data *policy,
46 struct lu_fid *fid, struct md_op_item *item)
48 struct ldlm_request *dlmreq;
49 struct ldlm_res_id res_id;
50 struct ldlm_enqueue_info *einfo = &item->mop_einfo;
55 dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
57 RETURN(PTR_ERR(dlmreq));
59 /* With Data-on-MDT the glimpse callback is needed too.
60 * It is set here in advance but not in mdc_finish_enqueue()
61 * to avoid possible races. It is safe to have glimpse handler
62 * for non-DOM locks and costs nothing.
64 if (einfo->ei_cb_gl == NULL)
65 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
67 fid_build_reg_res_name(fid, &res_id);
68 rc = ldlm_cli_lock_create_pack(exp, dlmreq, einfo, &res_id,
69 policy, &item->mop_lock_flags,
70 NULL, 0, LVB_T_NONE, &item->mop_lockh);
75 static int mdc_batch_getattr_pack(struct batch_update_head *head,
76 struct lustre_msg *reqmsg,
77 size_t *max_pack_size,
78 struct md_op_item *item)
80 struct obd_export *exp = head->buh_exp;
81 struct lookup_intent *it = &item->mop_it;
82 struct md_op_data *op_data = &item->mop_data;
83 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
84 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
86 union ldlm_policy_data policy = {
87 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
89 struct ldlm_intent *lit;
90 bool have_secctx = false;
91 struct req_capsule pill;
98 req_capsule_subreq_init(&pill, &RQF_BUT_GETATTR, NULL,
99 reqmsg, NULL, RCL_CLIENT);
101 /* send name of security xattr to get upon intent */
102 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
103 req_capsule_has_field(&pill, &RMF_FILE_SECCTX_NAME,
105 op_data->op_file_secctx_name_size > 0 &&
106 op_data->op_file_secctx_name != NULL) {
108 req_capsule_set_size(&pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT,
109 op_data->op_file_secctx_name_size);
112 req_capsule_set_size(&pill, &RMF_NAME, RCL_CLIENT,
113 op_data->op_namelen + 1);
115 size = req_capsule_msg_size(&pill, RCL_CLIENT);
116 if (unlikely(size >= *max_pack_size)) {
117 *max_pack_size = size;
121 req_capsule_client_pack(&pill);
122 /* pack the intent */
123 lit = req_capsule_client_get(&pill, &RMF_LDLM_INTENT);
124 lit->opc = (__u64)it->it_op;
126 easize = MAX_MD_SIZE_OLD; /* obd->u.cli.cl_default_mds_easize; */
128 /* pack the intended request */
129 mdc_getattr_pack(&pill, valid, it->it_flags, op_data, easize);
131 item->mop_lock_flags |= LDLM_FL_HAS_INTENT;
132 rc = mdc_ldlm_lock_pack(head->buh_exp, &pill, &policy,
133 &item->mop_data.op_fid1, item);
137 req_capsule_set_size(&pill, &RMF_MDT_MD, RCL_SERVER, easize);
138 req_capsule_set_size(&pill, &RMF_ACL, RCL_SERVER,
139 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
140 req_capsule_set_size(&pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
141 /*sizeof(struct lmv_user_md)*/MIN_MD_SIZE);
146 secctx_name = req_capsule_client_get(&pill,
147 &RMF_FILE_SECCTX_NAME);
148 memcpy(secctx_name, op_data->op_file_secctx_name,
149 op_data->op_file_secctx_name_size);
151 req_capsule_set_size(&pill, &RMF_FILE_SECCTX,
154 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
155 op_data->op_file_secctx_name_size,
156 op_data->op_file_secctx_name);
158 req_capsule_set_size(&pill, &RMF_FILE_SECCTX, RCL_SERVER, 0);
161 if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
162 req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
165 req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
168 req_capsule_set_replen(&pill);
169 reqmsg->lm_opc = BUT_GETATTR;
170 *max_pack_size = size;
174 static md_update_pack_t mdc_update_packers[MD_OP_MAX] = {
175 [MD_OP_GETATTR] = mdc_batch_getattr_pack,
178 static int mdc_batch_getattr_interpret(struct ptlrpc_request *req,
179 struct lustre_msg *repmsg,
180 struct object_update_callback *ouc,
183 struct md_op_item *item = (struct md_op_item *)ouc->ouc_data;
184 struct ldlm_enqueue_info *einfo = &item->mop_einfo;
185 struct batch_update_head *head = ouc->ouc_head;
186 struct obd_export *exp = head->buh_exp;
187 struct req_capsule *pill = item->mop_pill;
189 req_capsule_subreq_init(pill, &RQF_BUT_GETATTR, req,
190 NULL, repmsg, RCL_CLIENT);
192 rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &item->mop_lock_flags,
193 NULL, 0, &item->mop_lockh, rc, false);
197 rc = mdc_finish_enqueue(exp, pill, einfo, &item->mop_it,
198 &item->mop_lockh, rc);
200 return item->mop_cb(item, rc);
203 object_update_interpret_t mdc_update_interpreters[MD_OP_MAX] = {
204 [MD_OP_GETATTR] = mdc_batch_getattr_interpret,
207 int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
208 struct md_op_item *item)
210 enum md_item_opcode opc = item->mop_opc;
214 if (opc >= MD_OP_MAX || mdc_update_packers[opc] == NULL ||
215 mdc_update_interpreters[opc] == NULL) {
216 CERROR("%s: unexpected opcode %d\n",
217 exp->exp_obd->obd_name, opc);
221 OBD_ALLOC_PTR(item->mop_pill);
222 if (item->mop_pill == NULL)
225 item->mop_subpill_allocated = 1;
226 RETURN(cli_batch_add(exp, bh, item, mdc_update_packers[opc],
227 mdc_update_interpreters[opc]));