4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
120 LASSERTF(old_inode->i_state & I_FREEING,
121 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
122 old_inode, old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (b=5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways...
219 int mdc_save_lovea(struct ptlrpc_request *req,
220 const struct req_msg_field *field, void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
223 struct lov_user_md *lmm;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
242 /* overwrite layout generation returned from the MDS */
243 lmm->lmm_stripe_offset =
244 (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252 struct md_op_data *op_data, __u32 acl_bufsize)
254 struct ptlrpc_request *req;
255 struct obd_device *obd = class_exp2obd(exp);
256 struct ldlm_intent *lit;
257 const void *lmm = op_data->op_data;
258 __u32 lmmsize = op_data->op_data_size;
259 __u32 mdt_md_capsule_size;
263 int repsize, repsize_estimate;
268 mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
270 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272 /* XXX: openlock is not cancelled for cross-refs. */
273 /* If inode is known, cancel conflicting OPEN locks. */
274 if (fid_is_sane(&op_data->op_fid2)) {
275 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276 if (it->it_flags & MDS_FMODE_WRITE)
281 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
284 else if (it->it_flags & FMODE_EXEC)
290 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295 /* If CREATE, cancel parent's UPDATE lock. */
296 if (it->it_op & IT_CREAT)
300 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302 MDS_INODELOCK_UPDATE);
304 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305 &RQF_LDLM_INTENT_OPEN);
307 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308 RETURN(ERR_PTR(-ENOMEM));
311 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312 op_data->op_namelen + 1);
313 if (cl_is_lov_delay_create(it->it_flags)) {
314 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
315 LASSERT(lmmsize == 0);
316 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
318 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
319 max(lmmsize, obd->u.cli.cl_default_mds_easize));
322 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
323 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
324 op_data->op_file_secctx_name_size : 0);
326 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
327 op_data->op_file_secctx_size);
329 /* get SELinux policy info if any */
330 rc = sptlrpc_get_sepol(req);
332 ptlrpc_request_free(req);
335 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336 strlen(req->rq_sepol) ?
337 strlen(req->rq_sepol) + 1 : 0);
339 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341 ptlrpc_request_free(req);
345 spin_lock(&req->rq_lock);
346 req->rq_replay = req->rq_import->imp_replayable;
347 spin_unlock(&req->rq_lock);
349 /* pack the intent */
350 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351 lit->opc = (__u64)it->it_op;
353 /* pack the intended request */
354 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
357 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358 mdt_md_capsule_size);
359 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364 op_data->op_file_secctx_name_size > 0 &&
365 op_data->op_file_secctx_name != NULL) {
368 secctx_name = req_capsule_client_get(&req->rq_pill,
369 &RMF_FILE_SECCTX_NAME);
370 memcpy(secctx_name, op_data->op_file_secctx_name,
371 op_data->op_file_secctx_name_size);
372 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374 obd->u.cli.cl_max_mds_easize);
376 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377 op_data->op_file_secctx_name_size,
378 op_data->op_file_secctx_name);
381 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
386 * Inline buffer for possible data from Data-on-MDT files.
388 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389 sizeof(struct niobuf_remote));
390 ptlrpc_request_set_replen(req);
392 /* Get real repbuf allocated size as rounded up power of 2 */
393 repsize = size_roundup_power2(req->rq_replen +
394 lustre_msg_early_size());
395 /* Estimate free space for DoM files in repbuf */
396 repsize_estimate = repsize - (req->rq_replen -
397 mdt_md_capsule_size +
398 sizeof(struct lov_comp_md_v1) +
399 sizeof(struct lov_comp_md_entry_v1) +
400 lov_mds_md_size(0, LOV_MAGIC_V3));
402 if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
403 repsize = obd->u.cli.cl_dom_min_inline_repsize -
404 repsize_estimate + sizeof(struct niobuf_remote);
405 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
407 sizeof(struct niobuf_remote) + repsize);
408 ptlrpc_request_set_replen(req);
409 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
410 repsize, req->rq_replen);
411 repsize = size_roundup_power2(req->rq_replen +
412 lustre_msg_early_size());
414 /* The only way to report real allocated repbuf size to the server
415 * is the lm_repsize but it must be set prior buffer allocation itself
416 * due to security reasons - it is part of buffer used in signature
417 * calculation (see LU-11414). Therefore the saved size is predicted
418 * value as rq_replen rounded to the next higher power of 2.
419 * Such estimation is safe. Though the final allocated buffer might
420 * be even larger, it is not possible to know that at this point.
422 req->rq_reqmsg->lm_repsize = repsize;
426 #define GA_DEFAULT_EA_NAME_LEN 20
427 #define GA_DEFAULT_EA_VAL_LEN 250
428 #define GA_DEFAULT_EA_NUM 10
430 static struct ptlrpc_request *
431 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
432 struct md_op_data *op_data)
434 struct ptlrpc_request *req;
435 struct ldlm_intent *lit;
438 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
441 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
442 &RQF_LDLM_INTENT_GETXATTR);
444 RETURN(ERR_PTR(-ENOMEM));
446 /* get SELinux policy info if any */
447 rc = sptlrpc_get_sepol(req);
449 ptlrpc_request_free(req);
452 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
453 strlen(req->rq_sepol) ?
454 strlen(req->rq_sepol) + 1 : 0);
456 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
458 ptlrpc_request_free(req);
462 /* pack the intent */
463 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464 lit->opc = IT_GETXATTR;
465 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
466 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
468 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
469 /* If the supplied buffer is too small then the server will return
470 * -ERANGE and llite will fallback to using non cached xattr
471 * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
472 * for an orphan or dead file causes an oops. So let's try to avoid
473 * sending too small a buffer to too old a server. This is effectively
474 * undoing the memory conservation of LU-9417 when it would be *more*
475 * likely to crash the server. See LU-9856.
477 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
478 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
479 exp->exp_connect_data.ocd_max_easize);
482 /* pack the intended request */
483 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
484 ea_vals_buf_size, -1, 0);
486 /* get SELinux policy info if any */
487 mdc_file_sepol_pack(req);
489 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
490 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
492 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
495 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
496 sizeof(u32) * GA_DEFAULT_EA_NUM);
498 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
500 ptlrpc_request_set_replen(req);
505 static struct ptlrpc_request *
506 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
507 struct md_op_data *op_data, __u32 acl_bufsize)
509 struct ptlrpc_request *req;
510 struct obd_device *obd = class_exp2obd(exp);
511 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
512 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
514 struct ldlm_intent *lit;
516 bool have_secctx = false;
520 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
521 &RQF_LDLM_INTENT_GETATTR);
523 RETURN(ERR_PTR(-ENOMEM));
525 /* send name of security xattr to get upon intent */
526 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
527 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
529 op_data->op_file_secctx_name_size > 0 &&
530 op_data->op_file_secctx_name != NULL) {
532 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
534 op_data->op_file_secctx_name_size);
537 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
538 op_data->op_namelen + 1);
540 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
542 ptlrpc_request_free(req);
546 /* pack the intent */
547 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
548 lit->opc = (__u64)it->it_op;
550 easize = obd->u.cli.cl_default_mds_easize;
552 /* pack the intended request */
553 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
555 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
556 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
557 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
558 sizeof(struct lmv_user_md));
563 secctx_name = req_capsule_client_get(&req->rq_pill,
564 &RMF_FILE_SECCTX_NAME);
565 memcpy(secctx_name, op_data->op_file_secctx_name,
566 op_data->op_file_secctx_name_size);
568 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
571 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
572 op_data->op_file_secctx_name_size,
573 op_data->op_file_secctx_name);
575 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
579 ptlrpc_request_set_replen(req);
583 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
584 struct lookup_intent *it,
585 struct md_op_data *op_data)
587 struct obd_device *obd = class_exp2obd(exp);
588 struct ptlrpc_request *req;
589 struct ldlm_intent *lit;
590 struct layout_intent *layout;
595 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
596 &RQF_LDLM_INTENT_LAYOUT);
598 RETURN(ERR_PTR(-ENOMEM));
600 if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
601 (it->it_flags & FMODE_WRITE)) {
602 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
604 MDS_INODELOCK_LAYOUT);
607 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
608 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
610 ptlrpc_request_free(req);
614 /* pack the intent */
615 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
616 lit->opc = (__u64)it->it_op;
618 /* pack the layout intent request */
619 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
620 LASSERT(op_data->op_data != NULL);
621 LASSERT(op_data->op_data_size == sizeof(*layout));
622 memcpy(layout, op_data->op_data, sizeof(*layout));
624 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
625 obd->u.cli.cl_default_mds_easize);
626 ptlrpc_request_set_replen(req);
630 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
633 struct ptlrpc_request *req;
637 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
639 RETURN(ERR_PTR(-ENOMEM));
641 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
643 ptlrpc_request_free(req);
647 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
648 ptlrpc_request_set_replen(req);
652 static int mdc_finish_enqueue(struct obd_export *exp,
653 struct ptlrpc_request *req,
654 struct ldlm_enqueue_info *einfo,
655 struct lookup_intent *it,
656 struct lustre_handle *lockh, int rc)
658 struct req_capsule *pill = &req->rq_pill;
659 struct ldlm_request *lockreq;
660 struct ldlm_reply *lockrep;
661 struct ldlm_lock *lock;
662 struct mdt_body *body = NULL;
663 void *lvb_data = NULL;
668 /* Similarly, if we're going to replay this request, we don't want to
669 * actually get a lock, just perform the intent.
671 if (req->rq_transno || req->rq_replay) {
672 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
673 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
676 if (rc == ELDLM_LOCK_ABORTED) {
678 memset(lockh, 0, sizeof(*lockh));
680 } else { /* rc = 0 */
681 lock = ldlm_handle2lock(lockh);
682 LASSERT(lock != NULL);
684 /* If server returned a different lock mode, fix up variables */
685 if (lock->l_req_mode != einfo->ei_mode) {
686 ldlm_lock_addref(lockh, lock->l_req_mode);
687 ldlm_lock_decref(lockh, einfo->ei_mode);
688 einfo->ei_mode = lock->l_req_mode;
693 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
694 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
696 it->it_disposition = (int)lockrep->lock_policy_res1;
697 it->it_status = (int)lockrep->lock_policy_res2;
698 it->it_lock_mode = einfo->ei_mode;
699 it->it_lock_handle = lockh->cookie;
700 it->it_request = req;
702 /* Technically speaking rq_transno must already be zero if
703 * it_status is in error, so the check is a bit redundant.
705 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
706 mdc_clear_replay_flag(req, it->it_status);
708 /* If we're doing an IT_OPEN which did not result in an actual
709 * successful open, then we need to remove the bit which saves
710 * this request for unconditional replay.
712 * It's important that we do this first! Otherwise we might exit the
713 * function without doing so, and try to replay a failed create.
716 if (it->it_op & IT_OPEN && req->rq_replay &&
717 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
718 mdc_clear_replay_flag(req, it->it_status);
720 DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
721 it->it_op, it->it_disposition, it->it_status);
723 /* We know what to expect, so we do any byte flipping required here */
724 if (it_has_reply_body(it)) {
725 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
728 CERROR("%s: cannot swab mdt_body: rc = %d\n",
729 exp->exp_obd->obd_name, rc);
733 if (it_disposition(it, DISP_OPEN_OPEN) &&
734 !it_open_error(DISP_OPEN_OPEN, it)) {
736 * If this is a successful OPEN request, we need to set
737 * replay handler and data early, so that if replay
738 * happens immediately after swabbing below, new reply
739 * is swabbed by that handler correctly.
741 mdc_set_open_replay_data(NULL, NULL, it);
744 if (it_disposition(it, DISP_OPEN_CREATE) &&
745 !it_open_error(DISP_OPEN_CREATE, it)) {
746 lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
750 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
753 mdc_update_max_ea_from_body(exp, body);
756 * The eadata is opaque; just check that it is there.
757 * Eventually, obd_unpackmd() will check the contents.
759 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
760 body->mbo_eadatasize);
764 /* save LVB data and length if for layout lock */
766 lvb_len = body->mbo_eadatasize;
769 * We save the reply LOV EA in case we have to replay a
770 * create for recovery. If we didn't allocate a large
771 * enough request buffer above we need to reallocate it
772 * here to hold the actual LOV EA.
774 * To not save LOV EA if request is not going to replay
775 * (for example error one).
777 if ((it->it_op & IT_OPEN) && req->rq_replay) {
778 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
779 body->mbo_eadatasize);
781 body->mbo_valid &= ~OBD_MD_FLEASIZE;
782 body->mbo_eadatasize = 0;
787 } else if (it->it_op & IT_LAYOUT) {
788 /* maybe the lock was granted right away and layout
789 * is packed into RMF_DLM_LVB of req
791 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
792 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
793 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
795 lvb_data = req_capsule_server_sized_get(pill,
796 &RMF_DLM_LVB, lvb_len);
797 if (lvb_data == NULL)
801 * save replied layout data to the request buffer for
802 * recovery consideration (lest MDS reinitialize
803 * another set of OST objects).
806 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
811 /* fill in stripe data for layout lock.
812 * LU-6581: trust layout data only if layout lock is granted. The MDT
813 * has stopped sending layout unless the layout lock is granted. The
814 * client still does this checking in case it's talking with an old
817 lock = ldlm_handle2lock(lockh);
821 if (ldlm_has_layout(lock) && lvb_data != NULL &&
822 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
825 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
826 ldlm_it2str(it->it_op), lvb_len);
828 OBD_ALLOC_LARGE(lmm, lvb_len);
830 GOTO(out_lock, rc = -ENOMEM);
832 memcpy(lmm, lvb_data, lvb_len);
834 /* install lvb_data */
835 lock_res_and_lock(lock);
836 if (lock->l_lvb_data == NULL) {
837 lock->l_lvb_type = LVB_T_LAYOUT;
838 lock->l_lvb_data = lmm;
839 lock->l_lvb_len = lvb_len;
842 unlock_res_and_lock(lock);
844 OBD_FREE_LARGE(lmm, lvb_len);
847 if (ldlm_has_dom(lock)) {
848 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
850 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
851 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
852 LDLM_ERROR(lock, "%s: DoM lock without size.",
853 exp->exp_obd->obd_name);
854 GOTO(out_lock, rc = -EPROTO);
857 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
858 ldlm_it2str(it->it_op), body->mbo_dom_size);
860 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
868 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
871 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
872 it->it_op == IT_READDIR ||
873 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
878 /* We always reserve enough space in the reply packet for a stripe MD, because
879 * we don't know in advance the file type.
881 static int mdc_enqueue_base(struct obd_export *exp,
882 struct ldlm_enqueue_info *einfo,
883 const union ldlm_policy_data *policy,
884 struct lookup_intent *it,
885 struct md_op_data *op_data,
886 struct lustre_handle *lockh,
887 __u64 extra_lock_flags)
889 struct obd_device *obd = class_exp2obd(exp);
890 struct ptlrpc_request *req;
891 __u64 flags, saved_flags = extra_lock_flags;
892 struct ldlm_res_id res_id;
893 static const union ldlm_policy_data lookup_policy = {
894 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
895 static const union ldlm_policy_data update_policy = {
896 .l_inodebits = { MDS_INODELOCK_UPDATE } };
897 static const union ldlm_policy_data layout_policy = {
898 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
899 static const union ldlm_policy_data getxattr_policy = {
900 .l_inodebits = { MDS_INODELOCK_XATTR } };
901 int generation, resends = 0;
902 struct ldlm_reply *lockrep;
903 struct obd_import *imp = class_exp2cliimp(exp);
905 enum lvb_type lvb_type = 0;
909 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
911 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
914 LASSERT(policy == NULL);
916 saved_flags |= LDLM_FL_HAS_INTENT;
917 if (it->it_op & (IT_GETATTR | IT_READDIR))
918 policy = &update_policy;
919 else if (it->it_op & IT_LAYOUT)
920 policy = &layout_policy;
921 else if (it->it_op & IT_GETXATTR)
922 policy = &getxattr_policy;
924 policy = &lookup_policy;
927 generation = obd->u.cli.cl_import->imp_generation;
928 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
929 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
932 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
937 /* The only way right now is FLOCK. */
938 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
940 res_id.name[3] = LDLM_FLOCK;
941 req = ldlm_enqueue_pack(exp, 0);
942 } else if (it->it_op & IT_OPEN) {
943 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
944 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
945 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
946 } else if (it->it_op & IT_READDIR) {
947 req = mdc_enqueue_pack(exp, 0);
948 } else if (it->it_op & IT_LAYOUT) {
949 if (!imp_connect_lvb_type(imp))
951 req = mdc_intent_layout_pack(exp, it, op_data);
952 lvb_type = LVB_T_LAYOUT;
953 } else if (it->it_op & IT_GETXATTR) {
954 req = mdc_intent_getxattr_pack(exp, it, op_data);
961 RETURN(PTR_ERR(req));
964 req->rq_generation_set = 1;
965 req->rq_import_generation = generation;
966 req->rq_sent = ktime_get_real_seconds() + resends;
969 einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
971 /* With Data-on-MDT the glimpse callback is needed too.
972 * It is set here in advance but not in mdc_finish_enqueue()
973 * to avoid possible races. It is safe to have glimpse handler
974 * for non-DOM locks and costs nothing.
976 if (einfo->ei_cb_gl == NULL)
977 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
979 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
980 0, lvb_type, lockh, 0);
983 /* For flock requests we immediatelly return without further
984 * delay and let caller deal with the rest, since rest of
985 * this function metadata processing makes no sense for flock
986 * requests anyway. But in case of problem during comms with
987 * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
988 * we cannot rely on caller and this mainly for F_UNLCKs
989 * (explicits or automatically generated by kernel to clean
990 * current flocks upon exit) that can't be trashed.
992 ptlrpc_req_finished(req);
993 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
994 (einfo->ei_type == LDLM_FLOCK) &&
995 (einfo->ei_mode == LCK_NL))
1002 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1003 obd->obd_name, PFID(&op_data->op_fid1),
1004 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1006 mdc_clear_replay_flag(req, rc);
1007 ptlrpc_req_finished(req);
1011 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1012 LASSERT(lockrep != NULL);
1014 lockrep->lock_policy_res2 =
1015 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1017 /* Retry infinitely when the server returns -EINPROGRESS for the
1018 * intent operation, when server returns -EINPROGRESS for acquiring
1019 * intent lock, we'll retry in after_reply().
1021 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1022 mdc_clear_replay_flag(req, rc);
1023 ptlrpc_req_finished(req);
1024 if (generation == obd->u.cli.cl_import->imp_generation) {
1025 if (signal_pending(current))
1029 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1030 obd->obd_name, resends, it->it_op,
1031 PFID(&op_data->op_fid1),
1032 PFID(&op_data->op_fid2));
1035 CDEBUG(D_HA, "resend cross eviction\n");
1040 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1041 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1042 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1043 mdc_clear_replay_flag(req, -ERANGE);
1044 ptlrpc_req_finished(req);
1045 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1050 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1052 if (lustre_handle_is_used(lockh)) {
1053 ldlm_lock_decref(lockh, einfo->ei_mode);
1054 memset(lockh, 0, sizeof(*lockh));
1056 ptlrpc_req_finished(req);
1058 it->it_lock_handle = 0;
1059 it->it_lock_mode = 0;
1060 it->it_request = NULL;
1066 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1067 const union ldlm_policy_data *policy,
1068 struct md_op_data *op_data,
1069 struct lustre_handle *lockh, __u64 extra_lock_flags)
1071 return mdc_enqueue_base(exp, einfo, policy, NULL,
1072 op_data, lockh, extra_lock_flags);
1075 static int mdc_finish_intent_lock(struct obd_export *exp,
1076 struct ptlrpc_request *request,
1077 struct md_op_data *op_data,
1078 struct lookup_intent *it,
1079 struct lustre_handle *lockh)
1081 struct lustre_handle old_lock;
1082 struct ldlm_lock *lock;
1086 LASSERT(request != NULL);
1087 LASSERT(request != LP_POISON);
1088 LASSERT(request->rq_repmsg != LP_POISON);
1090 if (it->it_op & IT_READDIR)
1093 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1094 if (it->it_status != 0)
1095 GOTO(out, rc = it->it_status);
1097 if (!it_disposition(it, DISP_IT_EXECD)) {
1098 /* The server failed before it even started executing
1099 * the intent, i.e. because it couldn't unpack the
1102 LASSERT(it->it_status != 0);
1103 GOTO(out, rc = it->it_status);
1105 rc = it_open_error(DISP_IT_EXECD, it);
1109 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1113 /* keep requests around for the multiple phases of the call
1114 * this shows the DISP_XX must guarantee we make it into the
1117 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1118 it_disposition(it, DISP_OPEN_CREATE) &&
1119 !it_open_error(DISP_OPEN_CREATE, it)) {
1120 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1121 /* balanced in ll_create_node */
1122 ptlrpc_request_addref(request);
1124 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1125 it_disposition(it, DISP_OPEN_OPEN) &&
1126 !it_open_error(DISP_OPEN_OPEN, it)) {
1127 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1128 /* balanced in ll_file_open */
1129 ptlrpc_request_addref(request);
1130 /* eviction in middle of open RPC processing b=11546 */
1131 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1135 if (it->it_op & IT_CREAT) {
1136 /* XXX this belongs in ll_create_it */
1137 } else if (it->it_op == IT_OPEN) {
1138 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1140 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1144 /* If we already have a matching lock, then cancel the new
1145 * one. We have to set the data here instead of in
1146 * mdc_enqueue, because we need to use the child's inode as
1147 * the l_ast_data to match, and that's not available until
1148 * intent_finish has performed the iget().
1150 lock = ldlm_handle2lock(lockh);
1152 union ldlm_policy_data policy = lock->l_policy_data;
1154 LDLM_DEBUG(lock, "matching against this");
1156 if (it_has_reply_body(it)) {
1157 struct mdt_body *body;
1159 body = req_capsule_server_get(&request->rq_pill,
1161 /* mdc_enqueue checked */
1162 LASSERT(body != NULL);
1163 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1164 &lock->l_resource->lr_name),
1165 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1166 PLDLMRES(lock->l_resource),
1167 PFID(&body->mbo_fid1));
1169 LDLM_LOCK_PUT(lock);
1171 memcpy(&old_lock, lockh, sizeof(*lockh));
1172 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1173 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1174 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1175 memcpy(lockh, &old_lock, sizeof(old_lock));
1176 it->it_lock_handle = lockh->cookie;
1183 "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1184 (int)op_data->op_namelen, op_data->op_name,
1185 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1190 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1191 struct lu_fid *fid, __u64 *bits)
1193 /* We could just return 1 immediately, but as we should only be called
1194 * in revalidate_it if we already have a lock, let's verify that.
1196 struct ldlm_res_id res_id;
1197 struct lustre_handle lockh;
1198 union ldlm_policy_data policy;
1199 enum ldlm_mode mode;
1202 if (it->it_lock_handle) {
1203 lockh.cookie = it->it_lock_handle;
1204 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1206 fid_build_reg_res_name(fid, &res_id);
1207 switch (it->it_op) {
1209 /* File attributes are held under multiple bits:
1210 * nlink is under lookup lock, size and times are
1211 * under UPDATE lock and recently we've also got
1212 * a separate permissions lock for owner/group/acl that
1213 * were protected by lookup lock before.
1214 * Getattr must provide all of that information,
1215 * so we need to ensure we have all of those locks.
1216 * Unfortunately, if the bits are split across multiple
1217 * locks, there's no easy way to match all of them here,
1218 * so an extra RPC would be performed to fetch all
1219 * of those bits at once for now.
1221 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1222 * but for old MDTs (< 2.4), permission is covered
1223 * by LOOKUP lock, so it needs to match all bits here.
1225 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1226 MDS_INODELOCK_LOOKUP |
1230 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1233 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1236 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1240 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1241 LDLM_IBITS, &policy,
1242 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1247 it->it_lock_handle = lockh.cookie;
1248 it->it_lock_mode = mode;
1250 it->it_lock_handle = 0;
1251 it->it_lock_mode = 0;
1258 * This long block is all about fixing up the lock and request state
1259 * so that it is correct as of the moment _before_ the operation was
1260 * applied; that way, the VFS will think that everything is normal and
1261 * call Lustre's regular VFS methods.
1263 * If we're performing a creation, that means that unless the creation
1264 * failed with EEXIST, we should fake up a negative dentry.
1266 * For everything else, we want to lookup to succeed.
1268 * One additional note: if CREATE or OPEN succeeded, we add an extra
1269 * reference to the request because we need to keep it around until
1270 * ll_create/ll_open gets called.
1272 * The server will return to us, in it_disposition, an indication of
1273 * exactly what it_status refers to.
1275 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1276 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1277 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1278 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1281 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1284 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1285 struct lookup_intent *it, struct ptlrpc_request **reqp,
1286 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1288 struct ldlm_enqueue_info einfo = {
1289 .ei_type = LDLM_IBITS,
1290 .ei_mode = it_to_lock_mode(it),
1291 .ei_cb_bl = cb_blocking,
1292 .ei_cb_cp = ldlm_completion_ast,
1293 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1295 struct lustre_handle lockh;
1300 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1301 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1302 op_data->op_name, PFID(&op_data->op_fid2),
1303 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1307 if (fid_is_sane(&op_data->op_fid2) &&
1308 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1309 /* We could just return 1 immediately, but since we should only
1310 * be called in revalidate_it if we already have a lock, let's
1313 it->it_lock_handle = 0;
1314 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1315 /* Only return failure if it was not GETATTR by cfid
1316 * (from inode_revalidate()).
1318 if (rc || op_data->op_namelen != 0)
1322 /* For case if upper layer did not alloc fid, do it now. */
1323 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1324 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1326 CERROR("%s: cannot allocate new FID: rc=%d\n",
1327 exp->exp_obd->obd_name, rc);
1332 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1337 *reqp = it->it_request;
1338 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1342 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1343 struct ptlrpc_request *req,
1346 struct mdc_getattr_args *ga = args;
1347 struct obd_export *exp = ga->ga_exp;
1348 struct md_enqueue_info *minfo = ga->ga_minfo;
1349 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1350 struct lookup_intent *it = &minfo->mi_it;
1351 struct lustre_handle *lockh = &minfo->mi_lockh;
1352 struct ldlm_reply *lockrep;
1353 __u64 flags = LDLM_FL_HAS_INTENT;
1356 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1359 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1360 &flags, NULL, 0, lockh, rc);
1362 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1363 exp->exp_obd->obd_name, rc);
1364 mdc_clear_replay_flag(req, rc);
1368 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1369 LASSERT(lockrep != NULL);
1371 lockrep->lock_policy_res2 =
1372 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1374 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1378 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1382 minfo->mi_cb(req, minfo, rc);
1386 int mdc_intent_getattr_async(struct obd_export *exp,
1387 struct md_enqueue_info *minfo)
1389 struct md_op_data *op_data = &minfo->mi_data;
1390 struct lookup_intent *it = &minfo->mi_it;
1391 struct ptlrpc_request *req;
1392 struct mdc_getattr_args *ga;
1393 struct ldlm_res_id res_id;
1394 union ldlm_policy_data policy = {
1395 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1397 __u64 flags = LDLM_FL_HAS_INTENT;
1402 "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1403 (int)op_data->op_namelen, op_data->op_name,
1404 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1406 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1407 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1408 * of the async getattr RPC will handle that by itself.
1410 req = mdc_intent_getattr_pack(exp, it, op_data,
1411 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1413 RETURN(PTR_ERR(req));
1415 /* With Data-on-MDT the glimpse callback is needed too.
1416 * It is set here in advance but not in mdc_finish_enqueue()
1417 * to avoid possible races. It is safe to have glimpse handler
1418 * for non-DOM locks and costs nothing.
1420 if (minfo->mi_einfo.ei_cb_gl == NULL)
1421 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1423 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1424 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1426 ptlrpc_req_finished(req);
1430 ga = ptlrpc_req_async_args(ga, req);
1432 ga->ga_minfo = minfo;
1434 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1435 ptlrpcd_add_req(req);