4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
223 struct lov_user_md *lmm;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
242 /* overwrite layout generation returned from the MDS */
243 lmm->lmm_stripe_offset =
244 (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252 struct md_op_data *op_data, __u32 acl_bufsize)
254 struct ptlrpc_request *req;
255 struct obd_device *obddev = class_exp2obd(exp);
256 struct ldlm_intent *lit;
257 const void *lmm = op_data->op_data;
258 __u32 lmmsize = op_data->op_data_size;
259 struct list_head cancels = LIST_HEAD_INIT(cancels);
263 int repsize, repsize_estimate;
267 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
269 /* XXX: openlock is not cancelled for cross-refs. */
270 /* If inode is known, cancel conflicting OPEN locks. */
271 if (fid_is_sane(&op_data->op_fid2)) {
272 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
273 if (it->it_flags & MDS_FMODE_WRITE)
278 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
281 else if (it->it_flags & FMODE_EXEC)
287 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
292 /* If CREATE, cancel parent's UPDATE lock. */
293 if (it->it_op & IT_CREAT)
297 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
299 MDS_INODELOCK_UPDATE);
301 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
302 &RQF_LDLM_INTENT_OPEN);
304 ldlm_lock_list_put(&cancels, l_bl_ast, count);
305 RETURN(ERR_PTR(-ENOMEM));
308 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
309 op_data->op_namelen + 1);
310 if (cl_is_lov_delay_create(it->it_flags)) {
311 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
312 LASSERT(lmmsize == 0);
313 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
315 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
316 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
320 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
321 op_data->op_file_secctx_name_size : 0);
323 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
324 op_data->op_file_secctx_size);
326 /* get SELinux policy info if any */
327 rc = sptlrpc_get_sepol(req);
329 ptlrpc_request_free(req);
332 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
333 strlen(req->rq_sepol) ?
334 strlen(req->rq_sepol) + 1 : 0);
336 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
338 ptlrpc_request_free(req);
342 spin_lock(&req->rq_lock);
343 req->rq_replay = req->rq_import->imp_replayable;
344 spin_unlock(&req->rq_lock);
346 /* pack the intent */
347 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348 lit->opc = (__u64)it->it_op;
350 /* pack the intended request */
351 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
354 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
355 obddev->u.cli.cl_max_mds_easize);
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
358 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
359 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
361 op_data->op_file_secctx_name_size > 0 &&
362 op_data->op_file_secctx_name != NULL) {
365 secctx_name = req_capsule_client_get(&req->rq_pill,
366 &RMF_FILE_SECCTX_NAME);
367 memcpy(secctx_name, op_data->op_file_secctx_name,
368 op_data->op_file_secctx_name_size);
369 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
371 obddev->u.cli.cl_max_mds_easize);
373 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
374 op_data->op_file_secctx_name_size,
375 op_data->op_file_secctx_name);
378 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
383 * Inline buffer for possible data from Data-on-MDT files.
385 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
386 sizeof(struct niobuf_remote));
387 ptlrpc_request_set_replen(req);
389 /* Get real repbuf allocated size as rounded up power of 2 */
390 repsize = size_roundup_power2(req->rq_replen +
391 lustre_msg_early_size());
392 /* Estimate free space for DoM files in repbuf */
393 repsize_estimate = repsize - (req->rq_replen -
394 obddev->u.cli.cl_max_mds_easize +
395 sizeof(struct lov_comp_md_v1) +
396 sizeof(struct lov_comp_md_entry_v1) +
397 lov_mds_md_size(0, LOV_MAGIC_V3));
399 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
400 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
401 repsize_estimate + sizeof(struct niobuf_remote);
402 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
404 sizeof(struct niobuf_remote) + repsize);
405 ptlrpc_request_set_replen(req);
406 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
407 repsize, req->rq_replen);
408 repsize = size_roundup_power2(req->rq_replen +
409 lustre_msg_early_size());
411 /* The only way to report real allocated repbuf size to the server
412 * is the lm_repsize but it must be set prior buffer allocation itself
413 * due to security reasons - it is part of buffer used in signature
414 * calculation (see LU-11414). Therefore the saved size is predicted
415 * value as rq_replen rounded to the next higher power of 2.
416 * Such estimation is safe. Though the final allocated buffer might
417 * be even larger, it is not possible to know that at this point.
419 req->rq_reqmsg->lm_repsize = repsize;
423 #define GA_DEFAULT_EA_NAME_LEN 20
424 #define GA_DEFAULT_EA_VAL_LEN 250
425 #define GA_DEFAULT_EA_NUM 10
427 static struct ptlrpc_request *
428 mdc_intent_getxattr_pack(struct obd_export *exp,
429 struct lookup_intent *it,
430 struct md_op_data *op_data)
432 struct ptlrpc_request *req;
433 struct ldlm_intent *lit;
435 struct list_head cancels = LIST_HEAD_INIT(cancels);
436 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
440 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
441 &RQF_LDLM_INTENT_GETXATTR);
443 RETURN(ERR_PTR(-ENOMEM));
445 /* get SELinux policy info if any */
446 rc = sptlrpc_get_sepol(req);
448 ptlrpc_request_free(req);
451 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
452 strlen(req->rq_sepol) ?
453 strlen(req->rq_sepol) + 1 : 0);
455 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
457 ptlrpc_request_free(req);
461 /* pack the intent */
462 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463 lit->opc = IT_GETXATTR;
464 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
465 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
467 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
468 /* If the supplied buffer is too small then the server will
469 * return -ERANGE and llite will fallback to using non cached
470 * xattr operations. On servers before 2.10.1 a (non-cached)
471 * listxattr RPC for an orphan or dead file causes an oops. So
472 * let's try to avoid sending too small a buffer to too old a
473 * server. This is effectively undoing the memory conservation
474 * of LU-9417 when it would be *more* likely to crash the
475 * server. See LU-9856. */
476 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
477 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
478 exp->exp_connect_data.ocd_max_easize);
481 /* pack the intended request */
482 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
483 ea_vals_buf_size, -1, 0);
485 /* get SELinux policy info if any */
486 mdc_file_sepol_pack(req);
488 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
489 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
491 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
494 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
495 sizeof(u32) * GA_DEFAULT_EA_NUM);
497 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
499 ptlrpc_request_set_replen(req);
504 static struct ptlrpc_request *
505 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
506 struct md_op_data *op_data, __u32 acl_bufsize)
508 struct ptlrpc_request *req;
509 struct obd_device *obddev = class_exp2obd(exp);
510 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
511 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
512 OBD_MD_MEA | OBD_MD_FLACL;
513 struct ldlm_intent *lit;
516 bool have_secctx = false;
519 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
520 &RQF_LDLM_INTENT_GETATTR);
522 RETURN(ERR_PTR(-ENOMEM));
524 /* send name of security xattr to get upon intent */
525 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
526 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
528 op_data->op_file_secctx_name_size > 0 &&
529 op_data->op_file_secctx_name != NULL) {
531 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
533 op_data->op_file_secctx_name_size);
536 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
537 op_data->op_namelen + 1);
539 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
541 ptlrpc_request_free(req);
545 /* pack the intent */
546 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
547 lit->opc = (__u64)it->it_op;
549 if (obddev->u.cli.cl_default_mds_easize > 0)
550 easize = obddev->u.cli.cl_default_mds_easize;
552 easize = obddev->u.cli.cl_max_mds_easize;
554 /* pack the intended request */
555 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
557 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
558 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
563 secctx_name = req_capsule_client_get(&req->rq_pill,
564 &RMF_FILE_SECCTX_NAME);
565 memcpy(secctx_name, op_data->op_file_secctx_name,
566 op_data->op_file_secctx_name_size);
568 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
571 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
572 op_data->op_file_secctx_name_size,
573 op_data->op_file_secctx_name);
575 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
579 ptlrpc_request_set_replen(req);
583 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
584 struct lookup_intent *it,
585 struct md_op_data *op_data)
587 struct obd_device *obd = class_exp2obd(exp);
588 struct ptlrpc_request *req;
589 struct ldlm_intent *lit;
590 struct layout_intent *layout;
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
595 &RQF_LDLM_INTENT_LAYOUT);
597 RETURN(ERR_PTR(-ENOMEM));
599 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
600 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
602 ptlrpc_request_free(req);
606 /* pack the intent */
607 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
608 lit->opc = (__u64)it->it_op;
610 /* pack the layout intent request */
611 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
612 LASSERT(op_data->op_data != NULL);
613 LASSERT(op_data->op_data_size == sizeof(*layout));
614 memcpy(layout, op_data->op_data, sizeof(*layout));
616 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
617 obd->u.cli.cl_default_mds_easize);
618 ptlrpc_request_set_replen(req);
622 static struct ptlrpc_request *
623 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
625 struct ptlrpc_request *req;
629 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
631 RETURN(ERR_PTR(-ENOMEM));
633 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
635 ptlrpc_request_free(req);
639 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
640 ptlrpc_request_set_replen(req);
644 static int mdc_finish_enqueue(struct obd_export *exp,
645 struct ptlrpc_request *req,
646 struct ldlm_enqueue_info *einfo,
647 struct lookup_intent *it,
648 struct lustre_handle *lockh,
651 struct req_capsule *pill = &req->rq_pill;
652 struct ldlm_request *lockreq;
653 struct ldlm_reply *lockrep;
654 struct ldlm_lock *lock;
655 struct mdt_body *body = NULL;
656 void *lvb_data = NULL;
662 /* Similarly, if we're going to replay this request, we don't want to
663 * actually get a lock, just perform the intent. */
664 if (req->rq_transno || req->rq_replay) {
665 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
666 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
669 if (rc == ELDLM_LOCK_ABORTED) {
671 memset(lockh, 0, sizeof(*lockh));
673 } else { /* rc = 0 */
674 lock = ldlm_handle2lock(lockh);
675 LASSERT(lock != NULL);
677 /* If the server gave us back a different lock mode, we should
678 * fix up our variables. */
679 if (lock->l_req_mode != einfo->ei_mode) {
680 ldlm_lock_addref(lockh, lock->l_req_mode);
681 ldlm_lock_decref(lockh, einfo->ei_mode);
682 einfo->ei_mode = lock->l_req_mode;
687 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
688 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
690 it->it_disposition = (int)lockrep->lock_policy_res1;
691 it->it_status = (int)lockrep->lock_policy_res2;
692 it->it_lock_mode = einfo->ei_mode;
693 it->it_lock_handle = lockh->cookie;
694 it->it_request = req;
696 /* Technically speaking rq_transno must already be zero if
697 * it_status is in error, so the check is a bit redundant */
698 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
699 mdc_clear_replay_flag(req, it->it_status);
701 /* If we're doing an IT_OPEN which did not result in an actual
702 * successful open, then we need to remove the bit which saves
703 * this request for unconditional replay.
705 * It's important that we do this first! Otherwise we might exit the
706 * function without doing so, and try to replay a failed create
708 if (it->it_op & IT_OPEN && req->rq_replay &&
709 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
710 mdc_clear_replay_flag(req, it->it_status);
712 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
713 it->it_op, it->it_disposition, it->it_status);
715 /* We know what to expect, so we do any byte flipping required here */
716 if (it_has_reply_body(it)) {
717 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
719 CERROR ("Can't swab mdt_body\n");
723 if (it_disposition(it, DISP_OPEN_OPEN) &&
724 !it_open_error(DISP_OPEN_OPEN, it)) {
726 * If this is a successful OPEN request, we need to set
727 * replay handler and data early, so that if replay
728 * happens immediately after swabbing below, new reply
729 * is swabbed by that handler correctly.
731 mdc_set_open_replay_data(NULL, NULL, it);
734 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
737 mdc_update_max_ea_from_body(exp, body);
740 * The eadata is opaque; just check that it is there.
741 * Eventually, obd_unpackmd() will check the contents.
743 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
744 body->mbo_eadatasize);
748 /* save lvb data and length in case this is for layout
751 lvb_len = body->mbo_eadatasize;
754 * We save the reply LOV EA in case we have to replay a
755 * create for recovery. If we didn't allocate a large
756 * enough request buffer above we need to reallocate it
757 * here to hold the actual LOV EA.
759 * To not save LOV EA if request is not going to replay
760 * (for example error one).
762 if ((it->it_op & IT_OPEN) && req->rq_replay) {
763 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
764 body->mbo_eadatasize);
766 body->mbo_valid &= ~OBD_MD_FLEASIZE;
767 body->mbo_eadatasize = 0;
772 } else if (it->it_op & IT_LAYOUT) {
773 /* maybe the lock was granted right away and layout
774 * is packed into RMF_DLM_LVB of req */
775 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
776 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
777 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
779 lvb_data = req_capsule_server_sized_get(pill,
780 &RMF_DLM_LVB, lvb_len);
781 if (lvb_data == NULL)
785 * save replied layout data to the request buffer for
786 * recovery consideration (lest MDS reinitialize
787 * another set of OST objects).
790 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
795 /* fill in stripe data for layout lock.
796 * LU-6581: trust layout data only if layout lock is granted. The MDT
797 * has stopped sending layout unless the layout lock is granted. The
798 * client still does this checking in case it's talking with an old
799 * server. - Jinshan */
800 lock = ldlm_handle2lock(lockh);
804 if (ldlm_has_layout(lock) && lvb_data != NULL &&
805 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
808 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
809 ldlm_it2str(it->it_op), lvb_len);
811 OBD_ALLOC_LARGE(lmm, lvb_len);
813 GOTO(out_lock, rc = -ENOMEM);
815 memcpy(lmm, lvb_data, lvb_len);
817 /* install lvb_data */
818 lock_res_and_lock(lock);
819 if (lock->l_lvb_data == NULL) {
820 lock->l_lvb_type = LVB_T_LAYOUT;
821 lock->l_lvb_data = lmm;
822 lock->l_lvb_len = lvb_len;
825 unlock_res_and_lock(lock);
827 OBD_FREE_LARGE(lmm, lvb_len);
830 if (ldlm_has_dom(lock)) {
831 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
833 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
834 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
835 LDLM_ERROR(lock, "%s: DoM lock without size.",
836 exp->exp_obd->obd_name);
837 GOTO(out_lock, rc = -EPROTO);
840 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
841 ldlm_it2str(it->it_op), body->mbo_dom_size);
843 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
851 /* We always reserve enough space in the reply packet for a stripe MD, because
852 * we don't know in advance the file type. */
853 static int mdc_enqueue_base(struct obd_export *exp,
854 struct ldlm_enqueue_info *einfo,
855 const union ldlm_policy_data *policy,
856 struct lookup_intent *it,
857 struct md_op_data *op_data,
858 struct lustre_handle *lockh,
859 __u64 extra_lock_flags)
861 struct obd_device *obddev = class_exp2obd(exp);
862 struct ptlrpc_request *req = NULL;
863 __u64 flags, saved_flags = extra_lock_flags;
864 struct ldlm_res_id res_id;
865 static const union ldlm_policy_data lookup_policy = {
866 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
867 static const union ldlm_policy_data update_policy = {
868 .l_inodebits = { MDS_INODELOCK_UPDATE } };
869 static const union ldlm_policy_data layout_policy = {
870 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
871 static const union ldlm_policy_data getxattr_policy = {
872 .l_inodebits = { MDS_INODELOCK_XATTR } };
873 int generation, resends = 0;
874 struct ldlm_reply *lockrep;
875 struct obd_import *imp = class_exp2cliimp(exp);
877 enum lvb_type lvb_type = 0;
881 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
883 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
886 LASSERT(policy == NULL);
888 saved_flags |= LDLM_FL_HAS_INTENT;
889 if (it->it_op & (IT_GETATTR | IT_READDIR))
890 policy = &update_policy;
891 else if (it->it_op & IT_LAYOUT)
892 policy = &layout_policy;
893 else if (it->it_op & IT_GETXATTR)
894 policy = &getxattr_policy;
896 policy = &lookup_policy;
899 generation = obddev->u.cli.cl_import->imp_generation;
900 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
901 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
904 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
909 /* The only way right now is FLOCK. */
910 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
912 res_id.name[3] = LDLM_FLOCK;
913 } else if (it->it_op & IT_OPEN) {
914 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
915 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
916 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
917 } else if (it->it_op & IT_READDIR) {
918 req = mdc_enqueue_pack(exp, 0);
919 } else if (it->it_op & IT_LAYOUT) {
920 if (!imp_connect_lvb_type(imp))
922 req = mdc_intent_layout_pack(exp, it, op_data);
923 lvb_type = LVB_T_LAYOUT;
924 } else if (it->it_op & IT_GETXATTR) {
925 req = mdc_intent_getxattr_pack(exp, it, op_data);
932 RETURN(PTR_ERR(req));
935 req->rq_generation_set = 1;
936 req->rq_import_generation = generation;
937 req->rq_sent = ktime_get_real_seconds() + resends;
940 /* It is important to obtain modify RPC slot first (if applicable), so
941 * that threads that are waiting for a modify RPC slot are not polluting
942 * our rpcs in flight counter.
943 * We do not do flock request limiting, though */
945 mdc_get_mod_rpc_slot(req, it);
946 rc = obd_get_request_slot(&obddev->u.cli);
948 mdc_put_mod_rpc_slot(req, it);
949 mdc_clear_replay_flag(req, 0);
950 ptlrpc_req_finished(req);
955 /* With Data-on-MDT the glimpse callback is needed too.
956 * It is set here in advance but not in mdc_finish_enqueue()
957 * to avoid possible races. It is safe to have glimpse handler
958 * for non-DOM locks and costs nothing.*/
959 if (einfo->ei_cb_gl == NULL)
960 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
962 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
963 0, lvb_type, lockh, 0);
965 /* For flock requests we immediatelly return without further
966 delay and let caller deal with the rest, since rest of
967 this function metadata processing makes no sense for flock
968 requests anyway. But in case of problem during comms with
969 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
970 can not rely on caller and this mainly for F_UNLCKs
971 (explicits or automatically generated by Kernel to clean
972 current FLocks upon exit) that can't be trashed */
973 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
974 (einfo->ei_type == LDLM_FLOCK) &&
975 (einfo->ei_mode == LCK_NL))
980 obd_put_request_slot(&obddev->u.cli);
981 mdc_put_mod_rpc_slot(req, it);
985 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
986 obddev->obd_name, PFID(&op_data->op_fid1),
987 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
989 mdc_clear_replay_flag(req, rc);
990 ptlrpc_req_finished(req);
994 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
995 LASSERT(lockrep != NULL);
997 lockrep->lock_policy_res2 =
998 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1000 /* Retry infinitely when the server returns -EINPROGRESS for the
1001 * intent operation, when server returns -EINPROGRESS for acquiring
1002 * intent lock, we'll retry in after_reply(). */
1003 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1004 mdc_clear_replay_flag(req, rc);
1005 ptlrpc_req_finished(req);
1006 if (generation == obddev->u.cli.cl_import->imp_generation) {
1007 if (signal_pending(current))
1011 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1012 obddev->obd_name, resends, it->it_op,
1013 PFID(&op_data->op_fid1),
1014 PFID(&op_data->op_fid2));
1017 CDEBUG(D_HA, "resend cross eviction\n");
1022 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1023 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1024 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1025 mdc_clear_replay_flag(req, -ERANGE);
1026 ptlrpc_req_finished(req);
1027 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1032 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1034 if (lustre_handle_is_used(lockh)) {
1035 ldlm_lock_decref(lockh, einfo->ei_mode);
1036 memset(lockh, 0, sizeof(*lockh));
1038 ptlrpc_req_finished(req);
1040 it->it_lock_handle = 0;
1041 it->it_lock_mode = 0;
1042 it->it_request = NULL;
1048 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1049 const union ldlm_policy_data *policy,
1050 struct md_op_data *op_data,
1051 struct lustre_handle *lockh, __u64 extra_lock_flags)
1053 return mdc_enqueue_base(exp, einfo, policy, NULL,
1054 op_data, lockh, extra_lock_flags);
1057 static int mdc_finish_intent_lock(struct obd_export *exp,
1058 struct ptlrpc_request *request,
1059 struct md_op_data *op_data,
1060 struct lookup_intent *it,
1061 struct lustre_handle *lockh)
1063 struct lustre_handle old_lock;
1064 struct ldlm_lock *lock;
1068 LASSERT(request != NULL);
1069 LASSERT(request != LP_POISON);
1070 LASSERT(request->rq_repmsg != LP_POISON);
1072 if (it->it_op & IT_READDIR)
1075 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1076 if (it->it_status != 0)
1077 GOTO(out, rc = it->it_status);
1079 if (!it_disposition(it, DISP_IT_EXECD)) {
1080 /* The server failed before it even started executing
1081 * the intent, i.e. because it couldn't unpack the
1084 LASSERT(it->it_status != 0);
1085 GOTO(out, rc = it->it_status);
1087 rc = it_open_error(DISP_IT_EXECD, it);
1091 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1095 /* keep requests around for the multiple phases of the call
1096 * this shows the DISP_XX must guarantee we make it into the
1099 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1100 it_disposition(it, DISP_OPEN_CREATE) &&
1101 !it_open_error(DISP_OPEN_CREATE, it)) {
1102 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1103 /* balanced in ll_create_node */
1104 ptlrpc_request_addref(request);
1106 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1107 it_disposition(it, DISP_OPEN_OPEN) &&
1108 !it_open_error(DISP_OPEN_OPEN, it)) {
1109 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1110 /* balanced in ll_file_open */
1111 ptlrpc_request_addref(request);
1112 /* BUG 11546 - eviction in the middle of open rpc
1115 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1119 if (it->it_op & IT_CREAT) {
1120 /* XXX this belongs in ll_create_it */
1121 } else if (it->it_op == IT_OPEN) {
1122 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1124 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1128 /* If we already have a matching lock, then cancel the new
1129 * one. We have to set the data here instead of in
1130 * mdc_enqueue, because we need to use the child's inode as
1131 * the l_ast_data to match, and that's not available until
1132 * intent_finish has performed the iget().) */
1133 lock = ldlm_handle2lock(lockh);
1135 union ldlm_policy_data policy = lock->l_policy_data;
1136 LDLM_DEBUG(lock, "matching against this");
1138 if (it_has_reply_body(it)) {
1139 struct mdt_body *body;
1141 body = req_capsule_server_get(&request->rq_pill,
1143 /* mdc_enqueue checked */
1144 LASSERT(body != NULL);
1145 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1146 &lock->l_resource->lr_name),
1147 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1148 PLDLMRES(lock->l_resource),
1149 PFID(&body->mbo_fid1));
1151 LDLM_LOCK_PUT(lock);
1153 memcpy(&old_lock, lockh, sizeof(*lockh));
1154 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1155 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1156 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1157 memcpy(lockh, &old_lock, sizeof(old_lock));
1158 it->it_lock_handle = lockh->cookie;
1164 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1165 (int)op_data->op_namelen, op_data->op_name,
1166 ldlm_it2str(it->it_op), it->it_status,
1167 it->it_disposition, rc);
1171 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1172 struct lu_fid *fid, __u64 *bits)
1174 /* We could just return 1 immediately, but since we should only
1175 * be called in revalidate_it if we already have a lock, let's
1177 struct ldlm_res_id res_id;
1178 struct lustre_handle lockh;
1179 union ldlm_policy_data policy;
1180 enum ldlm_mode mode;
1183 if (it->it_lock_handle) {
1184 lockh.cookie = it->it_lock_handle;
1185 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1187 fid_build_reg_res_name(fid, &res_id);
1188 switch (it->it_op) {
1190 /* File attributes are held under multiple bits:
1191 * nlink is under lookup lock, size and times are
1192 * under UPDATE lock and recently we've also got
1193 * a separate permissions lock for owner/group/acl that
1194 * were protected by lookup lock before.
1195 * Getattr must provide all of that information,
1196 * so we need to ensure we have all of those locks.
1197 * Unfortunately, if the bits are split across multiple
1198 * locks, there's no easy way to match all of them here,
1199 * so an extra RPC would be performed to fetch all
1200 * of those bits at once for now. */
1201 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1202 * but for old MDTs (< 2.4), permission is covered
1203 * by LOOKUP lock, so it needs to match all bits here.*/
1204 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1205 MDS_INODELOCK_LOOKUP |
1209 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1212 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1215 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1219 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1220 LDLM_IBITS, &policy,
1221 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1226 it->it_lock_handle = lockh.cookie;
1227 it->it_lock_mode = mode;
1229 it->it_lock_handle = 0;
1230 it->it_lock_mode = 0;
1237 * This long block is all about fixing up the lock and request state
1238 * so that it is correct as of the moment _before_ the operation was
1239 * applied; that way, the VFS will think that everything is normal and
1240 * call Lustre's regular VFS methods.
1242 * If we're performing a creation, that means that unless the creation
1243 * failed with EEXIST, we should fake up a negative dentry.
1245 * For everything else, we want to lookup to succeed.
1247 * One additional note: if CREATE or OPEN succeeded, we add an extra
1248 * reference to the request because we need to keep it around until
1249 * ll_create/ll_open gets called.
1251 * The server will return to us, in it_disposition, an indication of
1252 * exactly what it_status refers to.
1254 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1255 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1256 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1257 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1260 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1263 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1264 struct lookup_intent *it, struct ptlrpc_request **reqp,
1265 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1267 struct ldlm_enqueue_info einfo = {
1268 .ei_type = LDLM_IBITS,
1269 .ei_mode = it_to_lock_mode(it),
1270 .ei_cb_bl = cb_blocking,
1271 .ei_cb_cp = ldlm_completion_ast,
1272 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1274 struct lustre_handle lockh;
1279 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1280 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1281 op_data->op_name, PFID(&op_data->op_fid2),
1282 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1286 if (fid_is_sane(&op_data->op_fid2) &&
1287 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1288 /* We could just return 1 immediately, but since we should only
1289 * be called in revalidate_it if we already have a lock, let's
1291 it->it_lock_handle = 0;
1292 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1293 /* Only return failure if it was not GETATTR by cfid
1294 (from inode_revalidate) */
1295 if (rc || op_data->op_namelen != 0)
1299 /* For case if upper layer did not alloc fid, do it now. */
1300 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1301 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1303 CERROR("Can't alloc new fid, rc %d\n", rc);
1308 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1313 *reqp = it->it_request;
1314 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1318 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1319 struct ptlrpc_request *req,
1322 struct mdc_getattr_args *ga = args;
1323 struct obd_export *exp = ga->ga_exp;
1324 struct md_enqueue_info *minfo = ga->ga_minfo;
1325 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1326 struct lookup_intent *it;
1327 struct lustre_handle *lockh;
1328 struct obd_device *obddev;
1329 struct ldlm_reply *lockrep;
1330 __u64 flags = LDLM_FL_HAS_INTENT;
1334 lockh = &minfo->mi_lockh;
1336 obddev = class_exp2obd(exp);
1338 obd_put_request_slot(&obddev->u.cli);
1339 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1342 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1343 &flags, NULL, 0, lockh, rc);
1345 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1346 mdc_clear_replay_flag(req, rc);
1350 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1351 LASSERT(lockrep != NULL);
1353 lockrep->lock_policy_res2 =
1354 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1356 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1360 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1364 minfo->mi_cb(req, minfo, rc);
1368 int mdc_intent_getattr_async(struct obd_export *exp,
1369 struct md_enqueue_info *minfo)
1371 struct md_op_data *op_data = &minfo->mi_data;
1372 struct lookup_intent *it = &minfo->mi_it;
1373 struct ptlrpc_request *req;
1374 struct mdc_getattr_args *ga;
1375 struct obd_device *obddev = class_exp2obd(exp);
1376 struct ldlm_res_id res_id;
1377 union ldlm_policy_data policy = {
1378 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1379 MDS_INODELOCK_UPDATE } };
1381 __u64 flags = LDLM_FL_HAS_INTENT;
1384 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1385 (int)op_data->op_namelen, op_data->op_name,
1386 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1388 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1389 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1390 * of the async getattr RPC will handle that by itself. */
1391 req = mdc_intent_getattr_pack(exp, it, op_data,
1392 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1394 RETURN(PTR_ERR(req));
1396 rc = obd_get_request_slot(&obddev->u.cli);
1398 ptlrpc_req_finished(req);
1402 /* With Data-on-MDT the glimpse callback is needed too.
1403 * It is set here in advance but not in mdc_finish_enqueue()
1404 * to avoid possible races. It is safe to have glimpse handler
1405 * for non-DOM locks and costs nothing.*/
1406 if (minfo->mi_einfo.ei_cb_gl == NULL)
1407 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1409 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1410 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1412 obd_put_request_slot(&obddev->u.cli);
1413 ptlrpc_req_finished(req);
1417 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1418 ga = ptlrpc_req_async_args(req);
1420 ga->ga_minfo = minfo;
1422 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1423 ptlrpcd_add_req(req);