4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
208 /* Save a large LOV EA into the request buffer so that it is available
209 * for replay. We don't do this in the initial request because the
210 * original request doesn't need this buffer (at most it sends just the
211 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212 * buffer and may also be difficult to allocate and save a very large
213 * request buffer for each open. (bug 5707)
215 * OOM here may cause recovery failure if lmm is needed (only for the
216 * original open if the MDS crashed just when this client also OOM'd)
217 * but this is incredibly unlikely, and questionable whether the client
218 * could do MDS recovery under OOM anyways... */
219 int mdc_save_lovea(struct ptlrpc_request *req,
220 const struct req_msg_field *field,
221 void *data, u32 size)
223 struct req_capsule *pill = &req->rq_pill;
224 struct lov_user_md *lmm;
227 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
228 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
230 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
231 req->rq_export->exp_obd->obd_name,
236 req_capsule_shrink(pill, field, size, RCL_CLIENT);
239 req_capsule_set_size(pill, field, RCL_CLIENT, size);
240 lmm = req_capsule_client_get(pill, field);
242 memcpy(lmm, data, size);
243 /* overwrite layout generation returned from the MDS */
244 lmm->lmm_stripe_offset =
245 (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
251 static struct ptlrpc_request *
252 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
253 struct md_op_data *op_data, __u32 acl_bufsize)
255 struct ptlrpc_request *req;
256 struct obd_device *obddev = class_exp2obd(exp);
257 struct ldlm_intent *lit;
258 const void *lmm = op_data->op_data;
259 __u32 lmmsize = op_data->op_data_size;
260 __u32 mdt_md_capsule_size;
265 int repsize, repsize_estimate;
269 mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize;
271 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
273 /* XXX: openlock is not cancelled for cross-refs. */
274 /* If inode is known, cancel conflicting OPEN locks. */
275 if (fid_is_sane(&op_data->op_fid2)) {
276 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
277 if (it->it_flags & MDS_FMODE_WRITE)
282 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
285 else if (it->it_flags & FMODE_EXEC)
291 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296 /* If CREATE, cancel parent's UPDATE lock. */
297 if (it->it_op & IT_CREAT)
301 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
303 MDS_INODELOCK_UPDATE);
305 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
306 &RQF_LDLM_INTENT_OPEN);
308 ldlm_lock_list_put(&cancels, l_bl_ast, count);
309 RETURN(ERR_PTR(-ENOMEM));
312 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
313 op_data->op_namelen + 1);
314 if (cl_is_lov_delay_create(it->it_flags)) {
315 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
316 LASSERT(lmmsize == 0);
317 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
320 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
323 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
324 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
325 op_data->op_file_secctx_name_size : 0);
327 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
328 op_data->op_file_secctx_size);
330 /* get SELinux policy info if any */
331 rc = sptlrpc_get_sepol(req);
333 ptlrpc_request_free(req);
336 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
337 strlen(req->rq_sepol) ?
338 strlen(req->rq_sepol) + 1 : 0);
340 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
342 ptlrpc_request_free(req);
346 spin_lock(&req->rq_lock);
347 req->rq_replay = req->rq_import->imp_replayable;
348 spin_unlock(&req->rq_lock);
350 /* pack the intent */
351 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
352 lit->opc = (__u64)it->it_op;
354 /* pack the intended request */
355 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
358 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
359 mdt_md_capsule_size);
360 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
362 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
363 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
365 op_data->op_file_secctx_name_size > 0 &&
366 op_data->op_file_secctx_name != NULL) {
369 secctx_name = req_capsule_client_get(&req->rq_pill,
370 &RMF_FILE_SECCTX_NAME);
371 memcpy(secctx_name, op_data->op_file_secctx_name,
372 op_data->op_file_secctx_name_size);
373 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
375 obddev->u.cli.cl_max_mds_easize);
377 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
378 op_data->op_file_secctx_name_size,
379 op_data->op_file_secctx_name);
382 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
387 * Inline buffer for possible data from Data-on-MDT files.
389 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
390 sizeof(struct niobuf_remote));
391 ptlrpc_request_set_replen(req);
393 /* Get real repbuf allocated size as rounded up power of 2 */
394 repsize = size_roundup_power2(req->rq_replen +
395 lustre_msg_early_size());
396 /* Estimate free space for DoM files in repbuf */
397 repsize_estimate = repsize - (req->rq_replen -
398 mdt_md_capsule_size +
399 sizeof(struct lov_comp_md_v1) +
400 sizeof(struct lov_comp_md_entry_v1) +
401 lov_mds_md_size(0, LOV_MAGIC_V3));
403 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
404 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
405 repsize_estimate + sizeof(struct niobuf_remote);
406 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
408 sizeof(struct niobuf_remote) + repsize);
409 ptlrpc_request_set_replen(req);
410 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
411 repsize, req->rq_replen);
412 repsize = size_roundup_power2(req->rq_replen +
413 lustre_msg_early_size());
415 /* The only way to report real allocated repbuf size to the server
416 * is the lm_repsize but it must be set prior buffer allocation itself
417 * due to security reasons - it is part of buffer used in signature
418 * calculation (see LU-11414). Therefore the saved size is predicted
419 * value as rq_replen rounded to the next higher power of 2.
420 * Such estimation is safe. Though the final allocated buffer might
421 * be even larger, it is not possible to know that at this point.
423 req->rq_reqmsg->lm_repsize = repsize;
427 #define GA_DEFAULT_EA_NAME_LEN 20
428 #define GA_DEFAULT_EA_VAL_LEN 250
429 #define GA_DEFAULT_EA_NUM 10
431 static struct ptlrpc_request *
432 mdc_intent_getxattr_pack(struct obd_export *exp,
433 struct lookup_intent *it,
434 struct md_op_data *op_data)
436 struct ptlrpc_request *req;
437 struct ldlm_intent *lit;
440 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
444 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
445 &RQF_LDLM_INTENT_GETXATTR);
447 RETURN(ERR_PTR(-ENOMEM));
449 /* get SELinux policy info if any */
450 rc = sptlrpc_get_sepol(req);
452 ptlrpc_request_free(req);
455 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
456 strlen(req->rq_sepol) ?
457 strlen(req->rq_sepol) + 1 : 0);
459 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
461 ptlrpc_request_free(req);
465 /* pack the intent */
466 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
467 lit->opc = IT_GETXATTR;
468 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
469 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
471 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
472 /* If the supplied buffer is too small then the server will
473 * return -ERANGE and llite will fallback to using non cached
474 * xattr operations. On servers before 2.10.1 a (non-cached)
475 * listxattr RPC for an orphan or dead file causes an oops. So
476 * let's try to avoid sending too small a buffer to too old a
477 * server. This is effectively undoing the memory conservation
478 * of LU-9417 when it would be *more* likely to crash the
479 * server. See LU-9856. */
480 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
481 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
482 exp->exp_connect_data.ocd_max_easize);
485 /* pack the intended request */
486 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
487 ea_vals_buf_size, -1, 0);
489 /* get SELinux policy info if any */
490 mdc_file_sepol_pack(req);
492 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
493 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
495 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
498 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
499 sizeof(u32) * GA_DEFAULT_EA_NUM);
501 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
503 ptlrpc_request_set_replen(req);
508 static struct ptlrpc_request *
509 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
510 struct md_op_data *op_data, __u32 acl_bufsize)
512 struct ptlrpc_request *req;
513 struct obd_device *obddev = class_exp2obd(exp);
514 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
515 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
517 struct ldlm_intent *lit;
519 bool have_secctx = false;
524 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
525 &RQF_LDLM_INTENT_GETATTR);
527 RETURN(ERR_PTR(-ENOMEM));
529 /* send name of security xattr to get upon intent */
530 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
531 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
533 op_data->op_file_secctx_name_size > 0 &&
534 op_data->op_file_secctx_name != NULL) {
536 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
538 op_data->op_file_secctx_name_size);
541 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
542 op_data->op_namelen + 1);
544 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
546 ptlrpc_request_free(req);
550 /* pack the intent */
551 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
552 lit->opc = (__u64)it->it_op;
554 easize = obddev->u.cli.cl_default_mds_easize;
556 /* pack the intended request */
557 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
559 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
560 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
561 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
562 sizeof(struct lmv_user_md));
567 secctx_name = req_capsule_client_get(&req->rq_pill,
568 &RMF_FILE_SECCTX_NAME);
569 memcpy(secctx_name, op_data->op_file_secctx_name,
570 op_data->op_file_secctx_name_size);
572 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
575 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
576 op_data->op_file_secctx_name_size,
577 op_data->op_file_secctx_name);
579 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
583 ptlrpc_request_set_replen(req);
587 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
588 struct lookup_intent *it,
589 struct md_op_data *op_data)
591 struct obd_device *obd = class_exp2obd(exp);
593 struct ptlrpc_request *req;
594 struct ldlm_intent *lit;
595 struct layout_intent *layout;
599 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
600 &RQF_LDLM_INTENT_LAYOUT);
602 RETURN(ERR_PTR(-ENOMEM));
604 if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
605 (it->it_flags & FMODE_WRITE)) {
606 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
608 MDS_INODELOCK_LAYOUT);
611 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
612 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
614 ptlrpc_request_free(req);
618 /* pack the intent */
619 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
620 lit->opc = (__u64)it->it_op;
622 /* pack the layout intent request */
623 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
624 LASSERT(op_data->op_data != NULL);
625 LASSERT(op_data->op_data_size == sizeof(*layout));
626 memcpy(layout, op_data->op_data, sizeof(*layout));
628 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
629 obd->u.cli.cl_default_mds_easize);
630 ptlrpc_request_set_replen(req);
634 static struct ptlrpc_request *
635 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
637 struct ptlrpc_request *req;
641 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
643 RETURN(ERR_PTR(-ENOMEM));
645 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
647 ptlrpc_request_free(req);
651 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
652 ptlrpc_request_set_replen(req);
656 static int mdc_finish_enqueue(struct obd_export *exp,
657 struct ptlrpc_request *req,
658 struct ldlm_enqueue_info *einfo,
659 struct lookup_intent *it,
660 struct lustre_handle *lockh,
663 struct req_capsule *pill = &req->rq_pill;
664 struct ldlm_request *lockreq;
665 struct ldlm_reply *lockrep;
666 struct ldlm_lock *lock;
667 struct mdt_body *body = NULL;
668 void *lvb_data = NULL;
674 /* Similarly, if we're going to replay this request, we don't want to
675 * actually get a lock, just perform the intent. */
676 if (req->rq_transno || req->rq_replay) {
677 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
678 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
681 if (rc == ELDLM_LOCK_ABORTED) {
683 memset(lockh, 0, sizeof(*lockh));
685 } else { /* rc = 0 */
686 lock = ldlm_handle2lock(lockh);
687 LASSERT(lock != NULL);
689 /* If the server gave us back a different lock mode, we should
690 * fix up our variables. */
691 if (lock->l_req_mode != einfo->ei_mode) {
692 ldlm_lock_addref(lockh, lock->l_req_mode);
693 ldlm_lock_decref(lockh, einfo->ei_mode);
694 einfo->ei_mode = lock->l_req_mode;
699 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
700 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
702 it->it_disposition = (int)lockrep->lock_policy_res1;
703 it->it_status = (int)lockrep->lock_policy_res2;
704 it->it_lock_mode = einfo->ei_mode;
705 it->it_lock_handle = lockh->cookie;
706 it->it_request = req;
708 /* Technically speaking rq_transno must already be zero if
709 * it_status is in error, so the check is a bit redundant */
710 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
711 mdc_clear_replay_flag(req, it->it_status);
713 /* If we're doing an IT_OPEN which did not result in an actual
714 * successful open, then we need to remove the bit which saves
715 * this request for unconditional replay.
717 * It's important that we do this first! Otherwise we might exit the
718 * function without doing so, and try to replay a failed create
720 if (it->it_op & IT_OPEN && req->rq_replay &&
721 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
722 mdc_clear_replay_flag(req, it->it_status);
724 DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
725 it->it_op, it->it_disposition, it->it_status);
727 /* We know what to expect, so we do any byte flipping required here */
728 if (it_has_reply_body(it)) {
729 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
731 CERROR ("Can't swab mdt_body\n");
735 if (it_disposition(it, DISP_OPEN_OPEN) &&
736 !it_open_error(DISP_OPEN_OPEN, it)) {
738 * If this is a successful OPEN request, we need to set
739 * replay handler and data early, so that if replay
740 * happens immediately after swabbing below, new reply
741 * is swabbed by that handler correctly.
743 mdc_set_open_replay_data(NULL, NULL, it);
746 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
749 mdc_update_max_ea_from_body(exp, body);
752 * The eadata is opaque; just check that it is there.
753 * Eventually, obd_unpackmd() will check the contents.
755 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
756 body->mbo_eadatasize);
760 /* save lvb data and length in case this is for layout
763 lvb_len = body->mbo_eadatasize;
766 * We save the reply LOV EA in case we have to replay a
767 * create for recovery. If we didn't allocate a large
768 * enough request buffer above we need to reallocate it
769 * here to hold the actual LOV EA.
771 * To not save LOV EA if request is not going to replay
772 * (for example error one).
774 if ((it->it_op & IT_OPEN) && req->rq_replay) {
775 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
776 body->mbo_eadatasize);
778 body->mbo_valid &= ~OBD_MD_FLEASIZE;
779 body->mbo_eadatasize = 0;
784 } else if (it->it_op & IT_LAYOUT) {
785 /* maybe the lock was granted right away and layout
786 * is packed into RMF_DLM_LVB of req */
787 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
788 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
789 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
791 lvb_data = req_capsule_server_sized_get(pill,
792 &RMF_DLM_LVB, lvb_len);
793 if (lvb_data == NULL)
797 * save replied layout data to the request buffer for
798 * recovery consideration (lest MDS reinitialize
799 * another set of OST objects).
802 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
807 /* fill in stripe data for layout lock.
808 * LU-6581: trust layout data only if layout lock is granted. The MDT
809 * has stopped sending layout unless the layout lock is granted. The
810 * client still does this checking in case it's talking with an old
811 * server. - Jinshan */
812 lock = ldlm_handle2lock(lockh);
816 if (ldlm_has_layout(lock) && lvb_data != NULL &&
817 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
820 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
821 ldlm_it2str(it->it_op), lvb_len);
823 OBD_ALLOC_LARGE(lmm, lvb_len);
825 GOTO(out_lock, rc = -ENOMEM);
827 memcpy(lmm, lvb_data, lvb_len);
829 /* install lvb_data */
830 lock_res_and_lock(lock);
831 if (lock->l_lvb_data == NULL) {
832 lock->l_lvb_type = LVB_T_LAYOUT;
833 lock->l_lvb_data = lmm;
834 lock->l_lvb_len = lvb_len;
837 unlock_res_and_lock(lock);
839 OBD_FREE_LARGE(lmm, lvb_len);
842 if (ldlm_has_dom(lock)) {
843 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
845 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
846 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
847 LDLM_ERROR(lock, "%s: DoM lock without size.",
848 exp->exp_obd->obd_name);
849 GOTO(out_lock, rc = -EPROTO);
852 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
853 ldlm_it2str(it->it_op), body->mbo_dom_size);
855 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
863 /* We always reserve enough space in the reply packet for a stripe MD, because
864 * we don't know in advance the file type. */
865 static int mdc_enqueue_base(struct obd_export *exp,
866 struct ldlm_enqueue_info *einfo,
867 const union ldlm_policy_data *policy,
868 struct lookup_intent *it,
869 struct md_op_data *op_data,
870 struct lustre_handle *lockh,
871 __u64 extra_lock_flags)
873 struct obd_device *obddev = class_exp2obd(exp);
874 struct ptlrpc_request *req = NULL;
875 __u64 flags, saved_flags = extra_lock_flags;
876 struct ldlm_res_id res_id;
877 static const union ldlm_policy_data lookup_policy = {
878 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
879 static const union ldlm_policy_data update_policy = {
880 .l_inodebits = { MDS_INODELOCK_UPDATE } };
881 static const union ldlm_policy_data layout_policy = {
882 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
883 static const union ldlm_policy_data getxattr_policy = {
884 .l_inodebits = { MDS_INODELOCK_XATTR } };
885 int generation, resends = 0;
886 struct ldlm_reply *lockrep;
887 struct obd_import *imp = class_exp2cliimp(exp);
889 enum lvb_type lvb_type = 0;
893 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
895 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
898 LASSERT(policy == NULL);
900 saved_flags |= LDLM_FL_HAS_INTENT;
901 if (it->it_op & (IT_GETATTR | IT_READDIR))
902 policy = &update_policy;
903 else if (it->it_op & IT_LAYOUT)
904 policy = &layout_policy;
905 else if (it->it_op & IT_GETXATTR)
906 policy = &getxattr_policy;
908 policy = &lookup_policy;
911 generation = obddev->u.cli.cl_import->imp_generation;
912 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
913 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
916 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
921 /* The only way right now is FLOCK. */
922 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
924 res_id.name[3] = LDLM_FLOCK;
925 } else if (it->it_op & IT_OPEN) {
926 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
927 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
928 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
929 } else if (it->it_op & IT_READDIR) {
930 req = mdc_enqueue_pack(exp, 0);
931 } else if (it->it_op & IT_LAYOUT) {
932 if (!imp_connect_lvb_type(imp))
934 req = mdc_intent_layout_pack(exp, it, op_data);
935 lvb_type = LVB_T_LAYOUT;
936 } else if (it->it_op & IT_GETXATTR) {
937 req = mdc_intent_getxattr_pack(exp, it, op_data);
944 RETURN(PTR_ERR(req));
947 req->rq_generation_set = 1;
948 req->rq_import_generation = generation;
949 req->rq_sent = ktime_get_real_seconds() + resends;
952 /* It is important to obtain modify RPC slot first (if applicable), so
953 * that threads that are waiting for a modify RPC slot are not polluting
954 * our rpcs in flight counter.
955 * We do not do flock request limiting, though */
957 mdc_get_mod_rpc_slot(req, it);
958 rc = obd_get_request_slot(&obddev->u.cli);
960 mdc_put_mod_rpc_slot(req, it);
961 mdc_clear_replay_flag(req, 0);
962 ptlrpc_req_finished(req);
967 /* With Data-on-MDT the glimpse callback is needed too.
968 * It is set here in advance but not in mdc_finish_enqueue()
969 * to avoid possible races. It is safe to have glimpse handler
970 * for non-DOM locks and costs nothing.*/
971 if (einfo->ei_cb_gl == NULL)
972 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
974 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
975 0, lvb_type, lockh, 0);
977 /* For flock requests we immediatelly return without further
978 delay and let caller deal with the rest, since rest of
979 this function metadata processing makes no sense for flock
980 requests anyway. But in case of problem during comms with
981 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
982 can not rely on caller and this mainly for F_UNLCKs
983 (explicits or automatically generated by Kernel to clean
984 current FLocks upon exit) that can't be trashed */
985 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
986 (einfo->ei_type == LDLM_FLOCK) &&
987 (einfo->ei_mode == LCK_NL))
992 obd_put_request_slot(&obddev->u.cli);
993 mdc_put_mod_rpc_slot(req, it);
997 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
998 obddev->obd_name, PFID(&op_data->op_fid1),
999 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1001 mdc_clear_replay_flag(req, rc);
1002 ptlrpc_req_finished(req);
1006 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1007 LASSERT(lockrep != NULL);
1009 lockrep->lock_policy_res2 =
1010 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1012 /* Retry infinitely when the server returns -EINPROGRESS for the
1013 * intent operation, when server returns -EINPROGRESS for acquiring
1014 * intent lock, we'll retry in after_reply(). */
1015 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1016 mdc_clear_replay_flag(req, rc);
1017 ptlrpc_req_finished(req);
1018 if (generation == obddev->u.cli.cl_import->imp_generation) {
1019 if (signal_pending(current))
1023 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1024 obddev->obd_name, resends, it->it_op,
1025 PFID(&op_data->op_fid1),
1026 PFID(&op_data->op_fid2));
1029 CDEBUG(D_HA, "resend cross eviction\n");
1034 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1035 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1036 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1037 mdc_clear_replay_flag(req, -ERANGE);
1038 ptlrpc_req_finished(req);
1039 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1044 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1046 if (lustre_handle_is_used(lockh)) {
1047 ldlm_lock_decref(lockh, einfo->ei_mode);
1048 memset(lockh, 0, sizeof(*lockh));
1050 ptlrpc_req_finished(req);
1052 it->it_lock_handle = 0;
1053 it->it_lock_mode = 0;
1054 it->it_request = NULL;
1060 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1061 const union ldlm_policy_data *policy,
1062 struct md_op_data *op_data,
1063 struct lustre_handle *lockh, __u64 extra_lock_flags)
1065 return mdc_enqueue_base(exp, einfo, policy, NULL,
1066 op_data, lockh, extra_lock_flags);
1069 static int mdc_finish_intent_lock(struct obd_export *exp,
1070 struct ptlrpc_request *request,
1071 struct md_op_data *op_data,
1072 struct lookup_intent *it,
1073 struct lustre_handle *lockh)
1075 struct lustre_handle old_lock;
1076 struct ldlm_lock *lock;
1080 LASSERT(request != NULL);
1081 LASSERT(request != LP_POISON);
1082 LASSERT(request->rq_repmsg != LP_POISON);
1084 if (it->it_op & IT_READDIR)
1087 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1088 if (it->it_status != 0)
1089 GOTO(out, rc = it->it_status);
1091 if (!it_disposition(it, DISP_IT_EXECD)) {
1092 /* The server failed before it even started executing
1093 * the intent, i.e. because it couldn't unpack the
1096 LASSERT(it->it_status != 0);
1097 GOTO(out, rc = it->it_status);
1099 rc = it_open_error(DISP_IT_EXECD, it);
1103 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1107 /* keep requests around for the multiple phases of the call
1108 * this shows the DISP_XX must guarantee we make it into the
1111 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1112 it_disposition(it, DISP_OPEN_CREATE) &&
1113 !it_open_error(DISP_OPEN_CREATE, it)) {
1114 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1115 /* balanced in ll_create_node */
1116 ptlrpc_request_addref(request);
1118 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1119 it_disposition(it, DISP_OPEN_OPEN) &&
1120 !it_open_error(DISP_OPEN_OPEN, it)) {
1121 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1122 /* balanced in ll_file_open */
1123 ptlrpc_request_addref(request);
1124 /* BUG 11546 - eviction in the middle of open rpc
1127 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1131 if (it->it_op & IT_CREAT) {
1132 /* XXX this belongs in ll_create_it */
1133 } else if (it->it_op == IT_OPEN) {
1134 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1136 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1140 /* If we already have a matching lock, then cancel the new
1141 * one. We have to set the data here instead of in
1142 * mdc_enqueue, because we need to use the child's inode as
1143 * the l_ast_data to match, and that's not available until
1144 * intent_finish has performed the iget().) */
1145 lock = ldlm_handle2lock(lockh);
1147 union ldlm_policy_data policy = lock->l_policy_data;
1148 LDLM_DEBUG(lock, "matching against this");
1150 if (it_has_reply_body(it)) {
1151 struct mdt_body *body;
1153 body = req_capsule_server_get(&request->rq_pill,
1155 /* mdc_enqueue checked */
1156 LASSERT(body != NULL);
1157 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1158 &lock->l_resource->lr_name),
1159 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1160 PLDLMRES(lock->l_resource),
1161 PFID(&body->mbo_fid1));
1163 LDLM_LOCK_PUT(lock);
1165 memcpy(&old_lock, lockh, sizeof(*lockh));
1166 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1167 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1168 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1169 memcpy(lockh, &old_lock, sizeof(old_lock));
1170 it->it_lock_handle = lockh->cookie;
1176 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1177 (int)op_data->op_namelen, op_data->op_name,
1178 ldlm_it2str(it->it_op), it->it_status,
1179 it->it_disposition, rc);
1183 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1184 struct lu_fid *fid, __u64 *bits)
1186 /* We could just return 1 immediately, but since we should only
1187 * be called in revalidate_it if we already have a lock, let's
1189 struct ldlm_res_id res_id;
1190 struct lustre_handle lockh;
1191 union ldlm_policy_data policy;
1192 enum ldlm_mode mode;
1195 if (it->it_lock_handle) {
1196 lockh.cookie = it->it_lock_handle;
1197 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1199 fid_build_reg_res_name(fid, &res_id);
1200 switch (it->it_op) {
1202 /* File attributes are held under multiple bits:
1203 * nlink is under lookup lock, size and times are
1204 * under UPDATE lock and recently we've also got
1205 * a separate permissions lock for owner/group/acl that
1206 * were protected by lookup lock before.
1207 * Getattr must provide all of that information,
1208 * so we need to ensure we have all of those locks.
1209 * Unfortunately, if the bits are split across multiple
1210 * locks, there's no easy way to match all of them here,
1211 * so an extra RPC would be performed to fetch all
1212 * of those bits at once for now. */
1213 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1214 * but for old MDTs (< 2.4), permission is covered
1215 * by LOOKUP lock, so it needs to match all bits here.*/
1216 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1217 MDS_INODELOCK_LOOKUP |
1221 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1224 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1227 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1231 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1232 LDLM_IBITS, &policy,
1233 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1238 it->it_lock_handle = lockh.cookie;
1239 it->it_lock_mode = mode;
1241 it->it_lock_handle = 0;
1242 it->it_lock_mode = 0;
1249 * This long block is all about fixing up the lock and request state
1250 * so that it is correct as of the moment _before_ the operation was
1251 * applied; that way, the VFS will think that everything is normal and
1252 * call Lustre's regular VFS methods.
1254 * If we're performing a creation, that means that unless the creation
1255 * failed with EEXIST, we should fake up a negative dentry.
1257 * For everything else, we want to lookup to succeed.
1259 * One additional note: if CREATE or OPEN succeeded, we add an extra
1260 * reference to the request because we need to keep it around until
1261 * ll_create/ll_open gets called.
1263 * The server will return to us, in it_disposition, an indication of
1264 * exactly what it_status refers to.
1266 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1267 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1268 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1269 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1272 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1275 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1276 struct lookup_intent *it, struct ptlrpc_request **reqp,
1277 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1279 struct ldlm_enqueue_info einfo = {
1280 .ei_type = LDLM_IBITS,
1281 .ei_mode = it_to_lock_mode(it),
1282 .ei_cb_bl = cb_blocking,
1283 .ei_cb_cp = ldlm_completion_ast,
1284 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1286 struct lustre_handle lockh;
1291 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1292 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1293 op_data->op_name, PFID(&op_data->op_fid2),
1294 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1298 if (fid_is_sane(&op_data->op_fid2) &&
1299 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1300 /* We could just return 1 immediately, but since we should only
1301 * be called in revalidate_it if we already have a lock, let's
1303 it->it_lock_handle = 0;
1304 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1305 /* Only return failure if it was not GETATTR by cfid
1306 (from inode_revalidate) */
1307 if (rc || op_data->op_namelen != 0)
1311 /* For case if upper layer did not alloc fid, do it now. */
1312 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1313 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1315 CERROR("Can't alloc new fid, rc %d\n", rc);
1320 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1325 *reqp = it->it_request;
1326 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1330 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1331 struct ptlrpc_request *req,
1334 struct mdc_getattr_args *ga = args;
1335 struct obd_export *exp = ga->ga_exp;
1336 struct md_enqueue_info *minfo = ga->ga_minfo;
1337 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1338 struct lookup_intent *it;
1339 struct lustre_handle *lockh;
1340 struct obd_device *obddev;
1341 struct ldlm_reply *lockrep;
1342 __u64 flags = LDLM_FL_HAS_INTENT;
1346 lockh = &minfo->mi_lockh;
1348 obddev = class_exp2obd(exp);
1350 obd_put_request_slot(&obddev->u.cli);
1351 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1354 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1355 &flags, NULL, 0, lockh, rc);
1357 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1358 mdc_clear_replay_flag(req, rc);
1362 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1363 LASSERT(lockrep != NULL);
1365 lockrep->lock_policy_res2 =
1366 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1368 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1372 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1376 minfo->mi_cb(req, minfo, rc);
1380 int mdc_intent_getattr_async(struct obd_export *exp,
1381 struct md_enqueue_info *minfo)
1383 struct md_op_data *op_data = &minfo->mi_data;
1384 struct lookup_intent *it = &minfo->mi_it;
1385 struct ptlrpc_request *req;
1386 struct mdc_getattr_args *ga;
1387 struct obd_device *obddev = class_exp2obd(exp);
1388 struct ldlm_res_id res_id;
1389 union ldlm_policy_data policy = {
1390 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1391 MDS_INODELOCK_UPDATE } };
1393 __u64 flags = LDLM_FL_HAS_INTENT;
1396 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1397 (int)op_data->op_namelen, op_data->op_name,
1398 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1400 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1401 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1402 * of the async getattr RPC will handle that by itself. */
1403 req = mdc_intent_getattr_pack(exp, it, op_data,
1404 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1406 RETURN(PTR_ERR(req));
1408 rc = obd_get_request_slot(&obddev->u.cli);
1410 ptlrpc_req_finished(req);
1414 /* With Data-on-MDT the glimpse callback is needed too.
1415 * It is set here in advance but not in mdc_finish_enqueue()
1416 * to avoid possible races. It is safe to have glimpse handler
1417 * for non-DOM locks and costs nothing.*/
1418 if (minfo->mi_einfo.ei_cb_gl == NULL)
1419 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1421 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1422 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1424 obd_put_request_slot(&obddev->u.cli);
1425 ptlrpc_req_finished(req);
1429 ga = ptlrpc_req_async_args(ga, req);
1431 ga->ga_minfo = minfo;
1433 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1434 ptlrpcd_add_req(req);