4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
259 int repsize, repsize_estimate;
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 op_data->op_file_secctx_name_size : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 /* get SELinux policy info if any */
323 rc = sptlrpc_get_sepol(req);
325 ptlrpc_request_free(req);
328 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329 strlen(req->rq_sepol) ?
330 strlen(req->rq_sepol) + 1 : 0);
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
351 obddev->u.cli.cl_max_mds_easize);
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
354 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
355 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
357 op_data->op_file_secctx_name_size > 0 &&
358 op_data->op_file_secctx_name != NULL) {
361 secctx_name = req_capsule_client_get(&req->rq_pill,
362 &RMF_FILE_SECCTX_NAME);
363 memcpy(secctx_name, op_data->op_file_secctx_name,
364 op_data->op_file_secctx_name_size);
365 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
367 obddev->u.cli.cl_max_mds_easize);
369 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
370 op_data->op_file_secctx_name_size,
371 op_data->op_file_secctx_name);
374 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
379 * Inline buffer for possible data from Data-on-MDT files.
381 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
382 sizeof(struct niobuf_remote));
383 ptlrpc_request_set_replen(req);
385 /* Get real repbuf allocated size as rounded up power of 2 */
386 repsize = size_roundup_power2(req->rq_replen +
387 lustre_msg_early_size());
388 /* Estimate free space for DoM files in repbuf */
389 repsize_estimate = repsize - (req->rq_replen -
390 obddev->u.cli.cl_max_mds_easize +
391 sizeof(struct lov_comp_md_v1) +
392 sizeof(struct lov_comp_md_entry_v1) +
393 lov_mds_md_size(0, LOV_MAGIC_V3));
395 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
396 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
397 repsize_estimate + sizeof(struct niobuf_remote);
398 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
400 sizeof(struct niobuf_remote) + repsize);
401 ptlrpc_request_set_replen(req);
402 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
403 repsize, req->rq_replen);
404 repsize = size_roundup_power2(req->rq_replen +
405 lustre_msg_early_size());
407 /* The only way to report real allocated repbuf size to the server
408 * is the lm_repsize but it must be set prior buffer allocation itself
409 * due to security reasons - it is part of buffer used in signature
410 * calculation (see LU-11414). Therefore the saved size is predicted
411 * value as rq_replen rounded to the next higher power of 2.
412 * Such estimation is safe. Though the final allocated buffer might
413 * be even larger, it is not possible to know that at this point.
415 req->rq_reqmsg->lm_repsize = repsize;
419 #define GA_DEFAULT_EA_NAME_LEN 20
420 #define GA_DEFAULT_EA_VAL_LEN 250
421 #define GA_DEFAULT_EA_NUM 10
423 static struct ptlrpc_request *
424 mdc_intent_getxattr_pack(struct obd_export *exp,
425 struct lookup_intent *it,
426 struct md_op_data *op_data)
428 struct ptlrpc_request *req;
429 struct ldlm_intent *lit;
431 struct list_head cancels = LIST_HEAD_INIT(cancels);
432 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
436 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437 &RQF_LDLM_INTENT_GETXATTR);
439 RETURN(ERR_PTR(-ENOMEM));
441 /* get SELinux policy info if any */
442 rc = sptlrpc_get_sepol(req);
444 ptlrpc_request_free(req);
447 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
448 strlen(req->rq_sepol) ?
449 strlen(req->rq_sepol) + 1 : 0);
451 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
453 ptlrpc_request_free(req);
457 /* pack the intent */
458 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459 lit->opc = IT_GETXATTR;
460 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
461 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
463 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
464 /* If the supplied buffer is too small then the server will
465 * return -ERANGE and llite will fallback to using non cached
466 * xattr operations. On servers before 2.10.1 a (non-cached)
467 * listxattr RPC for an orphan or dead file causes an oops. So
468 * let's try to avoid sending too small a buffer to too old a
469 * server. This is effectively undoing the memory conservation
470 * of LU-9417 when it would be *more* likely to crash the
471 * server. See LU-9856. */
472 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
473 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
474 exp->exp_connect_data.ocd_max_easize);
477 /* pack the intended request */
478 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
479 ea_vals_buf_size, -1, 0);
481 /* get SELinux policy info if any */
482 mdc_file_sepol_pack(req);
484 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
485 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
487 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
490 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
491 sizeof(u32) * GA_DEFAULT_EA_NUM);
493 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
495 ptlrpc_request_set_replen(req);
500 static struct ptlrpc_request *
501 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
502 struct md_op_data *op_data, __u32 acl_bufsize)
504 struct ptlrpc_request *req;
505 struct obd_device *obddev = class_exp2obd(exp);
506 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
507 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
508 OBD_MD_MEA | OBD_MD_FLACL;
509 struct ldlm_intent *lit;
512 bool have_secctx = false;
515 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
516 &RQF_LDLM_INTENT_GETATTR);
518 RETURN(ERR_PTR(-ENOMEM));
520 /* send name of security xattr to get upon intent */
521 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
522 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
524 op_data->op_file_secctx_name_size > 0 &&
525 op_data->op_file_secctx_name != NULL) {
527 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
529 op_data->op_file_secctx_name_size);
532 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
533 op_data->op_namelen + 1);
535 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537 ptlrpc_request_free(req);
541 /* pack the intent */
542 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
543 lit->opc = (__u64)it->it_op;
545 if (obddev->u.cli.cl_default_mds_easize > 0)
546 easize = obddev->u.cli.cl_default_mds_easize;
548 easize = obddev->u.cli.cl_max_mds_easize;
550 /* pack the intended request */
551 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
553 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
554 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
559 secctx_name = req_capsule_client_get(&req->rq_pill,
560 &RMF_FILE_SECCTX_NAME);
561 memcpy(secctx_name, op_data->op_file_secctx_name,
562 op_data->op_file_secctx_name_size);
564 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
567 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
568 op_data->op_file_secctx_name_size,
569 op_data->op_file_secctx_name);
571 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
575 ptlrpc_request_set_replen(req);
579 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
580 struct lookup_intent *it,
581 struct md_op_data *op_data)
583 struct obd_device *obd = class_exp2obd(exp);
584 struct ptlrpc_request *req;
585 struct ldlm_intent *lit;
586 struct layout_intent *layout;
590 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
591 &RQF_LDLM_INTENT_LAYOUT);
593 RETURN(ERR_PTR(-ENOMEM));
595 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
596 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
598 ptlrpc_request_free(req);
602 /* pack the intent */
603 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
604 lit->opc = (__u64)it->it_op;
606 /* pack the layout intent request */
607 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
608 LASSERT(op_data->op_data != NULL);
609 LASSERT(op_data->op_data_size == sizeof(*layout));
610 memcpy(layout, op_data->op_data, sizeof(*layout));
612 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
613 obd->u.cli.cl_default_mds_easize);
614 ptlrpc_request_set_replen(req);
618 static struct ptlrpc_request *
619 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
621 struct ptlrpc_request *req;
625 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
627 RETURN(ERR_PTR(-ENOMEM));
629 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
631 ptlrpc_request_free(req);
635 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
636 ptlrpc_request_set_replen(req);
640 static int mdc_finish_enqueue(struct obd_export *exp,
641 struct ptlrpc_request *req,
642 struct ldlm_enqueue_info *einfo,
643 struct lookup_intent *it,
644 struct lustre_handle *lockh,
647 struct req_capsule *pill = &req->rq_pill;
648 struct ldlm_request *lockreq;
649 struct ldlm_reply *lockrep;
650 struct ldlm_lock *lock;
651 struct mdt_body *body = NULL;
652 void *lvb_data = NULL;
658 /* Similarly, if we're going to replay this request, we don't want to
659 * actually get a lock, just perform the intent. */
660 if (req->rq_transno || req->rq_replay) {
661 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
662 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
665 if (rc == ELDLM_LOCK_ABORTED) {
667 memset(lockh, 0, sizeof(*lockh));
669 } else { /* rc = 0 */
670 lock = ldlm_handle2lock(lockh);
671 LASSERT(lock != NULL);
673 /* If the server gave us back a different lock mode, we should
674 * fix up our variables. */
675 if (lock->l_req_mode != einfo->ei_mode) {
676 ldlm_lock_addref(lockh, lock->l_req_mode);
677 ldlm_lock_decref(lockh, einfo->ei_mode);
678 einfo->ei_mode = lock->l_req_mode;
683 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
684 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
686 it->it_disposition = (int)lockrep->lock_policy_res1;
687 it->it_status = (int)lockrep->lock_policy_res2;
688 it->it_lock_mode = einfo->ei_mode;
689 it->it_lock_handle = lockh->cookie;
690 it->it_request = req;
692 /* Technically speaking rq_transno must already be zero if
693 * it_status is in error, so the check is a bit redundant */
694 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
695 mdc_clear_replay_flag(req, it->it_status);
697 /* If we're doing an IT_OPEN which did not result in an actual
698 * successful open, then we need to remove the bit which saves
699 * this request for unconditional replay.
701 * It's important that we do this first! Otherwise we might exit the
702 * function without doing so, and try to replay a failed create
704 if (it->it_op & IT_OPEN && req->rq_replay &&
705 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
706 mdc_clear_replay_flag(req, it->it_status);
708 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
709 it->it_op, it->it_disposition, it->it_status);
711 /* We know what to expect, so we do any byte flipping required here */
712 if (it_has_reply_body(it)) {
713 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
715 CERROR ("Can't swab mdt_body\n");
719 if (it_disposition(it, DISP_OPEN_OPEN) &&
720 !it_open_error(DISP_OPEN_OPEN, it)) {
722 * If this is a successful OPEN request, we need to set
723 * replay handler and data early, so that if replay
724 * happens immediately after swabbing below, new reply
725 * is swabbed by that handler correctly.
727 mdc_set_open_replay_data(NULL, NULL, it);
730 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
733 mdc_update_max_ea_from_body(exp, body);
736 * The eadata is opaque; just check that it is there.
737 * Eventually, obd_unpackmd() will check the contents.
739 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
740 body->mbo_eadatasize);
744 /* save lvb data and length in case this is for layout
747 lvb_len = body->mbo_eadatasize;
750 * We save the reply LOV EA in case we have to replay a
751 * create for recovery. If we didn't allocate a large
752 * enough request buffer above we need to reallocate it
753 * here to hold the actual LOV EA.
755 * To not save LOV EA if request is not going to replay
756 * (for example error one).
758 if ((it->it_op & IT_OPEN) && req->rq_replay) {
759 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
760 body->mbo_eadatasize);
762 body->mbo_valid &= ~OBD_MD_FLEASIZE;
763 body->mbo_eadatasize = 0;
768 } else if (it->it_op & IT_LAYOUT) {
769 /* maybe the lock was granted right away and layout
770 * is packed into RMF_DLM_LVB of req */
771 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
772 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
773 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
775 lvb_data = req_capsule_server_sized_get(pill,
776 &RMF_DLM_LVB, lvb_len);
777 if (lvb_data == NULL)
781 * save replied layout data to the request buffer for
782 * recovery consideration (lest MDS reinitialize
783 * another set of OST objects).
786 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
791 /* fill in stripe data for layout lock.
792 * LU-6581: trust layout data only if layout lock is granted. The MDT
793 * has stopped sending layout unless the layout lock is granted. The
794 * client still does this checking in case it's talking with an old
795 * server. - Jinshan */
796 lock = ldlm_handle2lock(lockh);
800 if (ldlm_has_layout(lock) && lvb_data != NULL &&
801 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
804 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
805 ldlm_it2str(it->it_op), lvb_len);
807 OBD_ALLOC_LARGE(lmm, lvb_len);
809 GOTO(out_lock, rc = -ENOMEM);
811 memcpy(lmm, lvb_data, lvb_len);
813 /* install lvb_data */
814 lock_res_and_lock(lock);
815 if (lock->l_lvb_data == NULL) {
816 lock->l_lvb_type = LVB_T_LAYOUT;
817 lock->l_lvb_data = lmm;
818 lock->l_lvb_len = lvb_len;
821 unlock_res_and_lock(lock);
823 OBD_FREE_LARGE(lmm, lvb_len);
826 if (ldlm_has_dom(lock)) {
827 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
829 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
830 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
831 LDLM_ERROR(lock, "%s: DoM lock without size.",
832 exp->exp_obd->obd_name);
833 GOTO(out_lock, rc = -EPROTO);
836 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
837 ldlm_it2str(it->it_op), body->mbo_dom_size);
839 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
847 /* We always reserve enough space in the reply packet for a stripe MD, because
848 * we don't know in advance the file type. */
849 static int mdc_enqueue_base(struct obd_export *exp,
850 struct ldlm_enqueue_info *einfo,
851 const union ldlm_policy_data *policy,
852 struct lookup_intent *it,
853 struct md_op_data *op_data,
854 struct lustre_handle *lockh,
855 __u64 extra_lock_flags)
857 struct obd_device *obddev = class_exp2obd(exp);
858 struct ptlrpc_request *req = NULL;
859 __u64 flags, saved_flags = extra_lock_flags;
860 struct ldlm_res_id res_id;
861 static const union ldlm_policy_data lookup_policy = {
862 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
863 static const union ldlm_policy_data update_policy = {
864 .l_inodebits = { MDS_INODELOCK_UPDATE } };
865 static const union ldlm_policy_data layout_policy = {
866 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
867 static const union ldlm_policy_data getxattr_policy = {
868 .l_inodebits = { MDS_INODELOCK_XATTR } };
869 int generation, resends = 0;
870 struct ldlm_reply *lockrep;
871 struct obd_import *imp = class_exp2cliimp(exp);
873 enum lvb_type lvb_type = 0;
877 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
879 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
882 LASSERT(policy == NULL);
884 saved_flags |= LDLM_FL_HAS_INTENT;
885 if (it->it_op & (IT_GETATTR | IT_READDIR))
886 policy = &update_policy;
887 else if (it->it_op & IT_LAYOUT)
888 policy = &layout_policy;
889 else if (it->it_op & IT_GETXATTR)
890 policy = &getxattr_policy;
892 policy = &lookup_policy;
895 generation = obddev->u.cli.cl_import->imp_generation;
896 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
897 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
900 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
905 /* The only way right now is FLOCK. */
906 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
908 res_id.name[3] = LDLM_FLOCK;
909 } else if (it->it_op & IT_OPEN) {
910 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
911 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
912 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
913 } else if (it->it_op & IT_READDIR) {
914 req = mdc_enqueue_pack(exp, 0);
915 } else if (it->it_op & IT_LAYOUT) {
916 if (!imp_connect_lvb_type(imp))
918 req = mdc_intent_layout_pack(exp, it, op_data);
919 lvb_type = LVB_T_LAYOUT;
920 } else if (it->it_op & IT_GETXATTR) {
921 req = mdc_intent_getxattr_pack(exp, it, op_data);
928 RETURN(PTR_ERR(req));
931 req->rq_generation_set = 1;
932 req->rq_import_generation = generation;
933 req->rq_sent = ktime_get_real_seconds() + resends;
936 /* It is important to obtain modify RPC slot first (if applicable), so
937 * that threads that are waiting for a modify RPC slot are not polluting
938 * our rpcs in flight counter.
939 * We do not do flock request limiting, though */
941 mdc_get_mod_rpc_slot(req, it);
942 rc = obd_get_request_slot(&obddev->u.cli);
944 mdc_put_mod_rpc_slot(req, it);
945 mdc_clear_replay_flag(req, 0);
946 ptlrpc_req_finished(req);
951 /* With Data-on-MDT the glimpse callback is needed too.
952 * It is set here in advance but not in mdc_finish_enqueue()
953 * to avoid possible races. It is safe to have glimpse handler
954 * for non-DOM locks and costs nothing.*/
955 if (einfo->ei_cb_gl == NULL)
956 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
958 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
959 0, lvb_type, lockh, 0);
961 /* For flock requests we immediatelly return without further
962 delay and let caller deal with the rest, since rest of
963 this function metadata processing makes no sense for flock
964 requests anyway. But in case of problem during comms with
965 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
966 can not rely on caller and this mainly for F_UNLCKs
967 (explicits or automatically generated by Kernel to clean
968 current FLocks upon exit) that can't be trashed */
969 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
970 (einfo->ei_type == LDLM_FLOCK) &&
971 (einfo->ei_mode == LCK_NL))
976 obd_put_request_slot(&obddev->u.cli);
977 mdc_put_mod_rpc_slot(req, it);
981 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
982 obddev->obd_name, PFID(&op_data->op_fid1),
983 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
985 mdc_clear_replay_flag(req, rc);
986 ptlrpc_req_finished(req);
990 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
991 LASSERT(lockrep != NULL);
993 lockrep->lock_policy_res2 =
994 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
996 /* Retry infinitely when the server returns -EINPROGRESS for the
997 * intent operation, when server returns -EINPROGRESS for acquiring
998 * intent lock, we'll retry in after_reply(). */
999 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1000 mdc_clear_replay_flag(req, rc);
1001 ptlrpc_req_finished(req);
1002 if (generation == obddev->u.cli.cl_import->imp_generation) {
1003 if (signal_pending(current))
1007 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1008 obddev->obd_name, resends, it->it_op,
1009 PFID(&op_data->op_fid1),
1010 PFID(&op_data->op_fid2));
1013 CDEBUG(D_HA, "resend cross eviction\n");
1018 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1019 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1020 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1021 mdc_clear_replay_flag(req, -ERANGE);
1022 ptlrpc_req_finished(req);
1023 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1028 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1030 if (lustre_handle_is_used(lockh)) {
1031 ldlm_lock_decref(lockh, einfo->ei_mode);
1032 memset(lockh, 0, sizeof(*lockh));
1034 ptlrpc_req_finished(req);
1036 it->it_lock_handle = 0;
1037 it->it_lock_mode = 0;
1038 it->it_request = NULL;
1044 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1045 const union ldlm_policy_data *policy,
1046 struct md_op_data *op_data,
1047 struct lustre_handle *lockh, __u64 extra_lock_flags)
1049 return mdc_enqueue_base(exp, einfo, policy, NULL,
1050 op_data, lockh, extra_lock_flags);
1053 static int mdc_finish_intent_lock(struct obd_export *exp,
1054 struct ptlrpc_request *request,
1055 struct md_op_data *op_data,
1056 struct lookup_intent *it,
1057 struct lustre_handle *lockh)
1059 struct lustre_handle old_lock;
1060 struct ldlm_lock *lock;
1064 LASSERT(request != NULL);
1065 LASSERT(request != LP_POISON);
1066 LASSERT(request->rq_repmsg != LP_POISON);
1068 if (it->it_op & IT_READDIR)
1071 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1072 if (it->it_status != 0)
1073 GOTO(out, rc = it->it_status);
1075 if (!it_disposition(it, DISP_IT_EXECD)) {
1076 /* The server failed before it even started executing
1077 * the intent, i.e. because it couldn't unpack the
1080 LASSERT(it->it_status != 0);
1081 GOTO(out, rc = it->it_status);
1083 rc = it_open_error(DISP_IT_EXECD, it);
1087 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1091 /* keep requests around for the multiple phases of the call
1092 * this shows the DISP_XX must guarantee we make it into the
1095 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1096 it_disposition(it, DISP_OPEN_CREATE) &&
1097 !it_open_error(DISP_OPEN_CREATE, it)) {
1098 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1099 /* balanced in ll_create_node */
1100 ptlrpc_request_addref(request);
1102 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1103 it_disposition(it, DISP_OPEN_OPEN) &&
1104 !it_open_error(DISP_OPEN_OPEN, it)) {
1105 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1106 /* balanced in ll_file_open */
1107 ptlrpc_request_addref(request);
1108 /* BUG 11546 - eviction in the middle of open rpc
1111 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1115 if (it->it_op & IT_CREAT) {
1116 /* XXX this belongs in ll_create_it */
1117 } else if (it->it_op == IT_OPEN) {
1118 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1120 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1124 /* If we already have a matching lock, then cancel the new
1125 * one. We have to set the data here instead of in
1126 * mdc_enqueue, because we need to use the child's inode as
1127 * the l_ast_data to match, and that's not available until
1128 * intent_finish has performed the iget().) */
1129 lock = ldlm_handle2lock(lockh);
1131 union ldlm_policy_data policy = lock->l_policy_data;
1132 LDLM_DEBUG(lock, "matching against this");
1134 if (it_has_reply_body(it)) {
1135 struct mdt_body *body;
1137 body = req_capsule_server_get(&request->rq_pill,
1139 /* mdc_enqueue checked */
1140 LASSERT(body != NULL);
1141 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1142 &lock->l_resource->lr_name),
1143 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1144 PLDLMRES(lock->l_resource),
1145 PFID(&body->mbo_fid1));
1147 LDLM_LOCK_PUT(lock);
1149 memcpy(&old_lock, lockh, sizeof(*lockh));
1150 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1151 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1152 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1153 memcpy(lockh, &old_lock, sizeof(old_lock));
1154 it->it_lock_handle = lockh->cookie;
1160 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1161 (int)op_data->op_namelen, op_data->op_name,
1162 ldlm_it2str(it->it_op), it->it_status,
1163 it->it_disposition, rc);
1167 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1168 struct lu_fid *fid, __u64 *bits)
1170 /* We could just return 1 immediately, but since we should only
1171 * be called in revalidate_it if we already have a lock, let's
1173 struct ldlm_res_id res_id;
1174 struct lustre_handle lockh;
1175 union ldlm_policy_data policy;
1176 enum ldlm_mode mode;
1179 if (it->it_lock_handle) {
1180 lockh.cookie = it->it_lock_handle;
1181 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1183 fid_build_reg_res_name(fid, &res_id);
1184 switch (it->it_op) {
1186 /* File attributes are held under multiple bits:
1187 * nlink is under lookup lock, size and times are
1188 * under UPDATE lock and recently we've also got
1189 * a separate permissions lock for owner/group/acl that
1190 * were protected by lookup lock before.
1191 * Getattr must provide all of that information,
1192 * so we need to ensure we have all of those locks.
1193 * Unfortunately, if the bits are split across multiple
1194 * locks, there's no easy way to match all of them here,
1195 * so an extra RPC would be performed to fetch all
1196 * of those bits at once for now. */
1197 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1198 * but for old MDTs (< 2.4), permission is covered
1199 * by LOOKUP lock, so it needs to match all bits here.*/
1200 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1201 MDS_INODELOCK_LOOKUP |
1205 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1208 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1211 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1215 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1216 LDLM_IBITS, &policy,
1217 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1222 it->it_lock_handle = lockh.cookie;
1223 it->it_lock_mode = mode;
1225 it->it_lock_handle = 0;
1226 it->it_lock_mode = 0;
1233 * This long block is all about fixing up the lock and request state
1234 * so that it is correct as of the moment _before_ the operation was
1235 * applied; that way, the VFS will think that everything is normal and
1236 * call Lustre's regular VFS methods.
1238 * If we're performing a creation, that means that unless the creation
1239 * failed with EEXIST, we should fake up a negative dentry.
1241 * For everything else, we want to lookup to succeed.
1243 * One additional note: if CREATE or OPEN succeeded, we add an extra
1244 * reference to the request because we need to keep it around until
1245 * ll_create/ll_open gets called.
1247 * The server will return to us, in it_disposition, an indication of
1248 * exactly what it_status refers to.
1250 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1251 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1252 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1253 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1256 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1259 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1260 struct lookup_intent *it, struct ptlrpc_request **reqp,
1261 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1263 struct ldlm_enqueue_info einfo = {
1264 .ei_type = LDLM_IBITS,
1265 .ei_mode = it_to_lock_mode(it),
1266 .ei_cb_bl = cb_blocking,
1267 .ei_cb_cp = ldlm_completion_ast,
1268 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1270 struct lustre_handle lockh;
1275 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1276 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1277 op_data->op_name, PFID(&op_data->op_fid2),
1278 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1282 if (fid_is_sane(&op_data->op_fid2) &&
1283 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1284 /* We could just return 1 immediately, but since we should only
1285 * be called in revalidate_it if we already have a lock, let's
1287 it->it_lock_handle = 0;
1288 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1289 /* Only return failure if it was not GETATTR by cfid
1290 (from inode_revalidate) */
1291 if (rc || op_data->op_namelen != 0)
1295 /* For case if upper layer did not alloc fid, do it now. */
1296 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1297 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1299 CERROR("Can't alloc new fid, rc %d\n", rc);
1304 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1309 *reqp = it->it_request;
1310 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1314 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1315 struct ptlrpc_request *req,
1318 struct mdc_getattr_args *ga = args;
1319 struct obd_export *exp = ga->ga_exp;
1320 struct md_enqueue_info *minfo = ga->ga_minfo;
1321 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1322 struct lookup_intent *it;
1323 struct lustre_handle *lockh;
1324 struct obd_device *obddev;
1325 struct ldlm_reply *lockrep;
1326 __u64 flags = LDLM_FL_HAS_INTENT;
1330 lockh = &minfo->mi_lockh;
1332 obddev = class_exp2obd(exp);
1334 obd_put_request_slot(&obddev->u.cli);
1335 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1338 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1339 &flags, NULL, 0, lockh, rc);
1341 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1342 mdc_clear_replay_flag(req, rc);
1346 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1347 LASSERT(lockrep != NULL);
1349 lockrep->lock_policy_res2 =
1350 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1352 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1356 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1360 minfo->mi_cb(req, minfo, rc);
1364 int mdc_intent_getattr_async(struct obd_export *exp,
1365 struct md_enqueue_info *minfo)
1367 struct md_op_data *op_data = &minfo->mi_data;
1368 struct lookup_intent *it = &minfo->mi_it;
1369 struct ptlrpc_request *req;
1370 struct mdc_getattr_args *ga;
1371 struct obd_device *obddev = class_exp2obd(exp);
1372 struct ldlm_res_id res_id;
1373 union ldlm_policy_data policy = {
1374 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1375 MDS_INODELOCK_UPDATE } };
1377 __u64 flags = LDLM_FL_HAS_INTENT;
1380 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1381 (int)op_data->op_namelen, op_data->op_name,
1382 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1384 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1385 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1386 * of the async getattr RPC will handle that by itself. */
1387 req = mdc_intent_getattr_pack(exp, it, op_data,
1388 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1390 RETURN(PTR_ERR(req));
1392 rc = obd_get_request_slot(&obddev->u.cli);
1394 ptlrpc_req_finished(req);
1398 /* With Data-on-MDT the glimpse callback is needed too.
1399 * It is set here in advance but not in mdc_finish_enqueue()
1400 * to avoid possible races. It is safe to have glimpse handler
1401 * for non-DOM locks and costs nothing.*/
1402 if (minfo->mi_einfo.ei_cb_gl == NULL)
1403 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1405 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1406 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1408 obd_put_request_slot(&obddev->u.cli);
1409 ptlrpc_req_finished(req);
1413 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1414 ga = ptlrpc_req_async_args(req);
1416 ga->ga_minfo = minfo;
1418 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1419 ptlrpcd_add_req(req);