4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
223 struct lov_user_md *lmm;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
242 /* overwrite layout generation returned from the MDS */
243 lmm->lmm_stripe_offset =
244 (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252 struct md_op_data *op_data, __u32 acl_bufsize)
254 struct ptlrpc_request *req;
255 struct obd_device *obddev = class_exp2obd(exp);
256 struct ldlm_intent *lit;
257 const void *lmm = op_data->op_data;
258 __u32 lmmsize = op_data->op_data_size;
259 __u32 mdt_md_capsule_size;
260 struct list_head cancels = LIST_HEAD_INIT(cancels);
264 int repsize, repsize_estimate;
268 mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize;
270 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272 /* XXX: openlock is not cancelled for cross-refs. */
273 /* If inode is known, cancel conflicting OPEN locks. */
274 if (fid_is_sane(&op_data->op_fid2)) {
275 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276 if (it->it_flags & MDS_FMODE_WRITE)
281 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
284 else if (it->it_flags & FMODE_EXEC)
290 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295 /* If CREATE, cancel parent's UPDATE lock. */
296 if (it->it_op & IT_CREAT)
300 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302 MDS_INODELOCK_UPDATE);
304 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305 &RQF_LDLM_INTENT_OPEN);
307 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308 RETURN(ERR_PTR(-ENOMEM));
311 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312 op_data->op_namelen + 1);
313 if (cl_is_lov_delay_create(it->it_flags)) {
314 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
315 LASSERT(lmmsize == 0);
316 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
318 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
319 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
322 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
323 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
324 op_data->op_file_secctx_name_size : 0);
326 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
327 op_data->op_file_secctx_size);
329 /* get SELinux policy info if any */
330 rc = sptlrpc_get_sepol(req);
332 ptlrpc_request_free(req);
335 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336 strlen(req->rq_sepol) ?
337 strlen(req->rq_sepol) + 1 : 0);
339 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341 ptlrpc_request_free(req);
345 spin_lock(&req->rq_lock);
346 req->rq_replay = req->rq_import->imp_replayable;
347 spin_unlock(&req->rq_lock);
349 /* pack the intent */
350 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351 lit->opc = (__u64)it->it_op;
353 /* pack the intended request */
354 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
357 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358 mdt_md_capsule_size);
359 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364 op_data->op_file_secctx_name_size > 0 &&
365 op_data->op_file_secctx_name != NULL) {
368 secctx_name = req_capsule_client_get(&req->rq_pill,
369 &RMF_FILE_SECCTX_NAME);
370 memcpy(secctx_name, op_data->op_file_secctx_name,
371 op_data->op_file_secctx_name_size);
372 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374 obddev->u.cli.cl_max_mds_easize);
376 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377 op_data->op_file_secctx_name_size,
378 op_data->op_file_secctx_name);
381 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
386 * Inline buffer for possible data from Data-on-MDT files.
388 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389 sizeof(struct niobuf_remote));
390 ptlrpc_request_set_replen(req);
392 /* Get real repbuf allocated size as rounded up power of 2 */
393 repsize = size_roundup_power2(req->rq_replen +
394 lustre_msg_early_size());
395 /* Estimate free space for DoM files in repbuf */
396 repsize_estimate = repsize - (req->rq_replen -
397 mdt_md_capsule_size +
398 sizeof(struct lov_comp_md_v1) +
399 sizeof(struct lov_comp_md_entry_v1) +
400 lov_mds_md_size(0, LOV_MAGIC_V3));
402 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
403 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
404 repsize_estimate + sizeof(struct niobuf_remote);
405 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
407 sizeof(struct niobuf_remote) + repsize);
408 ptlrpc_request_set_replen(req);
409 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
410 repsize, req->rq_replen);
411 repsize = size_roundup_power2(req->rq_replen +
412 lustre_msg_early_size());
414 /* The only way to report real allocated repbuf size to the server
415 * is the lm_repsize but it must be set prior buffer allocation itself
416 * due to security reasons - it is part of buffer used in signature
417 * calculation (see LU-11414). Therefore the saved size is predicted
418 * value as rq_replen rounded to the next higher power of 2.
419 * Such estimation is safe. Though the final allocated buffer might
420 * be even larger, it is not possible to know that at this point.
422 req->rq_reqmsg->lm_repsize = repsize;
426 #define GA_DEFAULT_EA_NAME_LEN 20
427 #define GA_DEFAULT_EA_VAL_LEN 250
428 #define GA_DEFAULT_EA_NUM 10
430 static struct ptlrpc_request *
431 mdc_intent_getxattr_pack(struct obd_export *exp,
432 struct lookup_intent *it,
433 struct md_op_data *op_data)
435 struct ptlrpc_request *req;
436 struct ldlm_intent *lit;
438 struct list_head cancels = LIST_HEAD_INIT(cancels);
439 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
443 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
444 &RQF_LDLM_INTENT_GETXATTR);
446 RETURN(ERR_PTR(-ENOMEM));
448 /* get SELinux policy info if any */
449 rc = sptlrpc_get_sepol(req);
451 ptlrpc_request_free(req);
454 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
455 strlen(req->rq_sepol) ?
456 strlen(req->rq_sepol) + 1 : 0);
458 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
460 ptlrpc_request_free(req);
464 /* pack the intent */
465 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
466 lit->opc = IT_GETXATTR;
467 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
468 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
470 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
471 /* If the supplied buffer is too small then the server will
472 * return -ERANGE and llite will fallback to using non cached
473 * xattr operations. On servers before 2.10.1 a (non-cached)
474 * listxattr RPC for an orphan or dead file causes an oops. So
475 * let's try to avoid sending too small a buffer to too old a
476 * server. This is effectively undoing the memory conservation
477 * of LU-9417 when it would be *more* likely to crash the
478 * server. See LU-9856. */
479 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
480 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
481 exp->exp_connect_data.ocd_max_easize);
484 /* pack the intended request */
485 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
486 ea_vals_buf_size, -1, 0);
488 /* get SELinux policy info if any */
489 mdc_file_sepol_pack(req);
491 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
492 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
494 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
497 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
498 sizeof(u32) * GA_DEFAULT_EA_NUM);
500 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
502 ptlrpc_request_set_replen(req);
507 static struct ptlrpc_request *
508 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
509 struct md_op_data *op_data, __u32 acl_bufsize)
511 struct ptlrpc_request *req;
512 struct obd_device *obddev = class_exp2obd(exp);
513 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
514 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
516 struct ldlm_intent *lit;
518 bool have_secctx = false;
523 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
524 &RQF_LDLM_INTENT_GETATTR);
526 RETURN(ERR_PTR(-ENOMEM));
528 /* send name of security xattr to get upon intent */
529 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
530 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
532 op_data->op_file_secctx_name_size > 0 &&
533 op_data->op_file_secctx_name != NULL) {
535 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
537 op_data->op_file_secctx_name_size);
540 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
541 op_data->op_namelen + 1);
543 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
545 ptlrpc_request_free(req);
549 /* pack the intent */
550 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
551 lit->opc = (__u64)it->it_op;
553 easize = obddev->u.cli.cl_default_mds_easize;
555 /* pack the intended request */
556 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
558 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
559 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
560 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
561 sizeof(struct lmv_user_md));
566 secctx_name = req_capsule_client_get(&req->rq_pill,
567 &RMF_FILE_SECCTX_NAME);
568 memcpy(secctx_name, op_data->op_file_secctx_name,
569 op_data->op_file_secctx_name_size);
571 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
574 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
575 op_data->op_file_secctx_name_size,
576 op_data->op_file_secctx_name);
578 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
582 ptlrpc_request_set_replen(req);
586 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
587 struct lookup_intent *it,
588 struct md_op_data *op_data)
590 struct obd_device *obd = class_exp2obd(exp);
591 struct list_head cancels = LIST_HEAD_INIT(cancels);
592 struct ptlrpc_request *req;
593 struct ldlm_intent *lit;
594 struct layout_intent *layout;
598 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
599 &RQF_LDLM_INTENT_LAYOUT);
601 RETURN(ERR_PTR(-ENOMEM));
603 if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
604 (it->it_flags & FMODE_WRITE)) {
605 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
607 MDS_INODELOCK_LAYOUT);
610 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
611 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
613 ptlrpc_request_free(req);
617 /* pack the intent */
618 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
619 lit->opc = (__u64)it->it_op;
621 /* pack the layout intent request */
622 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
623 LASSERT(op_data->op_data != NULL);
624 LASSERT(op_data->op_data_size == sizeof(*layout));
625 memcpy(layout, op_data->op_data, sizeof(*layout));
627 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
628 obd->u.cli.cl_default_mds_easize);
629 ptlrpc_request_set_replen(req);
633 static struct ptlrpc_request *
634 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
636 struct ptlrpc_request *req;
640 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
642 RETURN(ERR_PTR(-ENOMEM));
644 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
646 ptlrpc_request_free(req);
650 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
651 ptlrpc_request_set_replen(req);
655 static int mdc_finish_enqueue(struct obd_export *exp,
656 struct ptlrpc_request *req,
657 struct ldlm_enqueue_info *einfo,
658 struct lookup_intent *it,
659 struct lustre_handle *lockh,
662 struct req_capsule *pill = &req->rq_pill;
663 struct ldlm_request *lockreq;
664 struct ldlm_reply *lockrep;
665 struct ldlm_lock *lock;
666 struct mdt_body *body = NULL;
667 void *lvb_data = NULL;
673 /* Similarly, if we're going to replay this request, we don't want to
674 * actually get a lock, just perform the intent. */
675 if (req->rq_transno || req->rq_replay) {
676 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
677 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
680 if (rc == ELDLM_LOCK_ABORTED) {
682 memset(lockh, 0, sizeof(*lockh));
684 } else { /* rc = 0 */
685 lock = ldlm_handle2lock(lockh);
686 LASSERT(lock != NULL);
688 /* If the server gave us back a different lock mode, we should
689 * fix up our variables. */
690 if (lock->l_req_mode != einfo->ei_mode) {
691 ldlm_lock_addref(lockh, lock->l_req_mode);
692 ldlm_lock_decref(lockh, einfo->ei_mode);
693 einfo->ei_mode = lock->l_req_mode;
698 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
699 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
701 it->it_disposition = (int)lockrep->lock_policy_res1;
702 it->it_status = (int)lockrep->lock_policy_res2;
703 it->it_lock_mode = einfo->ei_mode;
704 it->it_lock_handle = lockh->cookie;
705 it->it_request = req;
707 /* Technically speaking rq_transno must already be zero if
708 * it_status is in error, so the check is a bit redundant */
709 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
710 mdc_clear_replay_flag(req, it->it_status);
712 /* If we're doing an IT_OPEN which did not result in an actual
713 * successful open, then we need to remove the bit which saves
714 * this request for unconditional replay.
716 * It's important that we do this first! Otherwise we might exit the
717 * function without doing so, and try to replay a failed create
719 if (it->it_op & IT_OPEN && req->rq_replay &&
720 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
721 mdc_clear_replay_flag(req, it->it_status);
723 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
724 it->it_op, it->it_disposition, it->it_status);
726 /* We know what to expect, so we do any byte flipping required here */
727 if (it_has_reply_body(it)) {
728 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
730 CERROR ("Can't swab mdt_body\n");
734 if (it_disposition(it, DISP_OPEN_OPEN) &&
735 !it_open_error(DISP_OPEN_OPEN, it)) {
737 * If this is a successful OPEN request, we need to set
738 * replay handler and data early, so that if replay
739 * happens immediately after swabbing below, new reply
740 * is swabbed by that handler correctly.
742 mdc_set_open_replay_data(NULL, NULL, it);
745 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
748 mdc_update_max_ea_from_body(exp, body);
751 * The eadata is opaque; just check that it is there.
752 * Eventually, obd_unpackmd() will check the contents.
754 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
755 body->mbo_eadatasize);
759 /* save lvb data and length in case this is for layout
762 lvb_len = body->mbo_eadatasize;
765 * We save the reply LOV EA in case we have to replay a
766 * create for recovery. If we didn't allocate a large
767 * enough request buffer above we need to reallocate it
768 * here to hold the actual LOV EA.
770 * To not save LOV EA if request is not going to replay
771 * (for example error one).
773 if ((it->it_op & IT_OPEN) && req->rq_replay) {
774 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
775 body->mbo_eadatasize);
777 body->mbo_valid &= ~OBD_MD_FLEASIZE;
778 body->mbo_eadatasize = 0;
783 } else if (it->it_op & IT_LAYOUT) {
784 /* maybe the lock was granted right away and layout
785 * is packed into RMF_DLM_LVB of req */
786 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
787 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
788 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
790 lvb_data = req_capsule_server_sized_get(pill,
791 &RMF_DLM_LVB, lvb_len);
792 if (lvb_data == NULL)
796 * save replied layout data to the request buffer for
797 * recovery consideration (lest MDS reinitialize
798 * another set of OST objects).
801 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
806 /* fill in stripe data for layout lock.
807 * LU-6581: trust layout data only if layout lock is granted. The MDT
808 * has stopped sending layout unless the layout lock is granted. The
809 * client still does this checking in case it's talking with an old
810 * server. - Jinshan */
811 lock = ldlm_handle2lock(lockh);
815 if (ldlm_has_layout(lock) && lvb_data != NULL &&
816 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
819 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
820 ldlm_it2str(it->it_op), lvb_len);
822 OBD_ALLOC_LARGE(lmm, lvb_len);
824 GOTO(out_lock, rc = -ENOMEM);
826 memcpy(lmm, lvb_data, lvb_len);
828 /* install lvb_data */
829 lock_res_and_lock(lock);
830 if (lock->l_lvb_data == NULL) {
831 lock->l_lvb_type = LVB_T_LAYOUT;
832 lock->l_lvb_data = lmm;
833 lock->l_lvb_len = lvb_len;
836 unlock_res_and_lock(lock);
838 OBD_FREE_LARGE(lmm, lvb_len);
841 if (ldlm_has_dom(lock)) {
842 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
844 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
845 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
846 LDLM_ERROR(lock, "%s: DoM lock without size.",
847 exp->exp_obd->obd_name);
848 GOTO(out_lock, rc = -EPROTO);
851 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
852 ldlm_it2str(it->it_op), body->mbo_dom_size);
854 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
862 /* We always reserve enough space in the reply packet for a stripe MD, because
863 * we don't know in advance the file type. */
864 static int mdc_enqueue_base(struct obd_export *exp,
865 struct ldlm_enqueue_info *einfo,
866 const union ldlm_policy_data *policy,
867 struct lookup_intent *it,
868 struct md_op_data *op_data,
869 struct lustre_handle *lockh,
870 __u64 extra_lock_flags)
872 struct obd_device *obddev = class_exp2obd(exp);
873 struct ptlrpc_request *req = NULL;
874 __u64 flags, saved_flags = extra_lock_flags;
875 struct ldlm_res_id res_id;
876 static const union ldlm_policy_data lookup_policy = {
877 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
878 static const union ldlm_policy_data update_policy = {
879 .l_inodebits = { MDS_INODELOCK_UPDATE } };
880 static const union ldlm_policy_data layout_policy = {
881 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
882 static const union ldlm_policy_data getxattr_policy = {
883 .l_inodebits = { MDS_INODELOCK_XATTR } };
884 int generation, resends = 0;
885 struct ldlm_reply *lockrep;
886 struct obd_import *imp = class_exp2cliimp(exp);
888 enum lvb_type lvb_type = 0;
892 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
894 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
897 LASSERT(policy == NULL);
899 saved_flags |= LDLM_FL_HAS_INTENT;
900 if (it->it_op & (IT_GETATTR | IT_READDIR))
901 policy = &update_policy;
902 else if (it->it_op & IT_LAYOUT)
903 policy = &layout_policy;
904 else if (it->it_op & IT_GETXATTR)
905 policy = &getxattr_policy;
907 policy = &lookup_policy;
910 generation = obddev->u.cli.cl_import->imp_generation;
911 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
912 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
915 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
920 /* The only way right now is FLOCK. */
921 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
923 res_id.name[3] = LDLM_FLOCK;
924 } else if (it->it_op & IT_OPEN) {
925 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
926 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
927 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
928 } else if (it->it_op & IT_READDIR) {
929 req = mdc_enqueue_pack(exp, 0);
930 } else if (it->it_op & IT_LAYOUT) {
931 if (!imp_connect_lvb_type(imp))
933 req = mdc_intent_layout_pack(exp, it, op_data);
934 lvb_type = LVB_T_LAYOUT;
935 } else if (it->it_op & IT_GETXATTR) {
936 req = mdc_intent_getxattr_pack(exp, it, op_data);
943 RETURN(PTR_ERR(req));
946 req->rq_generation_set = 1;
947 req->rq_import_generation = generation;
948 req->rq_sent = ktime_get_real_seconds() + resends;
951 /* It is important to obtain modify RPC slot first (if applicable), so
952 * that threads that are waiting for a modify RPC slot are not polluting
953 * our rpcs in flight counter.
954 * We do not do flock request limiting, though */
956 mdc_get_mod_rpc_slot(req, it);
957 rc = obd_get_request_slot(&obddev->u.cli);
959 mdc_put_mod_rpc_slot(req, it);
960 mdc_clear_replay_flag(req, 0);
961 ptlrpc_req_finished(req);
966 /* With Data-on-MDT the glimpse callback is needed too.
967 * It is set here in advance but not in mdc_finish_enqueue()
968 * to avoid possible races. It is safe to have glimpse handler
969 * for non-DOM locks and costs nothing.*/
970 if (einfo->ei_cb_gl == NULL)
971 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
973 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
974 0, lvb_type, lockh, 0);
976 /* For flock requests we immediatelly return without further
977 delay and let caller deal with the rest, since rest of
978 this function metadata processing makes no sense for flock
979 requests anyway. But in case of problem during comms with
980 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
981 can not rely on caller and this mainly for F_UNLCKs
982 (explicits or automatically generated by Kernel to clean
983 current FLocks upon exit) that can't be trashed */
984 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
985 (einfo->ei_type == LDLM_FLOCK) &&
986 (einfo->ei_mode == LCK_NL))
991 obd_put_request_slot(&obddev->u.cli);
992 mdc_put_mod_rpc_slot(req, it);
996 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
997 obddev->obd_name, PFID(&op_data->op_fid1),
998 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1000 mdc_clear_replay_flag(req, rc);
1001 ptlrpc_req_finished(req);
1005 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1006 LASSERT(lockrep != NULL);
1008 lockrep->lock_policy_res2 =
1009 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1011 /* Retry infinitely when the server returns -EINPROGRESS for the
1012 * intent operation, when server returns -EINPROGRESS for acquiring
1013 * intent lock, we'll retry in after_reply(). */
1014 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1015 mdc_clear_replay_flag(req, rc);
1016 ptlrpc_req_finished(req);
1017 if (generation == obddev->u.cli.cl_import->imp_generation) {
1018 if (signal_pending(current))
1022 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1023 obddev->obd_name, resends, it->it_op,
1024 PFID(&op_data->op_fid1),
1025 PFID(&op_data->op_fid2));
1028 CDEBUG(D_HA, "resend cross eviction\n");
1033 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1034 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1035 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1036 mdc_clear_replay_flag(req, -ERANGE);
1037 ptlrpc_req_finished(req);
1038 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1043 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1045 if (lustre_handle_is_used(lockh)) {
1046 ldlm_lock_decref(lockh, einfo->ei_mode);
1047 memset(lockh, 0, sizeof(*lockh));
1049 ptlrpc_req_finished(req);
1051 it->it_lock_handle = 0;
1052 it->it_lock_mode = 0;
1053 it->it_request = NULL;
1059 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1060 const union ldlm_policy_data *policy,
1061 struct md_op_data *op_data,
1062 struct lustre_handle *lockh, __u64 extra_lock_flags)
1064 return mdc_enqueue_base(exp, einfo, policy, NULL,
1065 op_data, lockh, extra_lock_flags);
1068 static int mdc_finish_intent_lock(struct obd_export *exp,
1069 struct ptlrpc_request *request,
1070 struct md_op_data *op_data,
1071 struct lookup_intent *it,
1072 struct lustre_handle *lockh)
1074 struct lustre_handle old_lock;
1075 struct ldlm_lock *lock;
1079 LASSERT(request != NULL);
1080 LASSERT(request != LP_POISON);
1081 LASSERT(request->rq_repmsg != LP_POISON);
1083 if (it->it_op & IT_READDIR)
1086 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1087 if (it->it_status != 0)
1088 GOTO(out, rc = it->it_status);
1090 if (!it_disposition(it, DISP_IT_EXECD)) {
1091 /* The server failed before it even started executing
1092 * the intent, i.e. because it couldn't unpack the
1095 LASSERT(it->it_status != 0);
1096 GOTO(out, rc = it->it_status);
1098 rc = it_open_error(DISP_IT_EXECD, it);
1102 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1106 /* keep requests around for the multiple phases of the call
1107 * this shows the DISP_XX must guarantee we make it into the
1110 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1111 it_disposition(it, DISP_OPEN_CREATE) &&
1112 !it_open_error(DISP_OPEN_CREATE, it)) {
1113 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1114 /* balanced in ll_create_node */
1115 ptlrpc_request_addref(request);
1117 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1118 it_disposition(it, DISP_OPEN_OPEN) &&
1119 !it_open_error(DISP_OPEN_OPEN, it)) {
1120 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1121 /* balanced in ll_file_open */
1122 ptlrpc_request_addref(request);
1123 /* BUG 11546 - eviction in the middle of open rpc
1126 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1130 if (it->it_op & IT_CREAT) {
1131 /* XXX this belongs in ll_create_it */
1132 } else if (it->it_op == IT_OPEN) {
1133 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1135 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1139 /* If we already have a matching lock, then cancel the new
1140 * one. We have to set the data here instead of in
1141 * mdc_enqueue, because we need to use the child's inode as
1142 * the l_ast_data to match, and that's not available until
1143 * intent_finish has performed the iget().) */
1144 lock = ldlm_handle2lock(lockh);
1146 union ldlm_policy_data policy = lock->l_policy_data;
1147 LDLM_DEBUG(lock, "matching against this");
1149 if (it_has_reply_body(it)) {
1150 struct mdt_body *body;
1152 body = req_capsule_server_get(&request->rq_pill,
1154 /* mdc_enqueue checked */
1155 LASSERT(body != NULL);
1156 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1157 &lock->l_resource->lr_name),
1158 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1159 PLDLMRES(lock->l_resource),
1160 PFID(&body->mbo_fid1));
1162 LDLM_LOCK_PUT(lock);
1164 memcpy(&old_lock, lockh, sizeof(*lockh));
1165 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1166 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1167 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1168 memcpy(lockh, &old_lock, sizeof(old_lock));
1169 it->it_lock_handle = lockh->cookie;
1175 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1176 (int)op_data->op_namelen, op_data->op_name,
1177 ldlm_it2str(it->it_op), it->it_status,
1178 it->it_disposition, rc);
1182 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1183 struct lu_fid *fid, __u64 *bits)
1185 /* We could just return 1 immediately, but since we should only
1186 * be called in revalidate_it if we already have a lock, let's
1188 struct ldlm_res_id res_id;
1189 struct lustre_handle lockh;
1190 union ldlm_policy_data policy;
1191 enum ldlm_mode mode;
1194 if (it->it_lock_handle) {
1195 lockh.cookie = it->it_lock_handle;
1196 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1198 fid_build_reg_res_name(fid, &res_id);
1199 switch (it->it_op) {
1201 /* File attributes are held under multiple bits:
1202 * nlink is under lookup lock, size and times are
1203 * under UPDATE lock and recently we've also got
1204 * a separate permissions lock for owner/group/acl that
1205 * were protected by lookup lock before.
1206 * Getattr must provide all of that information,
1207 * so we need to ensure we have all of those locks.
1208 * Unfortunately, if the bits are split across multiple
1209 * locks, there's no easy way to match all of them here,
1210 * so an extra RPC would be performed to fetch all
1211 * of those bits at once for now. */
1212 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1213 * but for old MDTs (< 2.4), permission is covered
1214 * by LOOKUP lock, so it needs to match all bits here.*/
1215 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1216 MDS_INODELOCK_LOOKUP |
1220 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1223 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1226 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1230 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1231 LDLM_IBITS, &policy,
1232 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1237 it->it_lock_handle = lockh.cookie;
1238 it->it_lock_mode = mode;
1240 it->it_lock_handle = 0;
1241 it->it_lock_mode = 0;
1248 * This long block is all about fixing up the lock and request state
1249 * so that it is correct as of the moment _before_ the operation was
1250 * applied; that way, the VFS will think that everything is normal and
1251 * call Lustre's regular VFS methods.
1253 * If we're performing a creation, that means that unless the creation
1254 * failed with EEXIST, we should fake up a negative dentry.
1256 * For everything else, we want to lookup to succeed.
1258 * One additional note: if CREATE or OPEN succeeded, we add an extra
1259 * reference to the request because we need to keep it around until
1260 * ll_create/ll_open gets called.
1262 * The server will return to us, in it_disposition, an indication of
1263 * exactly what it_status refers to.
1265 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1266 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1267 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1268 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1271 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1274 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1275 struct lookup_intent *it, struct ptlrpc_request **reqp,
1276 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1278 struct ldlm_enqueue_info einfo = {
1279 .ei_type = LDLM_IBITS,
1280 .ei_mode = it_to_lock_mode(it),
1281 .ei_cb_bl = cb_blocking,
1282 .ei_cb_cp = ldlm_completion_ast,
1283 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1285 struct lustre_handle lockh;
1290 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1291 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1292 op_data->op_name, PFID(&op_data->op_fid2),
1293 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1297 if (fid_is_sane(&op_data->op_fid2) &&
1298 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1299 /* We could just return 1 immediately, but since we should only
1300 * be called in revalidate_it if we already have a lock, let's
1302 it->it_lock_handle = 0;
1303 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1304 /* Only return failure if it was not GETATTR by cfid
1305 (from inode_revalidate) */
1306 if (rc || op_data->op_namelen != 0)
1310 /* For case if upper layer did not alloc fid, do it now. */
1311 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1312 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1314 CERROR("Can't alloc new fid, rc %d\n", rc);
1319 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1324 *reqp = it->it_request;
1325 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1329 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1330 struct ptlrpc_request *req,
1333 struct mdc_getattr_args *ga = args;
1334 struct obd_export *exp = ga->ga_exp;
1335 struct md_enqueue_info *minfo = ga->ga_minfo;
1336 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1337 struct lookup_intent *it;
1338 struct lustre_handle *lockh;
1339 struct obd_device *obddev;
1340 struct ldlm_reply *lockrep;
1341 __u64 flags = LDLM_FL_HAS_INTENT;
1345 lockh = &minfo->mi_lockh;
1347 obddev = class_exp2obd(exp);
1349 obd_put_request_slot(&obddev->u.cli);
1350 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1353 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1354 &flags, NULL, 0, lockh, rc);
1356 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1357 mdc_clear_replay_flag(req, rc);
1361 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1362 LASSERT(lockrep != NULL);
1364 lockrep->lock_policy_res2 =
1365 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1367 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1371 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1375 minfo->mi_cb(req, minfo, rc);
1379 int mdc_intent_getattr_async(struct obd_export *exp,
1380 struct md_enqueue_info *minfo)
1382 struct md_op_data *op_data = &minfo->mi_data;
1383 struct lookup_intent *it = &minfo->mi_it;
1384 struct ptlrpc_request *req;
1385 struct mdc_getattr_args *ga;
1386 struct obd_device *obddev = class_exp2obd(exp);
1387 struct ldlm_res_id res_id;
1388 union ldlm_policy_data policy = {
1389 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1390 MDS_INODELOCK_UPDATE } };
1392 __u64 flags = LDLM_FL_HAS_INTENT;
1395 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1396 (int)op_data->op_namelen, op_data->op_name,
1397 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1399 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1400 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1401 * of the async getattr RPC will handle that by itself. */
1402 req = mdc_intent_getattr_pack(exp, it, op_data,
1403 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1405 RETURN(PTR_ERR(req));
1407 rc = obd_get_request_slot(&obddev->u.cli);
1409 ptlrpc_req_finished(req);
1413 /* With Data-on-MDT the glimpse callback is needed too.
1414 * It is set here in advance but not in mdc_finish_enqueue()
1415 * to avoid possible races. It is safe to have glimpse handler
1416 * for non-DOM locks and costs nothing.*/
1417 if (minfo->mi_einfo.ei_cb_gl == NULL)
1418 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1420 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1421 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1423 obd_put_request_slot(&obddev->u.cli);
1424 ptlrpc_req_finished(req);
1428 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1429 ga = ptlrpc_req_async_args(req);
1431 ga->ga_minfo = minfo;
1433 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1434 ptlrpcd_add_req(req);