4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
259 int repsize, repsize_estimate;
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 op_data->op_file_secctx_name_size : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 /* get SELinux policy info if any */
323 rc = sptlrpc_get_sepol(req);
325 ptlrpc_request_free(req);
328 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329 strlen(req->rq_sepol) ?
330 strlen(req->rq_sepol) + 1 : 0);
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
351 obddev->u.cli.cl_max_mds_easize);
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
354 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
355 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
357 op_data->op_file_secctx_name_size > 0 &&
358 op_data->op_file_secctx_name != NULL) {
361 secctx_name = req_capsule_client_get(&req->rq_pill,
362 &RMF_FILE_SECCTX_NAME);
363 memcpy(secctx_name, op_data->op_file_secctx_name,
364 op_data->op_file_secctx_name_size);
365 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
367 obddev->u.cli.cl_max_mds_easize);
369 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
370 op_data->op_file_secctx_name_size,
371 op_data->op_file_secctx_name);
374 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
379 * Inline buffer for possible data from Data-on-MDT files.
381 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
382 sizeof(struct niobuf_remote));
383 ptlrpc_request_set_replen(req);
385 /* Get real repbuf allocated size as rounded up power of 2 */
386 repsize = size_roundup_power2(req->rq_replen +
387 lustre_msg_early_size());
388 /* Estimate free space for DoM files in repbuf */
389 repsize_estimate = repsize - (req->rq_replen -
390 obddev->u.cli.cl_max_mds_easize +
391 sizeof(struct lov_comp_md_v1) +
392 sizeof(struct lov_comp_md_entry_v1) +
393 lov_mds_md_size(0, LOV_MAGIC_V3));
395 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
396 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
397 repsize_estimate + sizeof(struct niobuf_remote);
398 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
400 sizeof(struct niobuf_remote) + repsize);
401 ptlrpc_request_set_replen(req);
402 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
403 repsize, req->rq_replen);
404 repsize = size_roundup_power2(req->rq_replen +
405 lustre_msg_early_size());
407 /* The only way to report real allocated repbuf size to the server
408 * is the lm_repsize but it must be set prior buffer allocation itself
409 * due to security reasons - it is part of buffer used in signature
410 * calculation (see LU-11414). Therefore the saved size is predicted
411 * value as rq_replen rounded to the next higher power of 2.
412 * Such estimation is safe. Though the final allocated buffer might
413 * be even larger, it is not possible to know that at this point.
415 req->rq_reqmsg->lm_repsize = repsize;
419 #define GA_DEFAULT_EA_NAME_LEN 20
420 #define GA_DEFAULT_EA_VAL_LEN 250
421 #define GA_DEFAULT_EA_NUM 10
423 static struct ptlrpc_request *
424 mdc_intent_getxattr_pack(struct obd_export *exp,
425 struct lookup_intent *it,
426 struct md_op_data *op_data)
428 struct ptlrpc_request *req;
429 struct ldlm_intent *lit;
431 struct list_head cancels = LIST_HEAD_INIT(cancels);
432 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
436 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437 &RQF_LDLM_INTENT_GETXATTR);
439 RETURN(ERR_PTR(-ENOMEM));
441 /* get SELinux policy info if any */
442 rc = sptlrpc_get_sepol(req);
444 ptlrpc_request_free(req);
447 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
448 strlen(req->rq_sepol) ?
449 strlen(req->rq_sepol) + 1 : 0);
451 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
453 ptlrpc_request_free(req);
457 /* pack the intent */
458 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459 lit->opc = IT_GETXATTR;
460 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
461 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
463 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
464 /* If the supplied buffer is too small then the server will
465 * return -ERANGE and llite will fallback to using non cached
466 * xattr operations. On servers before 2.10.1 a (non-cached)
467 * listxattr RPC for an orphan or dead file causes an oops. So
468 * let's try to avoid sending too small a buffer to too old a
469 * server. This is effectively undoing the memory conservation
470 * of LU-9417 when it would be *more* likely to crash the
471 * server. See LU-9856. */
472 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
473 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
474 exp->exp_connect_data.ocd_max_easize);
477 /* pack the intended request */
478 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
479 ea_vals_buf_size, -1, 0);
481 /* get SELinux policy info if any */
482 mdc_file_sepol_pack(req);
484 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
485 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
487 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
490 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
491 sizeof(u32) * GA_DEFAULT_EA_NUM);
493 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
495 ptlrpc_request_set_replen(req);
500 static struct ptlrpc_request *
501 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
502 struct md_op_data *op_data, __u32 acl_bufsize)
504 struct ptlrpc_request *req;
505 struct obd_device *obddev = class_exp2obd(exp);
506 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
507 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
508 OBD_MD_MEA | OBD_MD_FLACL;
509 struct ldlm_intent *lit;
512 bool have_secctx = false;
515 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
516 &RQF_LDLM_INTENT_GETATTR);
518 RETURN(ERR_PTR(-ENOMEM));
520 /* send name of security xattr to get upon intent */
521 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
522 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
524 op_data->op_file_secctx_name_size > 0 &&
525 op_data->op_file_secctx_name != NULL) {
527 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
529 op_data->op_file_secctx_name_size);
532 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
533 op_data->op_namelen + 1);
535 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537 ptlrpc_request_free(req);
541 /* pack the intent */
542 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
543 lit->opc = (__u64)it->it_op;
545 if (obddev->u.cli.cl_default_mds_easize > 0)
546 easize = obddev->u.cli.cl_default_mds_easize;
548 easize = obddev->u.cli.cl_max_mds_easize;
550 /* pack the intended request */
551 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
553 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
554 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
559 secctx_name = req_capsule_client_get(&req->rq_pill,
560 &RMF_FILE_SECCTX_NAME);
561 memcpy(secctx_name, op_data->op_file_secctx_name,
562 op_data->op_file_secctx_name_size);
564 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
567 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
568 op_data->op_file_secctx_name_size,
569 op_data->op_file_secctx_name);
571 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
575 ptlrpc_request_set_replen(req);
579 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
580 struct lookup_intent *it,
581 struct md_op_data *op_data)
583 struct obd_device *obd = class_exp2obd(exp);
584 struct ptlrpc_request *req;
585 struct ldlm_intent *lit;
586 struct layout_intent *layout;
590 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
591 &RQF_LDLM_INTENT_LAYOUT);
593 RETURN(ERR_PTR(-ENOMEM));
595 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
596 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
598 ptlrpc_request_free(req);
602 /* pack the intent */
603 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
604 lit->opc = (__u64)it->it_op;
606 /* pack the layout intent request */
607 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
608 LASSERT(op_data->op_data != NULL);
609 LASSERT(op_data->op_data_size == sizeof(*layout));
610 memcpy(layout, op_data->op_data, sizeof(*layout));
612 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
613 obd->u.cli.cl_default_mds_easize);
614 ptlrpc_request_set_replen(req);
618 static struct ptlrpc_request *
619 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
621 struct ptlrpc_request *req;
625 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
627 RETURN(ERR_PTR(-ENOMEM));
629 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
631 ptlrpc_request_free(req);
635 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
636 ptlrpc_request_set_replen(req);
640 static int mdc_finish_enqueue(struct obd_export *exp,
641 struct ptlrpc_request *req,
642 struct ldlm_enqueue_info *einfo,
643 struct lookup_intent *it,
644 struct lustre_handle *lockh,
647 struct req_capsule *pill = &req->rq_pill;
648 struct ldlm_request *lockreq;
649 struct ldlm_reply *lockrep;
650 struct ldlm_lock *lock;
651 struct mdt_body *body = NULL;
652 void *lvb_data = NULL;
658 /* Similarly, if we're going to replay this request, we don't want to
659 * actually get a lock, just perform the intent. */
660 if (req->rq_transno || req->rq_replay) {
661 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
662 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
665 if (rc == ELDLM_LOCK_ABORTED) {
667 memset(lockh, 0, sizeof(*lockh));
669 } else { /* rc = 0 */
670 lock = ldlm_handle2lock(lockh);
671 LASSERT(lock != NULL);
673 /* If the server gave us back a different lock mode, we should
674 * fix up our variables. */
675 if (lock->l_req_mode != einfo->ei_mode) {
676 ldlm_lock_addref(lockh, lock->l_req_mode);
677 ldlm_lock_decref(lockh, einfo->ei_mode);
678 einfo->ei_mode = lock->l_req_mode;
683 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
684 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
686 it->it_disposition = (int)lockrep->lock_policy_res1;
687 it->it_status = (int)lockrep->lock_policy_res2;
688 it->it_lock_mode = einfo->ei_mode;
689 it->it_lock_handle = lockh->cookie;
690 it->it_request = req;
692 /* Technically speaking rq_transno must already be zero if
693 * it_status is in error, so the check is a bit redundant */
694 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
695 mdc_clear_replay_flag(req, it->it_status);
697 /* If we're doing an IT_OPEN which did not result in an actual
698 * successful open, then we need to remove the bit which saves
699 * this request for unconditional replay.
701 * It's important that we do this first! Otherwise we might exit the
702 * function without doing so, and try to replay a failed create
704 if (it->it_op & IT_OPEN && req->rq_replay &&
705 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
706 mdc_clear_replay_flag(req, it->it_status);
708 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
709 it->it_op, it->it_disposition, it->it_status);
711 /* We know what to expect, so we do any byte flipping required here */
712 if (it_has_reply_body(it)) {
713 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
715 CERROR ("Can't swab mdt_body\n");
719 if (it_disposition(it, DISP_OPEN_OPEN) &&
720 !it_open_error(DISP_OPEN_OPEN, it)) {
722 * If this is a successful OPEN request, we need to set
723 * replay handler and data early, so that if replay
724 * happens immediately after swabbing below, new reply
725 * is swabbed by that handler correctly.
727 mdc_set_open_replay_data(NULL, NULL, it);
730 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
733 mdc_update_max_ea_from_body(exp, body);
736 * The eadata is opaque; just check that it is there.
737 * Eventually, obd_unpackmd() will check the contents.
739 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
740 body->mbo_eadatasize);
744 /* save lvb data and length in case this is for layout
747 lvb_len = body->mbo_eadatasize;
750 * We save the reply LOV EA in case we have to replay a
751 * create for recovery. If we didn't allocate a large
752 * enough request buffer above we need to reallocate it
753 * here to hold the actual LOV EA.
755 * To not save LOV EA if request is not going to replay
756 * (for example error one).
758 if ((it->it_op & IT_OPEN) && req->rq_replay) {
759 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
760 body->mbo_eadatasize);
762 body->mbo_valid &= ~OBD_MD_FLEASIZE;
763 body->mbo_eadatasize = 0;
768 } else if (it->it_op & IT_LAYOUT) {
769 /* maybe the lock was granted right away and layout
770 * is packed into RMF_DLM_LVB of req */
771 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
772 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
773 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
775 lvb_data = req_capsule_server_sized_get(pill,
776 &RMF_DLM_LVB, lvb_len);
777 if (lvb_data == NULL)
781 * save replied layout data to the request buffer for
782 * recovery consideration (lest MDS reinitialize
783 * another set of OST objects).
786 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
791 /* fill in stripe data for layout lock.
792 * LU-6581: trust layout data only if layout lock is granted. The MDT
793 * has stopped sending layout unless the layout lock is granted. The
794 * client still does this checking in case it's talking with an old
795 * server. - Jinshan */
796 lock = ldlm_handle2lock(lockh);
800 if (ldlm_has_layout(lock) && lvb_data != NULL &&
801 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
804 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
805 ldlm_it2str(it->it_op), lvb_len);
807 OBD_ALLOC_LARGE(lmm, lvb_len);
809 GOTO(out_lock, rc = -ENOMEM);
811 memcpy(lmm, lvb_data, lvb_len);
813 /* install lvb_data */
814 lock_res_and_lock(lock);
815 if (lock->l_lvb_data == NULL) {
816 lock->l_lvb_type = LVB_T_LAYOUT;
817 lock->l_lvb_data = lmm;
818 lock->l_lvb_len = lvb_len;
821 unlock_res_and_lock(lock);
823 OBD_FREE_LARGE(lmm, lvb_len);
826 if (ldlm_has_dom(lock)) {
827 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
829 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
830 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
831 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
832 exp->exp_obd->obd_name);
833 GOTO(out_lock, rc = -EPROTO);
836 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
837 ldlm_it2str(it->it_op), body->mbo_dom_size);
839 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
847 /* We always reserve enough space in the reply packet for a stripe MD, because
848 * we don't know in advance the file type. */
849 static int mdc_enqueue_base(struct obd_export *exp,
850 struct ldlm_enqueue_info *einfo,
851 const union ldlm_policy_data *policy,
852 struct lookup_intent *it,
853 struct md_op_data *op_data,
854 struct lustre_handle *lockh,
855 __u64 extra_lock_flags)
857 struct obd_device *obddev = class_exp2obd(exp);
858 struct ptlrpc_request *req = NULL;
859 __u64 flags, saved_flags = extra_lock_flags;
860 struct ldlm_res_id res_id;
861 static const union ldlm_policy_data lookup_policy = {
862 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
863 static const union ldlm_policy_data update_policy = {
864 .l_inodebits = { MDS_INODELOCK_UPDATE } };
865 static const union ldlm_policy_data layout_policy = {
866 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
867 static const union ldlm_policy_data getxattr_policy = {
868 .l_inodebits = { MDS_INODELOCK_XATTR } };
869 int generation, resends = 0;
870 struct ldlm_reply *lockrep;
871 struct obd_import *imp = class_exp2cliimp(exp);
873 enum lvb_type lvb_type = 0;
877 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
879 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
882 LASSERT(policy == NULL);
884 saved_flags |= LDLM_FL_HAS_INTENT;
885 if (it->it_op & (IT_GETATTR | IT_READDIR))
886 policy = &update_policy;
887 else if (it->it_op & IT_LAYOUT)
888 policy = &layout_policy;
889 else if (it->it_op & IT_GETXATTR)
890 policy = &getxattr_policy;
892 policy = &lookup_policy;
895 generation = obddev->u.cli.cl_import->imp_generation;
896 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
897 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
899 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
904 /* The only way right now is FLOCK. */
905 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
907 res_id.name[3] = LDLM_FLOCK;
908 } else if (it->it_op & IT_OPEN) {
909 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
910 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
911 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
912 } else if (it->it_op & IT_READDIR) {
913 req = mdc_enqueue_pack(exp, 0);
914 } else if (it->it_op & IT_LAYOUT) {
915 if (!imp_connect_lvb_type(imp))
917 req = mdc_intent_layout_pack(exp, it, op_data);
918 lvb_type = LVB_T_LAYOUT;
919 } else if (it->it_op & IT_GETXATTR) {
920 req = mdc_intent_getxattr_pack(exp, it, op_data);
927 RETURN(PTR_ERR(req));
930 req->rq_generation_set = 1;
931 req->rq_import_generation = generation;
932 req->rq_sent = ktime_get_real_seconds() + resends;
935 /* It is important to obtain modify RPC slot first (if applicable), so
936 * that threads that are waiting for a modify RPC slot are not polluting
937 * our rpcs in flight counter.
938 * We do not do flock request limiting, though */
940 mdc_get_mod_rpc_slot(req, it);
941 rc = obd_get_request_slot(&obddev->u.cli);
943 mdc_put_mod_rpc_slot(req, it);
944 mdc_clear_replay_flag(req, 0);
945 ptlrpc_req_finished(req);
950 /* With Data-on-MDT the glimpse callback is needed too.
951 * It is set here in advance but not in mdc_finish_enqueue()
952 * to avoid possible races. It is safe to have glimpse handler
953 * for non-DOM locks and costs nothing.*/
954 if (einfo->ei_cb_gl == NULL)
955 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
957 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
958 0, lvb_type, lockh, 0);
960 /* For flock requests we immediatelly return without further
961 delay and let caller deal with the rest, since rest of
962 this function metadata processing makes no sense for flock
963 requests anyway. But in case of problem during comms with
964 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
965 can not rely on caller and this mainly for F_UNLCKs
966 (explicits or automatically generated by Kernel to clean
967 current FLocks upon exit) that can't be trashed */
968 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
969 (einfo->ei_type == LDLM_FLOCK) &&
970 (einfo->ei_mode == LCK_NL))
975 obd_put_request_slot(&obddev->u.cli);
976 mdc_put_mod_rpc_slot(req, it);
980 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
981 obddev->obd_name, PFID(&op_data->op_fid1),
982 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
984 mdc_clear_replay_flag(req, rc);
985 ptlrpc_req_finished(req);
989 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
990 LASSERT(lockrep != NULL);
992 lockrep->lock_policy_res2 =
993 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
995 /* Retry infinitely when the server returns -EINPROGRESS for the
996 * intent operation, when server returns -EINPROGRESS for acquiring
997 * intent lock, we'll retry in after_reply(). */
998 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
999 mdc_clear_replay_flag(req, rc);
1000 ptlrpc_req_finished(req);
1001 if (generation == obddev->u.cli.cl_import->imp_generation) {
1002 if (signal_pending(current))
1006 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1007 obddev->obd_name, resends, it->it_op,
1008 PFID(&op_data->op_fid1),
1009 PFID(&op_data->op_fid2));
1012 CDEBUG(D_HA, "resend cross eviction\n");
1017 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1018 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1019 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
1020 mdc_clear_replay_flag(req, -ERANGE);
1021 ptlrpc_req_finished(req);
1022 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
1026 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1028 if (lustre_handle_is_used(lockh)) {
1029 ldlm_lock_decref(lockh, einfo->ei_mode);
1030 memset(lockh, 0, sizeof(*lockh));
1032 ptlrpc_req_finished(req);
1034 it->it_lock_handle = 0;
1035 it->it_lock_mode = 0;
1036 it->it_request = NULL;
1042 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1043 const union ldlm_policy_data *policy,
1044 struct md_op_data *op_data,
1045 struct lustre_handle *lockh, __u64 extra_lock_flags)
1047 return mdc_enqueue_base(exp, einfo, policy, NULL,
1048 op_data, lockh, extra_lock_flags);
1051 static int mdc_finish_intent_lock(struct obd_export *exp,
1052 struct ptlrpc_request *request,
1053 struct md_op_data *op_data,
1054 struct lookup_intent *it,
1055 struct lustre_handle *lockh)
1057 struct lustre_handle old_lock;
1058 struct ldlm_lock *lock;
1062 LASSERT(request != NULL);
1063 LASSERT(request != LP_POISON);
1064 LASSERT(request->rq_repmsg != LP_POISON);
1066 if (it->it_op & IT_READDIR)
1069 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1070 if (it->it_status != 0)
1071 GOTO(out, rc = it->it_status);
1073 if (!it_disposition(it, DISP_IT_EXECD)) {
1074 /* The server failed before it even started executing
1075 * the intent, i.e. because it couldn't unpack the
1078 LASSERT(it->it_status != 0);
1079 GOTO(out, rc = it->it_status);
1081 rc = it_open_error(DISP_IT_EXECD, it);
1085 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1089 /* keep requests around for the multiple phases of the call
1090 * this shows the DISP_XX must guarantee we make it into the
1093 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1094 it_disposition(it, DISP_OPEN_CREATE) &&
1095 !it_open_error(DISP_OPEN_CREATE, it)) {
1096 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1097 /* balanced in ll_create_node */
1098 ptlrpc_request_addref(request);
1100 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1101 it_disposition(it, DISP_OPEN_OPEN) &&
1102 !it_open_error(DISP_OPEN_OPEN, it)) {
1103 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1104 /* balanced in ll_file_open */
1105 ptlrpc_request_addref(request);
1106 /* BUG 11546 - eviction in the middle of open rpc
1109 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1113 if (it->it_op & IT_CREAT) {
1114 /* XXX this belongs in ll_create_it */
1115 } else if (it->it_op == IT_OPEN) {
1116 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1118 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1122 /* If we already have a matching lock, then cancel the new
1123 * one. We have to set the data here instead of in
1124 * mdc_enqueue, because we need to use the child's inode as
1125 * the l_ast_data to match, and that's not available until
1126 * intent_finish has performed the iget().) */
1127 lock = ldlm_handle2lock(lockh);
1129 union ldlm_policy_data policy = lock->l_policy_data;
1130 LDLM_DEBUG(lock, "matching against this");
1132 if (it_has_reply_body(it)) {
1133 struct mdt_body *body;
1135 body = req_capsule_server_get(&request->rq_pill,
1137 /* mdc_enqueue checked */
1138 LASSERT(body != NULL);
1139 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1140 &lock->l_resource->lr_name),
1141 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1142 PLDLMRES(lock->l_resource),
1143 PFID(&body->mbo_fid1));
1145 LDLM_LOCK_PUT(lock);
1147 memcpy(&old_lock, lockh, sizeof(*lockh));
1148 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1149 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1150 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1151 memcpy(lockh, &old_lock, sizeof(old_lock));
1152 it->it_lock_handle = lockh->cookie;
1158 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1159 (int)op_data->op_namelen, op_data->op_name,
1160 ldlm_it2str(it->it_op), it->it_status,
1161 it->it_disposition, rc);
1165 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1166 struct lu_fid *fid, __u64 *bits)
1168 /* We could just return 1 immediately, but since we should only
1169 * be called in revalidate_it if we already have a lock, let's
1171 struct ldlm_res_id res_id;
1172 struct lustre_handle lockh;
1173 union ldlm_policy_data policy;
1174 enum ldlm_mode mode;
1177 if (it->it_lock_handle) {
1178 lockh.cookie = it->it_lock_handle;
1179 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1181 fid_build_reg_res_name(fid, &res_id);
1182 switch (it->it_op) {
1184 /* File attributes are held under multiple bits:
1185 * nlink is under lookup lock, size and times are
1186 * under UPDATE lock and recently we've also got
1187 * a separate permissions lock for owner/group/acl that
1188 * were protected by lookup lock before.
1189 * Getattr must provide all of that information,
1190 * so we need to ensure we have all of those locks.
1191 * Unfortunately, if the bits are split across multiple
1192 * locks, there's no easy way to match all of them here,
1193 * so an extra RPC would be performed to fetch all
1194 * of those bits at once for now. */
1195 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1196 * but for old MDTs (< 2.4), permission is covered
1197 * by LOOKUP lock, so it needs to match all bits here.*/
1198 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1199 MDS_INODELOCK_LOOKUP |
1203 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1206 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1209 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1213 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1214 LDLM_IBITS, &policy,
1215 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1220 it->it_lock_handle = lockh.cookie;
1221 it->it_lock_mode = mode;
1223 it->it_lock_handle = 0;
1224 it->it_lock_mode = 0;
1231 * This long block is all about fixing up the lock and request state
1232 * so that it is correct as of the moment _before_ the operation was
1233 * applied; that way, the VFS will think that everything is normal and
1234 * call Lustre's regular VFS methods.
1236 * If we're performing a creation, that means that unless the creation
1237 * failed with EEXIST, we should fake up a negative dentry.
1239 * For everything else, we want to lookup to succeed.
1241 * One additional note: if CREATE or OPEN succeeded, we add an extra
1242 * reference to the request because we need to keep it around until
1243 * ll_create/ll_open gets called.
1245 * The server will return to us, in it_disposition, an indication of
1246 * exactly what it_status refers to.
1248 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1249 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1250 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1251 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1254 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1257 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1258 struct lookup_intent *it, struct ptlrpc_request **reqp,
1259 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1261 struct ldlm_enqueue_info einfo = {
1262 .ei_type = LDLM_IBITS,
1263 .ei_mode = it_to_lock_mode(it),
1264 .ei_cb_bl = cb_blocking,
1265 .ei_cb_cp = ldlm_completion_ast,
1266 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1268 struct lustre_handle lockh;
1273 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1274 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1275 op_data->op_name, PFID(&op_data->op_fid2),
1276 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1280 if (fid_is_sane(&op_data->op_fid2) &&
1281 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1282 /* We could just return 1 immediately, but since we should only
1283 * be called in revalidate_it if we already have a lock, let's
1285 it->it_lock_handle = 0;
1286 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1287 /* Only return failure if it was not GETATTR by cfid
1288 (from inode_revalidate) */
1289 if (rc || op_data->op_namelen != 0)
1293 /* For case if upper layer did not alloc fid, do it now. */
1294 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1295 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1297 CERROR("Can't alloc new fid, rc %d\n", rc);
1302 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1307 *reqp = it->it_request;
1308 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1312 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1313 struct ptlrpc_request *req,
1316 struct mdc_getattr_args *ga = args;
1317 struct obd_export *exp = ga->ga_exp;
1318 struct md_enqueue_info *minfo = ga->ga_minfo;
1319 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1320 struct lookup_intent *it;
1321 struct lustre_handle *lockh;
1322 struct obd_device *obddev;
1323 struct ldlm_reply *lockrep;
1324 __u64 flags = LDLM_FL_HAS_INTENT;
1328 lockh = &minfo->mi_lockh;
1330 obddev = class_exp2obd(exp);
1332 obd_put_request_slot(&obddev->u.cli);
1333 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1336 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1337 &flags, NULL, 0, lockh, rc);
1339 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1340 mdc_clear_replay_flag(req, rc);
1344 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1345 LASSERT(lockrep != NULL);
1347 lockrep->lock_policy_res2 =
1348 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1350 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1354 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1358 minfo->mi_cb(req, minfo, rc);
1362 int mdc_intent_getattr_async(struct obd_export *exp,
1363 struct md_enqueue_info *minfo)
1365 struct md_op_data *op_data = &minfo->mi_data;
1366 struct lookup_intent *it = &minfo->mi_it;
1367 struct ptlrpc_request *req;
1368 struct mdc_getattr_args *ga;
1369 struct obd_device *obddev = class_exp2obd(exp);
1370 struct ldlm_res_id res_id;
1371 union ldlm_policy_data policy = {
1372 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1373 MDS_INODELOCK_UPDATE } };
1375 __u64 flags = LDLM_FL_HAS_INTENT;
1378 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1379 (int)op_data->op_namelen, op_data->op_name,
1380 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1382 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1383 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1384 * of the async getattr RPC will handle that by itself. */
1385 req = mdc_intent_getattr_pack(exp, it, op_data,
1386 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1388 RETURN(PTR_ERR(req));
1390 rc = obd_get_request_slot(&obddev->u.cli);
1392 ptlrpc_req_finished(req);
1396 /* With Data-on-MDT the glimpse callback is needed too.
1397 * It is set here in advance but not in mdc_finish_enqueue()
1398 * to avoid possible races. It is safe to have glimpse handler
1399 * for non-DOM locks and costs nothing.*/
1400 if (minfo->mi_einfo.ei_cb_gl == NULL)
1401 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1403 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1404 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1406 obd_put_request_slot(&obddev->u.cli);
1407 ptlrpc_req_finished(req);
1411 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1412 ga = ptlrpc_req_async_args(req);
1414 ga->ga_minfo = minfo;
1416 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1417 ptlrpcd_add_req(req);