4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2010, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_handler.c
33 * Lustre Metadata Target (mdt) request handler
35 * Author: Peter Braam <braam@clusterfs.com>
36 * Author: Andreas Dilger <adilger@clusterfs.com>
37 * Author: Phil Schwan <phil@clusterfs.com>
38 * Author: Mike Shaver <shaver@clusterfs.com>
39 * Author: Nikita Danilov <nikita@clusterfs.com>
40 * Author: Huang Hua <huanghua@clusterfs.com>
41 * Author: Yury Umanets <umka@clusterfs.com>
44 #define DEBUG_SUBSYSTEM S_MDS
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
67 #include "mdt_internal.h"
69 static unsigned int max_mod_rpcs_per_client = 8;
70 module_param(max_mod_rpcs_per_client, uint, 0644);
71 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73 mdl_mode_t mdt_mdl_lock_modes[] = {
74 [LCK_MINMODE] = MDL_MINMODE,
81 [LCK_GROUP] = MDL_GROUP
84 enum ldlm_mode mdt_dlm_lock_modes[] = {
85 [MDL_MINMODE] = LCK_MINMODE,
92 [MDL_GROUP] = LCK_GROUP
95 static struct mdt_device *mdt_dev(struct lu_device *d);
97 static const struct lu_object_operations mdt_obj_ops;
99 /* Slab for MDT object allocation */
100 static struct kmem_cache *mdt_object_kmem;
102 /* For HSM restore handles */
103 struct kmem_cache *mdt_hsm_cdt_kmem;
105 /* For HSM request handles */
106 struct kmem_cache *mdt_hsm_car_kmem;
108 static struct lu_kmem_descr mdt_caches[] = {
110 .ckd_cache = &mdt_object_kmem,
111 .ckd_name = "mdt_obj",
112 .ckd_size = sizeof(struct mdt_object)
115 .ckd_cache = &mdt_hsm_cdt_kmem,
116 .ckd_name = "mdt_cdt_restore_handle",
117 .ckd_size = sizeof(struct cdt_restore_handle)
120 .ckd_cache = &mdt_hsm_car_kmem,
121 .ckd_name = "mdt_cdt_agent_req",
122 .ckd_size = sizeof(struct cdt_agent_req)
129 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
133 return rep->lock_policy_res1 & op_flag;
136 void mdt_clear_disposition(struct mdt_thread_info *info,
137 struct ldlm_reply *rep, __u64 op_flag)
140 info->mti_opdata &= ~op_flag;
141 tgt_opdata_clear(info->mti_env, op_flag);
144 rep->lock_policy_res1 &= ~op_flag;
147 void mdt_set_disposition(struct mdt_thread_info *info,
148 struct ldlm_reply *rep, __u64 op_flag)
151 info->mti_opdata |= op_flag;
152 tgt_opdata_set(info->mti_env, op_flag);
155 rep->lock_policy_res1 |= op_flag;
158 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 lh->mlh_pdo_hash = 0;
161 lh->mlh_reg_mode = lm;
162 lh->mlh_rreg_mode = lm;
163 lh->mlh_type = MDT_REG_LOCK;
166 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
168 mdt_lock_reg_init(lh, lock->l_req_mode);
169 if (lock->l_req_mode == LCK_GROUP)
170 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
173 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
174 const struct lu_name *lname)
176 lh->mlh_reg_mode = lock_mode;
177 lh->mlh_pdo_mode = LCK_MINMODE;
178 lh->mlh_rreg_mode = lock_mode;
179 lh->mlh_type = MDT_PDO_LOCK;
181 if (lu_name_is_valid(lname)) {
182 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
184 /* XXX Workaround for LU-2856
186 * Zero is a valid return value of full_name_hash, but
187 * several users of mlh_pdo_hash assume a non-zero
188 * hash value. We therefore map zero onto an
189 * arbitrary, but consistent value (1) to avoid
190 * problems further down the road. */
191 if (unlikely(lh->mlh_pdo_hash == 0))
192 lh->mlh_pdo_hash = 1;
194 lh->mlh_pdo_hash = 0;
198 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
199 struct mdt_lock_handle *lh)
205 * Any dir access needs couple of locks:
207 * 1) on part of dir we gonna take lookup/modify;
209 * 2) on whole dir to protect it from concurrent splitting and/or to
210 * flush client's cache for readdir().
212 * so, for a given mode and object this routine decides what lock mode
213 * to use for lock #2:
215 * 1) if caller's gonna lookup in dir then we need to protect dir from
216 * being splitted only - LCK_CR
218 * 2) if caller's gonna modify dir then we need to protect dir from
219 * being splitted and to flush cache - LCK_CW
221 * 3) if caller's gonna modify dir and that dir seems ready for
222 * splitting then we need to protect it from any type of access
223 * (lookup/modify/split) - LCK_EX --bzzz
226 LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
227 LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
230 * Ask underlaying level its opinion about preferable PDO lock mode
231 * having access type passed as regular lock mode:
233 * - MDL_MINMODE means that lower layer does not want to specify lock
236 * - MDL_NL means that no PDO lock should be taken. This is used in some
237 * cases. Say, for non-splittable directories no need to use PDO locks
240 mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
241 mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
243 if (mode != MDL_MINMODE) {
244 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
247 * Lower layer does not want to specify locking mode. We do it
248 * our selves. No special protection is needed, just flush
249 * client's cache on modification and allow concurrent
252 switch (lh->mlh_reg_mode) {
254 lh->mlh_pdo_mode = LCK_EX;
257 lh->mlh_pdo_mode = LCK_CR;
260 lh->mlh_pdo_mode = LCK_CW;
263 CERROR("Not expected lock type (0x%x)\n",
264 (int)lh->mlh_reg_mode);
269 LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
274 * Check whether \a o is directory stripe object.
276 * \param[in] info thread environment
277 * \param[in] o MDT object
279 * \retval 1 is directory stripe.
280 * \retval 0 isn't directory stripe.
281 * \retval < 1 error code
283 static int mdt_is_dir_stripe(struct mdt_thread_info *info,
284 struct mdt_object *o)
286 struct md_attr *ma = &info->mti_attr;
287 struct lmv_mds_md_v1 *lmv;
290 rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
294 if (!(ma->ma_valid & MA_LMV))
297 lmv = &ma->ma_lmv->lmv_md_v1;
299 if (!lmv_is_sane2(lmv))
302 if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
308 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
311 struct mdt_device *mdt = info->mti_mdt;
312 struct lu_name *lname = &info->mti_name;
313 const char *start = fileset;
314 char *filename = info->mti_filename;
315 struct mdt_object *parent;
319 LASSERT(!info->mti_cross_ref);
322 * We may want to allow this to mount a completely separate
323 * fileset from the MDT in the future, but keeping it to
324 * ROOT/ only for now avoid potential security issues.
326 *fid = mdt->mdt_md_root_fid;
328 while (rc == 0 && start != NULL && *start != '\0') {
329 const char *s1 = start;
335 while (*s2 != '/' && *s2 != '\0')
343 lname->ln_namelen = s2 - s1;
344 if (lname->ln_namelen > NAME_MAX) {
349 /* reject .. as a path component */
350 if (lname->ln_namelen == 2 &&
351 strncmp(s1, "..", 2) == 0) {
356 strncpy(filename, s1, lname->ln_namelen);
357 filename[lname->ln_namelen] = '\0';
358 lname->ln_name = filename;
360 parent = mdt_object_find(info->mti_env, mdt, fid);
361 if (IS_ERR(parent)) {
362 rc = PTR_ERR(parent);
365 /* Only got the fid of this obj by name */
367 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
368 fid, &info->mti_spec);
369 mdt_object_put(info->mti_env, parent);
372 parent = mdt_object_find(info->mti_env, mdt, fid);
374 rc = PTR_ERR(parent);
376 mode = lu_object_attr(&parent->mot_obj);
377 if (!S_ISDIR(mode)) {
379 } else if (mdt_is_remote_object(info, parent, parent)) {
380 if (!mdt->mdt_enable_remote_subdir_mount) {
382 LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
386 LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
391 mdt_object_put(info->mti_env, parent);
398 static int mdt_get_root(struct tgt_session_info *tsi)
400 struct mdt_thread_info *info = tsi2mdt_info(tsi);
401 struct mdt_device *mdt = info->mti_mdt;
402 struct mdt_body *repbody;
403 char *fileset = NULL, *buffer = NULL;
405 struct obd_export *exp = info->mti_exp;
406 char *nodemap_fileset;
410 rc = mdt_check_ucred(info);
412 GOTO(out, rc = err_serious(rc));
414 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
415 GOTO(out, rc = err_serious(-ENOMEM));
417 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
418 if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
419 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
421 GOTO(out, rc = err_serious(-EFAULT));
424 nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
425 if (nodemap_fileset && nodemap_fileset[0]) {
426 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
428 /* consider fileset from client as a sub-fileset
429 * of the nodemap one */
430 OBD_ALLOC(buffer, PATH_MAX + 1);
432 GOTO(out, rc = err_serious(-ENOMEM));
433 if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
434 nodemap_fileset, fileset) >= PATH_MAX + 1)
435 GOTO(out, rc = err_serious(-EINVAL));
438 /* enforce fileset as specified in the nodemap */
439 fileset = nodemap_fileset;
444 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
445 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
447 GOTO(out, rc = err_serious(rc));
449 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
451 repbody->mbo_valid |= OBD_MD_FLID;
455 mdt_thread_info_fini(info);
457 OBD_FREE(buffer, PATH_MAX+1);
461 static int mdt_statfs(struct tgt_session_info *tsi)
463 struct ptlrpc_request *req = tgt_ses_req(tsi);
464 struct mdt_thread_info *info = tsi2mdt_info(tsi);
465 struct mdt_device *mdt = info->mti_mdt;
466 struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
467 struct md_device *next = mdt->mdt_child;
468 struct ptlrpc_service_part *svcpt;
469 struct obd_statfs *osfs;
470 struct mdt_body *reqbody = NULL;
471 struct mdt_statfs_cache *msf;
472 ktime_t kstart = ktime_get();
473 int current_blockbits;
478 svcpt = req->rq_rqbd->rqbd_svcpt;
480 /* This will trigger a watchdog timeout */
481 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
482 (MDT_SERVICE_WATCHDOG_FACTOR *
483 at_get(&svcpt->scp_at_estimate)) + 1);
485 rc = mdt_check_ucred(info);
487 GOTO(out, rc = err_serious(rc));
489 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
490 GOTO(out, rc = err_serious(-ENOMEM));
492 osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
494 GOTO(out, rc = -EPROTO);
496 if (mdt_is_sum_statfs_client(req->rq_export) &&
497 lustre_packed_msg_size(req->rq_reqmsg) ==
498 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
499 &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
500 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
501 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
504 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
505 msf = &mdt->mdt_sum_osfs;
507 msf = &mdt->mdt_osfs;
509 if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
510 /** statfs data is too old, get up-to-date one */
511 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
512 rc = next->md_ops->mdo_statfs(info->mti_env,
515 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
519 spin_lock(&mdt->mdt_lock);
520 msf->msf_osfs = *osfs;
521 msf->msf_age = ktime_get_seconds();
522 spin_unlock(&mdt->mdt_lock);
524 /** use cached statfs data */
525 spin_lock(&mdt->mdt_lock);
526 *osfs = msf->msf_osfs;
527 spin_unlock(&mdt->mdt_lock);
530 /* tgd_blockbit is recordsize bits set during mkfs.
531 * This once set does not change. However, 'zfs set'
532 * can be used to change the MDT blocksize. Instead
533 * of using cached value of 'tgd_blockbit' always
534 * calculate the blocksize bits which may have
537 current_blockbits = fls64(osfs->os_bsize) - 1;
539 /* at least try to account for cached pages. its still racy and
540 * might be under-reporting if clients haven't announced their
541 * caches with brw recently */
542 CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
543 " pending %llu free %llu avail %llu\n",
544 tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
545 tgd->tgd_tot_pending,
546 osfs->os_bfree << current_blockbits,
547 osfs->os_bavail << current_blockbits);
549 osfs->os_bavail -= min_t(u64, osfs->os_bavail,
550 ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
551 osfs->os_bsize - 1) >> current_blockbits));
553 tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
554 CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
555 "%llu objects: %llu free; state %x\n",
556 osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
557 osfs->os_files, osfs->os_ffree, osfs->os_state);
559 if (!exp_grant_param_supp(tsi->tsi_exp) &&
560 current_blockbits > COMPAT_BSIZE_SHIFT) {
561 /* clients which don't support OBD_CONNECT_GRANT_PARAM
562 * should not see a block size > page size, otherwise
563 * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
564 * block size which is the biggest block size known to work
565 * with all client's page size. */
566 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
567 osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT;
568 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
569 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
572 mdt_counter_incr(req, LPROC_MDT_STATFS,
573 ktime_us_delta(ktime_get(), kstart));
575 mdt_thread_info_fini(info);
579 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
581 struct lov_comp_md_v1 *comp_v1;
582 struct lov_mds_md *v1;
584 __u32 dom_stripesize = 0;
586 bool has_ost_stripes = false;
593 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
596 comp_v1 = (struct lov_comp_md_v1 *)lmm;
597 off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
598 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
600 /* Fast check for DoM entry with no mirroring, should be the first */
601 if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
602 lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
605 /* check all entries otherwise */
606 for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
607 struct lov_comp_md_entry_v1 *lcme;
609 lcme = &comp_v1->lcm_entries[i];
610 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
613 off = le32_to_cpu(lcme->lcme_offset);
614 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
616 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
618 dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
620 has_ost_stripes = true;
622 if (dom_stripesize && has_ost_stripes)
623 RETURN(dom_stripesize);
625 /* DoM-only case exits here */
626 if (is_dom_only && dom_stripesize)
628 RETURN(dom_stripesize);
632 * Pack size attributes into the reply.
634 int mdt_pack_size2body(struct mdt_thread_info *info,
635 const struct lu_fid *fid, struct lustre_handle *lh)
638 struct md_attr *ma = &info->mti_attr;
640 bool dom_lock = false;
644 LASSERT(ma->ma_attr.la_valid & LA_MODE);
646 if (!S_ISREG(ma->ma_attr.la_mode) ||
647 !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
650 dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
651 /* no DoM stripe, no size in reply */
655 if (lustre_handle_is_used(lh)) {
656 struct ldlm_lock *lock;
658 lock = ldlm_handle2lock(lh);
660 dom_lock = ldlm_has_dom(lock);
665 /* no DoM lock, no size in reply */
669 /* Either DoM lock exists or LMM has only DoM stripe then
670 * return size on body. */
671 b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
673 mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
677 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
679 * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
681 * \param info thread info object
682 * \param repbody reply to pack ACLs into
683 * \param o mdt object of file to examine
684 * \param nodemap nodemap of client to reply to
686 * \retval -errno error getting or parsing ACL from disk
688 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
689 struct mdt_object *o, struct lu_nodemap *nodemap)
691 const struct lu_env *env = info->mti_env;
692 struct md_object *next = mdt_object_child(o);
693 struct lu_buf *buf = &info->mti_buf;
694 struct mdt_device *mdt = info->mti_mdt;
695 struct req_capsule *pill = info->mti_pill;
700 buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
701 buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
702 if (buf->lb_len == 0)
705 LASSERT(!info->mti_big_acl_used);
707 rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
709 if (rc == -ENODATA) {
710 repbody->mbo_aclsize = 0;
711 repbody->mbo_valid |= OBD_MD_FLACL;
713 } else if (rc == -EOPNOTSUPP) {
715 } else if (rc == -ERANGE) {
716 if (exp_connect_large_acl(info->mti_exp) &&
717 !info->mti_big_acl_used) {
718 if (info->mti_big_acl == NULL) {
719 info->mti_big_aclsize =
721 mdt->mdt_max_ea_size,
723 OBD_ALLOC_LARGE(info->mti_big_acl,
724 info->mti_big_aclsize);
725 if (info->mti_big_acl == NULL) {
726 info->mti_big_aclsize = 0;
727 CERROR("%s: unable to grow "
730 PFID(mdt_object_fid(o)));
735 CDEBUG(D_INODE, "%s: grow the "DFID
736 " ACL buffer to size %d\n",
738 PFID(mdt_object_fid(o)),
739 info->mti_big_aclsize);
741 buf->lb_buf = info->mti_big_acl;
742 buf->lb_len = info->mti_big_aclsize;
743 info->mti_big_acl_used = 1;
746 /* FS has ACL bigger that our limits */
747 CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
748 mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
749 info->mti_big_aclsize);
752 CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
753 mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
756 rc = nodemap_map_acl(nodemap, buf->lb_buf,
757 rc, NODEMAP_FS_TO_CLIENT);
758 /* if all ACLs mapped out, rc is still >= 0 */
760 CERROR("%s: nodemap_map_acl unable to parse "DFID
761 " ACL: rc = %d\n", mdt_obd_name(mdt),
762 PFID(mdt_object_fid(o)), rc);
763 repbody->mbo_aclsize = 0;
764 repbody->mbo_valid &= ~OBD_MD_FLACL;
766 repbody->mbo_aclsize = rc;
767 repbody->mbo_valid |= OBD_MD_FLACL;
776 /* XXX Look into layout in MDT layer. */
777 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
779 struct lov_comp_md_v1 *comp_v1;
780 struct lov_mds_md *v1;
783 if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
784 comp_v1 = (struct lov_comp_md_v1 *)lmm;
786 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
787 v1 = (struct lov_mds_md *)((char *)comp_v1 +
788 comp_v1->lcm_entries[i].lcme_offset);
789 /* We don't support partial release for now */
790 if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
795 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
800 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
801 const struct lu_attr *attr, const struct lu_fid *fid)
803 struct md_attr *ma = &info->mti_attr;
804 struct obd_export *exp = info->mti_exp;
805 struct lu_nodemap *nodemap = NULL;
807 LASSERT(ma->ma_valid & MA_INODE);
809 if (attr->la_valid & LA_ATIME) {
810 b->mbo_atime = attr->la_atime;
811 b->mbo_valid |= OBD_MD_FLATIME;
813 if (attr->la_valid & LA_MTIME) {
814 b->mbo_mtime = attr->la_mtime;
815 b->mbo_valid |= OBD_MD_FLMTIME;
817 if (attr->la_valid & LA_CTIME) {
818 b->mbo_ctime = attr->la_ctime;
819 b->mbo_valid |= OBD_MD_FLCTIME;
821 if (attr->la_valid & LA_BTIME) {
822 b->mbo_btime = attr->la_btime;
823 b->mbo_valid |= OBD_MD_FLBTIME;
825 if (attr->la_valid & LA_FLAGS) {
826 b->mbo_flags = attr->la_flags;
827 b->mbo_valid |= OBD_MD_FLFLAGS;
829 if (attr->la_valid & LA_NLINK) {
830 b->mbo_nlink = attr->la_nlink;
831 b->mbo_valid |= OBD_MD_FLNLINK;
833 if (attr->la_valid & (LA_UID|LA_GID)) {
834 nodemap = nodemap_get_from_exp(exp);
838 if (attr->la_valid & LA_UID) {
839 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
840 NODEMAP_FS_TO_CLIENT,
842 b->mbo_valid |= OBD_MD_FLUID;
844 if (attr->la_valid & LA_GID) {
845 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
846 NODEMAP_FS_TO_CLIENT,
848 b->mbo_valid |= OBD_MD_FLGID;
851 if (attr->la_valid & LA_PROJID) {
852 /* TODO, nodemap for project id */
853 b->mbo_projid = attr->la_projid;
854 b->mbo_valid |= OBD_MD_FLPROJID;
857 b->mbo_mode = attr->la_mode;
858 if (attr->la_valid & LA_MODE)
859 b->mbo_valid |= OBD_MD_FLMODE;
860 if (attr->la_valid & LA_TYPE)
861 b->mbo_valid |= OBD_MD_FLTYPE;
865 b->mbo_valid |= OBD_MD_FLID;
866 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
867 PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
870 if (!(attr->la_valid & LA_TYPE))
873 b->mbo_rdev = attr->la_rdev;
874 b->mbo_size = attr->la_size;
875 b->mbo_blocks = attr->la_blocks;
877 if (!S_ISREG(attr->la_mode)) {
878 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
879 } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
880 /* means no objects are allocated on osts. */
881 LASSERT(!(ma->ma_valid & MA_LOV));
882 /* just ignore blocks occupied by extend attributes on MDS */
884 /* if no object is allocated on osts, the size on mds is valid.
886 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
887 } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
888 if (mdt_hsm_is_released(ma->ma_lmm)) {
889 /* A released file stores its size on MDS. */
890 /* But return 1 block for released file, unless tools
891 * like tar will consider it fully sparse. (LU-3864)
893 if (unlikely(b->mbo_size == 0))
897 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
898 } else if (info->mti_som_valid) { /* som is valid */
899 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
900 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
901 b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
902 b->mbo_size = ma->ma_som.ms_size;
903 b->mbo_blocks = ma->ma_som.ms_blocks;
907 if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
908 b->mbo_valid & OBD_MD_FLLAZYSIZE))
909 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
910 PFID(fid), (unsigned long long)b->mbo_size);
913 if (!IS_ERR_OR_NULL(nodemap))
914 nodemap_putref(nodemap);
917 static inline int mdt_body_has_lov(const struct lu_attr *la,
918 const struct mdt_body *body)
920 return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
921 (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
924 void mdt_client_compatibility(struct mdt_thread_info *info)
926 struct mdt_body *body;
927 struct ptlrpc_request *req = mdt_info_req(info);
928 struct obd_export *exp = req->rq_export;
929 struct md_attr *ma = &info->mti_attr;
930 struct lu_attr *la = &ma->ma_attr;
933 if (exp_connect_layout(exp))
934 /* the client can deal with 16-bit lmm_stripe_count */
937 body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
939 if (!mdt_body_has_lov(la, body))
942 /* now we have a reply with a lov for a client not compatible with the
943 * layout lock so we have to clean the layout generation number */
944 if (S_ISREG(la->la_mode))
945 ma->ma_lmm->lmm_layout_gen = 0;
949 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
950 struct mdt_object *o)
952 const struct lu_env *env = info->mti_env;
955 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
964 /* Is it a directory? Let's check for the LMV as well */
965 if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
966 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
970 rc2 = mo_xattr_get(env, mdt_object_child(o),
972 XATTR_NAME_DEFAULT_LMV);
974 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
982 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
985 const struct lu_env *env = info->mti_env;
989 LASSERT(info->mti_big_lmm_used == 0);
990 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
994 /* big_lmm may need to be grown */
995 if (info->mti_big_lmmsize < rc) {
996 int size = size_roundup_power2(rc);
998 if (info->mti_big_lmmsize > 0) {
999 /* free old buffer */
1000 LASSERT(info->mti_big_lmm);
1001 OBD_FREE_LARGE(info->mti_big_lmm,
1002 info->mti_big_lmmsize);
1003 info->mti_big_lmm = NULL;
1004 info->mti_big_lmmsize = 0;
1007 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
1008 if (info->mti_big_lmm == NULL)
1010 info->mti_big_lmmsize = size;
1012 LASSERT(info->mti_big_lmmsize >= rc);
1014 info->mti_buf.lb_buf = info->mti_big_lmm;
1015 info->mti_buf.lb_len = info->mti_big_lmmsize;
1016 rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
1021 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1022 struct md_attr *ma, const char *name)
1024 struct md_object *next = mdt_object_child(o);
1025 struct lu_buf *buf = &info->mti_buf;
1028 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1029 buf->lb_buf = ma->ma_lmm;
1030 buf->lb_len = ma->ma_lmm_size;
1031 LASSERT(!(ma->ma_valid & MA_LOV));
1032 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1033 buf->lb_buf = ma->ma_lmv;
1034 buf->lb_len = ma->ma_lmv_size;
1035 LASSERT(!(ma->ma_valid & MA_LMV));
1036 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1037 buf->lb_buf = ma->ma_default_lmv;
1038 buf->lb_len = ma->ma_default_lmv_size;
1039 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1044 LASSERT(buf->lb_buf);
1046 rc = mo_xattr_get(info->mti_env, next, buf, name);
1050 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1051 if (info->mti_big_lmm_used)
1052 ma->ma_lmm = info->mti_big_lmm;
1054 /* NOT return LOV EA with hole to old client. */
1055 if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1056 LOV_PATTERN_F_HOLE) &&
1057 !(exp_connect_flags(info->mti_exp) &
1058 OBD_CONNECT_LFSCK)) {
1061 ma->ma_lmm_size = rc;
1062 ma->ma_valid |= MA_LOV;
1064 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1065 if (info->mti_big_lmm_used)
1066 ma->ma_lmv = info->mti_big_lmm;
1068 ma->ma_lmv_size = rc;
1069 ma->ma_valid |= MA_LMV;
1070 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1071 ma->ma_default_lmv_size = rc;
1072 ma->ma_valid |= MA_LMV_DEF;
1075 /* Update mdt_max_mdsize so all clients will be aware that */
1076 if (info->mti_mdt->mdt_max_mdsize < rc)
1077 info->mti_mdt->mdt_max_mdsize = rc;
1080 } else if (rc == -ENODATA) {
1083 } else if (rc == -ERANGE) {
1084 /* Default LMV has fixed size, so it must be able to fit
1085 * in the original buffer */
1086 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1088 rc = mdt_big_xattr_get(info, o, name);
1090 info->mti_big_lmm_used = 1;
1098 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1099 struct md_attr *ma, const char *name)
1103 if (!info->mti_big_lmm) {
1104 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1105 if (!info->mti_big_lmm)
1107 info->mti_big_lmmsize = PAGE_SIZE;
1110 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1111 ma->ma_lmm = info->mti_big_lmm;
1112 ma->ma_lmm_size = info->mti_big_lmmsize;
1113 ma->ma_valid &= ~MA_LOV;
1114 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1115 ma->ma_lmv = info->mti_big_lmm;
1116 ma->ma_lmv_size = info->mti_big_lmmsize;
1117 ma->ma_valid &= ~MA_LMV;
1122 LASSERT(!info->mti_big_lmm_used);
1123 rc = __mdt_stripe_get(info, o, ma, name);
1124 /* since big_lmm is always used here, clear 'used' flag to avoid
1125 * assertion in mdt_big_xattr_get().
1127 info->mti_big_lmm_used = 0;
1132 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1133 struct lu_fid *pfid)
1135 struct lu_buf *buf = &info->mti_buf;
1136 struct link_ea_header *leh;
1137 struct link_ea_entry *lee;
1141 buf->lb_buf = info->mti_big_lmm;
1142 buf->lb_len = info->mti_big_lmmsize;
1143 rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1144 buf, XATTR_NAME_LINK);
1145 /* ignore errors, MA_PFID won't be set and it is
1146 * up to the caller to treat this as an error */
1147 if (rc == -ERANGE || buf->lb_len == 0) {
1148 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1149 buf->lb_buf = info->mti_big_lmm;
1150 buf->lb_len = info->mti_big_lmmsize;
1155 if (rc < sizeof(*leh)) {
1156 CERROR("short LinkEA on "DFID": rc = %d\n",
1157 PFID(mdt_object_fid(o)), rc);
1161 leh = (struct link_ea_header *) buf->lb_buf;
1162 lee = (struct link_ea_entry *)(leh + 1);
1163 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1164 leh->leh_magic = LINK_EA_MAGIC;
1165 leh->leh_reccount = __swab32(leh->leh_reccount);
1166 leh->leh_len = __swab64(leh->leh_len);
1168 if (leh->leh_magic != LINK_EA_MAGIC)
1170 if (leh->leh_reccount == 0)
1173 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1174 fid_be_to_cpu(pfid, pfid);
1179 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1180 struct lu_fid *pfid, struct lu_name *lname)
1182 struct lu_buf *buf = &info->mti_buf;
1183 struct link_ea_header *leh;
1184 struct link_ea_entry *lee;
1188 buf->lb_buf = info->mti_xattr_buf;
1189 buf->lb_len = sizeof(info->mti_xattr_buf);
1190 rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1192 if (rc == -ERANGE) {
1193 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1194 buf->lb_buf = info->mti_big_lmm;
1195 buf->lb_len = info->mti_big_lmmsize;
1200 if (rc < sizeof(*leh)) {
1201 CERROR("short LinkEA on "DFID": rc = %d\n",
1202 PFID(mdt_object_fid(o)), rc);
1206 leh = (struct link_ea_header *)buf->lb_buf;
1207 lee = (struct link_ea_entry *)(leh + 1);
1208 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1209 leh->leh_magic = LINK_EA_MAGIC;
1210 leh->leh_reccount = __swab32(leh->leh_reccount);
1211 leh->leh_len = __swab64(leh->leh_len);
1213 if (leh->leh_magic != LINK_EA_MAGIC)
1216 if (leh->leh_reccount == 0)
1219 linkea_entry_unpack(lee, &reclen, lname, pfid);
1224 int mdt_attr_get_complex(struct mdt_thread_info *info,
1225 struct mdt_object *o, struct md_attr *ma)
1227 const struct lu_env *env = info->mti_env;
1228 struct md_object *next = mdt_object_child(o);
1229 struct lu_buf *buf = &info->mti_buf;
1230 int need = ma->ma_need;
1237 if (mdt_object_exists(o) == 0)
1238 GOTO(out, rc = -ENOENT);
1239 mode = lu_object_attr(&next->mo_lu);
1241 if (need & MA_INODE) {
1242 ma->ma_need = MA_INODE;
1243 rc = mo_attr_get(env, next, ma);
1248 (void) mdt_get_som(info, o, ma);
1249 ma->ma_valid |= MA_INODE;
1252 if (need & MA_PFID) {
1253 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1255 ma->ma_valid |= MA_PFID;
1256 /* ignore this error, parent fid is not mandatory */
1260 if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1261 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1266 if (need & MA_LMV && S_ISDIR(mode)) {
1267 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1272 if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1273 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1279 * In the handle of MA_INODE, we may already get the SOM attr.
1281 if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1282 rc = mdt_get_som(info, o, ma);
1287 if (need & MA_HSM && S_ISREG(mode)) {
1288 buf->lb_buf = info->mti_xattr_buf;
1289 buf->lb_len = sizeof(info->mti_xattr_buf);
1290 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1291 sizeof(info->mti_xattr_buf));
1292 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1293 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1295 ma->ma_valid |= MA_HSM;
1296 else if (rc2 < 0 && rc2 != -ENODATA)
1297 GOTO(out, rc = rc2);
1300 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1301 if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1302 buf->lb_buf = ma->ma_acl;
1303 buf->lb_len = ma->ma_acl_size;
1304 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1306 ma->ma_acl_size = rc2;
1307 ma->ma_valid |= MA_ACL_DEF;
1308 } else if (rc2 == -ENODATA) {
1310 ma->ma_acl_size = 0;
1312 GOTO(out, rc = rc2);
1317 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1318 rc, ma->ma_valid, ma->ma_lmm);
1322 static int mdt_getattr_internal(struct mdt_thread_info *info,
1323 struct mdt_object *o, int ma_need)
1325 struct mdt_device *mdt = info->mti_mdt;
1326 struct md_object *next = mdt_object_child(o);
1327 const struct mdt_body *reqbody = info->mti_body;
1328 struct ptlrpc_request *req = mdt_info_req(info);
1329 struct md_attr *ma = &info->mti_attr;
1330 struct lu_attr *la = &ma->ma_attr;
1331 struct req_capsule *pill = info->mti_pill;
1332 const struct lu_env *env = info->mti_env;
1333 struct mdt_body *repbody;
1334 struct lu_buf *buffer = &info->mti_buf;
1335 struct obd_export *exp = info->mti_exp;
1336 ktime_t kstart = ktime_get();
1341 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1342 RETURN(err_serious(-ENOMEM));
1344 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1348 if (mdt_object_remote(o)) {
1349 /* This object is located on remote node.*/
1350 /* Return -ENOTSUPP for old client */
1351 if (!mdt_is_dne_client(req->rq_export))
1352 GOTO(out, rc = -ENOTSUPP);
1354 repbody->mbo_fid1 = *mdt_object_fid(o);
1355 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1359 if (reqbody->mbo_eadatasize > 0) {
1360 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1361 if (buffer->lb_buf == NULL)
1362 GOTO(out, rc = -EPROTO);
1363 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1366 buffer->lb_buf = NULL;
1368 ma_need &= ~(MA_LOV | MA_LMV);
1369 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1370 mdt_obd_name(info->mti_mdt),
1371 req->rq_export->exp_client_uuid.uuid);
1374 /* from 2.12.58 intent_getattr pack default LMV in reply */
1375 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1376 ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1377 (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1378 req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1380 ma->ma_lmv = buffer->lb_buf;
1381 ma->ma_lmv_size = buffer->lb_len;
1382 ma->ma_default_lmv = req_capsule_server_get(pill,
1383 &RMF_DEFAULT_MDT_MD);
1384 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1385 &RMF_DEFAULT_MDT_MD,
1387 ma->ma_need = MA_INODE;
1388 if (ma->ma_lmv_size > 0)
1389 ma->ma_need |= MA_LMV;
1390 if (ma->ma_default_lmv_size > 0)
1391 ma->ma_need |= MA_LMV_DEF;
1392 } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1393 (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1394 /* If it is dir and client require MEA, then we got MEA */
1395 /* Assumption: MDT_MD size is enough for lmv size. */
1396 ma->ma_lmv = buffer->lb_buf;
1397 ma->ma_lmv_size = buffer->lb_len;
1398 ma->ma_need = MA_INODE;
1399 if (ma->ma_lmv_size > 0) {
1400 if (reqbody->mbo_valid & OBD_MD_MEA) {
1401 ma->ma_need |= MA_LMV;
1402 } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1403 ma->ma_need |= MA_LMV_DEF;
1404 ma->ma_default_lmv = buffer->lb_buf;
1406 ma->ma_default_lmv_size = buffer->lb_len;
1407 ma->ma_lmv_size = 0;
1411 ma->ma_lmm = buffer->lb_buf;
1412 ma->ma_lmm_size = buffer->lb_len;
1413 ma->ma_need = MA_INODE | MA_HSM;
1414 if (ma->ma_lmm_size > 0) {
1415 ma->ma_need |= MA_LOV;
1416 /* Older clients may crash if they getattr overstriped
1419 if (!exp_connect_overstriping(exp) &&
1420 mdt_lmm_is_overstriping(ma->ma_lmm))
1421 RETURN(-EOPNOTSUPP);
1425 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1426 reqbody->mbo_valid & OBD_MD_FLDIREA &&
1427 lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1428 /* get default stripe info for this dir. */
1429 ma->ma_need |= MA_LOV_DEF;
1431 ma->ma_need |= ma_need;
1433 rc = mdt_attr_get_complex(info, o, ma);
1435 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1436 "%s: getattr error for "DFID": rc = %d\n",
1437 mdt_obd_name(info->mti_mdt),
1438 PFID(mdt_object_fid(o)), rc);
1442 /* if file is released, check if a restore is running */
1443 if (ma->ma_valid & MA_HSM) {
1444 repbody->mbo_valid |= OBD_MD_TSTATE;
1445 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1446 mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1447 repbody->mbo_t_state = MS_RESTORE;
1450 if (unlikely(!(ma->ma_valid & MA_INODE)))
1453 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1455 if (mdt_body_has_lov(la, reqbody)) {
1456 u32 stripe_count = 1;
1457 bool fixed_layout = false;
1459 if (ma->ma_valid & MA_LOV) {
1460 LASSERT(ma->ma_lmm_size);
1461 repbody->mbo_eadatasize = ma->ma_lmm_size;
1462 if (S_ISDIR(la->la_mode))
1463 repbody->mbo_valid |= OBD_MD_FLDIREA;
1465 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1466 mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1468 if (ma->ma_valid & MA_LMV) {
1469 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1470 u32 magic = le32_to_cpu(lmv->lmv_magic);
1472 /* Return -ENOTSUPP for old client */
1473 if (!mdt_is_striped_client(req->rq_export))
1476 LASSERT(S_ISDIR(la->la_mode));
1477 mdt_dump_lmv(D_INFO, ma->ma_lmv);
1478 repbody->mbo_eadatasize = ma->ma_lmv_size;
1479 repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1481 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1482 fixed_layout = lmv_is_fixed(lmv);
1483 if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1484 mdt_restripe_migrate_add(info, o);
1485 else if (magic == LMV_MAGIC_V1 &&
1486 lmv_is_restriping(lmv))
1487 mdt_restripe_update_add(info, o);
1489 if (ma->ma_valid & MA_LMV_DEF) {
1490 /* Return -ENOTSUPP for old client */
1491 if (!mdt_is_striped_client(req->rq_export))
1493 LASSERT(S_ISDIR(la->la_mode));
1495 * when ll_dir_getstripe() gets default LMV, it
1496 * checks mbo_eadatasize.
1498 if (!(ma->ma_valid & MA_LMV))
1499 repbody->mbo_eadatasize =
1500 ma->ma_default_lmv_size;
1501 repbody->mbo_valid |= (OBD_MD_FLDIREA |
1502 OBD_MD_DEFAULT_MEA);
1505 "dirent count %llu stripe count %u MDT count %d\n",
1506 ma->ma_attr.la_dirent_count, stripe_count,
1507 atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1508 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1509 ma->ma_attr.la_dirent_count >
1510 mdt->mdt_restriper.mdr_dir_split_count &&
1511 !fid_is_root(mdt_object_fid(o)) &&
1512 mdt->mdt_enable_dir_auto_split &&
1513 !o->mot_restriping &&
1514 stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
1516 mdt_auto_split_add(info, o);
1517 } else if (S_ISLNK(la->la_mode) &&
1518 reqbody->mbo_valid & OBD_MD_LINKNAME) {
1519 buffer->lb_buf = ma->ma_lmm;
1520 /* eadatasize from client includes NULL-terminator, so
1521 * there is no need to read it */
1522 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1523 rc = mo_readlink(env, next, buffer);
1524 if (unlikely(rc <= 0)) {
1525 CERROR("%s: readlink failed for "DFID": rc = %d\n",
1526 mdt_obd_name(info->mti_mdt),
1527 PFID(mdt_object_fid(o)), rc);
1530 int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1532 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1534 repbody->mbo_valid |= OBD_MD_LINKNAME;
1535 /* we need to report back size with NULL-terminator
1536 * because client expects that */
1537 repbody->mbo_eadatasize = rc + 1;
1538 if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1539 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1540 "on "DFID ", expected %d\n",
1541 mdt_obd_name(info->mti_mdt),
1542 rc, PFID(mdt_object_fid(o)),
1543 reqbody->mbo_eadatasize - 1);
1544 /* NULL terminate */
1545 ((char *)ma->ma_lmm)[rc] = 0;
1547 /* If the total CDEBUG() size is larger than a page, it
1548 * will print a warning to the console, avoid this by
1549 * printing just the last part of the symlink. */
1550 CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1551 print_limit < rc ? "..." : "", print_limit,
1552 (char *)ma->ma_lmm + rc - print_limit, rc);
1557 if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1558 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1559 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1560 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1561 repbody->mbo_max_mdsize);
1564 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1565 if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1566 (reqbody->mbo_valid & OBD_MD_FLACL)) {
1567 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1568 if (IS_ERR(nodemap))
1569 RETURN(PTR_ERR(nodemap));
1571 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1572 nodemap_putref(nodemap);
1578 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1579 ktime_us_delta(ktime_get(), kstart));
1584 static int mdt_getattr(struct tgt_session_info *tsi)
1586 struct mdt_thread_info *info = tsi2mdt_info(tsi);
1587 struct mdt_object *obj = info->mti_object;
1588 struct req_capsule *pill = info->mti_pill;
1589 struct mdt_body *reqbody;
1590 struct mdt_body *repbody;
1594 if (unlikely(info->mti_object == NULL))
1597 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1599 LASSERT(lu_object_assert_exists(&obj->mot_obj));
1601 /* Special case for Data-on-MDT files to get data version */
1602 if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1603 rc = mdt_data_version_get(tsi);
1607 /* Unlike intent case where we need to pre-fill out buffers early on
1608 * in intent policy for ldlm reasons, here we can have a much better
1609 * guess at EA size by just reading it from disk.
1610 * Exceptions are readdir and (missing) directory striping */
1612 if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1613 /* No easy way to know how long is the symlink, but it cannot
1614 * be more than PATH_MAX, so we allocate +1 */
1616 /* A special case for fs ROOT: getattr there might fetch
1617 * default EA for entire fs, not just for this dir!
1619 } else if (lu_fid_eq(mdt_object_fid(obj),
1620 &info->mti_mdt->mdt_md_root_fid) &&
1621 (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1622 (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1624 /* Should the default strping be bigger, mdt_fix_reply
1625 * will reallocate */
1626 rc = DEF_REP_MD_SIZE;
1628 /* Read the actual EA size from disk */
1629 rc = mdt_attr_get_eabuf_size(info, obj);
1633 GOTO(out, rc = err_serious(rc));
1635 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1637 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1638 * by default. If the target object has more ACL entries, then
1639 * enlarge the buffer when necessary. */
1640 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1641 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1643 rc = req_capsule_server_pack(pill);
1644 if (unlikely(rc != 0))
1645 GOTO(out, rc = err_serious(rc));
1647 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1648 LASSERT(repbody != NULL);
1649 repbody->mbo_eadatasize = 0;
1650 repbody->mbo_aclsize = 0;
1652 rc = mdt_check_ucred(info);
1654 GOTO(out_shrink, rc);
1656 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1658 rc = mdt_getattr_internal(info, obj, 0);
1661 mdt_client_compatibility(info);
1662 rc2 = mdt_fix_reply(info);
1666 mdt_thread_info_fini(info);
1671 * Handler of layout intent RPC requiring the layout modification
1673 * \param[in] info thread environment
1674 * \param[in] obj object
1675 * \param[out] lhc object ldlm lock handle
1676 * \param[in] layout layout change descriptor
1678 * \retval 0 on success
1679 * \retval < 0 error code
1681 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1682 struct mdt_lock_handle *lhc,
1683 struct md_layout_change *layout)
1689 if (!mdt_object_exists(obj))
1692 if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1695 rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1700 rc = mdt_check_resent_lock(info, obj, lhc);
1706 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1708 /* take layout lock to prepare layout change */
1709 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1710 lockpart |= MDS_INODELOCK_UPDATE;
1712 mdt_lock_handle_init(lhc);
1713 mdt_lock_reg_init(lhc, LCK_EX);
1714 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1719 mutex_lock(&obj->mot_som_mutex);
1720 rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1721 mutex_unlock(&obj->mot_som_mutex);
1724 mdt_object_unlock(info, obj, lhc, 1);
1730 * Exchange MOF_LOV_CREATED flags between two objects after a
1731 * layout swap. No assumption is made on whether o1 or o2 have
1732 * created objects or not.
1734 * \param[in,out] o1 First swap layout object
1735 * \param[in,out] o2 Second swap layout object
1737 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1739 unsigned int o1_lov_created = o1->mot_lov_created;
1741 mutex_lock(&o1->mot_lov_mutex);
1742 mutex_lock(&o2->mot_lov_mutex);
1744 o1->mot_lov_created = o2->mot_lov_created;
1745 o2->mot_lov_created = o1_lov_created;
1747 mutex_unlock(&o2->mot_lov_mutex);
1748 mutex_unlock(&o1->mot_lov_mutex);
1751 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1753 struct mdt_thread_info *info;
1754 struct ptlrpc_request *req = tgt_ses_req(tsi);
1755 struct obd_export *exp = req->rq_export;
1756 struct mdt_object *o1, *o2, *o;
1757 struct mdt_lock_handle *lh1, *lh2;
1758 struct mdc_swap_layouts *msl;
1762 /* client does not support layout lock, so layout swaping
1764 * FIXME: there is a problem for old clients which don't support
1765 * layout lock yet. If those clients have already opened the file
1766 * they won't be notified at all so that old layout may still be
1767 * used to do IO. This can be fixed after file release is landed by
1768 * doing exclusive open and taking full EX ibits lock. - Jinshan */
1769 if (!exp_connect_layout(exp))
1770 RETURN(-EOPNOTSUPP);
1772 info = tsi2mdt_info(tsi);
1773 if (unlikely(info->mti_object == NULL))
1776 if (info->mti_dlm_req != NULL)
1777 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1779 o1 = info->mti_object;
1780 o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1781 &info->mti_body->mbo_fid2);
1783 GOTO(out, rc = PTR_ERR(o));
1785 if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1786 GOTO(put, rc = -ENOENT);
1788 rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1789 if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1795 /* permission check. Make sure the calling process having permission
1796 * to write both files. */
1797 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1802 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1807 msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1809 GOTO(put, rc = -EPROTO);
1811 lh1 = &info->mti_lh[MDT_LH_NEW];
1812 mdt_lock_reg_init(lh1, LCK_EX);
1813 lh2 = &info->mti_lh[MDT_LH_OLD];
1814 mdt_lock_reg_init(lh2, LCK_EX);
1816 rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1817 MDS_INODELOCK_XATTR);
1821 rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1822 MDS_INODELOCK_XATTR);
1826 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1827 mdt_object_child(o2), msl->msl_flags);
1831 mdt_swap_lov_flag(o1, o2);
1834 mdt_object_unlock(info, o2, lh2, rc);
1836 mdt_object_unlock(info, o1, lh1, rc);
1838 mdt_object_put(info->mti_env, o);
1840 mdt_thread_info_fini(info);
1844 static int mdt_raw_lookup(struct mdt_thread_info *info,
1845 struct mdt_object *parent,
1846 const struct lu_name *lname)
1848 struct lu_fid *fid = &info->mti_tmp_fid1;
1849 struct mdt_body *repbody;
1850 bool is_dotdot = false;
1851 bool is_old_parent_stripe = false;
1852 bool is_new_parent_checked = false;
1857 LASSERT(!info->mti_cross_ref);
1858 /* Always allow to lookup ".." */
1859 if (lname->ln_namelen == 2 &&
1860 lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
1861 info->mti_spec.sp_permitted = 1;
1863 if (mdt_is_dir_stripe(info, parent) == 1)
1864 is_old_parent_stripe = true;
1867 mdt_object_get(info->mti_env, parent);
1869 /* Only got the fid of this obj by name */
1871 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
1873 mdt_object_put(info->mti_env, parent);
1877 /* getattr_name("..") should return master object FID for striped dir */
1878 if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
1879 parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1881 RETURN(PTR_ERR(parent));
1883 /* old client getattr_name("..") with stripe FID */
1884 if (unlikely(is_old_parent_stripe)) {
1885 is_old_parent_stripe = false;
1889 /* ".." may be a stripe */
1890 if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
1891 is_new_parent_checked = true;
1895 mdt_object_put(info->mti_env, parent);
1898 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1899 repbody->mbo_fid1 = *fid;
1900 repbody->mbo_valid = OBD_MD_FLID;
1906 * UPDATE lock should be taken against parent, and be released before exit;
1907 * child_bits lock should be taken against child, and be returned back:
1908 * (1)normal request should release the child lock;
1909 * (2)intent request will grant the lock to client.
1911 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1912 struct mdt_lock_handle *lhc,
1914 struct ldlm_reply *ldlm_rep)
1916 struct ptlrpc_request *req = mdt_info_req(info);
1917 struct mdt_body *reqbody = NULL;
1918 struct mdt_object *parent = info->mti_object;
1919 struct mdt_object *child = NULL;
1920 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1921 struct lu_name *lname = NULL;
1922 struct mdt_lock_handle *lhp = NULL;
1923 struct ldlm_lock *lock;
1924 struct req_capsule *pill = info->mti_pill;
1932 is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1933 LASSERT(ergo(is_resent,
1934 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1939 if (info->mti_cross_ref) {
1940 /* Only getattr on the child. Parent is on another node. */
1941 mdt_set_disposition(info, ldlm_rep,
1942 DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1944 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1946 PFID(mdt_object_fid(child)), ldlm_rep);
1948 rc = mdt_check_resent_lock(info, child, lhc);
1951 } else if (rc > 0) {
1952 mdt_lock_handle_init(lhc);
1953 mdt_lock_reg_init(lhc, LCK_PR);
1956 * Object's name entry is on another MDS, it will
1957 * request PERM lock only because LOOKUP lock is owned
1958 * by the MDS where name entry resides.
1960 * TODO: it should try layout lock too. - Jinshan
1962 child_bits &= ~(MDS_INODELOCK_LOOKUP |
1963 MDS_INODELOCK_LAYOUT);
1964 child_bits |= MDS_INODELOCK_PERM;
1966 rc = mdt_object_lock(info, child, lhc, child_bits);
1971 /* Finally, we can get attr for child. */
1972 if (!mdt_object_exists(child)) {
1973 LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1975 "remote object doesn't exist.");
1976 mdt_object_unlock(info, child, lhc, 1);
1980 rc = mdt_getattr_internal(info, child, 0);
1981 if (unlikely(rc != 0)) {
1982 mdt_object_unlock(info, child, lhc, 1);
1986 rc = mdt_pack_secctx_in_reply(info, child);
1988 mdt_object_unlock(info, child, lhc, 1);
1992 rc = mdt_pack_encctx_in_reply(info, child);
1994 mdt_object_unlock(info, child, lhc, 1);
1998 lname = &info->mti_name;
1999 mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
2001 if (lu_name_is_valid(lname)) {
2002 if (mdt_object_remote(parent)) {
2003 CERROR("%s: parent "DFID" is on remote target\n",
2004 mdt_obd_name(info->mti_mdt),
2005 PFID(mdt_object_fid(parent)));
2009 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
2010 "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
2011 PNAME(lname), ldlm_rep);
2013 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2014 if (unlikely(reqbody == NULL))
2015 RETURN(err_serious(-EPROTO));
2017 *child_fid = reqbody->mbo_fid2;
2018 if (unlikely(!fid_is_sane(child_fid)))
2019 RETURN(err_serious(-EINVAL));
2021 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2022 mdt_object_get(info->mti_env, parent);
2025 child = mdt_object_find(info->mti_env, info->mti_mdt,
2028 RETURN(PTR_ERR(child));
2031 if (mdt_object_remote(child)) {
2032 CERROR("%s: child "DFID" is on remote target\n",
2033 mdt_obd_name(info->mti_mdt),
2034 PFID(mdt_object_fid(child)));
2035 GOTO(out_child, rc = -EPROTO);
2038 /* don't fetch LOOKUP lock if it's remote object */
2039 rc = mdt_is_remote_object(info, parent, child);
2041 GOTO(out_child, rc);
2043 child_bits &= ~MDS_INODELOCK_LOOKUP;
2045 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2047 PFID(mdt_object_fid(parent)),
2048 PFID(&reqbody->mbo_fid2), ldlm_rep);
2051 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
2053 if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
2054 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2056 "Parent doesn't exist!");
2057 GOTO(out_child, rc = -ESTALE);
2060 if (lu_name_is_valid(lname)) {
2061 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
2062 rc = mdt_raw_lookup(info, parent, lname);
2067 /* step 1: lock parent only if parent is a directory */
2068 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2069 lhp = &info->mti_lh[MDT_LH_PARENT];
2070 mdt_lock_pdo_init(lhp, LCK_PR, lname);
2071 rc = mdt_object_lock(info, parent, lhp,
2072 MDS_INODELOCK_UPDATE);
2073 if (unlikely(rc != 0))
2077 /* step 2: lookup child's fid by name */
2078 fid_zero(child_fid);
2079 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2080 child_fid, &info->mti_spec);
2082 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2085 GOTO(unlock_parent, rc);
2087 child = mdt_object_find(info->mti_env, info->mti_mdt,
2089 if (unlikely(IS_ERR(child)))
2090 GOTO(unlock_parent, rc = PTR_ERR(child));
2093 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2095 /* step 3: lock child regardless if it is local or remote. */
2098 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2099 if (!mdt_object_exists(child)) {
2100 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2102 "Object doesn't exist!");
2103 GOTO(out_child, rc = -ENOENT);
2106 rc = mdt_check_resent_lock(info, child, lhc);
2108 GOTO(out_child, rc);
2109 } else if (rc > 0) {
2110 mdt_lock_handle_init(lhc);
2111 mdt_lock_reg_init(lhc, LCK_PR);
2113 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2114 !mdt_object_remote(child)) {
2115 struct md_attr *ma = &info->mti_attr;
2118 ma->ma_need = MA_INODE;
2119 rc = mdt_attr_get_complex(info, child, ma);
2120 if (unlikely(rc != 0))
2121 GOTO(out_child, rc);
2123 /* If the file has not been changed for some time, we
2124 * return not only a LOOKUP lock, but also an UPDATE
2125 * lock and this might save us RPC on later STAT. For
2126 * directories, it also let negative dentry cache start
2127 * working for this dir. */
2128 if (ma->ma_valid & MA_INODE &&
2129 ma->ma_attr.la_valid & LA_CTIME &&
2130 info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2131 ma->ma_attr.la_ctime < ktime_get_real_seconds())
2132 child_bits |= MDS_INODELOCK_UPDATE;
2135 /* layout lock must be granted in a best-effort way
2136 * for IT operations */
2137 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2138 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2139 !mdt_object_remote(child) && ldlm_rep != NULL) {
2140 if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2141 exp_connect_layout(info->mti_exp)) {
2142 /* try to grant layout lock for regular file. */
2143 try_bits = MDS_INODELOCK_LAYOUT;
2145 /* Acquire DOM lock in advance for data-on-mdt file */
2146 if (child != parent)
2147 try_bits |= MDS_INODELOCK_DOM;
2150 if (try_bits != 0) {
2151 /* try layout lock, it may fail to be granted due to
2152 * contention at LOOKUP or UPDATE */
2153 rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2155 if (child_bits & MDS_INODELOCK_LAYOUT)
2158 /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2159 * client will enqueue the lock to the remote MDT */
2160 if (mdt_object_remote(child))
2161 child_bits &= ~MDS_INODELOCK_UPDATE;
2162 rc = mdt_object_lock(info, child, lhc, child_bits);
2164 if (unlikely(rc != 0))
2165 GOTO(out_child, rc);
2168 /* finally, we can get attr for child. */
2169 rc = mdt_getattr_internal(info, child, ma_need);
2170 if (unlikely(rc != 0)) {
2171 mdt_object_unlock(info, child, lhc, 1);
2172 GOTO(out_child, rc);
2175 rc = mdt_pack_secctx_in_reply(info, child);
2177 mdt_object_unlock(info, child, lhc, 1);
2178 GOTO(out_child, rc);
2181 rc = mdt_pack_encctx_in_reply(info, child);
2183 mdt_object_unlock(info, child, lhc, 1);
2184 GOTO(out_child, rc);
2187 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2189 /* Debugging code. */
2190 LDLM_DEBUG(lock, "Returning lock to client");
2191 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2192 &lock->l_resource->lr_name),
2193 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2194 PLDLMRES(lock->l_resource),
2195 PFID(mdt_object_fid(child)));
2197 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
2198 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2199 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
2201 req->rq_arrival_time.tv_sec +
2203 /* Put the lock to the waiting list and force the cancel */
2204 ldlm_set_ast_sent(lock);
2207 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2208 !mdt_object_remote(child) && child != parent) {
2209 mdt_object_put(info->mti_env, child);
2210 rc = mdt_pack_size2body(info, child_fid,
2212 if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2213 /* DOM lock was taken in advance but this is
2214 * not DoM file. Drop the lock.
2216 lock_res_and_lock(lock);
2217 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2218 unlock_res_and_lock(lock);
2220 LDLM_LOCK_PUT(lock);
2221 GOTO(unlock_parent, rc = 0);
2223 LDLM_LOCK_PUT(lock);
2229 mdt_object_put(info->mti_env, child);
2232 mdt_object_unlock(info, parent, lhp, 1);
2236 /* normal handler: should release the child lock */
2237 static int mdt_getattr_name(struct tgt_session_info *tsi)
2239 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2240 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2241 struct mdt_body *reqbody;
2242 struct mdt_body *repbody;
2247 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2248 LASSERT(reqbody != NULL);
2249 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2250 LASSERT(repbody != NULL);
2252 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2253 repbody->mbo_eadatasize = 0;
2254 repbody->mbo_aclsize = 0;
2256 rc = mdt_init_ucred(info, reqbody);
2258 GOTO(out_shrink, rc);
2260 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2261 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2262 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2263 lhc->mlh_reg_lh.cookie = 0;
2265 mdt_exit_ucred(info);
2268 mdt_client_compatibility(info);
2269 rc2 = mdt_fix_reply(info);
2272 mdt_thread_info_fini(info);
2276 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2277 const struct lu_fid *pfid,
2278 const struct lu_name *name,
2279 struct mdt_object *obj, s64 ctime)
2281 struct lu_fid *child_fid = &info->mti_tmp_fid1;
2282 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2283 struct mdt_device *mdt = info->mti_mdt;
2284 struct md_attr *ma = &info->mti_attr;
2285 struct mdt_lock_handle *parent_lh;
2286 struct mdt_lock_handle *child_lh;
2287 struct mdt_object *pobj;
2288 bool cos_incompat = false;
2292 pobj = mdt_object_find(info->mti_env, mdt, pfid);
2294 GOTO(out, rc = PTR_ERR(pobj));
2296 parent_lh = &info->mti_lh[MDT_LH_PARENT];
2297 mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2298 rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2300 GOTO(put_parent, rc);
2302 if (mdt_object_remote(pobj))
2303 cos_incompat = true;
2305 rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2306 name, child_fid, &info->mti_spec);
2308 GOTO(unlock_parent, rc);
2310 if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2311 GOTO(unlock_parent, rc = -EREMCHG);
2313 child_lh = &info->mti_lh[MDT_LH_CHILD];
2314 mdt_lock_reg_init(child_lh, LCK_EX);
2315 rc = mdt_reint_striped_lock(info, obj, child_lh,
2316 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2317 einfo, cos_incompat);
2319 GOTO(unlock_parent, rc);
2321 if (atomic_read(&obj->mot_open_count)) {
2322 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2323 PFID(mdt_object_fid(obj)));
2324 GOTO(unlock_child, rc = -EBUSY);
2328 ma->ma_valid = MA_INODE;
2329 ma->ma_attr.la_valid = LA_CTIME;
2330 ma->ma_attr.la_ctime = ctime;
2332 mutex_lock(&obj->mot_lov_mutex);
2334 rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2335 mdt_object_child(obj), name, ma, 0);
2337 mutex_unlock(&obj->mot_lov_mutex);
2340 mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2342 mdt_object_unlock(info, pobj, parent_lh, 1);
2344 mdt_object_put(info->mti_env, pobj);
2349 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2350 struct mdt_object *obj)
2352 struct lu_ucred *uc = lu_ucred(info->mti_env);
2353 struct md_attr *ma = &info->mti_attr;
2354 struct lu_attr *la = &ma->ma_attr;
2358 ma->ma_need = MA_INODE;
2359 rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2363 if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2366 if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
2368 if (uc->uc_fsuid == la->la_uid) {
2369 if ((la->la_mode & S_IWUSR) == 0)
2371 } else if (uc->uc_fsgid == la->la_gid) {
2372 if ((la->la_mode & S_IWGRP) == 0)
2374 } else if ((la->la_mode & S_IWOTH) == 0) {
2382 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2385 struct mdt_device *mdt = info->mti_mdt;
2386 struct mdt_object *obj = NULL;
2387 struct linkea_data ldata = { NULL };
2388 struct lu_buf *buf = &info->mti_big_buf;
2389 struct lu_name *name = &info->mti_name;
2390 struct lu_fid *pfid = &info->mti_tmp_fid1;
2391 struct link_ea_header *leh;
2392 struct link_ea_entry *lee;
2393 int reclen, count, rc = 0;
2396 if (!fid_is_sane(fid))
2397 GOTO(out, rc = -EINVAL);
2399 if (!fid_is_namespace_visible(fid))
2400 GOTO(out, rc = -EINVAL);
2402 obj = mdt_object_find(info->mti_env, mdt, fid);
2404 GOTO(out, rc = PTR_ERR(obj));
2406 if (mdt_object_remote(obj))
2407 GOTO(out, rc = -EREMOTE);
2408 if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2409 GOTO(out, rc = -ENOENT);
2411 rc = mdt_rmfid_check_permission(info, obj);
2416 buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2418 GOTO(out, rc = -ENOMEM);
2421 rc = mdt_links_read(info, obj, &ldata);
2426 lee = (struct link_ea_entry *)(leh + 1);
2427 for (count = 0; count < leh->leh_reccount; count++) {
2428 /* remove every hardlink */
2429 linkea_entry_unpack(lee, &reclen, name, pfid);
2430 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2431 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2437 if (obj && !IS_ERR(obj))
2438 mdt_object_put(info->mti_env, obj);
2439 if (info->mti_big_buf.lb_buf)
2440 lu_buf_free(&info->mti_big_buf);
2445 static int mdt_rmfid(struct tgt_session_info *tsi)
2447 struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2448 struct mdt_body *reqbody;
2449 struct lu_fid *fids, *rfids;
2455 reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2456 if (reqbody == NULL)
2458 bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2460 nr = bufsize / sizeof(struct lu_fid);
2461 if (nr * sizeof(struct lu_fid) != bufsize)
2463 req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2464 RCL_SERVER, nr * sizeof(__u32));
2465 req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2466 RCL_SERVER, nr * sizeof(struct lu_fid));
2467 rc = req_capsule_server_pack(tsi->tsi_pill);
2469 GOTO(out, rc = err_serious(rc));
2470 fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2473 rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2475 rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2478 mdt_init_ucred(mti, reqbody);
2479 for (i = 0; i < nr; i++) {
2481 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2483 mdt_exit_ucred(mti);
2489 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2490 void *karg, void __user *uarg);
2492 static int mdt_set_info(struct tgt_session_info *tsi)
2494 struct ptlrpc_request *req = tgt_ses_req(tsi);
2497 int keylen, vallen, rc = 0;
2501 key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2503 DEBUG_REQ(D_HA, req, "no set_info key");
2504 RETURN(err_serious(-EFAULT));
2507 keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2510 val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2512 DEBUG_REQ(D_HA, req, "no set_info val");
2513 RETURN(err_serious(-EFAULT));
2516 vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2519 /* Swab any part of val you need to here */
2520 if (KEY_IS(KEY_READ_ONLY)) {
2521 spin_lock(&req->rq_export->exp_lock);
2523 *exp_connect_flags_ptr(req->rq_export) |=
2526 *exp_connect_flags_ptr(req->rq_export) &=
2527 ~OBD_CONNECT_RDONLY;
2528 spin_unlock(&req->rq_export->exp_lock);
2529 } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2530 struct changelog_setinfo *cs = val;
2532 if (vallen != sizeof(*cs)) {
2533 CERROR("%s: bad changelog_clear setinfo size %d\n",
2534 tgt_name(tsi->tsi_tgt), vallen);
2537 if (req_capsule_req_need_swab(&req->rq_pill)) {
2538 __swab64s(&cs->cs_recno);
2539 __swab32s(&cs->cs_id);
2542 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2544 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2546 } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2548 obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2555 static int mdt_readpage(struct tgt_session_info *tsi)
2557 struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
2558 struct mdt_object *object = mdt_obj(tsi->tsi_corpus);
2559 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
2560 const struct mdt_body *reqbody = tsi->tsi_mdt_body;
2561 struct mdt_body *repbody;
2567 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2568 RETURN(err_serious(-ENOMEM));
2570 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2571 if (repbody == NULL || reqbody == NULL)
2572 RETURN(err_serious(-EFAULT));
2575 * prepare @rdpg before calling lower layers and transfer itself. Here
2576 * reqbody->size contains offset of where to start to read and
2577 * reqbody->nlink contains number bytes to read.
2579 rdpg->rp_hash = reqbody->mbo_size;
2580 if (rdpg->rp_hash != reqbody->mbo_size) {
2581 CERROR("Invalid hash: %#llx != %#llx\n",
2582 rdpg->rp_hash, reqbody->mbo_size);
2586 rdpg->rp_attrs = reqbody->mbo_mode;
2587 if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2588 rdpg->rp_attrs |= LUDA_64BITHASH;
2589 rdpg->rp_count = min_t(unsigned int, reqbody->mbo_nlink,
2590 exp_max_brw_size(tsi->tsi_exp));
2591 rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2593 OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2594 if (rdpg->rp_pages == NULL)
2597 for (i = 0; i < rdpg->rp_npages; ++i) {
2598 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2599 if (rdpg->rp_pages[i] == NULL)
2600 GOTO(free_rdpg, rc = -ENOMEM);
2603 /* call lower layers to fill allocated pages with directory data */
2604 rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2606 GOTO(free_rdpg, rc);
2608 /* send pages to client */
2609 rc = tgt_sendpage(tsi, rdpg, rc);
2614 for (i = 0; i < rdpg->rp_npages; i++)
2615 if (rdpg->rp_pages[i] != NULL)
2616 __free_page(rdpg->rp_pages[i]);
2617 OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2619 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2625 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2627 struct lu_ucred *uc = mdt_ucred_check(info);
2628 struct lu_attr *attr = &info->mti_attr.ma_attr;
2633 if (op != REINT_SETATTR) {
2634 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2635 attr->la_uid = uc->uc_fsuid;
2636 /* for S_ISGID, inherit gid from his parent, such work will be
2637 * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2638 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2639 attr->la_gid = uc->uc_fsgid;
2645 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2647 return op == REINT_OPEN &&
2648 !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2651 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2653 struct req_capsule *pill = info->mti_pill;
2655 if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2657 req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2659 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2661 /* pre-set size in server part with max size */
2662 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2664 OBD_MAX_DEFAULT_EA_SIZE);
2666 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2671 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2673 struct req_capsule *pill = info->mti_pill;
2675 if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2677 /* pre-set size in server part with max size */
2678 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2680 info->mti_mdt->mdt_max_mdsize);
2683 static int mdt_reint_internal(struct mdt_thread_info *info,
2684 struct mdt_lock_handle *lhc,
2687 struct req_capsule *pill = info->mti_pill;
2688 struct mdt_body *repbody;
2693 rc = mdt_reint_unpack(info, op);
2695 CERROR("Can't unpack reint, rc %d\n", rc);
2696 RETURN(err_serious(rc));
2700 /* check if the file system is set to readonly. O_RDONLY open
2701 * is still allowed even the file system is set to readonly mode */
2702 if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2703 RETURN(err_serious(-EROFS));
2705 /* for replay (no_create) lmm is not needed, client has it already */
2706 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2707 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2710 /* llog cookies are always 0, the field is kept for compatibility */
2711 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2712 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2714 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2715 * by default. If the target object has more ACL entries, then
2716 * enlarge the buffer when necessary. */
2717 if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2718 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2719 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2721 mdt_preset_secctx_size(info);
2722 mdt_preset_encctx_size(info);
2724 rc = req_capsule_server_pack(pill);
2726 CERROR("Can't pack response, rc %d\n", rc);
2727 RETURN(err_serious(rc));
2730 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2731 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2733 repbody->mbo_eadatasize = 0;
2734 repbody->mbo_aclsize = 0;
2737 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2739 /* for replay no cookkie / lmm need, because client have this already */
2740 if (info->mti_spec.no_create)
2741 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2742 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2744 rc = mdt_init_ucred_reint(info);
2746 GOTO(out_shrink, rc);
2748 rc = mdt_fix_attr_ucred(info, op);
2750 GOTO(out_ucred, rc = err_serious(rc));
2752 rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2754 GOTO(out_ucred, rc);
2755 } else if (rc == 1) {
2756 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2757 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2758 GOTO(out_ucred, rc);
2760 rc = mdt_reint_rec(info, lhc);
2763 mdt_exit_ucred(info);
2765 mdt_client_compatibility(info);
2767 rc2 = mdt_fix_reply(info);
2772 * Data-on-MDT optimization - read data along with OPEN and return it
2773 * in reply when possible.
2775 if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2776 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2782 static long mdt_reint_opcode(struct ptlrpc_request *req,
2783 const struct req_format **fmt)
2785 struct mdt_device *mdt;
2786 struct mdt_rec_reint *rec;
2789 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2791 opc = rec->rr_opcode;
2792 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2793 if (opc < REINT_MAX && fmt[opc] != NULL)
2794 req_capsule_extend(&req->rq_pill, fmt[opc]);
2796 mdt = mdt_exp2dev(req->rq_export);
2797 CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2798 " rc = %d\n", req->rq_export->exp_obd->obd_name,
2799 opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2800 opc = err_serious(-EFAULT);
2803 opc = err_serious(-EFAULT);
2808 static int mdt_reint(struct tgt_session_info *tsi)
2812 static const struct req_format *reint_fmts[REINT_MAX] = {
2813 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
2814 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
2815 [REINT_LINK] = &RQF_MDS_REINT_LINK,
2816 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
2817 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
2818 [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
2819 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2820 [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
2821 [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE,
2822 [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC,
2827 opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2829 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2831 * No lock possible here from client to pass it to reint code
2834 rc = mdt_reint_internal(info, NULL, opc);
2835 mdt_thread_info_fini(info);
2840 tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2844 /* this should sync the whole device */
2845 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2847 struct dt_device *dt = mdt->mdt_bottom;
2851 rc = dt->dd_ops->dt_sync(env, dt);
2855 /* this should sync this object */
2856 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2857 struct mdt_object *mo)
2863 if (!mdt_object_exists(mo)) {
2864 CWARN("%s: non existing object "DFID": rc = %d\n",
2865 exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2870 if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2871 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2872 dt_obj_version_t version;
2874 version = dt_version_get(env, mdt_obj2dt(mo));
2875 if (version > tgt->lut_obd->obd_last_committed)
2876 rc = mo_object_sync(env, mdt_object_child(mo));
2878 rc = mo_object_sync(env, mdt_object_child(mo));
2884 static int mdt_sync(struct tgt_session_info *tsi)
2886 struct ptlrpc_request *req = tgt_ses_req(tsi);
2887 struct req_capsule *pill = tsi->tsi_pill;
2888 struct mdt_body *body;
2889 ktime_t kstart = ktime_get();
2894 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2895 RETURN(err_serious(-ENOMEM));
2897 if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2898 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2900 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2902 if (unlikely(info->mti_object == NULL))
2905 /* sync an object */
2906 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2909 const struct lu_fid *fid;
2910 struct lu_attr *la = &info->mti_attr.ma_attr;
2912 info->mti_attr.ma_need = MA_INODE;
2913 info->mti_attr.ma_valid = 0;
2914 rc = mdt_attr_get_complex(info, info->mti_object,
2917 body = req_capsule_server_get(pill,
2919 fid = mdt_object_fid(info->mti_object);
2920 mdt_pack_attr2body(info, body, la, fid);
2923 mdt_thread_info_fini(info);
2926 mdt_counter_incr(req, LPROC_MDT_SYNC,
2927 ktime_us_delta(ktime_get(), kstart));
2932 static int mdt_data_sync(struct tgt_session_info *tsi)
2934 struct mdt_thread_info *info;
2935 struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2936 struct ost_body *body = tsi->tsi_ost_body;
2937 struct ost_body *repbody;
2938 struct mdt_object *mo = NULL;
2944 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2946 /* if no fid is specified then do nothing,
2947 * device sync is done via MDS_SYNC */
2948 if (fid_is_zero(&tsi->tsi_fid))
2951 mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2953 RETURN(PTR_ERR(mo));
2955 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2959 repbody->oa.o_oi = body->oa.o_oi;
2960 repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2962 info = tsi2mdt_info(tsi);
2963 ma = &info->mti_attr;
2964 ma->ma_need = MA_INODE;
2966 rc = mdt_attr_get_complex(info, mo, ma);
2968 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2971 mdt_thread_info_fini(info);
2976 mdt_object_put(tsi->tsi_env, mo);
2981 * Handle quota control requests to consult current usage/limit, but also
2982 * to configure quota enforcement
2984 static int mdt_quotactl(struct tgt_session_info *tsi)
2986 struct obd_export *exp = tsi->tsi_exp;
2987 struct req_capsule *pill = tsi->tsi_pill;
2988 struct obd_quotactl *oqctl, *repoqc;
2990 struct mdt_device *mdt = mdt_exp2dev(exp);
2991 struct lu_device *qmt = mdt->mdt_qmt_dev;
2992 struct lu_nodemap *nodemap;
2995 oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2997 RETURN(err_serious(-EPROTO));
2999 rc = req_capsule_server_pack(pill);
3001 RETURN(err_serious(rc));
3003 nodemap = nodemap_get_from_exp(exp);
3004 if (IS_ERR(nodemap))
3005 RETURN(PTR_ERR(nodemap));
3007 switch (oqctl->qc_cmd) {
3008 /* master quotactl */
3011 case LUSTRE_Q_SETDEFAULT:
3012 case LUSTRE_Q_SETQUOTAPOOL:
3013 case LUSTRE_Q_SETINFOPOOL:
3014 case LUSTRE_Q_SETDEFAULT_POOL:
3015 if (!nodemap_can_setquota(nodemap))
3016 GOTO(out_nodemap, rc = -EPERM);
3020 case LUSTRE_Q_GETDEFAULT:
3021 case LUSTRE_Q_GETQUOTAPOOL:
3022 case LUSTRE_Q_GETINFOPOOL:
3023 case LUSTRE_Q_GETDEFAULT_POOL:
3025 GOTO(out_nodemap, rc = -EOPNOTSUPP);
3026 /* slave quotactl */
3033 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
3034 mdt_obd_name(mdt), oqctl->qc_cmd, rc);
3035 GOTO(out_nodemap, rc);
3039 switch (oqctl->qc_type) {
3041 id = nodemap_map_id(nodemap, NODEMAP_UID,
3042 NODEMAP_CLIENT_TO_FS, id);
3045 id = nodemap_map_id(nodemap, NODEMAP_GID,
3046 NODEMAP_CLIENT_TO_FS, id);
3049 /* todo: check/map project id */
3053 GOTO(out_nodemap, rc = -EOPNOTSUPP);
3055 repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
3057 GOTO(out_nodemap, rc = err_serious(-EFAULT));
3059 if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
3060 barrier_exit(tsi->tsi_tgt->lut_bottom);
3062 if (oqctl->qc_id != id)
3063 swap(oqctl->qc_id, id);
3065 if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
3066 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
3067 RETURN(-EINPROGRESS);
3070 switch (oqctl->qc_cmd) {
3076 case LUSTRE_Q_SETDEFAULT:
3077 case LUSTRE_Q_GETDEFAULT:
3078 case LUSTRE_Q_SETQUOTAPOOL:
3079 case LUSTRE_Q_GETQUOTAPOOL:
3080 case LUSTRE_Q_SETINFOPOOL:
3081 case LUSTRE_Q_GETINFOPOOL:
3082 case LUSTRE_Q_SETDEFAULT_POOL:
3083 case LUSTRE_Q_GETDEFAULT_POOL:
3084 /* forward quotactl request to QMT */
3085 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
3090 /* slave quotactl */
3091 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
3096 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
3097 GOTO(out_nodemap, rc = -EFAULT);
3100 if (oqctl->qc_id != id)
3101 swap(oqctl->qc_id, id);
3103 QCTL_COPY(repoqc, oqctl);
3107 nodemap_putref(nodemap);
3112 /** clone llog ctxt from child (mdd)
3113 * This allows remote llog (replicator) access.
3114 * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
3115 * context was originally set up, or we can handle them directly.
3116 * I choose the latter, but that means I need any llog
3117 * contexts set up by child to be accessable by the mdt. So we clone the
3118 * context into our context list here.
3120 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3123 struct md_device *next = mdt->mdt_child;
3124 struct llog_ctxt *ctxt;
3127 if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3130 rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3131 if (rc || ctxt == NULL) {
3135 rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3137 CERROR("Can't set mdt ctxt %d\n", rc);
3142 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3143 struct mdt_device *mdt, int idx)
3145 struct llog_ctxt *ctxt;
3147 ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3150 /* Put once for the get we just did, and once for the clone */
3151 llog_ctxt_put(ctxt);
3152 llog_ctxt_put(ctxt);
3157 * sec context handlers
3159 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3161 CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3167 * quota request handlers
3169 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3171 struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
3172 struct lu_device *qmt = mdt->mdt_qmt_dev;
3177 RETURN(err_serious(-EOPNOTSUPP));
3179 rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3183 struct mdt_object *mdt_object_new(const struct lu_env *env,
3184 struct mdt_device *d,
3185 const struct lu_fid *f)
3187 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3188 struct lu_object *o;
3189 struct mdt_object *m;
3192 CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3193 o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3194 if (unlikely(IS_ERR(o)))
3195 m = (struct mdt_object *)o;
3201 struct mdt_object *mdt_object_find(const struct lu_env *env,
3202 struct mdt_device *d,
3203 const struct lu_fid *f)
3205 struct lu_object *o;
3206 struct mdt_object *m;
3209 CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3210 o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3211 if (unlikely(IS_ERR(o)))
3212 m = (struct mdt_object *)o;
3220 * Asyncronous commit for mdt device.
3222 * Pass asynchonous commit call down the MDS stack.
3224 * \param env environment
3225 * \param mdt the mdt device
3227 static void mdt_device_commit_async(const struct lu_env *env,
3228 struct mdt_device *mdt)
3230 struct dt_device *dt = mdt->mdt_bottom;
3234 rc = dt->dd_ops->dt_commit_async(env, dt);
3235 if (unlikely(rc != 0))
3236 CWARN("%s: async commit start failed: rc = %d\n",
3237 mdt_obd_name(mdt), rc);
3238 atomic_inc(&mdt->mdt_async_commit_count);
3243 * Mark the lock as "synchonous".
3245 * Mark the lock to deffer transaction commit to the unlock time.
3247 * \param lock the lock to mark as "synchonous"
3249 * \see mdt_is_lock_sync
3250 * \see mdt_save_lock
3252 static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
3254 lock->l_ast_data = (void*)1;
3258 * Check whehter the lock "synchonous" or not.
3260 * \param lock the lock to check
3261 * \retval 1 the lock is "synchonous"
3262 * \retval 0 the lock isn't "synchronous"
3264 * \see mdt_set_lock_sync
3265 * \see mdt_save_lock
3267 static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
3269 return lock->l_ast_data != NULL;
3273 * Blocking AST for mdt locks.
3275 * Starts transaction commit if in case of COS lock conflict or
3276 * deffers such a commit to the mdt_save_lock.
3278 * \param lock the lock which blocks a request or cancelling lock
3279 * \param desc unused
3280 * \param data unused
3281 * \param flag indicates whether this cancelling or blocking callback
3283 * \see ldlm_blocking_ast_nocheck
3285 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
3286 void *data, int flag)
3288 struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
3289 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3290 struct ldlm_cb_set_arg *arg = data;
3291 bool commit_async = false;
3295 if (flag == LDLM_CB_CANCELING)
3298 lock_res_and_lock(lock);
3299 if (lock->l_blocking_ast != mdt_blocking_ast) {
3300 unlock_res_and_lock(lock);
3304 /* A blocking ast may be sent from ldlm_lock_decref_internal
3305 * when the last reference to a local lock was released and
3306 * during blocking event from ldlm_work_bl_ast_lock().
3307 * The 'data' parameter is l_ast_data in the first case and
3308 * callback arguments in the second one. Distinguish them by that.
3310 if (!data || data == lock->l_ast_data || !arg->bl_desc)
3311 goto skip_cos_checks;
3313 if (lock->l_req_mode & (LCK_PW | LCK_EX)) {
3314 if (mdt_cos_is_enabled(mdt)) {
3315 if (!arg->bl_desc->bl_same_client)
3316 mdt_set_lock_sync(lock);
3317 } else if (mdt_slc_is_enabled(mdt) &&
3318 arg->bl_desc->bl_cos_incompat) {
3319 mdt_set_lock_sync(lock);
3321 * we may do extra commit here, but there is a small
3322 * window to miss a commit: lock was unlocked (saved),
3323 * then a conflict lock queued and we come here, but
3324 * REP-ACK not received, so lock was not converted to
3326 * Fortunately this window is quite small, so the
3327 * extra commit should be rare (not to say distributed
3328 * operation is rare too).
3330 commit_async = true;
3332 } else if (lock->l_req_mode == LCK_COS) {
3333 commit_async = true;
3337 rc = ldlm_blocking_ast_nocheck(lock);
3342 rc = lu_env_init(&env, LCT_LOCAL);
3343 if (unlikely(rc != 0))
3344 CWARN("%s: lu_env initialization failed, cannot "
3345 "start asynchronous commit: rc = %d\n",
3348 mdt_device_commit_async(&env, mdt);
3355 * Blocking AST for cross-MDT lock
3357 * Discard lock from uncommitted_slc_locks and cancel it.
3359 * \param lock the lock which blocks a request or cancelling lock
3360 * \param desc unused
3361 * \param data unused
3362 * \param flag indicates whether this cancelling or blocking callback
3363 * \retval 0 on success
3364 * \retval negative number on error
3366 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
3367 void *data, int flag)
3373 case LDLM_CB_BLOCKING: {
3374 struct lustre_handle lockh;
3376 ldlm_lock2handle(lock, &lockh);
3377 rc = ldlm_cli_cancel(&lockh,
3378 ldlm_is_atomic_cb(lock) ? 0 : LCF_ASYNC);
3380 CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
3385 case LDLM_CB_CANCELING: {
3386 struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
3387 struct mdt_device *mdt =
3388 mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
3390 LDLM_DEBUG(lock, "Revoke remote lock\n");
3392 /* discard slc lock here so that it can be cleaned anytime,
3393 * especially for cleanup_resource() */
3394 tgt_discard_slc_lock(&mdt->mdt_lut, lock);
3396 /* once we cache lock, l_ast_data is set to mdt_object */
3397 if (lock->l_ast_data != NULL) {
3398 struct mdt_object *mo = lock->l_ast_data;
3401 rc = lu_env_init(&env, LCT_MD_THREAD);
3402 if (unlikely(rc != 0)) {
3403 CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n",
3405 PFID(mdt_object_fid(mo)), rc);
3409 if (lock->l_policy_data.l_inodebits.bits &
3410 (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)) {
3411 rc = mo_invalidate(&env, mdt_object_child(mo));
3412 mo->mot_cache_attr = 0;
3414 mdt_object_put(&env, mo);
3426 int mdt_check_resent_lock(struct mdt_thread_info *info,
3427 struct mdt_object *mo,
3428 struct mdt_lock_handle *lhc)
3430 /* the lock might already be gotten in ldlm_handle_enqueue() */
3431 if (unlikely(lustre_handle_is_used(&lhc->mlh_reg_lh))) {
3432 struct ptlrpc_request *req = mdt_info_req(info);
3433 struct ldlm_lock *lock;
3435 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
3436 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
3438 /* Lock is pinned by ldlm_handle_enqueue0() as it is
3439 * a resend case, however, it could be already destroyed
3440 * due to client eviction or a raced cancel RPC. */
3441 LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
3442 lhc->mlh_reg_lh.cookie);
3446 if (!fid_res_name_eq(mdt_object_fid(mo),
3447 &lock->l_resource->lr_name)) {
3448 CWARN("%s: Although resent, but still not "
3449 "get child lock:"DFID"\n",
3450 info->mti_exp->exp_obd->obd_name,
3451 PFID(mdt_object_fid(mo)));
3452 LDLM_LOCK_PUT(lock);
3455 LDLM_LOCK_PUT(lock);
3461 static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock)
3463 mdt_object_get(NULL, lock->l_ast_data);
3466 int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
3467 struct mdt_object *o, const struct lu_fid *fid,
3468 struct lustre_handle *lh, enum ldlm_mode mode,
3469 __u64 *ibits, __u64 trybits, bool cache)
3471 struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
3472 union ldlm_policy_data *policy = &mti->mti_policy;
3473 struct ldlm_res_id *res_id = &mti->mti_res_id;
3477 LASSERT(mdt_object_remote(o));
3479 fid_build_reg_res_name(fid, res_id);
3481 memset(einfo, 0, sizeof(*einfo));
3482 einfo->ei_type = LDLM_IBITS;
3483 einfo->ei_mode = mode;
3484 einfo->ei_cb_bl = mdt_remote_blocking_ast;
3485 einfo->ei_cb_cp = ldlm_completion_ast;
3486 einfo->ei_enq_slave = 0;
3487 einfo->ei_res_id = res_id;
3491 * if we cache lock, couple lock with mdt_object, so that object
3492 * can be easily found in lock ASTs.
3494 einfo->ei_cbdata = o;
3495 einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
3498 memset(policy, 0, sizeof(*policy));
3499 policy->l_inodebits.bits = *ibits;
3500 policy->l_inodebits.try_bits = trybits;
3502 rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
3505 /* Return successfully acquired bits to a caller */
3507 struct ldlm_lock *lock = ldlm_handle2lock(lh);
3510 *ibits = lock->l_policy_data.l_inodebits.bits;
3511 LDLM_LOCK_PUT(lock);
3516 int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
3517 const struct lu_fid *fid, struct lustre_handle *lh,
3518 enum ldlm_mode mode, __u64 ibits, bool cache)
3520 return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
3524 int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
3525 struct mdt_lock_handle *lh, __u64 *ibits,
3526 __u64 trybits, bool cos_incompat)
3528 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
3529 union ldlm_policy_data *policy = &info->mti_policy;
3530 struct ldlm_res_id *res_id = &info->mti_res_id;
3531 __u64 dlmflags = 0, *cookie = NULL;
3535 LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
3536 LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
3537 LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
3538 LASSERT(lh->mlh_type != MDT_NUL_LOCK);
3541 LASSERT(lh->mlh_reg_mode == LCK_PW ||
3542 lh->mlh_reg_mode == LCK_EX);
3543 dlmflags |= LDLM_FL_COS_INCOMPAT;
3544 } else if (mdt_cos_is_enabled(info->mti_mdt)) {
3545 dlmflags |= LDLM_FL_COS_ENABLED;
3548 /* Only enqueue LOOKUP lock for remote object */
3549 LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
3551 /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
3552 if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
3553 *ibits == MDS_INODELOCK_OPEN)
3554 dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
3556 if (lh->mlh_type == MDT_PDO_LOCK) {
3557 /* check for exists after object is locked */
3558 if (mdt_object_exists(o) == 0) {
3559 /* Non-existent object shouldn't have PDO lock */
3562 /* Non-dir object shouldn't have PDO lock */
3563 if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
3568 fid_build_reg_res_name(mdt_object_fid(o), res_id);
3569 dlmflags |= LDLM_FL_ATOMIC_CB;
3572 cookie = &info->mti_exp->exp_handle.h_cookie;
3575 * Take PDO lock on whole directory and build correct @res_id for lock
3576 * on part of directory.
3578 if (lh->mlh_pdo_hash != 0) {
3579 LASSERT(lh->mlh_type == MDT_PDO_LOCK);
3580 mdt_lock_pdo_mode(info, o, lh);
3581 if (lh->mlh_pdo_mode != LCK_NL) {
3583 * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
3584 * is never going to be sent to client and we do not
3585 * want it slowed down due to possible cancels.
3587 policy->l_inodebits.bits =
3588 *ibits & MDS_INODELOCK_UPDATE;
3589 policy->l_inodebits.try_bits =
3590 trybits & MDS_INODELOCK_UPDATE;
3591 /* at least one of them should be set */
3592 LASSERT(policy->l_inodebits.bits |
3593 policy->l_inodebits.try_bits);
3594 rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
3595 lh->mlh_pdo_mode, policy, res_id,
3597 if (unlikely(rc != 0))
3598 GOTO(out_unlock, rc);
3602 * Finish res_id initializing by name hash marking part of
3603 * directory which is taking modification.
3605 res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
3608 policy->l_inodebits.bits = *ibits;
3609 policy->l_inodebits.try_bits = trybits;
3610 policy->l_inodebits.li_gid = lh->mlh_gid;
3613 * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
3614 * going to be sent to client. If it is - mdt_intent_policy() path will
3615 * fix it up and turn FL_LOCAL flag off.
3617 rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
3618 policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
3622 mdt_object_unlock(info, o, lh, 1);
3623 else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
3624 lh->mlh_pdo_hash != 0 &&
3625 (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
3626 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
3628 /* Return successfully acquired bits to a caller */
3630 struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
3633 *ibits = lock->l_policy_data.l_inodebits.bits;
3634 LDLM_LOCK_PUT(lock);
3640 mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
3641 struct mdt_lock_handle *lh, __u64 *ibits,
3642 __u64 trybits, bool cos_incompat)
3644 struct mdt_lock_handle *local_lh = NULL;
3648 if (!mdt_object_remote(o)) {
3649 rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
3654 /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
3655 *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
3656 MDS_INODELOCK_XATTR);
3658 /* Only enqueue LOOKUP lock for remote object */