4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
223 struct lov_user_md *lmm;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
242 /* overwrite layout generation returned from the MDS */
243 lmm->lmm_stripe_offset =
244 (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252 struct md_op_data *op_data, __u32 acl_bufsize)
254 struct ptlrpc_request *req;
255 struct obd_device *obddev = class_exp2obd(exp);
256 struct ldlm_intent *lit;
257 const void *lmm = op_data->op_data;
258 __u32 lmmsize = op_data->op_data_size;
259 __u32 mdt_md_capsule_size;
260 struct list_head cancels = LIST_HEAD_INIT(cancels);
264 int repsize, repsize_estimate;
268 mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize;
270 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272 /* XXX: openlock is not cancelled for cross-refs. */
273 /* If inode is known, cancel conflicting OPEN locks. */
274 if (fid_is_sane(&op_data->op_fid2)) {
275 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276 if (it->it_flags & MDS_FMODE_WRITE)
281 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
284 else if (it->it_flags & FMODE_EXEC)
290 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295 /* If CREATE, cancel parent's UPDATE lock. */
296 if (it->it_op & IT_CREAT)
300 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302 MDS_INODELOCK_UPDATE);
304 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305 &RQF_LDLM_INTENT_OPEN);
307 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308 RETURN(ERR_PTR(-ENOMEM));
311 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312 op_data->op_namelen + 1);
313 if (cl_is_lov_delay_create(it->it_flags)) {
314 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
315 LASSERT(lmmsize == 0);
316 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
318 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
319 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
322 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
323 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
324 op_data->op_file_secctx_name_size : 0);
326 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
327 op_data->op_file_secctx_size);
329 /* get SELinux policy info if any */
330 rc = sptlrpc_get_sepol(req);
332 ptlrpc_request_free(req);
335 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336 strlen(req->rq_sepol) ?
337 strlen(req->rq_sepol) + 1 : 0);
339 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341 ptlrpc_request_free(req);
345 spin_lock(&req->rq_lock);
346 req->rq_replay = req->rq_import->imp_replayable;
347 spin_unlock(&req->rq_lock);
349 /* pack the intent */
350 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351 lit->opc = (__u64)it->it_op;
353 /* pack the intended request */
354 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
357 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358 mdt_md_capsule_size);
359 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364 op_data->op_file_secctx_name_size > 0 &&
365 op_data->op_file_secctx_name != NULL) {
368 secctx_name = req_capsule_client_get(&req->rq_pill,
369 &RMF_FILE_SECCTX_NAME);
370 memcpy(secctx_name, op_data->op_file_secctx_name,
371 op_data->op_file_secctx_name_size);
372 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374 obddev->u.cli.cl_max_mds_easize);
376 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377 op_data->op_file_secctx_name_size,
378 op_data->op_file_secctx_name);
381 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
386 * Inline buffer for possible data from Data-on-MDT files.
388 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389 sizeof(struct niobuf_remote));
390 ptlrpc_request_set_replen(req);
392 /* Get real repbuf allocated size as rounded up power of 2 */
393 repsize = size_roundup_power2(req->rq_replen +
394 lustre_msg_early_size());
395 /* Estimate free space for DoM files in repbuf */
396 repsize_estimate = repsize - (req->rq_replen -
397 mdt_md_capsule_size +
398 sizeof(struct lov_comp_md_v1) +
399 sizeof(struct lov_comp_md_entry_v1) +
400 lov_mds_md_size(0, LOV_MAGIC_V3));
402 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
403 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
404 repsize_estimate + sizeof(struct niobuf_remote);
405 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
407 sizeof(struct niobuf_remote) + repsize);
408 ptlrpc_request_set_replen(req);
409 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
410 repsize, req->rq_replen);
411 repsize = size_roundup_power2(req->rq_replen +
412 lustre_msg_early_size());
414 /* The only way to report real allocated repbuf size to the server
415 * is the lm_repsize but it must be set prior buffer allocation itself
416 * due to security reasons - it is part of buffer used in signature
417 * calculation (see LU-11414). Therefore the saved size is predicted
418 * value as rq_replen rounded to the next higher power of 2.
419 * Such estimation is safe. Though the final allocated buffer might
420 * be even larger, it is not possible to know that at this point.
422 req->rq_reqmsg->lm_repsize = repsize;
426 #define GA_DEFAULT_EA_NAME_LEN 20
427 #define GA_DEFAULT_EA_VAL_LEN 250
428 #define GA_DEFAULT_EA_NUM 10
430 static struct ptlrpc_request *
431 mdc_intent_getxattr_pack(struct obd_export *exp,
432 struct lookup_intent *it,
433 struct md_op_data *op_data)
435 struct ptlrpc_request *req;
436 struct ldlm_intent *lit;
438 struct list_head cancels = LIST_HEAD_INIT(cancels);
439 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
443 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
444 &RQF_LDLM_INTENT_GETXATTR);
446 RETURN(ERR_PTR(-ENOMEM));
448 /* get SELinux policy info if any */
449 rc = sptlrpc_get_sepol(req);
451 ptlrpc_request_free(req);
454 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
455 strlen(req->rq_sepol) ?
456 strlen(req->rq_sepol) + 1 : 0);
458 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
460 ptlrpc_request_free(req);
464 /* pack the intent */
465 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
466 lit->opc = IT_GETXATTR;
467 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
468 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
470 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
471 /* If the supplied buffer is too small then the server will
472 * return -ERANGE and llite will fallback to using non cached
473 * xattr operations. On servers before 2.10.1 a (non-cached)
474 * listxattr RPC for an orphan or dead file causes an oops. So
475 * let's try to avoid sending too small a buffer to too old a
476 * server. This is effectively undoing the memory conservation
477 * of LU-9417 when it would be *more* likely to crash the
478 * server. See LU-9856. */
479 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
480 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
481 exp->exp_connect_data.ocd_max_easize);
484 /* pack the intended request */
485 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
486 ea_vals_buf_size, -1, 0);
488 /* get SELinux policy info if any */
489 mdc_file_sepol_pack(req);
491 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
492 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
494 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
497 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
498 sizeof(u32) * GA_DEFAULT_EA_NUM);
500 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
502 ptlrpc_request_set_replen(req);
507 static struct ptlrpc_request *
508 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
509 struct md_op_data *op_data, __u32 acl_bufsize)
511 struct ptlrpc_request *req;
512 struct obd_device *obddev = class_exp2obd(exp);
513 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
514 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
516 struct ldlm_intent *lit;
518 bool have_secctx = false;
523 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
524 &RQF_LDLM_INTENT_GETATTR);
526 RETURN(ERR_PTR(-ENOMEM));
528 /* send name of security xattr to get upon intent */
529 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
530 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
532 op_data->op_file_secctx_name_size > 0 &&
533 op_data->op_file_secctx_name != NULL) {
535 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
537 op_data->op_file_secctx_name_size);
540 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
541 op_data->op_namelen + 1);
543 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
545 ptlrpc_request_free(req);
549 /* pack the intent */
550 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
551 lit->opc = (__u64)it->it_op;
553 easize = obddev->u.cli.cl_default_mds_easize;
555 /* pack the intended request */
556 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
558 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
559 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
560 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
561 sizeof(struct lmv_user_md));
566 secctx_name = req_capsule_client_get(&req->rq_pill,
567 &RMF_FILE_SECCTX_NAME);
568 memcpy(secctx_name, op_data->op_file_secctx_name,
569 op_data->op_file_secctx_name_size);
571 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
574 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
575 op_data->op_file_secctx_name_size,
576 op_data->op_file_secctx_name);
578 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
582 ptlrpc_request_set_replen(req);
586 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
587 struct lookup_intent *it,
588 struct md_op_data *op_data)
590 struct obd_device *obd = class_exp2obd(exp);
591 struct ptlrpc_request *req;
592 struct ldlm_intent *lit;
593 struct layout_intent *layout;
597 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
598 &RQF_LDLM_INTENT_LAYOUT);
600 RETURN(ERR_PTR(-ENOMEM));
602 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
603 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
605 ptlrpc_request_free(req);
609 /* pack the intent */
610 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
611 lit->opc = (__u64)it->it_op;
613 /* pack the layout intent request */
614 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
615 LASSERT(op_data->op_data != NULL);
616 LASSERT(op_data->op_data_size == sizeof(*layout));
617 memcpy(layout, op_data->op_data, sizeof(*layout));
619 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
620 obd->u.cli.cl_default_mds_easize);
621 ptlrpc_request_set_replen(req);
625 static struct ptlrpc_request *
626 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
628 struct ptlrpc_request *req;
632 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
634 RETURN(ERR_PTR(-ENOMEM));
636 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
638 ptlrpc_request_free(req);
642 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
643 ptlrpc_request_set_replen(req);
647 static int mdc_finish_enqueue(struct obd_export *exp,
648 struct ptlrpc_request *req,
649 struct ldlm_enqueue_info *einfo,
650 struct lookup_intent *it,
651 struct lustre_handle *lockh,
654 struct req_capsule *pill = &req->rq_pill;
655 struct ldlm_request *lockreq;
656 struct ldlm_reply *lockrep;
657 struct ldlm_lock *lock;
658 struct mdt_body *body = NULL;
659 void *lvb_data = NULL;
665 /* Similarly, if we're going to replay this request, we don't want to
666 * actually get a lock, just perform the intent. */
667 if (req->rq_transno || req->rq_replay) {
668 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
669 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
672 if (rc == ELDLM_LOCK_ABORTED) {
674 memset(lockh, 0, sizeof(*lockh));
676 } else { /* rc = 0 */
677 lock = ldlm_handle2lock(lockh);
678 LASSERT(lock != NULL);
680 /* If the server gave us back a different lock mode, we should
681 * fix up our variables. */
682 if (lock->l_req_mode != einfo->ei_mode) {
683 ldlm_lock_addref(lockh, lock->l_req_mode);
684 ldlm_lock_decref(lockh, einfo->ei_mode);
685 einfo->ei_mode = lock->l_req_mode;
690 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
691 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
693 it->it_disposition = (int)lockrep->lock_policy_res1;
694 it->it_status = (int)lockrep->lock_policy_res2;
695 it->it_lock_mode = einfo->ei_mode;
696 it->it_lock_handle = lockh->cookie;
697 it->it_request = req;
699 /* Technically speaking rq_transno must already be zero if
700 * it_status is in error, so the check is a bit redundant */
701 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
702 mdc_clear_replay_flag(req, it->it_status);
704 /* If we're doing an IT_OPEN which did not result in an actual
705 * successful open, then we need to remove the bit which saves
706 * this request for unconditional replay.
708 * It's important that we do this first! Otherwise we might exit the
709 * function without doing so, and try to replay a failed create
711 if (it->it_op & IT_OPEN && req->rq_replay &&
712 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
713 mdc_clear_replay_flag(req, it->it_status);
715 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
716 it->it_op, it->it_disposition, it->it_status);
718 /* We know what to expect, so we do any byte flipping required here */
719 if (it_has_reply_body(it)) {
720 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
722 CERROR ("Can't swab mdt_body\n");
726 if (it_disposition(it, DISP_OPEN_OPEN) &&
727 !it_open_error(DISP_OPEN_OPEN, it)) {
729 * If this is a successful OPEN request, we need to set
730 * replay handler and data early, so that if replay
731 * happens immediately after swabbing below, new reply
732 * is swabbed by that handler correctly.
734 mdc_set_open_replay_data(NULL, NULL, it);
737 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
740 mdc_update_max_ea_from_body(exp, body);
743 * The eadata is opaque; just check that it is there.
744 * Eventually, obd_unpackmd() will check the contents.
746 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
747 body->mbo_eadatasize);
751 /* save lvb data and length in case this is for layout
754 lvb_len = body->mbo_eadatasize;
757 * We save the reply LOV EA in case we have to replay a
758 * create for recovery. If we didn't allocate a large
759 * enough request buffer above we need to reallocate it
760 * here to hold the actual LOV EA.
762 * To not save LOV EA if request is not going to replay
763 * (for example error one).
765 if ((it->it_op & IT_OPEN) && req->rq_replay) {
766 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
767 body->mbo_eadatasize);
769 body->mbo_valid &= ~OBD_MD_FLEASIZE;
770 body->mbo_eadatasize = 0;
775 } else if (it->it_op & IT_LAYOUT) {
776 /* maybe the lock was granted right away and layout
777 * is packed into RMF_DLM_LVB of req */
778 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
779 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
780 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
782 lvb_data = req_capsule_server_sized_get(pill,
783 &RMF_DLM_LVB, lvb_len);
784 if (lvb_data == NULL)
788 * save replied layout data to the request buffer for
789 * recovery consideration (lest MDS reinitialize
790 * another set of OST objects).
793 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
798 /* fill in stripe data for layout lock.
799 * LU-6581: trust layout data only if layout lock is granted. The MDT
800 * has stopped sending layout unless the layout lock is granted. The
801 * client still does this checking in case it's talking with an old
802 * server. - Jinshan */
803 lock = ldlm_handle2lock(lockh);
807 if (ldlm_has_layout(lock) && lvb_data != NULL &&
808 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
811 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
812 ldlm_it2str(it->it_op), lvb_len);
814 OBD_ALLOC_LARGE(lmm, lvb_len);
816 GOTO(out_lock, rc = -ENOMEM);
818 memcpy(lmm, lvb_data, lvb_len);
820 /* install lvb_data */
821 lock_res_and_lock(lock);
822 if (lock->l_lvb_data == NULL) {
823 lock->l_lvb_type = LVB_T_LAYOUT;
824 lock->l_lvb_data = lmm;
825 lock->l_lvb_len = lvb_len;
828 unlock_res_and_lock(lock);
830 OBD_FREE_LARGE(lmm, lvb_len);
833 if (ldlm_has_dom(lock)) {
834 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
836 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
837 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
838 LDLM_ERROR(lock, "%s: DoM lock without size.",
839 exp->exp_obd->obd_name);
840 GOTO(out_lock, rc = -EPROTO);
843 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
844 ldlm_it2str(it->it_op), body->mbo_dom_size);
846 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
854 /* We always reserve enough space in the reply packet for a stripe MD, because
855 * we don't know in advance the file type. */
856 static int mdc_enqueue_base(struct obd_export *exp,
857 struct ldlm_enqueue_info *einfo,
858 const union ldlm_policy_data *policy,
859 struct lookup_intent *it,
860 struct md_op_data *op_data,
861 struct lustre_handle *lockh,
862 __u64 extra_lock_flags)
864 struct obd_device *obddev = class_exp2obd(exp);
865 struct ptlrpc_request *req = NULL;
866 __u64 flags, saved_flags = extra_lock_flags;
867 struct ldlm_res_id res_id;
868 static const union ldlm_policy_data lookup_policy = {
869 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
870 static const union ldlm_policy_data update_policy = {
871 .l_inodebits = { MDS_INODELOCK_UPDATE } };
872 static const union ldlm_policy_data layout_policy = {
873 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
874 static const union ldlm_policy_data getxattr_policy = {
875 .l_inodebits = { MDS_INODELOCK_XATTR } };
876 int generation, resends = 0;
877 struct ldlm_reply *lockrep;
878 struct obd_import *imp = class_exp2cliimp(exp);
880 enum lvb_type lvb_type = 0;
884 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
886 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
889 LASSERT(policy == NULL);
891 saved_flags |= LDLM_FL_HAS_INTENT;
892 if (it->it_op & (IT_GETATTR | IT_READDIR))
893 policy = &update_policy;
894 else if (it->it_op & IT_LAYOUT)
895 policy = &layout_policy;
896 else if (it->it_op & IT_GETXATTR)
897 policy = &getxattr_policy;
899 policy = &lookup_policy;
902 generation = obddev->u.cli.cl_import->imp_generation;
903 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
904 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
907 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
912 /* The only way right now is FLOCK. */
913 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
915 res_id.name[3] = LDLM_FLOCK;
916 } else if (it->it_op & IT_OPEN) {
917 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
918 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
919 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
920 } else if (it->it_op & IT_READDIR) {
921 req = mdc_enqueue_pack(exp, 0);
922 } else if (it->it_op & IT_LAYOUT) {
923 if (!imp_connect_lvb_type(imp))
925 req = mdc_intent_layout_pack(exp, it, op_data);
926 lvb_type = LVB_T_LAYOUT;
927 } else if (it->it_op & IT_GETXATTR) {
928 req = mdc_intent_getxattr_pack(exp, it, op_data);
935 RETURN(PTR_ERR(req));
938 req->rq_generation_set = 1;
939 req->rq_import_generation = generation;
940 req->rq_sent = ktime_get_real_seconds() + resends;
943 /* It is important to obtain modify RPC slot first (if applicable), so
944 * that threads that are waiting for a modify RPC slot are not polluting
945 * our rpcs in flight counter.
946 * We do not do flock request limiting, though */
948 mdc_get_mod_rpc_slot(req, it);
949 rc = obd_get_request_slot(&obddev->u.cli);
951 mdc_put_mod_rpc_slot(req, it);
952 mdc_clear_replay_flag(req, 0);
953 ptlrpc_req_finished(req);
958 /* With Data-on-MDT the glimpse callback is needed too.
959 * It is set here in advance but not in mdc_finish_enqueue()
960 * to avoid possible races. It is safe to have glimpse handler
961 * for non-DOM locks and costs nothing.*/
962 if (einfo->ei_cb_gl == NULL)
963 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
965 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
966 0, lvb_type, lockh, 0);
968 /* For flock requests we immediatelly return without further
969 delay and let caller deal with the rest, since rest of
970 this function metadata processing makes no sense for flock
971 requests anyway. But in case of problem during comms with
972 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
973 can not rely on caller and this mainly for F_UNLCKs
974 (explicits or automatically generated by Kernel to clean
975 current FLocks upon exit) that can't be trashed */
976 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
977 (einfo->ei_type == LDLM_FLOCK) &&
978 (einfo->ei_mode == LCK_NL))
983 obd_put_request_slot(&obddev->u.cli);
984 mdc_put_mod_rpc_slot(req, it);
988 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
989 obddev->obd_name, PFID(&op_data->op_fid1),
990 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
992 mdc_clear_replay_flag(req, rc);
993 ptlrpc_req_finished(req);
997 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
998 LASSERT(lockrep != NULL);
1000 lockrep->lock_policy_res2 =
1001 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1003 /* Retry infinitely when the server returns -EINPROGRESS for the
1004 * intent operation, when server returns -EINPROGRESS for acquiring
1005 * intent lock, we'll retry in after_reply(). */
1006 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1007 mdc_clear_replay_flag(req, rc);
1008 ptlrpc_req_finished(req);
1009 if (generation == obddev->u.cli.cl_import->imp_generation) {
1010 if (signal_pending(current))
1014 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1015 obddev->obd_name, resends, it->it_op,
1016 PFID(&op_data->op_fid1),
1017 PFID(&op_data->op_fid2));
1020 CDEBUG(D_HA, "resend cross eviction\n");
1025 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1026 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1027 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1028 mdc_clear_replay_flag(req, -ERANGE);
1029 ptlrpc_req_finished(req);
1030 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1035 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1037 if (lustre_handle_is_used(lockh)) {
1038 ldlm_lock_decref(lockh, einfo->ei_mode);
1039 memset(lockh, 0, sizeof(*lockh));
1041 ptlrpc_req_finished(req);
1043 it->it_lock_handle = 0;
1044 it->it_lock_mode = 0;
1045 it->it_request = NULL;
1051 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1052 const union ldlm_policy_data *policy,
1053 struct md_op_data *op_data,
1054 struct lustre_handle *lockh, __u64 extra_lock_flags)
1056 return mdc_enqueue_base(exp, einfo, policy, NULL,
1057 op_data, lockh, extra_lock_flags);
1060 static int mdc_finish_intent_lock(struct obd_export *exp,
1061 struct ptlrpc_request *request,
1062 struct md_op_data *op_data,
1063 struct lookup_intent *it,
1064 struct lustre_handle *lockh)
1066 struct lustre_handle old_lock;
1067 struct ldlm_lock *lock;
1071 LASSERT(request != NULL);
1072 LASSERT(request != LP_POISON);
1073 LASSERT(request->rq_repmsg != LP_POISON);
1075 if (it->it_op & IT_READDIR)
1078 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1079 if (it->it_status != 0)
1080 GOTO(out, rc = it->it_status);
1082 if (!it_disposition(it, DISP_IT_EXECD)) {
1083 /* The server failed before it even started executing
1084 * the intent, i.e. because it couldn't unpack the
1087 LASSERT(it->it_status != 0);
1088 GOTO(out, rc = it->it_status);
1090 rc = it_open_error(DISP_IT_EXECD, it);
1094 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1098 /* keep requests around for the multiple phases of the call
1099 * this shows the DISP_XX must guarantee we make it into the
1102 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1103 it_disposition(it, DISP_OPEN_CREATE) &&
1104 !it_open_error(DISP_OPEN_CREATE, it)) {
1105 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1106 /* balanced in ll_create_node */
1107 ptlrpc_request_addref(request);
1109 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1110 it_disposition(it, DISP_OPEN_OPEN) &&
1111 !it_open_error(DISP_OPEN_OPEN, it)) {
1112 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1113 /* balanced in ll_file_open */
1114 ptlrpc_request_addref(request);
1115 /* BUG 11546 - eviction in the middle of open rpc
1118 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1122 if (it->it_op & IT_CREAT) {
1123 /* XXX this belongs in ll_create_it */
1124 } else if (it->it_op == IT_OPEN) {
1125 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1127 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1131 /* If we already have a matching lock, then cancel the new
1132 * one. We have to set the data here instead of in
1133 * mdc_enqueue, because we need to use the child's inode as
1134 * the l_ast_data to match, and that's not available until
1135 * intent_finish has performed the iget().) */
1136 lock = ldlm_handle2lock(lockh);
1138 union ldlm_policy_data policy = lock->l_policy_data;
1139 LDLM_DEBUG(lock, "matching against this");
1141 if (it_has_reply_body(it)) {
1142 struct mdt_body *body;
1144 body = req_capsule_server_get(&request->rq_pill,
1146 /* mdc_enqueue checked */
1147 LASSERT(body != NULL);
1148 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1149 &lock->l_resource->lr_name),
1150 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1151 PLDLMRES(lock->l_resource),
1152 PFID(&body->mbo_fid1));
1154 LDLM_LOCK_PUT(lock);
1156 memcpy(&old_lock, lockh, sizeof(*lockh));
1157 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1158 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1159 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1160 memcpy(lockh, &old_lock, sizeof(old_lock));
1161 it->it_lock_handle = lockh->cookie;
1167 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1168 (int)op_data->op_namelen, op_data->op_name,
1169 ldlm_it2str(it->it_op), it->it_status,
1170 it->it_disposition, rc);
1174 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1175 struct lu_fid *fid, __u64 *bits)
1177 /* We could just return 1 immediately, but since we should only
1178 * be called in revalidate_it if we already have a lock, let's
1180 struct ldlm_res_id res_id;
1181 struct lustre_handle lockh;
1182 union ldlm_policy_data policy;
1183 enum ldlm_mode mode;
1186 if (it->it_lock_handle) {
1187 lockh.cookie = it->it_lock_handle;
1188 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1190 fid_build_reg_res_name(fid, &res_id);
1191 switch (it->it_op) {
1193 /* File attributes are held under multiple bits:
1194 * nlink is under lookup lock, size and times are
1195 * under UPDATE lock and recently we've also got
1196 * a separate permissions lock for owner/group/acl that
1197 * were protected by lookup lock before.
1198 * Getattr must provide all of that information,
1199 * so we need to ensure we have all of those locks.
1200 * Unfortunately, if the bits are split across multiple
1201 * locks, there's no easy way to match all of them here,
1202 * so an extra RPC would be performed to fetch all
1203 * of those bits at once for now. */
1204 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1205 * but for old MDTs (< 2.4), permission is covered
1206 * by LOOKUP lock, so it needs to match all bits here.*/
1207 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1208 MDS_INODELOCK_LOOKUP |
1212 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1215 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1218 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1222 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1223 LDLM_IBITS, &policy,
1224 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1229 it->it_lock_handle = lockh.cookie;
1230 it->it_lock_mode = mode;
1232 it->it_lock_handle = 0;
1233 it->it_lock_mode = 0;
1240 * This long block is all about fixing up the lock and request state
1241 * so that it is correct as of the moment _before_ the operation was
1242 * applied; that way, the VFS will think that everything is normal and
1243 * call Lustre's regular VFS methods.
1245 * If we're performing a creation, that means that unless the creation
1246 * failed with EEXIST, we should fake up a negative dentry.
1248 * For everything else, we want to lookup to succeed.
1250 * One additional note: if CREATE or OPEN succeeded, we add an extra
1251 * reference to the request because we need to keep it around until
1252 * ll_create/ll_open gets called.
1254 * The server will return to us, in it_disposition, an indication of
1255 * exactly what it_status refers to.
1257 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1258 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1259 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1260 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1263 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1266 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1267 struct lookup_intent *it, struct ptlrpc_request **reqp,
1268 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1270 struct ldlm_enqueue_info einfo = {
1271 .ei_type = LDLM_IBITS,
1272 .ei_mode = it_to_lock_mode(it),
1273 .ei_cb_bl = cb_blocking,
1274 .ei_cb_cp = ldlm_completion_ast,
1275 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1277 struct lustre_handle lockh;
1282 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1283 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1284 op_data->op_name, PFID(&op_data->op_fid2),
1285 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1289 if (fid_is_sane(&op_data->op_fid2) &&
1290 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1291 /* We could just return 1 immediately, but since we should only
1292 * be called in revalidate_it if we already have a lock, let's
1294 it->it_lock_handle = 0;
1295 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1296 /* Only return failure if it was not GETATTR by cfid
1297 (from inode_revalidate) */
1298 if (rc || op_data->op_namelen != 0)
1302 /* For case if upper layer did not alloc fid, do it now. */
1303 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1304 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1306 CERROR("Can't alloc new fid, rc %d\n", rc);
1311 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1316 *reqp = it->it_request;
1317 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1321 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1322 struct ptlrpc_request *req,
1325 struct mdc_getattr_args *ga = args;
1326 struct obd_export *exp = ga->ga_exp;
1327 struct md_enqueue_info *minfo = ga->ga_minfo;
1328 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1329 struct lookup_intent *it;
1330 struct lustre_handle *lockh;
1331 struct obd_device *obddev;
1332 struct ldlm_reply *lockrep;
1333 __u64 flags = LDLM_FL_HAS_INTENT;
1337 lockh = &minfo->mi_lockh;
1339 obddev = class_exp2obd(exp);
1341 obd_put_request_slot(&obddev->u.cli);
1342 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1345 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1346 &flags, NULL, 0, lockh, rc);
1348 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1349 mdc_clear_replay_flag(req, rc);
1353 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1354 LASSERT(lockrep != NULL);
1356 lockrep->lock_policy_res2 =
1357 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1359 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1363 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1367 minfo->mi_cb(req, minfo, rc);
1371 int mdc_intent_getattr_async(struct obd_export *exp,
1372 struct md_enqueue_info *minfo)
1374 struct md_op_data *op_data = &minfo->mi_data;
1375 struct lookup_intent *it = &minfo->mi_it;
1376 struct ptlrpc_request *req;
1377 struct mdc_getattr_args *ga;
1378 struct obd_device *obddev = class_exp2obd(exp);
1379 struct ldlm_res_id res_id;
1380 union ldlm_policy_data policy = {
1381 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1382 MDS_INODELOCK_UPDATE } };
1384 __u64 flags = LDLM_FL_HAS_INTENT;
1387 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1388 (int)op_data->op_namelen, op_data->op_name,
1389 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1391 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1392 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1393 * of the async getattr RPC will handle that by itself. */
1394 req = mdc_intent_getattr_pack(exp, it, op_data,
1395 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1397 RETURN(PTR_ERR(req));
1399 rc = obd_get_request_slot(&obddev->u.cli);
1401 ptlrpc_req_finished(req);
1405 /* With Data-on-MDT the glimpse callback is needed too.
1406 * It is set here in advance but not in mdc_finish_enqueue()
1407 * to avoid possible races. It is safe to have glimpse handler
1408 * for non-DOM locks and costs nothing.*/
1409 if (minfo->mi_einfo.ei_cb_gl == NULL)
1410 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1412 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1413 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1415 obd_put_request_slot(&obddev->u.cli);
1416 ptlrpc_req_finished(req);
1420 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1421 ga = ptlrpc_req_async_args(req);
1423 ga->ga_minfo = minfo;
1425 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1426 ptlrpcd_add_req(req);