4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 #define DEBUG_SUBSYSTEM S_MDC
34 #include <linux/module.h>
37 #include <obd_class.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_intent.h>
41 #include <lustre_mdc.h>
42 #include <lustre_net.h>
43 #include <lustre_req_layout.h>
44 #include <lustre_swab.h>
45 #include <lustre_acl.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_op_item *ga_item;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %px/%lu/%u state %lu in lock: setting data to %px/%lu/%u\n",
121 old_inode, old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
161 fid_build_reg_res_name(fid, &res_id);
162 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163 policy, mode, flags, opaque);
167 int mdc_null_inode(struct obd_export *exp,
168 const struct lu_fid *fid)
170 struct ldlm_res_id res_id;
171 struct ldlm_resource *res;
172 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175 LASSERTF(ns != NULL, "no namespace passed\n");
177 fid_build_reg_res_name(fid, &res_id);
179 res = ldlm_resource_get(ns, &res_id, 0, 0);
184 res->lr_lvb_inode = NULL;
187 ldlm_resource_putref(res);
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 /* Don't hold error requests for replay. */
194 if (req->rq_replay) {
195 spin_lock(&req->rq_lock);
197 spin_unlock(&req->rq_lock);
199 if (rc && req->rq_transno != 0) {
200 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
207 * Save a large LOV/LMV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (b=5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways...
219 int mdc_save_lmm(struct ptlrpc_request *req, void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
237 req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, &RMF_EADATA);
240 memcpy(lmm, data, size);
241 lov_fix_ea_for_replay(lmm);
247 static struct ptlrpc_request *
248 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
249 struct md_op_data *op_data, __u32 acl_bufsize)
251 struct ptlrpc_request *req;
252 struct obd_device *obd = class_exp2obd(exp);
253 struct ldlm_intent *lit;
254 const void *lmm = op_data->op_data;
255 __u32 lmmsize = op_data->op_data_size;
256 __u32 mdt_md_capsule_size;
260 int repsize, repsize_estimate;
261 struct sptlrpc_sepol *sepol;
266 mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
268 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
270 /* XXX: openlock is not cancelled for cross-refs. */
271 /* If inode is known, cancel conflicting OPEN locks. */
272 if (fid_is_sane(&op_data->op_fid2)) {
273 if (it->it_open_flags & MDS_OPEN_LEASE) { /* try to get lease */
274 if (it->it_open_flags & MDS_FMODE_WRITE)
279 if (it->it_open_flags & (MDS_FMODE_WRITE |
283 else if (it->it_open_flags & FMODE_EXEC)
289 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
294 /* If CREATE, cancel parent's UPDATE lock. */
295 if (it->it_op & IT_CREAT)
299 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
301 MDS_INODELOCK_UPDATE);
303 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
304 &RQF_LDLM_INTENT_OPEN);
306 ldlm_lock_list_put(&cancels, l_bl_ast, count);
307 RETURN(ERR_PTR(-ENOMEM));
310 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
311 op_data->op_namelen + 1);
312 if (cl_is_lov_delay_create(it->it_open_flags)) {
313 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
314 LASSERT(lmmsize == 0);
315 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
317 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
318 max(lmmsize, obd->u.cli.cl_default_mds_easize));
321 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
322 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
323 op_data->op_file_secctx_name_size : 0);
325 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
326 op_data->op_file_secctx_size);
328 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
329 op_data->op_file_encctx_size);
331 /* get SELinux policy info if any */
332 sepol = sptlrpc_sepol_get(req);
334 GOTO(err_free_rq, rc = PTR_ERR(sepol));
336 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
337 sptlrpc_sepol_size(sepol));
339 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341 GOTO(err_put_sepol, rc);
343 spin_lock(&req->rq_lock);
344 req->rq_replay = req->rq_import->imp_replayable;
345 spin_unlock(&req->rq_lock);
347 /* pack the intent */
348 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349 lit->opc = (__u64)it->it_op;
351 /* pack the intended request */
352 mdc_open_pack(&req->rq_pill, op_data, it->it_create_mode, 0,
353 it->it_open_flags, lmm, lmmsize, sepol);
355 sptlrpc_sepol_put(sepol);
357 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358 mdt_md_capsule_size);
359 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364 op_data->op_file_secctx_name_size > 0 &&
365 op_data->op_file_secctx_name != NULL) {
368 secctx_name = req_capsule_client_get(&req->rq_pill,
369 &RMF_FILE_SECCTX_NAME);
370 memcpy(secctx_name, op_data->op_file_secctx_name,
371 op_data->op_file_secctx_name_size);
372 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374 obd->u.cli.cl_max_mds_easize);
376 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377 op_data->op_file_secctx_name_size,
378 op_data->op_file_secctx_name);
381 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
385 if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
387 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
389 obd->u.cli.cl_max_mds_easize);
391 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
395 * Inline buffer for possible data from Data-on-MDT files.
397 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
398 sizeof(struct niobuf_remote));
399 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
400 sizeof(struct lmv_user_md));
401 ptlrpc_request_set_replen(req);
403 /* Get real repbuf allocated size as rounded up power of 2 */
404 repsize = size_roundup_power2(req->rq_replen +
405 lustre_msg_early_size);
406 /* Estimate free space for DoM files in repbuf */
407 repsize_estimate = repsize - (req->rq_replen -
408 mdt_md_capsule_size +
409 sizeof(struct lov_comp_md_v1) +
410 sizeof(struct lov_comp_md_entry_v1) +
411 lov_mds_md_size(0, LOV_MAGIC_V3));
413 if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
414 repsize = obd->u.cli.cl_dom_min_inline_repsize -
415 repsize_estimate + sizeof(struct niobuf_remote);
416 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
418 sizeof(struct niobuf_remote) + repsize);
419 ptlrpc_request_set_replen(req);
420 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
421 repsize, req->rq_replen);
422 repsize = size_roundup_power2(req->rq_replen +
423 lustre_msg_early_size);
425 /* The only way to report real allocated repbuf size to the server
426 * is the lm_repsize but it must be set prior buffer allocation itself
427 * due to security reasons - it is part of buffer used in signature
428 * calculation (see LU-11414). Therefore the saved size is predicted
429 * value as rq_replen rounded to the next higher power of 2.
430 * Such estimation is safe. Though the final allocated buffer might
431 * be even larger, it is not possible to know that at this point.
433 req->rq_reqmsg->lm_repsize = repsize;
437 sptlrpc_sepol_put(sepol);
439 ptlrpc_request_free(req);
443 static struct ptlrpc_request *
444 mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
445 struct md_op_data *op_data, __u32 acl_bufsize,
446 __u64 extra_lock_flags)
449 struct ptlrpc_request *req;
450 struct obd_device *obd = class_exp2obd(exp);
451 struct sptlrpc_sepol *sepol;
452 struct ldlm_intent *lit;
458 if (fid_is_sane(&op_data->op_fid1))
459 /* cancel parent's UPDATE lock. */
460 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
462 MDS_INODELOCK_UPDATE);
464 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
465 &RQF_LDLM_INTENT_CREATE);
467 ldlm_lock_list_put(&cancels, l_bl_ast, count);
468 RETURN(ERR_PTR(-ENOMEM));
471 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472 op_data->op_namelen + 1);
473 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
474 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
475 strlen(op_data->op_file_secctx_name) + 1 : 0);
476 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
477 op_data->op_file_secctx_size);
478 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
479 op_data->op_data_size);
480 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
481 op_data->op_file_encctx_size);
483 /* get SELinux policy info if any */
484 sepol = sptlrpc_sepol_get(req);
486 ldlm_lock_list_put(&cancels, l_bl_ast, count);
487 GOTO(err_free_rq, rc = PTR_ERR(sepol));
489 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
490 sptlrpc_sepol_size(sepol));
492 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
494 GOTO(err_put_sepol, rc);
496 /* Pack the intent */
497 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498 lit->opc = (__u64)it->it_op;
500 /* Pack the intent request. */
501 mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
502 op_data->op_data_size, it->it_create_mode,
503 op_data->op_fsuid, op_data->op_fsgid,
504 op_data->op_cap, 0, sepol);
506 sptlrpc_sepol_put(sepol);
508 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
509 obd->u.cli.cl_default_mds_easize);
510 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
511 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
512 sizeof(struct lmv_user_md));
513 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
515 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
517 ptlrpc_request_set_replen(req);
521 sptlrpc_sepol_put(sepol);
523 ptlrpc_request_free(req);
527 #define GA_DEFAULT_EA_NAME_LEN 20
528 #define GA_DEFAULT_EA_VAL_LEN 250
529 #define GA_DEFAULT_EA_NUM 10
531 static struct ptlrpc_request *
532 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
533 struct md_op_data *op_data)
535 struct ptlrpc_request *req;
536 struct ldlm_intent *lit;
537 struct sptlrpc_sepol *sepol;
540 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
543 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
544 &RQF_LDLM_INTENT_GETXATTR);
546 RETURN(ERR_PTR(-ENOMEM));
548 /* get SELinux policy info if any */
549 sepol = sptlrpc_sepol_get(req);
551 GOTO(err_free_rq, rc = PTR_ERR(sepol));
553 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
554 sptlrpc_sepol_size(sepol));
556 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
558 GOTO(err_put_sepol, rc);
560 /* pack the intent */
561 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
562 lit->opc = IT_GETXATTR;
563 /* Message below is checked in sanity-selinux test_20d
564 * and sanity-sec test_49
566 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
567 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
569 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
570 /* If the supplied buffer is too small then the server will return
571 * -ERANGE and llite will fallback to using non cached xattr
572 * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
573 * for an orphan or dead file causes an oops. So let's try to avoid
574 * sending too small a buffer to too old a server. This is effectively
575 * undoing the memory conservation of LU-9417 when it would be *more*
576 * likely to crash the server. See LU-9856.
578 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
579 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
580 exp->exp_connect_data.ocd_max_easize);
583 /* pack the intended request */
584 mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
585 ea_vals_buf_size, -1, 0);
587 /* get SELinux policy info if any */
588 mdc_file_sepol_pack(&req->rq_pill, sepol);
589 sptlrpc_sepol_put(sepol);
591 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
592 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
594 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
597 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
598 sizeof(u32) * GA_DEFAULT_EA_NUM);
600 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
602 ptlrpc_request_set_replen(req);
607 sptlrpc_sepol_put(sepol);
609 ptlrpc_request_free(req);
613 static struct ptlrpc_request *
614 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
615 struct md_op_data *op_data, __u32 acl_bufsize)
617 struct ptlrpc_request *req;
618 struct obd_device *obd = class_exp2obd(exp);
619 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
620 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
622 struct ldlm_intent *lit;
624 bool have_secctx = false;
628 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
629 &RQF_LDLM_INTENT_GETATTR);
631 RETURN(ERR_PTR(-ENOMEM));
633 /* send name of security xattr to get upon intent */
634 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
635 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
637 op_data->op_file_secctx_name_size > 0 &&
638 op_data->op_file_secctx_name != NULL) {
640 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
642 op_data->op_file_secctx_name_size);
645 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
646 op_data->op_namelen + 1);
648 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
650 ptlrpc_request_free(req);
654 /* pack the intent */
655 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
656 lit->opc = (__u64)it->it_op;
658 if (obd->u.cli.cl_default_mds_easize > 0)
659 easize = obd->u.cli.cl_default_mds_easize;
661 easize = obd->u.cli.cl_max_mds_easize;
663 /* pack the intended request */
664 mdc_getattr_pack(&req->rq_pill, valid, it->it_open_flags, op_data,
667 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
668 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
669 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
670 sizeof(struct lmv_user_md));
675 secctx_name = req_capsule_client_get(&req->rq_pill,
676 &RMF_FILE_SECCTX_NAME);
677 memcpy(secctx_name, op_data->op_file_secctx_name,
678 op_data->op_file_secctx_name_size);
680 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
683 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
684 op_data->op_file_secctx_name_size,
685 op_data->op_file_secctx_name);
687 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
691 if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
692 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
695 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
698 ptlrpc_request_set_replen(req);
702 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
703 struct lookup_intent *it,
704 struct md_op_data *op_data)
706 struct obd_device *obd = class_exp2obd(exp);
707 struct ptlrpc_request *req;
708 struct ldlm_intent *lit;
709 struct layout_intent *layout;
714 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
715 &RQF_LDLM_INTENT_LAYOUT);
717 RETURN(ERR_PTR(-ENOMEM));
719 if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
720 (it->it_open_flags & FMODE_WRITE)) {
721 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
723 MDS_INODELOCK_LAYOUT);
726 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
727 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
729 ptlrpc_request_free(req);
733 /* pack the intent */
734 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
735 lit->opc = (__u64)it->it_op;
737 /* pack the layout intent request */
738 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
739 LASSERT(op_data->op_data != NULL);
740 LASSERT(op_data->op_data_size == sizeof(*layout));
741 memcpy(layout, op_data->op_data, sizeof(*layout));
743 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
744 obd->u.cli.cl_default_mds_easize);
745 ptlrpc_request_set_replen(req);
749 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
752 struct ptlrpc_request *req;
756 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
758 RETURN(ERR_PTR(-ENOMEM));
760 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
762 ptlrpc_request_free(req);
766 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
767 ptlrpc_request_set_replen(req);
771 int mdc_finish_enqueue(struct obd_export *exp,
772 struct req_capsule *pill,
773 struct ldlm_enqueue_info *einfo,
774 struct lookup_intent *it,
775 struct lustre_handle *lockh, int rc)
777 struct ptlrpc_request *req = pill->rc_req;
778 struct ldlm_request *lockreq;
779 struct ldlm_reply *lockrep;
780 struct ldlm_lock *lock;
781 struct mdt_body *body = NULL;
782 void *lvb_data = NULL;
787 /* Similarly, if we're going to replay this request, we don't want to
788 * actually get a lock, just perform the intent.
790 if (req->rq_transno || req->rq_replay) {
791 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
792 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
795 if (rc == ELDLM_LOCK_ABORTED) {
797 memset(lockh, 0, sizeof(*lockh));
799 } else { /* rc = 0 */
800 lock = ldlm_handle2lock(lockh);
801 LASSERT(lock != NULL);
803 /* If server returned a different lock mode, fix up variables */
804 if (lock->l_req_mode != einfo->ei_mode) {
805 ldlm_lock_addref(lockh, lock->l_req_mode);
806 ldlm_lock_decref(lockh, einfo->ei_mode);
807 einfo->ei_mode = lock->l_req_mode;
812 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
813 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
815 it->it_disposition = (int)lockrep->lock_policy_res1;
816 it->it_status = (int)lockrep->lock_policy_res2;
817 it->it_lock_mode = einfo->ei_mode;
818 it->it_lock_handle = lockh->cookie;
819 it->it_request = req;
821 /* Technically speaking rq_transno must already be zero if
822 * it_status is in error, so the check is a bit redundant.
824 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
825 mdc_clear_replay_flag(req, it->it_status);
827 /* If we're doing an IT_OPEN which did not result in an actual
828 * successful open, then we need to remove the bit which saves
829 * this request for unconditional replay.
831 * It's important that we do this first! Otherwise we might exit the
832 * function without doing so, and try to replay a failed create.
835 if (it->it_op & IT_OPEN && req->rq_replay &&
836 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
837 mdc_clear_replay_flag(req, it->it_status);
839 DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
840 it->it_op, it->it_disposition, it->it_status);
842 /* We know what to expect, so we do any byte flipping required here */
843 if (it_has_reply_body(it)) {
844 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
847 CERROR("%s: cannot swab mdt_body: rc = %d\n",
848 exp->exp_obd->obd_name, rc);
852 if (it_disposition(it, DISP_OPEN_OPEN) &&
853 !it_open_error(DISP_OPEN_OPEN, it)) {
855 * If this is a successful OPEN request, we need to set
856 * replay handler and data early, so that if replay
857 * happens immediately after swabbing below, new reply
858 * is swabbed by that handler correctly.
860 mdc_set_open_replay_data(NULL, NULL, it);
863 if (it_disposition(it, DISP_OPEN_CREATE) &&
864 !it_open_error(DISP_OPEN_CREATE, it)) {
865 lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
869 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
872 mdc_update_max_ea_from_body(exp, body);
875 * The eadata is opaque; just check that it is there.
876 * Eventually, obd_unpackmd() will check the contents.
878 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
879 body->mbo_eadatasize);
883 /* save LVB data and length if for layout lock */
885 lvb_len = body->mbo_eadatasize;
888 * We save the reply LOV EA in case we have to replay a
889 * create for recovery. If we didn't allocate a large
890 * enough request buffer above we need to reallocate it
891 * here to hold the actual LOV EA.
893 * To not save LOV EA if request is not going to replay
894 * (for example error one).
896 if ((it->it_op & IT_OPEN) && req->rq_replay) {
897 rc = mdc_save_lmm(req, eadata,
898 body->mbo_eadatasize);
900 body->mbo_valid &= ~OBD_MD_FLEASIZE;
901 body->mbo_eadatasize = 0;
906 } else if (it->it_op & IT_LAYOUT) {
907 /* maybe the lock was granted right away and layout
908 * is packed into RMF_DLM_LVB of req
910 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
911 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
912 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
914 lvb_data = req_capsule_server_sized_get(pill,
915 &RMF_DLM_LVB, lvb_len);
916 if (lvb_data == NULL)
920 * save replied layout data to the request buffer for
921 * recovery consideration (lest MDS reinitialize
922 * another set of OST objects).
925 mdc_save_lmm(req, lvb_data, lvb_len);
929 /* fill in stripe data for layout lock.
930 * LU-6581: trust layout data only if layout lock is granted. The MDT
931 * has stopped sending layout unless the layout lock is granted. The
932 * client still does this checking in case it's talking with an old
935 lock = ldlm_handle2lock(lockh);
939 if (ldlm_has_layout(lock) && lvb_data != NULL &&
940 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
943 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
944 ldlm_it2str(it->it_op), lvb_len);
946 OBD_ALLOC_LARGE(lmm, lvb_len);
948 GOTO(out_lock, rc = -ENOMEM);
950 memcpy(lmm, lvb_data, lvb_len);
952 /* install lvb_data */
953 lock_res_and_lock(lock);
954 if (lock->l_lvb_data == NULL) {
955 lock->l_lvb_type = LVB_T_LAYOUT;
956 lock->l_lvb_data = lmm;
957 lock->l_lvb_len = lvb_len;
960 unlock_res_and_lock(lock);
962 OBD_FREE_LARGE(lmm, lvb_len);
965 if (ldlm_has_dom(lock)) {
966 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
968 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
969 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
970 LDLM_ERROR(lock, "%s: DoM lock without size.",
971 exp->exp_obd->obd_name);
972 GOTO(out_lock, rc = -EPROTO);
975 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
976 ldlm_it2str(it->it_op), body->mbo_dom_size);
978 lock_res_and_lock(lock);
979 mdc_body2lvb(body, &lock->l_ost_lvb);
980 ldlm_lock_allow_match_locked(lock);
981 unlock_res_and_lock(lock);
989 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
992 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
993 it->it_op == IT_READDIR || it->it_op == IT_GETXATTR ||
994 (it->it_op == IT_LAYOUT && !(it->it_open_flags &
1000 /* We always reserve enough space in the reply packet for a stripe MD, because
1001 * we don't know in advance the file type.
1003 static int mdc_enqueue_base(struct obd_export *exp,
1004 struct ldlm_enqueue_info *einfo,
1005 const union ldlm_policy_data *policy,
1006 struct lookup_intent *it,
1007 struct md_op_data *op_data,
1008 struct lustre_handle *lockh,
1009 __u64 extra_lock_flags)
1011 struct obd_device *obd = class_exp2obd(exp);
1012 struct ptlrpc_request *req;
1013 __u64 flags, saved_flags = extra_lock_flags;
1014 struct ldlm_res_id res_id;
1015 static const union ldlm_policy_data lookup_policy = {
1016 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
1017 static const union ldlm_policy_data update_policy = {
1018 .l_inodebits = { MDS_INODELOCK_UPDATE } };
1019 static const union ldlm_policy_data layout_policy = {
1020 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
1021 static const union ldlm_policy_data getxattr_policy = {
1022 .l_inodebits = { MDS_INODELOCK_XATTR } };
1023 int generation, resends = 0;
1024 struct ldlm_reply *lockrep;
1025 struct obd_import *imp = class_exp2cliimp(exp);
1027 enum lvb_type lvb_type = 0;
1031 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
1033 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1036 LASSERT(policy == NULL);
1038 saved_flags |= LDLM_FL_HAS_INTENT;
1039 if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
1040 policy = &update_policy;
1041 else if (it->it_op & IT_LAYOUT)
1042 policy = &layout_policy;
1043 else if (it->it_op & IT_GETXATTR)
1044 policy = &getxattr_policy;
1046 policy = &lookup_policy;
1049 generation = obd->u.cli.cl_import->imp_generation;
1050 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
1051 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1054 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
1057 flags = saved_flags;
1059 /* The only way right now is FLOCK. */
1060 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1062 res_id.name[3] = LDLM_FLOCK;
1063 req = ldlm_enqueue_pack(exp, 0);
1064 } else if (it->it_op & IT_OPEN) {
1065 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
1066 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
1067 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
1068 } else if (it->it_op & IT_READDIR) {
1069 req = mdc_enqueue_pack(exp, 0);
1070 } else if (it->it_op & IT_LAYOUT) {
1071 if (!imp_connect_lvb_type(imp))
1072 RETURN(-EOPNOTSUPP);
1073 req = mdc_intent_layout_pack(exp, it, op_data);
1074 lvb_type = LVB_T_LAYOUT;
1075 } else if (it->it_op & IT_GETXATTR) {
1076 req = mdc_intent_getxattr_pack(exp, it, op_data);
1077 } else if (it->it_op == IT_CREAT) {
1078 req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
1086 RETURN(PTR_ERR(req));
1089 req->rq_generation_set = 1;
1090 req->rq_import_generation = generation;
1091 req->rq_sent = ktime_get_real_seconds() + resends;
1094 einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
1095 einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
1097 /* With Data-on-MDT the glimpse callback is needed too.
1098 * It is set here in advance but not in mdc_finish_enqueue()
1099 * to avoid possible races. It is safe to have glimpse handler
1100 * for non-DOM locks and costs nothing.
1102 if (einfo->ei_cb_gl == NULL)
1103 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1105 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1106 0, lvb_type, lockh, 0);
1109 /* For flock requests we immediatelly return without further
1110 * delay and let caller deal with the rest, since rest of
1111 * this function metadata processing makes no sense for flock
1112 * requests anyway. But in case of problem during comms with
1113 * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1114 * we cannot rely on caller and this mainly for F_UNLCKs
1115 * (explicits or automatically generated by kernel to clean
1116 * current flocks upon exit) that can't be trashed.
1118 ptlrpc_req_put(req);
1119 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1120 (einfo->ei_type == LDLM_FLOCK) &&
1121 (einfo->ei_mode == LCK_NL))
1128 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1129 obd->obd_name, PFID(&op_data->op_fid1),
1130 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1132 mdc_clear_replay_flag(req, rc);
1133 ptlrpc_req_put(req);
1137 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1138 LASSERT(lockrep != NULL);
1140 lockrep->lock_policy_res2 =
1141 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1143 /* Retry infinitely when the server returns -EINPROGRESS for the
1144 * intent operation, when server returns -EINPROGRESS for acquiring
1145 * intent lock, we'll retry in after_reply().
1147 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1148 mdc_clear_replay_flag(req, rc);
1149 ptlrpc_req_put(req);
1150 if (generation == obd->u.cli.cl_import->imp_generation) {
1151 if (signal_pending(current))
1155 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1156 obd->obd_name, resends, it->it_op,
1157 PFID(&op_data->op_fid1),
1158 PFID(&op_data->op_fid2));
1161 CDEBUG(D_HA, "resend cross eviction\n");
1166 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1167 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1168 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1169 mdc_clear_replay_flag(req, -ERANGE);
1170 ptlrpc_req_put(req);
1171 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1176 rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
1178 if (lustre_handle_is_used(lockh)) {
1179 ldlm_lock_decref(lockh, einfo->ei_mode);
1180 memset(lockh, 0, sizeof(*lockh));
1182 ptlrpc_req_put(req);
1184 it->it_lock_handle = 0;
1185 it->it_lock_mode = 0;
1186 it->it_request = NULL;
1192 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1193 const union ldlm_policy_data *policy,
1194 struct md_op_data *op_data,
1195 struct lustre_handle *lockh, __u64 extra_lock_flags)
1197 return mdc_enqueue_base(exp, einfo, policy, NULL,
1198 op_data, lockh, extra_lock_flags);
1201 static int mdc_finish_intent_lock(struct obd_export *exp,
1202 struct ptlrpc_request *request,
1203 struct md_op_data *op_data,
1204 struct lookup_intent *it,
1205 struct lustre_handle *lockh)
1207 struct lustre_handle old_lock;
1208 struct ldlm_lock *lock;
1212 LASSERT(request != NULL);
1213 LASSERT(request != LP_POISON);
1214 LASSERT(request->rq_repmsg != LP_POISON);
1216 if (it->it_op & IT_READDIR)
1219 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1220 if (it->it_status != 0)
1221 GOTO(out, rc = it->it_status);
1223 if (!it_disposition(it, DISP_IT_EXECD)) {
1224 /* The server failed before it even started executing
1225 * the intent, i.e. because it couldn't unpack the
1228 LASSERT(it->it_status != 0);
1229 GOTO(out, rc = it->it_status);
1231 rc = it_open_error(DISP_IT_EXECD, it);
1235 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1239 /* keep requests around for the multiple phases of the call
1240 * this shows the DISP_XX must guarantee we make it into the
1243 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1244 it_disposition(it, DISP_OPEN_CREATE) &&
1245 !it_open_error(DISP_OPEN_CREATE, it)) {
1246 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1247 /* balanced in ll_create_node */
1248 ptlrpc_request_addref(request);
1250 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1251 it_disposition(it, DISP_OPEN_OPEN) &&
1252 !it_open_error(DISP_OPEN_OPEN, it)) {
1253 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1254 /* balanced in ll_file_open */
1255 ptlrpc_request_addref(request);
1256 /* eviction in middle of open RPC processing b=11546 */
1257 CFS_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1261 if (it->it_op & IT_CREAT) {
1262 /* XXX this belongs in ll_create_it */
1263 } else if (it->it_op == IT_OPEN) {
1264 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1266 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1270 /* If we already have a matching lock, then cancel the new
1271 * one. We have to set the data here instead of in
1272 * mdc_enqueue, because we need to use the child's inode as
1273 * the l_ast_data to match, and that's not available until
1274 * intent_finish has performed the iget().
1276 lock = ldlm_handle2lock(lockh);
1278 union ldlm_policy_data policy = lock->l_policy_data;
1280 LDLM_DEBUG(lock, "matching against this");
1282 if (it_has_reply_body(it)) {
1283 struct mdt_body *body;
1285 body = req_capsule_server_get(&request->rq_pill,
1287 /* mdc_enqueue checked */
1288 LASSERT(body != NULL);
1289 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1290 &lock->l_resource->lr_name),
1291 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1292 PLDLMRES(lock->l_resource),
1293 PFID(&body->mbo_fid1));
1295 LDLM_LOCK_PUT(lock);
1297 memcpy(&old_lock, lockh, sizeof(*lockh));
1298 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1299 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
1300 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1301 memcpy(lockh, &old_lock, sizeof(old_lock));
1302 it->it_lock_handle = lockh->cookie;
1309 "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1310 (int)op_data->op_namelen, op_data->op_name,
1311 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1316 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1317 struct lu_fid *fid, __u64 *bits)
1319 /* We could just return 1 immediately, but as we should only be called
1320 * in revalidate_it if we already have a lock, let's verify that.
1322 struct ldlm_res_id res_id;
1323 struct lustre_handle lockh;
1324 union ldlm_policy_data policy;
1325 enum ldlm_mode mode;
1328 if (it->it_lock_handle) {
1329 lockh.cookie = it->it_lock_handle;
1330 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1332 fid_build_reg_res_name(fid, &res_id);
1333 switch (it->it_op) {
1335 /* File attributes are held under multiple bits:
1336 * nlink is under lookup lock, size and times are
1337 * under UPDATE lock and recently we've also got
1338 * a separate permissions lock for owner/group/acl that
1339 * were protected by lookup lock before.
1340 * Getattr must provide all of that information,
1341 * so we need to ensure we have all of those locks.
1342 * Unfortunately, if the bits are split across multiple
1343 * locks, there's no easy way to match all of them here,
1344 * so an extra RPC would be performed to fetch all
1345 * of those bits at once for now.
1347 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1348 * but for old MDTs (< 2.4), permission is covered
1349 * by LOOKUP lock, so it needs to match all bits here.
1351 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1355 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1358 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1361 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1365 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1366 LDLM_IBITS, &policy,
1367 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1372 it->it_lock_handle = lockh.cookie;
1373 it->it_lock_mode = mode;
1375 it->it_lock_handle = 0;
1376 it->it_lock_mode = 0;
1383 * This long block is all about fixing up the lock and request state
1384 * so that it is correct as of the moment _before_ the operation was
1385 * applied; that way, the VFS will think that everything is normal and
1386 * call Lustre's regular VFS methods.
1388 * If we're performing a creation, that means that unless the creation
1389 * failed with EEXIST, we should fake up a negative dentry.
1391 * For everything else, we want the lookup to succeed.
1393 * One additional note: if CREATE or OPEN succeeded, we add an extra
1394 * reference to the request because we need to keep it around until
1395 * ll_create/ll_open gets called.
1397 * The server will return to us, in it_disposition, an indication of
1398 * exactly what it_status refers to.
1400 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1401 * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1402 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1403 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1406 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1409 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1410 struct lookup_intent *it, struct ptlrpc_request **reqp,
1411 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1413 struct ldlm_enqueue_info einfo = {
1414 .ei_type = LDLM_IBITS,
1415 .ei_mode = it_to_lock_mode(it),
1416 .ei_cb_bl = cb_blocking,
1417 .ei_cb_cp = ldlm_completion_ast,
1418 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1420 struct lustre_handle lockh;
1425 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1426 ", intent: %s flags %#lo\n", (int)op_data->op_namelen,
1427 op_data->op_name, PFID(&op_data->op_fid2),
1428 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1432 /* MDS_FID_OP is not a revalidate case */
1433 if (fid_is_sane(&op_data->op_fid2) &&
1434 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR)) &&
1435 !(op_data->op_bias & MDS_FID_OP)) {
1436 /* We could just return 1 immediately, but since we should only
1437 * be called in revalidate_it if we already have a lock, let's
1440 it->it_lock_handle = 0;
1441 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1442 /* Only return failure if it was not GETATTR by cfid
1443 * (from inode_revalidate()).
1445 if (rc || op_data->op_namelen != 0)
1449 /* For case if upper layer did not alloc fid, do it now. */
1450 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1451 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1453 CERROR("%s: cannot allocate new FID: rc=%d\n",
1454 exp->exp_obd->obd_name, rc);
1459 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1464 *reqp = it->it_request;
1465 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1469 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1470 struct ptlrpc_request *req,
1473 struct mdc_getattr_args *ga = args;
1474 struct obd_export *exp = ga->ga_exp;
1475 struct md_op_item *item = ga->ga_item;
1476 struct ldlm_enqueue_info *einfo = &item->mop_einfo;
1477 struct lookup_intent *it = &item->mop_it;
1478 struct lustre_handle *lockh = &item->mop_lockh;
1479 struct req_capsule *pill = &req->rq_pill;
1480 struct ldlm_reply *lockrep;
1481 __u64 flags = LDLM_FL_HAS_INTENT;
1484 if (CFS_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1487 rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
1490 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1491 exp->exp_obd->obd_name, rc);
1492 mdc_clear_replay_flag(req, rc);
1496 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
1497 LASSERT(lockrep != NULL);
1499 lockrep->lock_policy_res2 =
1500 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1502 rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
1506 rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
1510 item->mop_pill = pill;
1511 item->mop_cb(item, rc);
1515 int mdc_intent_getattr_async(struct obd_export *exp,
1516 struct md_op_item *item)
1518 struct md_op_data *op_data = &item->mop_data;
1519 struct lookup_intent *it = &item->mop_it;
1520 struct ptlrpc_request *req;
1521 struct mdc_getattr_args *ga;
1522 struct ldlm_res_id res_id;
1523 union ldlm_policy_data policy = {
1524 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1526 __u64 flags = LDLM_FL_HAS_INTENT;
1531 "name: %.*s in inode "DFID", intent: %s flags %#lo\n",
1532 (int)op_data->op_namelen, op_data->op_name,
1533 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1536 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1537 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1538 * of the async getattr RPC will handle that by itself.
1540 req = mdc_intent_getattr_pack(exp, it, op_data,
1541 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1543 RETURN(PTR_ERR(req));
1545 /* With Data-on-MDT the glimpse callback is needed too.
1546 * It is set here in advance but not in mdc_finish_enqueue()
1547 * to avoid possible races. It is safe to have glimpse handler
1548 * for non-DOM locks and costs nothing.
1550 if (item->mop_einfo.ei_cb_gl == NULL)
1551 item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1553 rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
1554 &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
1556 ptlrpc_req_put(req);
1560 ga = ptlrpc_req_async_args(ga, req);
1564 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1565 ptlrpcd_add_req(req);