1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
5 * Use is subject to license terms.
7 * Copyright (c) 2011, 2017, Intel Corporation.
11 * This file is part of Lustre, http://www.lustre.org/
14 #define DEBUG_SUBSYSTEM S_MDC
16 #include <linux/module.h>
19 #include <obd_class.h>
20 #include <lustre_dlm.h>
21 #include <lustre_fid.h>
22 #include <lustre_intent.h>
23 #include <lustre_mdc.h>
24 #include <lustre_net.h>
25 #include <lustre_req_layout.h>
26 #include <lustre_swab.h>
27 #include <lustre_acl.h>
29 #include "mdc_internal.h"
31 struct mdc_getattr_args {
32 struct obd_export *ga_exp;
33 struct md_op_item *ga_item;
36 struct mdc_enqueue_args {
37 struct ldlm_lock *mea_lock;
38 struct obd_export *mea_exp;
39 enum ldlm_mode mea_mode;
41 obd_enqueue_update_f mea_upcall;
44 int it_open_error(int phase, struct lookup_intent *it)
46 if (it_disposition(it, DISP_OPEN_LEASE)) {
47 if (phase >= DISP_OPEN_LEASE)
52 if (it_disposition(it, DISP_OPEN_OPEN)) {
53 if (phase >= DISP_OPEN_OPEN)
59 if (it_disposition(it, DISP_OPEN_CREATE)) {
60 if (phase >= DISP_OPEN_CREATE)
66 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
67 if (phase >= DISP_LOOKUP_EXECD)
73 if (it_disposition(it, DISP_IT_EXECD)) {
74 if (phase >= DISP_IT_EXECD)
80 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
85 EXPORT_SYMBOL(it_open_error);
87 /* this must be called on a lockh that is known to have a referenced lock */
88 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
89 void *data, enum mds_ibits_locks *bits)
91 struct ldlm_lock *lock;
92 struct inode *new_inode = data;
98 if (!lustre_handle_is_used(lockh))
101 lock = ldlm_handle2lock(lockh);
103 LASSERT(lock != NULL);
104 lock_res_and_lock(lock);
105 if (lock->l_resource->lr_lvb_inode &&
106 lock->l_resource->lr_lvb_inode != data) {
107 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
109 LASSERTF(old_inode->i_state & I_FREEING,
110 "Found existing inode %px/%lu/%u state %lu in lock: setting data to %px/%lu/%u\n",
111 old_inode, old_inode->i_ino, old_inode->i_generation,
112 (unsigned long)old_inode->i_state,
113 new_inode, new_inode->i_ino, new_inode->i_generation);
115 lock->l_resource->lr_lvb_inode = new_inode;
117 *bits = lock->l_policy_data.l_inodebits.bits;
119 unlock_res_and_lock(lock);
125 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
126 const struct lu_fid *fid, enum ldlm_type type,
127 union ldlm_policy_data *policy,
129 enum ldlm_match_flags match_flags,
130 struct lustre_handle *lockh)
132 struct ldlm_res_id res_id;
136 fid_build_reg_res_name(fid, &res_id);
137 /* LU-4405: Clear bits not supported by server */
138 policy->l_inodebits.bits &= exp_connect_ibits(exp);
139 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
140 &res_id, type, policy, mode, match_flags, lockh);
144 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
145 union ldlm_policy_data *policy, enum ldlm_mode mode,
146 enum ldlm_cancel_flags flags, void *opaque)
148 struct obd_device *obd = class_exp2obd(exp);
149 struct ldlm_res_id res_id;
153 fid_build_reg_res_name(fid, &res_id);
154 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
155 policy, mode, flags, opaque);
159 int mdc_null_inode(struct obd_export *exp,
160 const struct lu_fid *fid)
162 struct ldlm_res_id res_id;
163 struct ldlm_resource *res;
164 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
167 LASSERTF(ns != NULL, "no namespace passed\n");
169 fid_build_reg_res_name(fid, &res_id);
171 res = ldlm_resource_get(ns, &res_id, 0, 0);
176 res->lr_lvb_inode = NULL;
179 ldlm_resource_putref(res);
183 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
185 /* Don't hold error requests for replay. */
186 if (req->rq_replay) {
187 spin_lock(&req->rq_lock);
189 spin_unlock(&req->rq_lock);
191 if (rc && req->rq_transno != 0) {
192 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
199 * Save a large LOV/LMV EA into the request buffer so that it is available
200 * for replay. We don't do this in the initial request because the
201 * original request doesn't need this buffer (at most it sends just the
202 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
203 * buffer and may also be difficult to allocate and save a very large
204 * request buffer for each open. (b=5707)
206 * OOM here may cause recovery failure if lmm is needed (only for the
207 * original open if the MDS crashed just when this client also OOM'd)
208 * but this is incredibly unlikely, and questionable whether the client
209 * could do MDS recovery under OOM anyways...
211 int mdc_save_lmm(struct ptlrpc_request *req, void *data, u32 size)
213 struct req_capsule *pill = &req->rq_pill;
217 if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
218 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
220 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
221 req->rq_export->exp_obd->obd_name,
226 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
229 req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
230 lmm = req_capsule_client_get(pill, &RMF_EADATA);
232 memcpy(lmm, data, size);
233 lov_fix_ea_for_replay(lmm);
239 static struct ptlrpc_request *
240 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
241 struct md_op_data *op_data, __u32 acl_bufsize)
243 struct ptlrpc_request *req;
244 struct obd_device *obd = class_exp2obd(exp);
245 struct ldlm_intent *lit;
246 const void *lmm = op_data->op_data;
247 __u32 lmmsize = op_data->op_data_size;
248 __u32 mdt_md_capsule_size;
252 int repsize, repsize_estimate;
253 struct sptlrpc_sepol *sepol;
258 mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_open_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_open_flags & MDS_FMODE_WRITE)
271 if (it->it_open_flags & (MDS_FMODE_WRITE |
275 else if (it->it_open_flags & FMODE_EXEC)
281 count = mdc_resource_cancel_unused(exp, &op_data->op_fid2,
286 /* If CREATE, cancel parent's UPDATE lock. */
287 if (it->it_op & IT_CREAT)
291 count += mdc_resource_cancel_unused(exp, &op_data->op_fid1,
293 MDS_INODELOCK_UPDATE);
295 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296 &RQF_LDLM_INTENT_OPEN);
298 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299 RETURN(ERR_PTR(-ENOMEM));
302 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303 op_data->op_namelen + 1);
304 if (cl_is_lov_delay_create(it->it_open_flags)) {
305 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306 LASSERT(lmmsize == 0);
307 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310 max(lmmsize, obd->u.cli.cl_default_mds_easize));
313 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315 op_data->op_file_secctx_name_size : 0);
317 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318 op_data->op_file_secctx_size);
320 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
321 op_data->op_file_encctx_size);
323 /* get SELinux policy info if any */
324 sepol = sptlrpc_sepol_get(req);
326 GOTO(err_free_rq, rc = PTR_ERR(sepol));
328 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329 sptlrpc_sepol_size(sepol));
331 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
333 GOTO(err_put_sepol, rc);
335 spin_lock(&req->rq_lock);
336 req->rq_replay = req->rq_import->imp_replayable;
337 spin_unlock(&req->rq_lock);
339 /* pack the intent */
340 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
341 lit->opc = (__u64)it->it_op;
343 /* pack the intended request */
344 mdc_open_pack(&req->rq_pill, op_data, it->it_create_mode, 0,
345 it->it_open_flags, lmm, lmmsize, sepol);
347 sptlrpc_sepol_put(sepol);
349 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
350 mdt_md_capsule_size);
351 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
353 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
354 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
356 op_data->op_file_secctx_name_size > 0 &&
357 op_data->op_file_secctx_name != NULL) {
360 secctx_name = req_capsule_client_get(&req->rq_pill,
361 &RMF_FILE_SECCTX_NAME);
362 memcpy(secctx_name, op_data->op_file_secctx_name,
363 op_data->op_file_secctx_name_size);
364 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
366 obd->u.cli.cl_max_mds_easize);
368 CDEBUG(D_SEC, "packed '"DNAME"' as security xattr name\n",
369 encode_fn_opdata(op_data));
372 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
376 if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
378 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
380 obd->u.cli.cl_max_mds_easize);
382 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
386 * Inline buffer for possible data from Data-on-MDT files.
388 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389 sizeof(struct niobuf_remote));
390 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
391 sizeof(struct lmv_user_md));
392 ptlrpc_request_set_replen(req);
394 /* Get real repbuf allocated size as rounded up power of 2 */
395 repsize = size_roundup_power2(req->rq_replen +
396 lustre_msg_early_size);
397 /* Estimate free space for DoM files in repbuf */
398 repsize_estimate = repsize - (req->rq_replen -
399 mdt_md_capsule_size +
400 sizeof(struct lov_comp_md_v1) +
401 sizeof(struct lov_comp_md_entry_v1) +
402 lov_mds_md_size(0, LOV_MAGIC_V3));
404 if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
405 repsize = obd->u.cli.cl_dom_min_inline_repsize -
406 repsize_estimate + sizeof(struct niobuf_remote);
407 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
409 sizeof(struct niobuf_remote) + repsize);
410 ptlrpc_request_set_replen(req);
411 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
412 repsize, req->rq_replen);
413 repsize = size_roundup_power2(req->rq_replen +
414 lustre_msg_early_size);
416 /* The only way to report real allocated repbuf size to the server
417 * is the lm_repsize but it must be set prior buffer allocation itself
418 * due to security reasons - it is part of buffer used in signature
419 * calculation (see LU-11414). Therefore the saved size is predicted
420 * value as rq_replen rounded to the next higher power of 2.
421 * Such estimation is safe. Though the final allocated buffer might
422 * be even larger, it is not possible to know that at this point.
424 if ((op_data->op_cli_flags & CLI_READ_ON_OPEN) != 0)
425 req->rq_reqmsg->lm_repsize = repsize;
427 req->rq_reqmsg->lm_repsize = 0;
431 sptlrpc_sepol_put(sepol);
433 ptlrpc_request_free(req);
437 static struct ptlrpc_request *
438 mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
439 struct md_op_data *op_data, __u32 acl_bufsize,
440 __u64 extra_lock_flags)
443 struct ptlrpc_request *req;
444 struct obd_device *obd = class_exp2obd(exp);
445 struct sptlrpc_sepol *sepol;
446 struct ldlm_intent *lit;
452 if (fid_is_sane(&op_data->op_fid1))
453 /* cancel parent's UPDATE lock. */
454 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
456 MDS_INODELOCK_UPDATE);
458 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
459 &RQF_LDLM_INTENT_CREATE);
461 ldlm_lock_list_put(&cancels, l_bl_ast, count);
462 RETURN(ERR_PTR(-ENOMEM));
465 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
466 op_data->op_namelen + 1);
467 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
468 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
469 strlen(op_data->op_file_secctx_name) + 1 : 0);
470 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
471 op_data->op_file_secctx_size);
472 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
473 op_data->op_data_size);
474 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
475 op_data->op_file_encctx_size);
477 /* get SELinux policy info if any */
478 sepol = sptlrpc_sepol_get(req);
480 ldlm_lock_list_put(&cancels, l_bl_ast, count);
481 GOTO(err_free_rq, rc = PTR_ERR(sepol));
483 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
484 sptlrpc_sepol_size(sepol));
486 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
488 GOTO(err_put_sepol, rc);
490 /* Pack the intent */
491 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
492 lit->opc = (__u64)it->it_op;
494 /* Pack the intent request. */
495 mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
496 op_data->op_data_size, it->it_create_mode,
497 op_data->op_fsuid, op_data->op_fsgid,
498 op_data->op_cap, 0, sepol);
500 sptlrpc_sepol_put(sepol);
502 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
503 obd->u.cli.cl_default_mds_easize);
504 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
505 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
506 sizeof(struct lmv_user_md));
507 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
509 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
511 ptlrpc_request_set_replen(req);
515 sptlrpc_sepol_put(sepol);
517 ptlrpc_request_free(req);
521 #define GA_DEFAULT_EA_NAME_LEN 20
522 #define GA_DEFAULT_EA_VAL_LEN 250
523 #define GA_DEFAULT_EA_NUM 10
525 static struct ptlrpc_request *
526 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
527 struct md_op_data *op_data)
529 struct ptlrpc_request *req;
530 struct ldlm_intent *lit;
531 struct sptlrpc_sepol *sepol;
534 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
537 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
538 &RQF_LDLM_INTENT_GETXATTR);
540 RETURN(ERR_PTR(-ENOMEM));
542 /* get SELinux policy info if any */
543 sepol = sptlrpc_sepol_get(req);
545 GOTO(err_free_rq, rc = PTR_ERR(sepol));
547 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
548 sptlrpc_sepol_size(sepol));
550 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
552 GOTO(err_put_sepol, rc);
554 /* pack the intent */
555 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
556 lit->opc = IT_GETXATTR;
557 /* Message below is checked in sanity-selinux test_20d
558 * and sanity-sec test_49
560 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
561 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
563 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
564 /* If the supplied buffer is too small then the server will return
565 * -ERANGE and llite will fallback to using non cached xattr
566 * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
567 * for an orphan or dead file causes an oops. So let's try to avoid
568 * sending too small a buffer to too old a server. This is effectively
569 * undoing the memory conservation of LU-9417 when it would be *more*
570 * likely to crash the server. See LU-9856.
572 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
573 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
574 exp->exp_connect_data.ocd_max_easize);
577 /* pack the intended request */
578 mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
579 ea_vals_buf_size, -1, 0, op_data->op_projid);
581 /* get SELinux policy info if any */
582 mdc_file_sepol_pack(&req->rq_pill, sepol);
583 sptlrpc_sepol_put(sepol);
585 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
586 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
588 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
591 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
592 sizeof(u32) * GA_DEFAULT_EA_NUM);
594 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
596 ptlrpc_request_set_replen(req);
601 sptlrpc_sepol_put(sepol);
603 ptlrpc_request_free(req);
607 static struct ptlrpc_request *
608 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
609 struct md_op_data *op_data, __u32 acl_bufsize)
611 struct ptlrpc_request *req;
612 struct obd_device *obd = class_exp2obd(exp);
613 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
614 OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
616 struct ldlm_intent *lit;
618 bool have_secctx = false;
622 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
623 &RQF_LDLM_INTENT_GETATTR);
625 RETURN(ERR_PTR(-ENOMEM));
627 /* send name of security xattr to get upon intent */
628 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
629 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
631 op_data->op_file_secctx_name_size > 0 &&
632 op_data->op_file_secctx_name != NULL) {
634 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
636 op_data->op_file_secctx_name_size);
639 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
640 op_data->op_namelen + 1);
642 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
644 ptlrpc_request_free(req);
648 /* pack the intent */
649 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
650 lit->opc = (__u64)it->it_op;
652 if (obd->u.cli.cl_default_mds_easize > 0)
653 easize = obd->u.cli.cl_default_mds_easize;
655 easize = obd->u.cli.cl_max_mds_easize;
657 /* pack the intended request */
658 mdc_getattr_pack(&req->rq_pill, valid, it->it_open_flags, op_data,
661 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
662 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
663 req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
664 sizeof(struct lmv_user_md));
669 secctx_name = req_capsule_client_get(&req->rq_pill,
670 &RMF_FILE_SECCTX_NAME);
671 memcpy(secctx_name, op_data->op_file_secctx_name,
672 op_data->op_file_secctx_name_size);
674 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
677 CDEBUG(D_SEC, "packed '"DNAME"' as security xattr name\n",
678 encode_fn_opdata(op_data));
680 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
684 if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
685 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
688 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
691 ptlrpc_request_set_replen(req);
695 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
696 struct lookup_intent *it,
697 struct md_op_data *op_data)
699 struct obd_device *obd = class_exp2obd(exp);
700 struct ptlrpc_request *req;
701 struct ldlm_intent *lit;
702 struct layout_intent *layout;
707 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
708 &RQF_LDLM_INTENT_LAYOUT);
710 RETURN(ERR_PTR(-ENOMEM));
712 if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
713 (it->it_open_flags & FMODE_WRITE)) {
714 count = mdc_resource_cancel_unused(exp, &op_data->op_fid2,
716 MDS_INODELOCK_LAYOUT);
719 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
720 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
722 ptlrpc_request_free(req);
726 /* pack the intent */
727 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
728 lit->opc = (__u64)it->it_op;
730 /* pack the layout intent request */
731 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
732 LASSERT(op_data->op_data != NULL);
733 LASSERT(op_data->op_data_size == sizeof(*layout));
734 memcpy(layout, op_data->op_data, sizeof(*layout));
736 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
737 obd->u.cli.cl_default_mds_easize);
738 ptlrpc_request_set_replen(req);
742 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
745 struct ptlrpc_request *req;
749 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
751 RETURN(ERR_PTR(-ENOMEM));
753 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
755 ptlrpc_request_free(req);
759 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
760 ptlrpc_request_set_replen(req);
764 int mdc_finish_enqueue(struct obd_export *exp,
765 struct req_capsule *pill,
766 struct ldlm_enqueue_info *einfo,
767 struct lookup_intent *it,
768 struct lustre_handle *lockh, int rc)
770 struct ptlrpc_request *req = pill->rc_req;
771 struct ldlm_request *lockreq;
772 struct ldlm_reply *lockrep;
773 struct ldlm_lock *lock;
774 struct mdt_body *body = NULL;
775 void *lvb_data = NULL;
780 /* Similarly, if we're going to replay this request, we don't want to
781 * actually get a lock, just perform the intent.
783 if (req->rq_transno || req->rq_replay) {
784 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
785 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
788 if (rc == ELDLM_LOCK_ABORTED) {
790 memset(lockh, 0, sizeof(*lockh));
792 } else { /* rc = 0 */
793 lock = ldlm_handle2lock(lockh);
794 LASSERT(lock != NULL);
796 /* If server returned a different lock mode, fix up variables */
797 if (lock->l_req_mode != einfo->ei_mode) {
798 ldlm_lock_addref(lockh, lock->l_req_mode);
799 ldlm_lock_decref(lockh, einfo->ei_mode);
800 einfo->ei_mode = lock->l_req_mode;
805 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
806 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
808 it->it_disposition = (int)lockrep->lock_policy_res1;
809 it->it_status = (int)lockrep->lock_policy_res2;
810 it->it_lock_mode = einfo->ei_mode;
811 it->it_lock_handle = lockh->cookie;
812 it->it_request = req;
814 /* Technically speaking rq_transno must already be zero if
815 * it_status is in error, so the check is a bit redundant.
817 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
818 mdc_clear_replay_flag(req, it->it_status);
820 /* If we're doing an IT_OPEN which did not result in an actual
821 * successful open, then we need to remove the bit which saves
822 * this request for unconditional replay.
824 * It's important that we do this first! Otherwise we might exit the
825 * function without doing so, and try to replay a failed create.
828 if (it->it_op & IT_OPEN && req->rq_replay &&
829 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
830 mdc_clear_replay_flag(req, it->it_status);
832 DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
833 it->it_op, it->it_disposition, it->it_status);
835 /* We know what to expect, so we do any byte flipping required here */
836 if (it_has_reply_body(it)) {
837 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
840 CERROR("%s: cannot swab mdt_body: rc = %d\n",
841 exp->exp_obd->obd_name, rc);
845 if (it_disposition(it, DISP_OPEN_OPEN) &&
846 !it_open_error(DISP_OPEN_OPEN, it)) {
848 * If this is a successful OPEN request, we need to set
849 * replay handler and data early, so that if replay
850 * happens immediately after swabbing below, new reply
851 * is swabbed by that handler correctly.
853 mdc_set_open_replay_data(NULL, NULL, it);
856 if (it_disposition(it, DISP_OPEN_CREATE) &&
857 !it_open_error(DISP_OPEN_CREATE, it)) {
858 lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
862 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
865 mdc_update_max_ea_from_body(exp, body);
868 * The eadata is opaque; just check that it is there.
869 * Eventually, obd_unpackmd() will check the contents.
871 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
872 body->mbo_eadatasize);
876 /* save LVB data and length if for layout lock */
878 lvb_len = body->mbo_eadatasize;
881 * We save the reply LOV EA in case we have to replay a
882 * create for recovery. If we didn't allocate a large
883 * enough request buffer above we need to reallocate it
884 * here to hold the actual LOV EA.
886 * To not save LOV EA if request is not going to replay
887 * (for example error one).
889 if ((it->it_op & IT_OPEN) && req->rq_replay) {
890 rc = mdc_save_lmm(req, eadata,
891 body->mbo_eadatasize);
893 body->mbo_valid &= ~OBD_MD_FLEASIZE;
894 body->mbo_eadatasize = 0;
899 } else if (it->it_op & IT_LAYOUT) {
900 /* maybe the lock was granted right away and layout
901 * is packed into RMF_DLM_LVB of req
903 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
904 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
905 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
907 lvb_data = req_capsule_server_sized_get(pill,
908 &RMF_DLM_LVB, lvb_len);
909 if (lvb_data == NULL)
913 * save replied layout data to the request buffer for
914 * recovery consideration (lest MDS reinitialize
915 * another set of OST objects).
918 mdc_save_lmm(req, lvb_data, lvb_len);
922 /* fill in stripe data for layout lock.
923 * LU-6581: trust layout data only if layout lock is granted. The MDT
924 * has stopped sending layout unless the layout lock is granted. The
925 * client still does this checking in case it's talking with an old
928 lock = ldlm_handle2lock(lockh);
932 if (ldlm_has_layout(lock) && lvb_data != NULL &&
933 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
936 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
937 ldlm_it2str(it->it_op), lvb_len);
939 OBD_ALLOC_LARGE(lmm, lvb_len);
941 GOTO(out_lock, rc = -ENOMEM);
943 memcpy(lmm, lvb_data, lvb_len);
945 /* install lvb_data */
946 lock_res_and_lock(lock);
947 if (lock->l_lvb_data == NULL) {
948 lock->l_lvb_type = LVB_T_LAYOUT;
949 lock->l_lvb_data = lmm;
950 lock->l_lvb_len = lvb_len;
953 unlock_res_and_lock(lock);
955 OBD_FREE_LARGE(lmm, lvb_len);
958 if (ldlm_has_dom(lock)) {
959 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
961 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
962 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
963 LDLM_ERROR(lock, "%s: DoM lock without size.",
964 exp->exp_obd->obd_name);
965 GOTO(out_lock, rc = -EPROTO);
968 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
969 ldlm_it2str(it->it_op), body->mbo_dom_size);
971 /* l_ost_lvb is only in the LDLM_IBITS union **/
972 LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
973 lock_res_and_lock(lock);
974 mdc_body2lvb(body, &lock->l_ost_lvb);
975 ldlm_lock_allow_match_locked(lock);
976 unlock_res_and_lock(lock);
984 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
987 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
988 it->it_op == IT_READDIR || it->it_op == IT_GETXATTR ||
989 (it->it_op == IT_LAYOUT && !(it->it_open_flags &
995 /* We always reserve enough space in the reply packet for a stripe MD, because
996 * we don't know in advance the file type.
998 static int mdc_enqueue_base(struct obd_export *exp,
999 struct ldlm_enqueue_info *einfo,
1000 const union ldlm_policy_data *policy,
1001 struct lookup_intent *it,
1002 struct md_op_data *op_data,
1003 struct lustre_handle *lockh,
1004 __u64 extra_lock_flags)
1006 struct obd_device *obd = class_exp2obd(exp);
1007 struct ptlrpc_request *req;
1008 __u64 flags, saved_flags = extra_lock_flags;
1009 struct ldlm_res_id res_id;
1010 static const union ldlm_policy_data lookup_policy = {
1011 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
1012 static const union ldlm_policy_data update_policy = {
1013 .l_inodebits = { MDS_INODELOCK_UPDATE } };
1014 static const union ldlm_policy_data layout_policy = {
1015 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
1016 static const union ldlm_policy_data getxattr_policy = {
1017 .l_inodebits = { MDS_INODELOCK_XATTR } };
1018 int generation, resends = 0;
1019 struct ldlm_reply *lockrep;
1020 struct obd_import *imp = class_exp2cliimp(exp);
1022 enum lvb_type lvb_type = 0;
1026 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
1028 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1031 LASSERT(policy == NULL);
1033 saved_flags |= LDLM_FL_HAS_INTENT;
1034 if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
1035 policy = &update_policy;
1036 else if (it->it_op & IT_LAYOUT)
1037 policy = &layout_policy;
1038 else if (it->it_op & IT_GETXATTR)
1039 policy = &getxattr_policy;
1041 policy = &lookup_policy;
1044 generation = obd->u.cli.cl_import->imp_generation;
1045 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
1046 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1049 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
1052 flags = saved_flags;
1054 /* The only way right now is FLOCK. */
1055 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1057 res_id.name[3] = LDLM_FLOCK;
1058 req = ldlm_enqueue_pack(exp, 0);
1059 } else if (it->it_op & IT_OPEN) {
1060 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
1061 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
1062 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
1063 } else if (it->it_op & IT_READDIR) {
1064 req = mdc_enqueue_pack(exp, 0);
1065 } else if (it->it_op & IT_LAYOUT) {
1066 if (!imp_connect_lvb_type(imp))
1067 RETURN(-EOPNOTSUPP);
1068 req = mdc_intent_layout_pack(exp, it, op_data);
1069 lvb_type = LVB_T_LAYOUT;
1070 } else if (it->it_op & IT_GETXATTR) {
1071 req = mdc_intent_getxattr_pack(exp, it, op_data);
1072 } else if (it->it_op == IT_CREAT) {
1073 req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
1081 RETURN(PTR_ERR(req));
1083 lustre_msg_set_projid(req->rq_reqmsg, op_data->op_projid);
1086 req->rq_generation_set = 1;
1087 req->rq_import_generation = generation;
1088 req->rq_sent = ktime_get_real_seconds() + resends;
1091 einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
1092 einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
1094 /* With Data-on-MDT the glimpse callback is needed too.
1095 * It is set here in advance but not in mdc_finish_enqueue()
1096 * to avoid possible races. It is safe to have glimpse handler
1097 * for non-DOM locks and costs nothing.
1099 if (einfo->ei_cb_gl == NULL)
1100 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1102 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1103 0, lvb_type, lockh, 0);
1106 /* For flock requests we immediatelly return without further
1107 * delay and let caller deal with the rest, since rest of
1108 * this function metadata processing makes no sense for flock
1109 * requests anyway. But in case of problem during comms with
1110 * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1111 * we cannot rely on caller and this mainly for F_UNLCKs
1112 * (explicits or automatically generated by kernel to clean
1113 * current flocks upon exit) that can't be trashed.
1115 ptlrpc_req_put(req);
1116 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1117 (einfo->ei_type == LDLM_FLOCK) &&
1118 (einfo->ei_mode == LCK_NL))
1125 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1126 obd->obd_name, PFID(&op_data->op_fid1),
1127 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1129 mdc_clear_replay_flag(req, rc);
1130 ptlrpc_req_put(req);
1134 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1135 LASSERT(lockrep != NULL);
1137 lockrep->lock_policy_res2 =
1138 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1140 /* Retry infinitely when the server returns -EINPROGRESS for the
1141 * intent operation, when server returns -EINPROGRESS for acquiring
1142 * intent lock, we'll retry in after_reply().
1144 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1145 mdc_clear_replay_flag(req, rc);
1146 ptlrpc_req_put(req);
1147 if (generation == obd->u.cli.cl_import->imp_generation) {
1148 if (signal_pending(current))
1152 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1153 obd->obd_name, resends, it->it_op,
1154 PFID(&op_data->op_fid1),
1155 PFID(&op_data->op_fid2));
1158 CDEBUG(D_HA, "resend cross eviction\n");
1163 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1164 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1165 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1166 mdc_clear_replay_flag(req, -ERANGE);
1167 ptlrpc_req_put(req);
1168 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1173 rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
1175 if (lustre_handle_is_used(lockh)) {
1176 ldlm_lock_decref(lockh, einfo->ei_mode);
1177 memset(lockh, 0, sizeof(*lockh));
1179 ptlrpc_req_put(req);
1181 it->it_lock_handle = 0;
1182 it->it_lock_mode = 0;
1183 it->it_request = NULL;
1189 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1190 const union ldlm_policy_data *policy,
1191 struct md_op_data *op_data,
1192 struct lustre_handle *lockh, __u64 extra_lock_flags)
1194 return mdc_enqueue_base(exp, einfo, policy, NULL,
1195 op_data, lockh, extra_lock_flags);
1198 static int mdc_enqueue_async_interpret(const struct lu_env *env,
1199 struct ptlrpc_request *req,
1202 struct mdc_enqueue_args *mea = args;
1203 struct obd_export *exp = mea->mea_exp;
1204 struct ldlm_lock *lock = mea->mea_lock;
1205 struct lustre_handle lockh;
1206 struct ldlm_enqueue_info einfo = {
1207 .ei_type = LDLM_FLOCK,
1208 .ei_mode = mea->mea_mode,
1212 CDEBUG(D_INFO, "req=%p rc=%d\n", req, rc);
1214 ldlm_lock2handle(lock, &lockh);
1215 rc = ldlm_cli_enqueue_fini(exp, &req->rq_pill, &einfo, 1,
1216 &mea->mea_flags, NULL, 0, &lockh, rc, true);
1218 ldlm_lock_put(lock);
1220 /* we expect failed_lock_cleanup() to destroy lock */
1222 LASSERT(list_empty(&lock->l_res_link));
1224 if (mea->mea_upcall != NULL)
1225 mea->mea_upcall(lock, rc);
1227 ldlm_lock_put(lock);
1232 int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1233 obd_enqueue_update_f upcall, struct md_op_data *op_data,
1234 const union ldlm_policy_data *policy, __u64 flags)
1236 struct mdc_enqueue_args *mea;
1237 struct ptlrpc_request *req;
1239 struct ldlm_res_id res_id;
1240 struct lustre_handle lockh;
1243 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1245 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1247 res_id.name[3] = LDLM_FLOCK;
1249 req = ldlm_enqueue_pack(exp, 0);
1251 RETURN(PTR_ERR(req));
1253 einfo->ei_req_slot = 1;
1254 einfo->ei_mod_slot = 1;
1256 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1259 ptlrpc_req_put(req);
1263 mea = ptlrpc_req_async_args(mea, req);
1265 mea->mea_lock = ldlm_handle2lock(&lockh);
1266 LASSERT(mea->mea_lock != NULL);
1268 mea->mea_mode = einfo->ei_mode;
1269 mea->mea_flags = flags;
1270 mea->mea_upcall = upcall;
1272 req->rq_interpret_reply = mdc_enqueue_async_interpret;
1273 ptlrpcd_add_req(req);
1278 static int mdc_finish_intent_lock(struct obd_export *exp,
1279 struct ptlrpc_request *request,
1280 struct md_op_data *op_data,
1281 struct lookup_intent *it,
1282 struct lustre_handle *lockh)
1284 struct lustre_handle old_lock;
1285 struct ldlm_lock *lock;
1289 LASSERT(request != NULL);
1290 LASSERT(request != LP_POISON);
1291 LASSERT(request->rq_repmsg != LP_POISON);
1293 if (it->it_op & IT_READDIR)
1296 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1297 if (it->it_status != 0)
1298 GOTO(out, rc = it->it_status);
1300 if (!it_disposition(it, DISP_IT_EXECD)) {
1301 /* The server failed before it even started executing
1302 * the intent, i.e. because it couldn't unpack the
1305 LASSERT(it->it_status != 0);
1306 GOTO(out, rc = it->it_status);
1308 rc = it_open_error(DISP_IT_EXECD, it);
1312 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1316 /* keep requests around for the multiple phases of the call
1317 * this shows the DISP_XX must guarantee we make it into the
1320 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1321 it_disposition(it, DISP_OPEN_CREATE) &&
1322 !it_open_error(DISP_OPEN_CREATE, it)) {
1323 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1324 /* balanced in ll_create_node */
1325 ptlrpc_request_addref(request);
1327 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1328 it_disposition(it, DISP_OPEN_OPEN) &&
1329 !it_open_error(DISP_OPEN_OPEN, it)) {
1330 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1331 /* balanced in ll_file_open */
1332 ptlrpc_request_addref(request);
1333 /* eviction in middle of open RPC processing b=11546 */
1334 CFS_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1338 if (it->it_op & IT_CREAT) {
1339 /* XXX this belongs in ll_create_it */
1340 } else if (it->it_op == IT_OPEN) {
1341 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1343 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1347 /* If we already have a matching lock, then cancel the new
1348 * one. We have to set the data here instead of in
1349 * mdc_enqueue, because we need to use the child's inode as
1350 * the l_ast_data to match, and that's not available until
1351 * intent_finish has performed the iget().
1353 lock = ldlm_handle2lock(lockh);
1355 union ldlm_policy_data policy = lock->l_policy_data;
1357 LDLM_DEBUG(lock, "matching against this");
1359 if (it_has_reply_body(it)) {
1360 struct mdt_body *body;
1362 body = req_capsule_server_get(&request->rq_pill,
1364 /* mdc_enqueue checked */
1365 LASSERT(body != NULL);
1366 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1367 &lock->l_resource->lr_name),
1368 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1369 PLDLMRES(lock->l_resource),
1370 PFID(&body->mbo_fid1));
1372 ldlm_lock_put(lock);
1374 memcpy(&old_lock, lockh, sizeof(*lockh));
1375 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1376 LDLM_IBITS, &policy, LCK_NL, 0, &old_lock)) {
1377 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1378 memcpy(lockh, &old_lock, sizeof(old_lock));
1379 it->it_lock_handle = lockh->cookie;
1386 "D_IT dentry="DNAME" intent=%s status=%d disp=%x: rc = %d\n",
1387 encode_fn_opdata(op_data), ldlm_it2str(it->it_op),
1388 it->it_status, it->it_disposition, rc);
1393 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1394 struct lu_fid *fid, enum mds_ibits_locks *bits)
1396 /* We could just return 1 immediately, but as we should only be called
1397 * in revalidate_it if we already have a lock, let's verify that.
1399 struct ldlm_res_id res_id;
1400 struct lustre_handle lockh;
1401 union ldlm_policy_data policy;
1402 enum ldlm_mode mode;
1405 if (it->it_lock_handle) {
1406 lockh.cookie = it->it_lock_handle;
1407 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1409 fid_build_reg_res_name(fid, &res_id);
1410 switch (it->it_op) {
1412 /* File attributes are held under multiple bits:
1413 * nlink is under lookup lock, size and times are
1414 * under UPDATE lock and recently we've also got
1415 * a separate permissions lock for owner/group/acl that
1416 * were protected by lookup lock before.
1417 * Getattr must provide all of that information,
1418 * so we need to ensure we have all of those locks.
1419 * Unfortunately, if the bits are split across multiple
1420 * locks, there's no easy way to match all of them here,
1421 * so an extra RPC would be performed to fetch all
1422 * of those bits at once for now.
1424 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1425 * but for old MDTs (< 2.4), permission is covered
1426 * by LOOKUP lock, so it needs to match all bits here.
1428 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1432 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1435 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1438 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1442 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1443 LDLM_IBITS, &policy,
1444 LCK_CR | LCK_CW | LCK_PR | LCK_PW, 0,
1449 it->it_lock_handle = lockh.cookie;
1450 it->it_lock_mode = mode;
1452 it->it_lock_handle = 0;
1453 it->it_lock_mode = 0;
1460 * This long block is all about fixing up the lock and request state
1461 * so that it is correct as of the moment _before_ the operation was
1462 * applied; that way, the VFS will think that everything is normal and
1463 * call Lustre's regular VFS methods.
1465 * If we're performing a creation, that means that unless the creation
1466 * failed with EEXIST, we should fake up a negative dentry.
1468 * For everything else, we want the lookup to succeed.
1470 * One additional note: if CREATE or OPEN succeeded, we add an extra
1471 * reference to the request because we need to keep it around until
1472 * ll_create/ll_open gets called.
1474 * The server will return to us, in it_disposition, an indication of
1475 * exactly what it_status refers to.
1477 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1478 * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1479 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1480 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1483 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1486 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1487 struct lookup_intent *it, struct ptlrpc_request **reqp,
1488 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1490 struct ldlm_enqueue_info einfo = {
1491 .ei_type = LDLM_IBITS,
1492 .ei_mode = it_to_lock_mode(it),
1493 .ei_cb_bl = cb_blocking,
1494 .ei_cb_cp = ldlm_completion_ast,
1495 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1497 struct lustre_handle lockh;
1503 "(name: "DNAME","DFID") in obj "DFID", intent: %s flags %#lo\n",
1504 encode_fn_opdata(op_data), PFID(&op_data->op_fid2),
1505 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1509 /* MDS_FID_OP is not a revalidate case */
1510 if (fid_is_sane(&op_data->op_fid2) &&
1511 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR)) &&
1512 !(op_data->op_bias & MDS_FID_OP)) {
1513 /* We could just return 1 immediately, but since we should only
1514 * be called in revalidate_it if we already have a lock, let's
1517 it->it_lock_handle = 0;
1518 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1519 /* Only return failure if it was not GETATTR by cfid
1520 * (from inode_revalidate()).
1522 if (rc || op_data->op_namelen != 0)
1526 /* For case if upper layer did not alloc fid, do it now. */
1527 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1528 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1530 CERROR("%s: cannot allocate new FID: rc=%d\n",
1531 exp->exp_obd->obd_name, rc);
1536 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1541 *reqp = it->it_request;
1542 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1546 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1547 struct ptlrpc_request *req,
1550 struct mdc_getattr_args *ga = args;
1551 struct obd_export *exp = ga->ga_exp;
1552 struct md_op_item *item = ga->ga_item;
1553 struct ldlm_enqueue_info *einfo = &item->mop_einfo;
1554 struct lookup_intent *it = &item->mop_it;
1555 struct lustre_handle *lockh = &item->mop_lockh;
1556 struct req_capsule *pill = &req->rq_pill;
1557 struct ldlm_reply *lockrep;
1558 __u64 flags = LDLM_FL_HAS_INTENT;
1561 if (CFS_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1564 rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
1567 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1568 exp->exp_obd->obd_name, rc);
1569 mdc_clear_replay_flag(req, rc);
1573 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
1574 LASSERT(lockrep != NULL);
1576 lockrep->lock_policy_res2 =
1577 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1579 rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
1583 rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
1587 item->mop_pill = pill;
1588 item->mop_cb(item, rc);
1592 int mdc_intent_getattr_async(struct obd_export *exp,
1593 struct md_op_item *item)
1595 struct md_op_data *op_data = &item->mop_data;
1596 struct lookup_intent *it = &item->mop_it;
1597 struct ptlrpc_request *req;
1598 struct mdc_getattr_args *ga;
1599 struct ldlm_res_id res_id;
1600 union ldlm_policy_data policy = {
1601 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1603 __u64 flags = LDLM_FL_HAS_INTENT;
1608 "name: "DNAME" in inode "DFID", intent: %s flags %#lo\n",
1609 encode_fn_opdata(op_data), PFID(&op_data->op_fid1),
1610 ldlm_it2str(it->it_op), it->it_open_flags);
1612 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1613 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1614 * of the async getattr RPC will handle that by itself.
1616 req = mdc_intent_getattr_pack(exp, it, op_data,
1617 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1619 RETURN(PTR_ERR(req));
1621 /* With Data-on-MDT the glimpse callback is needed too.
1622 * It is set here in advance but not in mdc_finish_enqueue()
1623 * to avoid possible races. It is safe to have glimpse handler
1624 * for non-DOM locks and costs nothing.
1626 if (item->mop_einfo.ei_cb_gl == NULL)
1627 item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1629 rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
1630 &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
1632 ptlrpc_req_put(req);
1636 ga = ptlrpc_req_async_args(ga, req);
1640 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1641 ptlrpcd_add_req(req);