4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2010, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_handler.c
33 * Lustre Metadata Target (mdt) request handler
35 * Author: Peter Braam <braam@clusterfs.com>
36 * Author: Andreas Dilger <adilger@clusterfs.com>
37 * Author: Phil Schwan <phil@clusterfs.com>
38 * Author: Mike Shaver <shaver@clusterfs.com>
39 * Author: Nikita Danilov <nikita@clusterfs.com>
40 * Author: Huang Hua <huanghua@clusterfs.com>
41 * Author: Yury Umanets <umka@clusterfs.com>
44 #define DEBUG_SUBSYSTEM S_MDS
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
66 #include <lustre_crypto.h>
68 #include "mdt_internal.h"
70 static unsigned int max_mod_rpcs_per_client = 8;
71 module_param(max_mod_rpcs_per_client, uint, 0644);
72 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
74 mdl_mode_t mdt_mdl_lock_modes[] = {
75 [LCK_MINMODE] = MDL_MINMODE,
82 [LCK_GROUP] = MDL_GROUP
85 enum ldlm_mode mdt_dlm_lock_modes[] = {
86 [MDL_MINMODE] = LCK_MINMODE,
93 [MDL_GROUP] = LCK_GROUP
96 static struct mdt_device *mdt_dev(struct lu_device *d);
98 static const struct lu_object_operations mdt_obj_ops;
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
109 static struct lu_kmem_descr mdt_caches[] = {
111 .ckd_cache = &mdt_object_kmem,
112 .ckd_name = "mdt_obj",
113 .ckd_size = sizeof(struct mdt_object)
116 .ckd_cache = &mdt_hsm_cdt_kmem,
117 .ckd_name = "mdt_cdt_restore_handle",
118 .ckd_size = sizeof(struct cdt_restore_handle)
121 .ckd_cache = &mdt_hsm_car_kmem,
122 .ckd_name = "mdt_cdt_agent_req",
123 .ckd_size = sizeof(struct cdt_agent_req)
130 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
134 return rep->lock_policy_res1 & op_flag;
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138 struct ldlm_reply *rep, __u64 op_flag)
141 info->mti_opdata &= ~op_flag;
142 tgt_opdata_clear(info->mti_env, op_flag);
145 rep->lock_policy_res1 &= ~op_flag;
148 void mdt_set_disposition(struct mdt_thread_info *info,
149 struct ldlm_reply *rep, __u64 op_flag)
152 info->mti_opdata |= op_flag;
153 tgt_opdata_set(info->mti_env, op_flag);
156 rep->lock_policy_res1 |= op_flag;
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
161 lh->mlh_pdo_hash = 0;
162 lh->mlh_reg_mode = lm;
163 lh->mlh_rreg_mode = lm;
164 lh->mlh_type = MDT_REG_LOCK;
167 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
169 mdt_lock_reg_init(lh, lock->l_req_mode);
170 if (lock->l_req_mode == LCK_GROUP)
171 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
174 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
175 const struct lu_name *lname)
177 lh->mlh_reg_mode = lock_mode;
178 lh->mlh_pdo_mode = LCK_MINMODE;
179 lh->mlh_rreg_mode = lock_mode;
180 lh->mlh_type = MDT_PDO_LOCK;
182 if (lu_name_is_valid(lname)) {
183 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
185 /* XXX Workaround for LU-2856
187 * Zero is a valid return value of full_name_hash, but
188 * several users of mlh_pdo_hash assume a non-zero
189 * hash value. We therefore map zero onto an
190 * arbitrary, but consistent value (1) to avoid
191 * problems further down the road. */
192 if (unlikely(lh->mlh_pdo_hash == 0))
193 lh->mlh_pdo_hash = 1;
195 lh->mlh_pdo_hash = 0;
199 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
200 struct mdt_lock_handle *lh)
206 * Any dir access needs couple of locks:
208 * 1) on part of dir we gonna take lookup/modify;
210 * 2) on whole dir to protect it from concurrent splitting and/or to
211 * flush client's cache for readdir().
213 * so, for a given mode and object this routine decides what lock mode
214 * to use for lock #2:
216 * 1) if caller's gonna lookup in dir then we need to protect dir from
217 * being splitted only - LCK_CR
219 * 2) if caller's gonna modify dir then we need to protect dir from
220 * being splitted and to flush cache - LCK_CW
222 * 3) if caller's gonna modify dir and that dir seems ready for
223 * splitting then we need to protect it from any type of access
224 * (lookup/modify/split) - LCK_EX --bzzz
227 LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
228 LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
231 * Ask underlaying level its opinion about preferable PDO lock mode
232 * having access type passed as regular lock mode:
234 * - MDL_MINMODE means that lower layer does not want to specify lock
237 * - MDL_NL means that no PDO lock should be taken. This is used in some
238 * cases. Say, for non-splittable directories no need to use PDO locks
241 mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
242 mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
244 if (mode != MDL_MINMODE) {
245 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
248 * Lower layer does not want to specify locking mode. We do it
249 * our selves. No special protection is needed, just flush
250 * client's cache on modification and allow concurrent
253 switch (lh->mlh_reg_mode) {
255 lh->mlh_pdo_mode = LCK_EX;
258 lh->mlh_pdo_mode = LCK_CR;
261 lh->mlh_pdo_mode = LCK_CW;
264 CERROR("Not expected lock type (0x%x)\n",
265 (int)lh->mlh_reg_mode);
270 LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
275 * Check whether \a o is directory stripe object.
277 * \param[in] info thread environment
278 * \param[in] o MDT object
280 * \retval 1 is directory stripe.
281 * \retval 0 isn't directory stripe.
282 * \retval < 1 error code
284 static int mdt_is_dir_stripe(struct mdt_thread_info *info,
285 struct mdt_object *o)
287 struct md_attr *ma = &info->mti_attr;
288 struct lmv_mds_md_v1 *lmv;
291 rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
295 if (!(ma->ma_valid & MA_LMV))
298 lmv = &ma->ma_lmv->lmv_md_v1;
300 if (!lmv_is_sane2(lmv))
303 if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
309 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
312 struct mdt_device *mdt = info->mti_mdt;
313 struct lu_name *lname = &info->mti_name;
314 const char *start = fileset;
315 char *filename = info->mti_filename;
316 struct mdt_object *parent;
320 LASSERT(!info->mti_cross_ref);
323 * We may want to allow this to mount a completely separate
324 * fileset from the MDT in the future, but keeping it to
325 * ROOT/ only for now avoid potential security issues.
327 *fid = mdt->mdt_md_root_fid;
329 while (rc == 0 && start != NULL && *start != '\0') {
330 const char *s1 = start;
336 while (*s2 != '/' && *s2 != '\0')
344 lname->ln_namelen = s2 - s1;
345 if (lname->ln_namelen > NAME_MAX) {
350 /* reject .. as a path component */
351 if (lname->ln_namelen == 2 &&
352 strncmp(s1, "..", 2) == 0) {
357 strncpy(filename, s1, lname->ln_namelen);
358 filename[lname->ln_namelen] = '\0';
359 lname->ln_name = filename;
361 parent = mdt_object_find(info->mti_env, mdt, fid);
362 if (IS_ERR(parent)) {
363 rc = PTR_ERR(parent);
366 /* Only got the fid of this obj by name */
368 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
369 fid, &info->mti_spec);
370 mdt_object_put(info->mti_env, parent);
373 parent = mdt_object_find(info->mti_env, mdt, fid);
375 rc = PTR_ERR(parent);
377 mode = lu_object_attr(&parent->mot_obj);
378 if (!S_ISDIR(mode)) {
380 } else if (mdt_is_remote_object(info, parent, parent)) {
381 if (!mdt->mdt_enable_remote_subdir_mount) {
383 LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
387 LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
392 mdt_object_put(info->mti_env, parent);
399 static int mdt_get_root(struct tgt_session_info *tsi)
401 struct mdt_thread_info *info = tsi2mdt_info(tsi);
402 struct mdt_device *mdt = info->mti_mdt;
403 struct mdt_body *repbody;
404 char *fileset = NULL, *buffer = NULL;
406 struct obd_export *exp = info->mti_exp;
407 char *nodemap_fileset;
411 rc = mdt_check_ucred(info);
413 GOTO(out, rc = err_serious(rc));
415 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
416 GOTO(out, rc = err_serious(-ENOMEM));
418 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
419 if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
420 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
422 GOTO(out, rc = err_serious(-EFAULT));
425 nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
426 if (nodemap_fileset && nodemap_fileset[0]) {
427 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
429 /* consider fileset from client as a sub-fileset
430 * of the nodemap one */
431 OBD_ALLOC(buffer, PATH_MAX + 1);
433 GOTO(out, rc = err_serious(-ENOMEM));
434 if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
435 nodemap_fileset, fileset) >= PATH_MAX + 1)
436 GOTO(out, rc = err_serious(-EINVAL));
439 /* enforce fileset as specified in the nodemap */
440 fileset = nodemap_fileset;
445 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
446 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
448 GOTO(out, rc = err_serious(rc));
450 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
452 repbody->mbo_valid |= OBD_MD_FLID;
456 mdt_thread_info_fini(info);
458 OBD_FREE(buffer, PATH_MAX+1);
462 static int mdt_statfs(struct tgt_session_info *tsi)
464 struct ptlrpc_request *req = tgt_ses_req(tsi);
465 struct mdt_thread_info *info = tsi2mdt_info(tsi);
466 struct mdt_device *mdt = info->mti_mdt;
467 struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
468 struct md_device *next = mdt->mdt_child;
469 struct ptlrpc_service_part *svcpt;
470 struct obd_statfs *osfs;
471 struct mdt_body *reqbody = NULL;
472 struct mdt_statfs_cache *msf;
473 ktime_t kstart = ktime_get();
474 int current_blockbits;
479 svcpt = req->rq_rqbd->rqbd_svcpt;
481 /* This will trigger a watchdog timeout */
482 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
483 (MDT_SERVICE_WATCHDOG_FACTOR *
484 at_get(&svcpt->scp_at_estimate)) + 1);
486 rc = mdt_check_ucred(info);
488 GOTO(out, rc = err_serious(rc));
490 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
491 GOTO(out, rc = err_serious(-ENOMEM));
493 osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
495 GOTO(out, rc = -EPROTO);
497 if (mdt_is_sum_statfs_client(req->rq_export) &&
498 lustre_packed_msg_size(req->rq_reqmsg) ==
499 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
500 &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
501 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
502 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
505 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
506 msf = &mdt->mdt_sum_osfs;
508 msf = &mdt->mdt_osfs;
510 if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
511 /** statfs data is too old, get up-to-date one */
512 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
513 rc = next->md_ops->mdo_statfs(info->mti_env,
516 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
520 spin_lock(&mdt->mdt_lock);
521 msf->msf_osfs = *osfs;
522 msf->msf_age = ktime_get_seconds();
523 spin_unlock(&mdt->mdt_lock);
525 /** use cached statfs data */
526 spin_lock(&mdt->mdt_lock);
527 *osfs = msf->msf_osfs;
528 spin_unlock(&mdt->mdt_lock);
531 /* tgd_blockbit is recordsize bits set during mkfs.
532 * This once set does not change. However, 'zfs set'
533 * can be used to change the MDT blocksize. Instead
534 * of using cached value of 'tgd_blockbit' always
535 * calculate the blocksize bits which may have
538 current_blockbits = fls64(osfs->os_bsize) - 1;
540 /* at least try to account for cached pages. its still racy and
541 * might be under-reporting if clients haven't announced their
542 * caches with brw recently */
543 CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
544 " pending %llu free %llu avail %llu\n",
545 tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
546 tgd->tgd_tot_pending,
547 osfs->os_bfree << current_blockbits,
548 osfs->os_bavail << current_blockbits);
550 osfs->os_bavail -= min_t(u64, osfs->os_bavail,
551 ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
552 osfs->os_bsize - 1) >> current_blockbits));
554 tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
555 CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
556 "%llu objects: %llu free; state %x\n",
557 osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
558 osfs->os_files, osfs->os_ffree, osfs->os_state);
560 if (!exp_grant_param_supp(tsi->tsi_exp) &&
561 current_blockbits > COMPAT_BSIZE_SHIFT) {
562 /* clients which don't support OBD_CONNECT_GRANT_PARAM
563 * should not see a block size > page size, otherwise
564 * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
565 * block size which is the biggest block size known to work
566 * with all client's page size. */
567 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
568 osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT;
569 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
570 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
573 mdt_counter_incr(req, LPROC_MDT_STATFS,
574 ktime_us_delta(ktime_get(), kstart));
576 mdt_thread_info_fini(info);
580 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
582 struct lov_comp_md_v1 *comp_v1;
583 struct lov_mds_md *v1;
585 __u32 dom_stripesize = 0;
587 bool has_ost_stripes = false;
594 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
597 comp_v1 = (struct lov_comp_md_v1 *)lmm;
598 off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
599 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
601 /* Fast check for DoM entry with no mirroring, should be the first */
602 if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
603 lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
606 /* check all entries otherwise */
607 for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
608 struct lov_comp_md_entry_v1 *lcme;
610 lcme = &comp_v1->lcm_entries[i];
611 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
614 off = le32_to_cpu(lcme->lcme_offset);
615 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
617 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
619 dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
621 has_ost_stripes = true;
623 if (dom_stripesize && has_ost_stripes)
624 RETURN(dom_stripesize);
626 /* DoM-only case exits here */
627 if (is_dom_only && dom_stripesize)
629 RETURN(dom_stripesize);
633 * Pack size attributes into the reply.
635 int mdt_pack_size2body(struct mdt_thread_info *info,
636 const struct lu_fid *fid, struct lustre_handle *lh)
639 struct md_attr *ma = &info->mti_attr;
641 bool dom_lock = false;
645 LASSERT(ma->ma_attr.la_valid & LA_MODE);
647 if (!S_ISREG(ma->ma_attr.la_mode) ||
648 !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
651 dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
652 /* no DoM stripe, no size in reply */
656 if (lustre_handle_is_used(lh)) {
657 struct ldlm_lock *lock;
659 lock = ldlm_handle2lock(lh);
661 dom_lock = ldlm_has_dom(lock);
666 /* no DoM lock, no size in reply */
670 /* Either DoM lock exists or LMM has only DoM stripe then
671 * return size on body. */
672 b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
674 mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
678 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
680 * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
682 * \param info thread info object
683 * \param repbody reply to pack ACLs into
684 * \param o mdt object of file to examine
685 * \param nodemap nodemap of client to reply to
687 * \retval -errno error getting or parsing ACL from disk
689 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
690 struct mdt_object *o, struct lu_nodemap *nodemap)
692 const struct lu_env *env = info->mti_env;
693 struct md_object *next = mdt_object_child(o);
694 struct lu_buf *buf = &info->mti_buf;
695 struct mdt_device *mdt = info->mti_mdt;
696 struct req_capsule *pill = info->mti_pill;
701 buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
702 buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
703 if (buf->lb_len == 0)
706 LASSERT(!info->mti_big_acl_used);
708 rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
710 if (rc == -ENODATA) {
711 repbody->mbo_aclsize = 0;
712 repbody->mbo_valid |= OBD_MD_FLACL;
714 } else if (rc == -EOPNOTSUPP) {
716 } else if (rc == -ERANGE) {
717 if (exp_connect_large_acl(info->mti_exp) &&
718 !info->mti_big_acl_used) {
719 if (info->mti_big_acl == NULL) {
720 info->mti_big_aclsize =
722 mdt->mdt_max_ea_size,
724 OBD_ALLOC_LARGE(info->mti_big_acl,
725 info->mti_big_aclsize);
726 if (info->mti_big_acl == NULL) {
727 info->mti_big_aclsize = 0;
728 CERROR("%s: unable to grow "
731 PFID(mdt_object_fid(o)));
736 CDEBUG(D_INODE, "%s: grow the "DFID
737 " ACL buffer to size %d\n",
739 PFID(mdt_object_fid(o)),
740 info->mti_big_aclsize);
742 buf->lb_buf = info->mti_big_acl;
743 buf->lb_len = info->mti_big_aclsize;
744 info->mti_big_acl_used = 1;
747 /* FS has ACL bigger that our limits */
748 CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
749 mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
750 info->mti_big_aclsize);
753 CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
754 mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
757 rc = nodemap_map_acl(nodemap, buf->lb_buf,
758 rc, NODEMAP_FS_TO_CLIENT);
759 /* if all ACLs mapped out, rc is still >= 0 */
761 CERROR("%s: nodemap_map_acl unable to parse "DFID
762 " ACL: rc = %d\n", mdt_obd_name(mdt),
763 PFID(mdt_object_fid(o)), rc);
764 repbody->mbo_aclsize = 0;
765 repbody->mbo_valid &= ~OBD_MD_FLACL;
767 repbody->mbo_aclsize = rc;
768 repbody->mbo_valid |= OBD_MD_FLACL;
777 /* XXX Look into layout in MDT layer. */
778 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
780 struct lov_comp_md_v1 *comp_v1;
781 struct lov_mds_md *v1;
784 if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
785 comp_v1 = (struct lov_comp_md_v1 *)lmm;
787 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
788 v1 = (struct lov_mds_md *)((char *)comp_v1 +
789 comp_v1->lcm_entries[i].lcme_offset);
790 /* We don't support partial release for now */
791 if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
796 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
801 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
802 const struct lu_attr *attr, const struct lu_fid *fid)
804 struct md_attr *ma = &info->mti_attr;
805 struct obd_export *exp = info->mti_exp;
806 struct lu_nodemap *nodemap = NULL;
808 LASSERT(ma->ma_valid & MA_INODE);
810 if (attr->la_valid & LA_ATIME) {
811 b->mbo_atime = attr->la_atime;
812 b->mbo_valid |= OBD_MD_FLATIME;
814 if (attr->la_valid & LA_MTIME) {
815 b->mbo_mtime = attr->la_mtime;
816 b->mbo_valid |= OBD_MD_FLMTIME;
818 if (attr->la_valid & LA_CTIME) {
819 b->mbo_ctime = attr->la_ctime;
820 b->mbo_valid |= OBD_MD_FLCTIME;
822 if (attr->la_valid & LA_BTIME) {
823 b->mbo_btime = attr->la_btime;
824 b->mbo_valid |= OBD_MD_FLBTIME;
826 if (attr->la_valid & LA_FLAGS) {
827 b->mbo_flags = attr->la_flags;
828 b->mbo_valid |= OBD_MD_FLFLAGS;
830 if (attr->la_valid & LA_NLINK) {
831 b->mbo_nlink = attr->la_nlink;
832 b->mbo_valid |= OBD_MD_FLNLINK;
834 if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) {
835 nodemap = nodemap_get_from_exp(exp);
839 if (attr->la_valid & LA_UID) {
840 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
841 NODEMAP_FS_TO_CLIENT,
843 b->mbo_valid |= OBD_MD_FLUID;
845 if (attr->la_valid & LA_GID) {
846 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
847 NODEMAP_FS_TO_CLIENT,
849 b->mbo_valid |= OBD_MD_FLGID;
852 if (attr->la_valid & LA_PROJID) {
853 b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
854 NODEMAP_FS_TO_CLIENT,
856 b->mbo_valid |= OBD_MD_FLPROJID;
859 b->mbo_mode = attr->la_mode;
860 if (attr->la_valid & LA_MODE)
861 b->mbo_valid |= OBD_MD_FLMODE;
862 if (attr->la_valid & LA_TYPE)
863 b->mbo_valid |= OBD_MD_FLTYPE;
867 b->mbo_valid |= OBD_MD_FLID;
868 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
869 PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
872 if (!(attr->la_valid & LA_TYPE))
875 b->mbo_rdev = attr->la_rdev;
876 b->mbo_size = attr->la_size;
877 b->mbo_blocks = attr->la_blocks;
879 if (!S_ISREG(attr->la_mode)) {
880 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
881 } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
882 /* means no objects are allocated on osts. */
883 LASSERT(!(ma->ma_valid & MA_LOV));
884 /* just ignore blocks occupied by extend attributes on MDS */
886 /* if no object is allocated on osts, the size on mds is valid.
888 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
889 } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
890 if (mdt_hsm_is_released(ma->ma_lmm)) {
891 /* A released file stores its size on MDS. */
892 /* But return 1 block for released file, unless tools
893 * like tar will consider it fully sparse. (LU-3864)
895 if (unlikely(b->mbo_size == 0))
899 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
900 } else if (info->mti_som_valid) { /* som is valid */
901 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
902 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
903 b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
904 b->mbo_size = ma->ma_som.ms_size;
905 b->mbo_blocks = ma->ma_som.ms_blocks;
909 if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
910 b->mbo_valid & OBD_MD_FLLAZYSIZE))
911 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
912 PFID(fid), (unsigned long long)b->mbo_size);
915 if (!IS_ERR_OR_NULL(nodemap))
916 nodemap_putref(nodemap);
919 static inline int mdt_body_has_lov(const struct lu_attr *la,
920 const struct mdt_body *body)
922 return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
923 (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
926 void mdt_client_compatibility(struct mdt_thread_info *info)
928 struct mdt_body *body;
929 struct ptlrpc_request *req = mdt_info_req(info);
930 struct obd_export *exp = req->rq_export;
931 struct md_attr *ma = &info->mti_attr;
932 struct lu_attr *la = &ma->ma_attr;
935 if (exp_connect_layout(exp))
936 /* the client can deal with 16-bit lmm_stripe_count */
939 body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
941 if (!mdt_body_has_lov(la, body))
944 /* now we have a reply with a lov for a client not compatible with the
945 * layout lock so we have to clean the layout generation number */
946 if (S_ISREG(la->la_mode))
947 ma->ma_lmm->lmm_layout_gen = 0;
951 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
952 struct mdt_object *o)
954 const struct lu_env *env = info->mti_env;
957 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
966 /* Is it a directory? Let's check for the LMV as well */
967 if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
968 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
972 rc2 = mo_xattr_get(env, mdt_object_child(o),
974 XATTR_NAME_DEFAULT_LMV);
976 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
984 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
987 const struct lu_env *env = info->mti_env;
991 LASSERT(info->mti_big_lmm_used == 0);
992 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
996 /* big_lmm may need to be grown */
997 if (info->mti_big_lmmsize < rc) {
998 int size = size_roundup_power2(rc);
1000 if (info->mti_big_lmmsize > 0) {
1001 /* free old buffer */
1002 LASSERT(info->mti_big_lmm);
1003 OBD_FREE_LARGE(info->mti_big_lmm,
1004 info->mti_big_lmmsize);
1005 info->mti_big_lmm = NULL;
1006 info->mti_big_lmmsize = 0;
1009 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
1010 if (info->mti_big_lmm == NULL)
1012 info->mti_big_lmmsize = size;
1014 LASSERT(info->mti_big_lmmsize >= rc);
1016 info->mti_buf.lb_buf = info->mti_big_lmm;
1017 info->mti_buf.lb_len = info->mti_big_lmmsize;
1018 rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
1023 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1024 struct md_attr *ma, const char *name)
1026 struct md_object *next = mdt_object_child(o);
1027 struct lu_buf *buf = &info->mti_buf;
1030 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1031 buf->lb_buf = ma->ma_lmm;
1032 buf->lb_len = ma->ma_lmm_size;
1033 LASSERT(!(ma->ma_valid & MA_LOV));
1034 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1035 buf->lb_buf = ma->ma_lmv;
1036 buf->lb_len = ma->ma_lmv_size;
1037 LASSERT(!(ma->ma_valid & MA_LMV));
1038 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1039 buf->lb_buf = ma->ma_default_lmv;
1040 buf->lb_len = ma->ma_default_lmv_size;
1041 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1046 LASSERT(buf->lb_buf);
1048 rc = mo_xattr_get(info->mti_env, next, buf, name);
1052 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1053 if (info->mti_big_lmm_used)
1054 ma->ma_lmm = info->mti_big_lmm;
1056 /* NOT return LOV EA with hole to old client. */
1057 if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1058 LOV_PATTERN_F_HOLE) &&
1059 !(exp_connect_flags(info->mti_exp) &
1060 OBD_CONNECT_LFSCK)) {
1063 ma->ma_lmm_size = rc;
1064 ma->ma_valid |= MA_LOV;
1066 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1067 if (info->mti_big_lmm_used)
1068 ma->ma_lmv = info->mti_big_lmm;
1070 ma->ma_lmv_size = rc;
1071 ma->ma_valid |= MA_LMV;
1072 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1073 ma->ma_default_lmv_size = rc;
1074 ma->ma_valid |= MA_LMV_DEF;
1077 /* Update mdt_max_mdsize so all clients will be aware that */
1078 if (info->mti_mdt->mdt_max_mdsize < rc)
1079 info->mti_mdt->mdt_max_mdsize = rc;
1082 } else if (rc == -ENODATA) {
1085 } else if (rc == -ERANGE) {
1086 /* Default LMV has fixed size, so it must be able to fit
1087 * in the original buffer */
1088 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1090 rc = mdt_big_xattr_get(info, o, name);
1092 info->mti_big_lmm_used = 1;
1100 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1101 struct md_attr *ma, const char *name)
1105 if (!info->mti_big_lmm) {
1106 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1107 if (!info->mti_big_lmm)
1109 info->mti_big_lmmsize = PAGE_SIZE;
1112 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1113 ma->ma_lmm = info->mti_big_lmm;
1114 ma->ma_lmm_size = info->mti_big_lmmsize;
1115 ma->ma_valid &= ~MA_LOV;
1116 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1117 ma->ma_lmv = info->mti_big_lmm;
1118 ma->ma_lmv_size = info->mti_big_lmmsize;
1119 ma->ma_valid &= ~MA_LMV;
1124 LASSERT(!info->mti_big_lmm_used);
1125 rc = __mdt_stripe_get(info, o, ma, name);
1126 /* since big_lmm is always used here, clear 'used' flag to avoid
1127 * assertion in mdt_big_xattr_get().
1129 info->mti_big_lmm_used = 0;
1134 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1135 struct lu_fid *pfid)
1137 struct lu_buf *buf = &info->mti_buf;
1138 struct link_ea_header *leh;
1139 struct link_ea_entry *lee;
1143 buf->lb_buf = info->mti_big_lmm;
1144 buf->lb_len = info->mti_big_lmmsize;
1145 rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1146 buf, XATTR_NAME_LINK);
1147 /* ignore errors, MA_PFID won't be set and it is
1148 * up to the caller to treat this as an error */
1149 if (rc == -ERANGE || buf->lb_len == 0) {
1150 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1151 buf->lb_buf = info->mti_big_lmm;
1152 buf->lb_len = info->mti_big_lmmsize;
1157 if (rc < sizeof(*leh)) {
1158 CERROR("short LinkEA on "DFID": rc = %d\n",
1159 PFID(mdt_object_fid(o)), rc);
1163 leh = (struct link_ea_header *) buf->lb_buf;
1164 lee = (struct link_ea_entry *)(leh + 1);
1165 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1166 leh->leh_magic = LINK_EA_MAGIC;
1167 leh->leh_reccount = __swab32(leh->leh_reccount);
1168 leh->leh_len = __swab64(leh->leh_len);
1170 if (leh->leh_magic != LINK_EA_MAGIC)
1172 if (leh->leh_reccount == 0)
1175 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1176 fid_be_to_cpu(pfid, pfid);
1181 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1182 struct lu_fid *pfid, struct lu_name *lname)
1184 struct lu_buf *buf = &info->mti_buf;
1185 struct link_ea_header *leh;
1186 struct link_ea_entry *lee;
1190 buf->lb_buf = info->mti_xattr_buf;
1191 buf->lb_len = sizeof(info->mti_xattr_buf);
1192 rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1194 if (rc == -ERANGE) {
1195 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1196 buf->lb_buf = info->mti_big_lmm;
1197 buf->lb_len = info->mti_big_lmmsize;
1202 if (rc < sizeof(*leh)) {
1203 CERROR("short LinkEA on "DFID": rc = %d\n",
1204 PFID(mdt_object_fid(o)), rc);
1208 leh = (struct link_ea_header *)buf->lb_buf;
1209 lee = (struct link_ea_entry *)(leh + 1);
1210 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1211 leh->leh_magic = LINK_EA_MAGIC;
1212 leh->leh_reccount = __swab32(leh->leh_reccount);
1213 leh->leh_len = __swab64(leh->leh_len);
1215 if (leh->leh_magic != LINK_EA_MAGIC)
1218 if (leh->leh_reccount == 0)
1221 linkea_entry_unpack(lee, &reclen, lname, pfid);
1226 int mdt_attr_get_complex(struct mdt_thread_info *info,
1227 struct mdt_object *o, struct md_attr *ma)
1229 const struct lu_env *env = info->mti_env;
1230 struct md_object *next = mdt_object_child(o);
1231 struct lu_buf *buf = &info->mti_buf;
1232 int need = ma->ma_need;
1239 if (mdt_object_exists(o) == 0)
1240 GOTO(out, rc = -ENOENT);
1241 mode = lu_object_attr(&next->mo_lu);
1243 if (need & MA_INODE) {
1244 ma->ma_need = MA_INODE;
1245 rc = mo_attr_get(env, next, ma);
1250 (void) mdt_get_som(info, o, ma);
1251 ma->ma_valid |= MA_INODE;
1254 if (need & MA_PFID) {
1255 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1257 ma->ma_valid |= MA_PFID;
1258 /* ignore this error, parent fid is not mandatory */
1262 if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1263 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1268 if (need & MA_LMV && S_ISDIR(mode)) {
1269 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1274 if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1275 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1281 * In the handle of MA_INODE, we may already get the SOM attr.
1283 if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1284 rc = mdt_get_som(info, o, ma);
1289 if (need & MA_HSM && S_ISREG(mode)) {
1290 buf->lb_buf = info->mti_xattr_buf;
1291 buf->lb_len = sizeof(info->mti_xattr_buf);
1292 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1293 sizeof(info->mti_xattr_buf));
1294 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1295 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1297 ma->ma_valid |= MA_HSM;
1298 else if (rc2 < 0 && rc2 != -ENODATA)
1299 GOTO(out, rc = rc2);
1302 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1303 if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1304 buf->lb_buf = ma->ma_acl;
1305 buf->lb_len = ma->ma_acl_size;
1306 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1308 ma->ma_acl_size = rc2;
1309 ma->ma_valid |= MA_ACL_DEF;
1310 } else if (rc2 == -ENODATA) {
1312 ma->ma_acl_size = 0;
1314 GOTO(out, rc = rc2);
1319 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1320 rc, ma->ma_valid, ma->ma_lmm);
1324 static int mdt_getattr_internal(struct mdt_thread_info *info,
1325 struct mdt_object *o, int ma_need)
1327 struct mdt_device *mdt = info->mti_mdt;
1328 struct md_object *next = mdt_object_child(o);
1329 const struct mdt_body *reqbody = info->mti_body;
1330 struct ptlrpc_request *req = mdt_info_req(info);
1331 struct md_attr *ma = &info->mti_attr;
1332 struct lu_attr *la = &ma->ma_attr;
1333 struct req_capsule *pill = info->mti_pill;
1334 const struct lu_env *env = info->mti_env;
1335 struct mdt_body *repbody;
1336 struct lu_buf *buffer = &info->mti_buf;
1337 struct obd_export *exp = info->mti_exp;
1338 ktime_t kstart = ktime_get();
1343 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1344 RETURN(err_serious(-ENOMEM));
1346 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1350 if (mdt_object_remote(o)) {
1351 /* This object is located on remote node.*/
1352 /* Return -ENOTSUPP for old client */
1353 if (!mdt_is_dne_client(req->rq_export))
1354 GOTO(out, rc = -ENOTSUPP);
1356 repbody->mbo_fid1 = *mdt_object_fid(o);
1357 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1361 if (reqbody->mbo_eadatasize > 0) {
1362 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1363 if (buffer->lb_buf == NULL)
1364 GOTO(out, rc = -EPROTO);
1365 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1368 buffer->lb_buf = NULL;
1370 ma_need &= ~(MA_LOV | MA_LMV);
1371 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1372 mdt_obd_name(info->mti_mdt),
1373 req->rq_export->exp_client_uuid.uuid);
1376 /* from 2.12.58 intent_getattr pack default LMV in reply */
1377 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1378 ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1379 (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1380 req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1382 ma->ma_lmv = buffer->lb_buf;
1383 ma->ma_lmv_size = buffer->lb_len;
1384 ma->ma_default_lmv = req_capsule_server_get(pill,
1385 &RMF_DEFAULT_MDT_MD);
1386 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1387 &RMF_DEFAULT_MDT_MD,
1389 ma->ma_need = MA_INODE;
1390 if (ma->ma_lmv_size > 0)
1391 ma->ma_need |= MA_LMV;
1392 if (ma->ma_default_lmv_size > 0)
1393 ma->ma_need |= MA_LMV_DEF;
1394 } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1395 (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1396 /* If it is dir and client require MEA, then we got MEA */
1397 /* Assumption: MDT_MD size is enough for lmv size. */
1398 ma->ma_lmv = buffer->lb_buf;
1399 ma->ma_lmv_size = buffer->lb_len;
1400 ma->ma_need = MA_INODE;
1401 if (ma->ma_lmv_size > 0) {
1402 if (reqbody->mbo_valid & OBD_MD_MEA) {
1403 ma->ma_need |= MA_LMV;
1404 } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1405 ma->ma_need |= MA_LMV_DEF;
1406 ma->ma_default_lmv = buffer->lb_buf;
1408 ma->ma_default_lmv_size = buffer->lb_len;
1409 ma->ma_lmv_size = 0;
1413 ma->ma_lmm = buffer->lb_buf;
1414 ma->ma_lmm_size = buffer->lb_len;
1415 ma->ma_need = MA_INODE | MA_HSM;
1416 if (ma->ma_lmm_size > 0) {
1417 ma->ma_need |= MA_LOV;
1418 /* Older clients may crash if they getattr overstriped
1421 if (!exp_connect_overstriping(exp) &&
1422 mdt_lmm_is_overstriping(ma->ma_lmm))
1423 RETURN(-EOPNOTSUPP);
1427 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1428 reqbody->mbo_valid & OBD_MD_FLDIREA &&
1429 lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1430 /* get default stripe info for this dir. */
1431 ma->ma_need |= MA_LOV_DEF;
1433 ma->ma_need |= ma_need;
1435 rc = mdt_attr_get_complex(info, o, ma);
1437 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1438 "%s: getattr error for "DFID": rc = %d\n",
1439 mdt_obd_name(info->mti_mdt),
1440 PFID(mdt_object_fid(o)), rc);
1444 /* if file is released, check if a restore is running */
1445 if (ma->ma_valid & MA_HSM) {
1446 repbody->mbo_valid |= OBD_MD_TSTATE;
1447 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1448 mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1449 repbody->mbo_t_state = MS_RESTORE;
1452 if (unlikely(!(ma->ma_valid & MA_INODE)))
1455 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1457 if (mdt_body_has_lov(la, reqbody)) {
1458 u32 stripe_count = 1;
1459 bool fixed_layout = false;
1461 if (ma->ma_valid & MA_LOV) {
1462 LASSERT(ma->ma_lmm_size);
1463 repbody->mbo_eadatasize = ma->ma_lmm_size;
1464 if (S_ISDIR(la->la_mode))
1465 repbody->mbo_valid |= OBD_MD_FLDIREA;
1467 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1468 mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1470 if (ma->ma_valid & MA_LMV) {
1471 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1472 u32 magic = le32_to_cpu(lmv->lmv_magic);
1474 /* Return -ENOTSUPP for old client */
1475 if (!mdt_is_striped_client(req->rq_export))
1478 LASSERT(S_ISDIR(la->la_mode));
1479 mdt_dump_lmv(D_INFO, ma->ma_lmv);
1480 repbody->mbo_eadatasize = ma->ma_lmv_size;
1481 repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1483 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1484 fixed_layout = lmv_is_fixed(lmv);
1485 if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1486 mdt_restripe_migrate_add(info, o);
1487 else if (magic == LMV_MAGIC_V1 &&
1488 lmv_is_restriping(lmv))
1489 mdt_restripe_update_add(info, o);
1491 if (ma->ma_valid & MA_LMV_DEF) {
1492 /* Return -ENOTSUPP for old client */
1493 if (!mdt_is_striped_client(req->rq_export))
1495 LASSERT(S_ISDIR(la->la_mode));
1497 * when ll_dir_getstripe() gets default LMV, it
1498 * checks mbo_eadatasize.
1500 if (!(ma->ma_valid & MA_LMV))
1501 repbody->mbo_eadatasize =
1502 ma->ma_default_lmv_size;
1503 repbody->mbo_valid |= (OBD_MD_FLDIREA |
1504 OBD_MD_DEFAULT_MEA);
1507 "dirent count %llu stripe count %u MDT count %d\n",
1508 ma->ma_attr.la_dirent_count, stripe_count,
1509 atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1510 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1511 ma->ma_attr.la_dirent_count >
1512 mdt->mdt_restriper.mdr_dir_split_count &&
1513 !fid_is_root(mdt_object_fid(o)) &&
1514 mdt->mdt_enable_dir_auto_split &&
1515 !o->mot_restriping &&
1516 stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
1518 mdt_auto_split_add(info, o);
1519 } else if (S_ISLNK(la->la_mode) &&
1520 reqbody->mbo_valid & OBD_MD_LINKNAME) {
1521 buffer->lb_buf = ma->ma_lmm;
1522 /* eadatasize from client includes NULL-terminator, so
1523 * there is no need to read it */
1524 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1525 rc = mo_readlink(env, next, buffer);
1526 if (unlikely(rc <= 0)) {
1527 CERROR("%s: readlink failed for "DFID": rc = %d\n",
1528 mdt_obd_name(info->mti_mdt),
1529 PFID(mdt_object_fid(o)), rc);
1532 int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1534 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1536 repbody->mbo_valid |= OBD_MD_LINKNAME;
1537 /* we need to report back size with NULL-terminator
1538 * because client expects that */
1539 repbody->mbo_eadatasize = rc + 1;
1540 if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1541 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1542 "on "DFID ", expected %d\n",
1543 mdt_obd_name(info->mti_mdt),
1544 rc, PFID(mdt_object_fid(o)),
1545 reqbody->mbo_eadatasize - 1);
1546 /* NULL terminate */
1547 ((char *)ma->ma_lmm)[rc] = 0;
1549 /* If the total CDEBUG() size is larger than a page, it
1550 * will print a warning to the console, avoid this by
1551 * printing just the last part of the symlink. */
1552 CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1553 print_limit < rc ? "..." : "", print_limit,
1554 (char *)ma->ma_lmm + rc - print_limit, rc);
1559 if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1560 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1561 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1562 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1563 repbody->mbo_max_mdsize);
1566 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1567 if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1568 (reqbody->mbo_valid & OBD_MD_FLACL)) {
1569 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1570 if (IS_ERR(nodemap))
1571 RETURN(PTR_ERR(nodemap));
1573 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1574 nodemap_putref(nodemap);
1580 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1581 ktime_us_delta(ktime_get(), kstart));
1586 static int mdt_getattr(struct tgt_session_info *tsi)
1588 struct mdt_thread_info *info = tsi2mdt_info(tsi);
1589 struct mdt_object *obj = info->mti_object;
1590 struct req_capsule *pill = info->mti_pill;
1591 struct mdt_body *reqbody;
1592 struct mdt_body *repbody;
1596 if (unlikely(info->mti_object == NULL))
1599 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1601 LASSERT(lu_object_assert_exists(&obj->mot_obj));
1603 /* Special case for Data-on-MDT files to get data version */
1604 if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1605 rc = mdt_data_version_get(tsi);
1609 /* Unlike intent case where we need to pre-fill out buffers early on
1610 * in intent policy for ldlm reasons, here we can have a much better
1611 * guess at EA size by just reading it from disk.
1612 * Exceptions are readdir and (missing) directory striping */
1614 if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1615 /* No easy way to know how long is the symlink, but it cannot
1616 * be more than PATH_MAX, so we allocate +1 */
1618 /* A special case for fs ROOT: getattr there might fetch
1619 * default EA for entire fs, not just for this dir!
1621 } else if (lu_fid_eq(mdt_object_fid(obj),
1622 &info->mti_mdt->mdt_md_root_fid) &&
1623 (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1624 (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1626 /* Should the default strping be bigger, mdt_fix_reply
1627 * will reallocate */
1628 rc = DEF_REP_MD_SIZE;
1630 /* Read the actual EA size from disk */
1631 rc = mdt_attr_get_eabuf_size(info, obj);
1635 GOTO(out, rc = err_serious(rc));
1637 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1639 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1640 * by default. If the target object has more ACL entries, then
1641 * enlarge the buffer when necessary. */
1642 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1643 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1645 rc = req_capsule_server_pack(pill);
1646 if (unlikely(rc != 0))
1647 GOTO(out, rc = err_serious(rc));
1649 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1650 LASSERT(repbody != NULL);
1651 repbody->mbo_eadatasize = 0;
1652 repbody->mbo_aclsize = 0;
1654 rc = mdt_check_ucred(info);
1656 GOTO(out_shrink, rc);
1658 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1660 rc = mdt_getattr_internal(info, obj, 0);
1663 mdt_client_compatibility(info);
1664 rc2 = mdt_fix_reply(info);
1668 mdt_thread_info_fini(info);
1673 * Handler of layout intent RPC requiring the layout modification
1675 * \param[in] info thread environment
1676 * \param[in] obj object
1677 * \param[out] lhc object ldlm lock handle
1678 * \param[in] layout layout change descriptor
1680 * \retval 0 on success
1681 * \retval < 0 error code
1683 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1684 struct mdt_lock_handle *lhc,
1685 struct md_layout_change *layout)
1691 if (!mdt_object_exists(obj))
1694 if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1697 rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1702 rc = mdt_check_resent_lock(info, obj, lhc);
1708 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1710 /* take layout lock to prepare layout change */
1711 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1712 lockpart |= MDS_INODELOCK_UPDATE;
1714 mdt_lock_handle_init(lhc);
1715 mdt_lock_reg_init(lhc, LCK_EX);
1716 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1721 mutex_lock(&obj->mot_som_mutex);
1722 rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1723 mutex_unlock(&obj->mot_som_mutex);
1726 mdt_object_unlock(info, obj, lhc, 1);
1732 * Exchange MOF_LOV_CREATED flags between two objects after a
1733 * layout swap. No assumption is made on whether o1 or o2 have
1734 * created objects or not.
1736 * \param[in,out] o1 First swap layout object
1737 * \param[in,out] o2 Second swap layout object
1739 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1741 unsigned int o1_lov_created = o1->mot_lov_created;
1743 mutex_lock(&o1->mot_lov_mutex);
1744 mutex_lock(&o2->mot_lov_mutex);
1746 o1->mot_lov_created = o2->mot_lov_created;
1747 o2->mot_lov_created = o1_lov_created;
1749 mutex_unlock(&o2->mot_lov_mutex);
1750 mutex_unlock(&o1->mot_lov_mutex);
1753 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1755 struct mdt_thread_info *info;
1756 struct ptlrpc_request *req = tgt_ses_req(tsi);
1757 struct obd_export *exp = req->rq_export;
1758 struct mdt_object *o1, *o2, *o;
1759 struct mdt_lock_handle *lh1, *lh2;
1760 struct mdc_swap_layouts *msl;
1764 /* client does not support layout lock, so layout swaping
1766 * FIXME: there is a problem for old clients which don't support
1767 * layout lock yet. If those clients have already opened the file
1768 * they won't be notified at all so that old layout may still be
1769 * used to do IO. This can be fixed after file release is landed by
1770 * doing exclusive open and taking full EX ibits lock. - Jinshan */
1771 if (!exp_connect_layout(exp))
1772 RETURN(-EOPNOTSUPP);
1774 info = tsi2mdt_info(tsi);
1775 if (unlikely(info->mti_object == NULL))
1778 if (info->mti_dlm_req != NULL)
1779 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1781 o1 = info->mti_object;
1782 o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1783 &info->mti_body->mbo_fid2);
1785 GOTO(out, rc = PTR_ERR(o));
1787 if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1788 GOTO(put, rc = -ENOENT);
1790 rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1791 if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1797 /* permission check. Make sure the calling process having permission
1798 * to write both files. */
1799 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1804 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1809 msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1811 GOTO(put, rc = -EPROTO);
1813 lh1 = &info->mti_lh[MDT_LH_NEW];
1814 mdt_lock_reg_init(lh1, LCK_EX);
1815 lh2 = &info->mti_lh[MDT_LH_OLD];
1816 mdt_lock_reg_init(lh2, LCK_EX);
1818 rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1819 MDS_INODELOCK_XATTR);
1823 rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1824 MDS_INODELOCK_XATTR);
1828 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1829 mdt_object_child(o2), msl->msl_flags);
1833 mdt_swap_lov_flag(o1, o2);
1836 mdt_object_unlock(info, o2, lh2, rc);
1838 mdt_object_unlock(info, o1, lh1, rc);
1840 mdt_object_put(info->mti_env, o);
1842 mdt_thread_info_fini(info);
1846 static int mdt_raw_lookup(struct mdt_thread_info *info,
1847 struct mdt_object *parent,
1848 const struct lu_name *lname)
1850 struct lu_fid *fid = &info->mti_tmp_fid1;
1851 struct mdt_body *repbody;
1852 bool is_dotdot = false;
1853 bool is_old_parent_stripe = false;
1854 bool is_new_parent_checked = false;
1859 LASSERT(!info->mti_cross_ref);
1860 /* Always allow to lookup ".." */
1861 if (lname->ln_namelen == 2 &&
1862 lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
1863 info->mti_spec.sp_permitted = 1;
1865 if (mdt_is_dir_stripe(info, parent) == 1)
1866 is_old_parent_stripe = true;
1869 mdt_object_get(info->mti_env, parent);
1871 /* Only got the fid of this obj by name */
1873 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
1875 mdt_object_put(info->mti_env, parent);
1879 /* getattr_name("..") should return master object FID for striped dir */
1880 if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
1881 parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1883 RETURN(PTR_ERR(parent));
1885 /* old client getattr_name("..") with stripe FID */
1886 if (unlikely(is_old_parent_stripe)) {
1887 is_old_parent_stripe = false;
1891 /* ".." may be a stripe */
1892 if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
1893 is_new_parent_checked = true;
1897 mdt_object_put(info->mti_env, parent);
1900 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1901 repbody->mbo_fid1 = *fid;
1902 repbody->mbo_valid = OBD_MD_FLID;
1908 * Find name matching hash
1910 * We search \a child LinkEA for a name whose hash matches \a lname
1911 * (it contains an encoded hash).
1913 * \param info mdt thread info
1914 * \param lname encoded hash to find
1915 * \param parent parent object
1916 * \param child object to search with LinkEA
1917 * \param force_check true to check hash even if LinkEA has only one entry
1919 * \retval 1 match found
1920 * \retval 0 no match found
1921 * \retval -ev negative errno upon error
1923 int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
1924 struct mdt_object *parent, struct mdt_object *child,
1927 /* Here, lname is an encoded hash of on-disk name, and
1928 * client is doing access without encryption key.
1929 * So we need to get LinkEA, check parent fid is correct and
1930 * compare name hash with the one in the request.
1932 struct lu_buf *buf = &info->mti_big_buf;
1933 struct lu_name name;
1935 struct linkea_data ldata = { NULL };
1936 struct link_ea_header *leh;
1937 struct link_ea_entry *lee;
1938 struct lu_buf link = { 0 };
1940 int reclen, count, rc;
1944 if (lname->ln_namelen < LLCRYPT_FNAME_DIGEST_SIZE)
1947 buf = lu_buf_check_and_alloc(buf, PATH_MAX);
1952 rc = mdt_links_read(info, child, &ldata);
1957 if (force_check || leh->leh_reccount > 1) {
1958 hash = kmalloc(lname->ln_namelen, GFP_NOFS);
1961 rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
1963 lee = (struct link_ea_entry *)(leh + 1);
1964 for (count = 0; count < leh->leh_reccount; count++) {
1965 linkea_entry_unpack(lee, &reclen, &name, &pfid);
1966 if (!force_check && leh->leh_reccount == 1) {
1967 /* if there is only one rec, it has to be it */
1971 if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
1972 lu_buf_check_and_alloc(&link, name.ln_namelen);
1974 GOTO(out_match, rc = -ENOMEM);
1975 rc = critical_decode(name.ln_name, name.ln_namelen,
1978 if (memcmp(LLCRYPT_FNAME_DIGEST(link.lb_buf, rc),
1979 hash, LLCRYPT_FNAME_DIGEST_SIZE) == 0) {
1984 lee = (struct link_ea_entry *) ((char *)lee + reclen);
1986 if (count == leh->leh_reccount)
1999 * UPDATE lock should be taken against parent, and be released before exit;
2000 * child_bits lock should be taken against child, and be returned back:
2001 * (1)normal request should release the child lock;
2002 * (2)intent request will grant the lock to client.
2004 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
2005 struct mdt_lock_handle *lhc,
2007 struct ldlm_reply *ldlm_rep)
2009 struct ptlrpc_request *req = mdt_info_req(info);
2010 struct mdt_body *reqbody = NULL;
2011 struct mdt_object *parent = info->mti_object;
2012 struct mdt_object *child = NULL;
2013 struct lu_fid *child_fid = &info->mti_tmp_fid1;
2014 struct lu_name *lname = NULL;
2015 struct mdt_lock_handle *lhp = NULL;
2016 struct ldlm_lock *lock;
2017 struct req_capsule *pill = info->mti_pill;
2025 is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
2026 LASSERT(ergo(is_resent,
2027 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
2032 if (info->mti_cross_ref) {
2033 /* Only getattr on the child. Parent is on another node. */
2034 mdt_set_disposition(info, ldlm_rep,
2035 DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
2037 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
2039 PFID(mdt_object_fid(child)), ldlm_rep);
2041 rc = mdt_check_resent_lock(info, child, lhc);
2044 } else if (rc > 0) {
2045 mdt_lock_handle_init(lhc);
2046 mdt_lock_reg_init(lhc, LCK_PR);
2049 * Object's name entry is on another MDS, it will
2050 * request PERM lock only because LOOKUP lock is owned
2051 * by the MDS where name entry resides.
2053 * TODO: it should try layout lock too. - Jinshan
2055 child_bits &= ~(MDS_INODELOCK_LOOKUP |
2056 MDS_INODELOCK_LAYOUT);
2057 child_bits |= MDS_INODELOCK_PERM;
2059 rc = mdt_object_lock(info, child, lhc, child_bits);
2064 /* Finally, we can get attr for child. */
2065 if (!mdt_object_exists(child)) {
2066 LU_OBJECT_DEBUG(D_INFO, info->mti_env,
2068 "remote object doesn't exist.");
2069 mdt_object_unlock(info, child, lhc, 1);
2073 rc = mdt_getattr_internal(info, child, 0);
2074 if (unlikely(rc != 0)) {
2075 mdt_object_unlock(info, child, lhc, 1);
2079 rc = mdt_pack_secctx_in_reply(info, child);
2081 mdt_object_unlock(info, child, lhc, 1);
2085 rc = mdt_pack_encctx_in_reply(info, child);
2087 mdt_object_unlock(info, child, lhc, 1);
2091 lname = &info->mti_name;
2092 mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
2094 if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2095 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2096 if (unlikely(reqbody == NULL))
2097 RETURN(err_serious(-EPROTO));
2099 *child_fid = reqbody->mbo_fid2;
2100 if (unlikely(!fid_is_sane(child_fid)))
2101 RETURN(err_serious(-EINVAL));
2103 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2104 mdt_object_get(info->mti_env, parent);
2107 child = mdt_object_find(info->mti_env, info->mti_mdt,
2110 RETURN(PTR_ERR(child));
2113 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2115 PFID(mdt_object_fid(parent)),
2116 PFID(&reqbody->mbo_fid2), ldlm_rep);
2117 } else if (lu_name_is_valid(lname)) {
2118 if (mdt_object_remote(parent)) {
2119 CERROR("%s: parent "DFID" is on remote target\n",
2120 mdt_obd_name(info->mti_mdt),
2121 PFID(mdt_object_fid(parent)));
2125 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
2126 "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
2127 PNAME(lname), ldlm_rep);
2129 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2130 if (unlikely(reqbody == NULL))
2131 RETURN(err_serious(-EPROTO));
2133 *child_fid = reqbody->mbo_fid2;
2134 if (unlikely(!fid_is_sane(child_fid)))
2135 RETURN(err_serious(-EINVAL));
2137 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2138 mdt_object_get(info->mti_env, parent);
2141 child = mdt_object_find(info->mti_env, info->mti_mdt,
2144 RETURN(PTR_ERR(child));
2147 if (mdt_object_remote(child)) {
2148 CERROR("%s: child "DFID" is on remote target\n",
2149 mdt_obd_name(info->mti_mdt),
2150 PFID(mdt_object_fid(child)));
2151 GOTO(out_child, rc = -EPROTO);
2154 /* don't fetch LOOKUP lock if it's remote object */
2155 rc = mdt_is_remote_object(info, parent, child);
2157 GOTO(out_child, rc);
2159 child_bits &= ~MDS_INODELOCK_LOOKUP;
2161 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2163 PFID(mdt_object_fid(parent)),
2164 PFID(&reqbody->mbo_fid2), ldlm_rep);
2167 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
2169 if (unlikely(!mdt_object_exists(parent)) &&
2170 !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2171 lu_name_is_valid(lname)) {
2172 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2174 "Parent doesn't exist!");
2175 GOTO(out_child, rc = -ESTALE);
2178 if (!child && is_resent) {
2179 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2181 /* Lock is pinned by ldlm_handle_enqueue0() as it is
2182 * a resend case, however, it could be already destroyed
2183 * due to client eviction or a raced cancel RPC.
2185 LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
2186 lhc->mlh_reg_lh.cookie);
2189 fid_extract_from_res_name(child_fid,
2190 &lock->l_resource->lr_name);
2191 LDLM_LOCK_PUT(lock);
2192 child = mdt_object_find(info->mti_env, info->mti_mdt,
2195 RETURN(PTR_ERR(child));
2196 } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2197 lu_name_is_valid(lname)) {
2198 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
2199 rc = mdt_raw_lookup(info, parent, lname);
2204 /* step 1: lock parent only if parent is a directory */
2205 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2206 lhp = &info->mti_lh[MDT_LH_PARENT];
2207 mdt_lock_pdo_init(lhp, LCK_PR, lname);
2208 rc = mdt_object_lock(info, parent, lhp,
2209 MDS_INODELOCK_UPDATE);
2210 if (unlikely(rc != 0))
2214 /* step 2: lookup child's fid by name */
2215 fid_zero(child_fid);
2216 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2217 child_fid, &info->mti_spec);
2219 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2222 GOTO(unlock_parent, rc);
2224 child = mdt_object_find(info->mti_env, info->mti_mdt,
2226 if (unlikely(IS_ERR(child)))
2227 GOTO(unlock_parent, rc = PTR_ERR(child));
2230 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2232 /* step 3: lock child regardless if it is local or remote. */
2235 if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2236 /* Here, lname is an encoded hash of on-disk name, and
2237 * client is doing access without encryption key.
2238 * So we need to compare name hash with the one in the request.
2240 if (!find_name_matching_hash(info, lname, parent,
2242 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2243 mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2244 GOTO(out_child, rc = -ENOENT);
2248 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2249 if (!mdt_object_exists(child)) {
2250 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2252 "Object doesn't exist!");
2253 GOTO(out_child, rc = -ENOENT);
2256 rc = mdt_check_resent_lock(info, child, lhc);
2258 GOTO(out_child, rc);
2259 } else if (rc > 0) {
2260 mdt_lock_handle_init(lhc);
2261 mdt_lock_reg_init(lhc, LCK_PR);
2263 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2264 !mdt_object_remote(child)) {
2265 struct md_attr *ma = &info->mti_attr;
2268 ma->ma_need = MA_INODE;
2269 rc = mdt_attr_get_complex(info, child, ma);
2270 if (unlikely(rc != 0))
2271 GOTO(out_child, rc);
2273 /* If the file has not been changed for some time, we
2274 * return not only a LOOKUP lock, but also an UPDATE
2275 * lock and this might save us RPC on later STAT. For
2276 * directories, it also let negative dentry cache start
2277 * working for this dir. */
2278 if (ma->ma_valid & MA_INODE &&
2279 ma->ma_attr.la_valid & LA_CTIME &&
2280 info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2281 ma->ma_attr.la_ctime < ktime_get_real_seconds())
2282 child_bits |= MDS_INODELOCK_UPDATE;
2285 /* layout lock must be granted in a best-effort way
2286 * for IT operations */
2287 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2288 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2289 !mdt_object_remote(child) && ldlm_rep != NULL) {
2290 if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2291 exp_connect_layout(info->mti_exp)) {
2292 /* try to grant layout lock for regular file. */
2293 try_bits = MDS_INODELOCK_LAYOUT;
2295 /* Acquire DOM lock in advance for data-on-mdt file */
2296 if (child != parent)
2297 try_bits |= MDS_INODELOCK_DOM;
2300 if (try_bits != 0) {
2301 /* try layout lock, it may fail to be granted due to
2302 * contention at LOOKUP or UPDATE */
2303 rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2305 if (child_bits & MDS_INODELOCK_LAYOUT)
2308 /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2309 * client will enqueue the lock to the remote MDT */
2310 if (mdt_object_remote(child))
2311 child_bits &= ~MDS_INODELOCK_UPDATE;
2312 rc = mdt_object_lock(info, child, lhc, child_bits);
2314 if (unlikely(rc != 0))
2315 GOTO(out_child, rc);
2318 /* finally, we can get attr for child. */
2319 rc = mdt_getattr_internal(info, child, ma_need);
2320 if (unlikely(rc != 0)) {
2322 mdt_object_unlock(info, child, lhc, 1);
2323 GOTO(out_child, rc);
2326 rc = mdt_pack_secctx_in_reply(info, child);
2329 mdt_object_unlock(info, child, lhc, 1);
2330 GOTO(out_child, rc);
2333 rc = mdt_pack_encctx_in_reply(info, child);
2336 mdt_object_unlock(info, child, lhc, 1);
2337 GOTO(out_child, rc);
2340 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2342 /* Debugging code. */
2343 LDLM_DEBUG(lock, "Returning lock to client");
2344 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2345 &lock->l_resource->lr_name),
2346 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2347 PLDLMRES(lock->l_resource),
2348 PFID(mdt_object_fid(child)));
2350 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
2351 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2352 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
2354 req->rq_arrival_time.tv_sec +
2356 /* Put the lock to the waiting list and force the cancel */
2357 ldlm_set_ast_sent(lock);
2360 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2361 !mdt_object_remote(child) && child != parent) {
2362 mdt_object_put(info->mti_env, child);
2363 rc = mdt_pack_size2body(info, child_fid,
2365 if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2366 /* DOM lock was taken in advance but this is
2367 * not DoM file. Drop the lock.
2369 lock_res_and_lock(lock);
2370 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2371 unlock_res_and_lock(lock);
2373 LDLM_LOCK_PUT(lock);
2374 GOTO(unlock_parent, rc = 0);
2376 LDLM_LOCK_PUT(lock);
2382 mdt_object_put(info->mti_env, child);
2385 mdt_object_unlock(info, parent, lhp, 1);
2389 /* normal handler: should release the child lock */
2390 static int mdt_getattr_name(struct tgt_session_info *tsi)
2392 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2393 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2394 struct mdt_body *reqbody;
2395 struct mdt_body *repbody;
2400 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2401 LASSERT(reqbody != NULL);
2402 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2403 LASSERT(repbody != NULL);
2405 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2406 repbody->mbo_eadatasize = 0;
2407 repbody->mbo_aclsize = 0;
2409 rc = mdt_init_ucred(info, reqbody);
2411 GOTO(out_shrink, rc);
2413 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2414 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2415 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2416 lhc->mlh_reg_lh.cookie = 0;
2418 mdt_exit_ucred(info);
2421 mdt_client_compatibility(info);
2422 rc2 = mdt_fix_reply(info);
2425 mdt_thread_info_fini(info);
2429 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2430 const struct lu_fid *pfid,
2431 const struct lu_name *name,
2432 struct mdt_object *obj, s64 ctime)
2434 struct lu_fid *child_fid = &info->mti_tmp_fid1;
2435 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2436 struct mdt_device *mdt = info->mti_mdt;
2437 struct md_attr *ma = &info->mti_attr;
2438 struct mdt_lock_handle *parent_lh;
2439 struct mdt_lock_handle *child_lh;
2440 struct mdt_object *pobj;
2441 bool cos_incompat = false;
2445 pobj = mdt_object_find(info->mti_env, mdt, pfid);
2447 GOTO(out, rc = PTR_ERR(pobj));
2449 parent_lh = &info->mti_lh[MDT_LH_PARENT];
2450 mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2451 rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2453 GOTO(put_parent, rc);
2455 if (mdt_object_remote(pobj))
2456 cos_incompat = true;
2458 rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2459 name, child_fid, &info->mti_spec);
2461 GOTO(unlock_parent, rc);
2463 if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2464 GOTO(unlock_parent, rc = -EREMCHG);
2466 child_lh = &info->mti_lh[MDT_LH_CHILD];
2467 mdt_lock_reg_init(child_lh, LCK_EX);
2468 rc = mdt_reint_striped_lock(info, obj, child_lh,
2469 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2470 einfo, cos_incompat);
2472 GOTO(unlock_parent, rc);
2474 if (atomic_read(&obj->mot_open_count)) {
2475 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2476 PFID(mdt_object_fid(obj)));
2477 GOTO(unlock_child, rc = -EBUSY);
2481 ma->ma_valid = MA_INODE;
2482 ma->ma_attr.la_valid = LA_CTIME;
2483 ma->ma_attr.la_ctime = ctime;
2485 mutex_lock(&obj->mot_lov_mutex);
2487 rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2488 mdt_object_child(obj), name, ma, 0);
2490 mutex_unlock(&obj->mot_lov_mutex);
2493 mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2495 mdt_object_unlock(info, pobj, parent_lh, 1);
2497 mdt_object_put(info->mti_env, pobj);
2502 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2503 struct mdt_object *obj)
2505 struct lu_ucred *uc = lu_ucred(info->mti_env);
2506 struct md_attr *ma = &info->mti_attr;
2507 struct lu_attr *la = &ma->ma_attr;
2511 ma->ma_need = MA_INODE;
2512 rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2516 if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2519 if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
2521 if (uc->uc_fsuid == la->la_uid) {
2522 if ((la->la_mode & S_IWUSR) == 0)
2524 } else if (uc->uc_fsgid == la->la_gid) {
2525 if ((la->la_mode & S_IWGRP) == 0)
2527 } else if ((la->la_mode & S_IWOTH) == 0) {
2535 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2538 struct mdt_device *mdt = info->mti_mdt;
2539 struct mdt_object *obj = NULL;
2540 struct linkea_data ldata = { NULL };
2541 struct lu_buf *buf = &info->mti_big_buf;
2542 struct lu_name *name = &info->mti_name;
2543 struct lu_fid *pfid = &info->mti_tmp_fid1;
2544 struct link_ea_header *leh;
2545 struct link_ea_entry *lee;
2546 int reclen, count, rc = 0;
2549 if (!fid_is_sane(fid))
2550 GOTO(out, rc = -EINVAL);
2552 if (!fid_is_namespace_visible(fid))
2553 GOTO(out, rc = -EINVAL);
2555 obj = mdt_object_find(info->mti_env, mdt, fid);
2557 GOTO(out, rc = PTR_ERR(obj));
2559 if (mdt_object_remote(obj))
2560 GOTO(out, rc = -EREMOTE);
2561 if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2562 GOTO(out, rc = -ENOENT);
2564 rc = mdt_rmfid_check_permission(info, obj);
2569 buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2571 GOTO(out, rc = -ENOMEM);
2574 rc = mdt_links_read(info, obj, &ldata);
2579 lee = (struct link_ea_entry *)(leh + 1);
2580 for (count = 0; count < leh->leh_reccount; count++) {
2581 /* remove every hardlink */
2582 linkea_entry_unpack(lee, &reclen, name, pfid);
2583 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2584 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2590 if (obj && !IS_ERR(obj))
2591 mdt_object_put(info->mti_env, obj);
2592 if (info->mti_big_buf.lb_buf)
2593 lu_buf_free(&info->mti_big_buf);
2598 static int mdt_rmfid(struct tgt_session_info *tsi)
2600 struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2601 struct mdt_body *reqbody;
2602 struct lu_fid *fids, *rfids;
2608 reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2609 if (reqbody == NULL)
2611 bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2613 nr = bufsize / sizeof(struct lu_fid);
2614 if (nr * sizeof(struct lu_fid) != bufsize)
2616 req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2617 RCL_SERVER, nr * sizeof(__u32));
2618 req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2619 RCL_SERVER, nr * sizeof(struct lu_fid));
2620 rc = req_capsule_server_pack(tsi->tsi_pill);
2622 GOTO(out, rc = err_serious(rc));
2623 fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2626 rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2628 rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2631 mdt_init_ucred(mti, reqbody);
2632 for (i = 0; i < nr; i++) {
2634 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2636 mdt_exit_ucred(mti);
2642 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2643 void *karg, void __user *uarg);
2645 int mdt_io_set_info(struct tgt_session_info *tsi)
2647 struct ptlrpc_request *req = tgt_ses_req(tsi);
2648 struct ost_body *body = NULL, *repbody;
2649 void *key, *val = NULL;
2650 int keylen, vallen, rc = 0;
2651 bool is_grant_shrink;
2655 key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2657 DEBUG_REQ(D_HA, req, "no set_info key");
2658 RETURN(err_serious(-EFAULT));
2660 keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2663 val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2665 DEBUG_REQ(D_HA, req, "no set_info val");
2666 RETURN(err_serious(-EFAULT));
2668 vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2671 is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
2672 if (is_grant_shrink)
2673 /* In this case the value is actually an RMF_OST_BODY, so we
2674 * transmutate the type of this PTLRPC */
2675 req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
2677 rc = req_capsule_server_pack(tsi->tsi_pill);
2681 if (is_grant_shrink) {
2682 body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
2684 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2687 /** handle grant shrink, similar to a read request */
2688 tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
2691 CERROR("%s: Unsupported key %s\n",
2692 tgt_name(tsi->tsi_tgt), (char *)key);
2700 static int mdt_set_info(struct tgt_session_info *tsi)
2702 struct ptlrpc_request *req = tgt_ses_req(tsi);
2705 int keylen, vallen, rc = 0;
2709 key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2711 DEBUG_REQ(D_HA, req, "no set_info key");
2712 RETURN(err_serious(-EFAULT));
2715 keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2718 val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2720 DEBUG_REQ(D_HA, req, "no set_info val");
2721 RETURN(err_serious(-EFAULT));
2724 vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2727 /* Swab any part of val you need to here */
2728 if (KEY_IS(KEY_READ_ONLY)) {
2729 spin_lock(&req->rq_export->exp_lock);
2731 *exp_connect_flags_ptr(req->rq_export) |=
2734 *exp_connect_flags_ptr(req->rq_export) &=
2735 ~OBD_CONNECT_RDONLY;
2736 spin_unlock(&req->rq_export->exp_lock);
2737 } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2738 struct changelog_setinfo *cs = val;
2740 if (vallen != sizeof(*cs)) {
2741 CERROR("%s: bad changelog_clear setinfo size %d\n",
2742 tgt_name(tsi->tsi_tgt), vallen);
2745 if (req_capsule_req_need_swab(&req->rq_pill)) {
2746 __swab64s(&cs->cs_recno);
2747 __swab32s(&cs->cs_id);
2750 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2752 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2754 } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2756 obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2763 static int mdt_readpage(struct tgt_session_info *tsi)
2765 struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
2766 struct mdt_object *object = mdt_obj(tsi->tsi_corpus);
2767 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
2768 const struct mdt_body *reqbody = tsi->tsi_mdt_body;
2769 struct mdt_body *repbody;
2775 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2776 RETURN(err_serious(-ENOMEM));
2778 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2779 if (repbody == NULL || reqbody == NULL)
2780 RETURN(err_serious(-EFAULT));
2783 * prepare @rdpg before calling lower layers and transfer itself. Here
2784 * reqbody->size contains offset of where to start to read and
2785 * reqbody->nlink contains number bytes to read.
2787 rdpg->rp_hash = reqbody->mbo_size;
2788 if (rdpg->rp_hash != reqbody->mbo_size) {
2789 CERROR("Invalid hash: %#llx != %#llx\n",
2790 rdpg->rp_hash, reqbody->mbo_size);
2794 rdpg->rp_attrs = reqbody->mbo_mode;
2795 if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2796 rdpg->rp_attrs |= LUDA_64BITHASH;
2797 rdpg->rp_count = min_t(unsigned int, reqbody->mbo_nlink,
2798 exp_max_brw_size(tsi->tsi_exp));
2799 rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2801 OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2802 if (rdpg->rp_pages == NULL)
2805 for (i = 0; i < rdpg->rp_npages; ++i) {
2806 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2807 if (rdpg->rp_pages[i] == NULL)
2808 GOTO(free_rdpg, rc = -ENOMEM);
2811 /* call lower layers to fill allocated pages with directory data */
2812 rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2814 GOTO(free_rdpg, rc);
2816 /* send pages to client */
2817 rc = tgt_sendpage(tsi, rdpg, rc);
2822 for (i = 0; i < rdpg->rp_npages; i++)
2823 if (rdpg->rp_pages[i] != NULL)
2824 __free_page(rdpg->rp_pages[i]);
2825 OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2827 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2833 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2835 struct lu_ucred *uc = mdt_ucred_check(info);
2836 struct lu_attr *attr = &info->mti_attr.ma_attr;
2841 if (op != REINT_SETATTR) {
2842 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2843 attr->la_uid = uc->uc_fsuid;
2844 /* for S_ISGID, inherit gid from his parent, such work will be
2845 * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2846 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2847 attr->la_gid = uc->uc_fsgid;
2853 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2855 return op == REINT_OPEN &&
2856 !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2859 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2861 struct req_capsule *pill = info->mti_pill;
2863 if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2865 req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2867 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2869 /* pre-set size in server part with max size */
2870 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2872 OBD_MAX_DEFAULT_EA_SIZE);
2874 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2879 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2881 struct req_capsule *pill = info->mti_pill;
2883 if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2885 /* pre-set size in server part with max size */
2886 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2888 info->mti_mdt->mdt_max_mdsize);
2891 static int mdt_reint_internal(struct mdt_thread_info *info,
2892 struct mdt_lock_handle *lhc,
2895 struct req_capsule *pill = info->mti_pill;
2896 struct mdt_body *repbody;
2901 rc = mdt_reint_unpack(info, op);
2903 CERROR("Can't unpack reint, rc %d\n", rc);
2904 RETURN(err_serious(rc));
2908 /* check if the file system is set to readonly. O_RDONLY open
2909 * is still allowed even the file system is set to readonly mode */
2910 if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2911 RETURN(err_serious(-EROFS));
2913 /* for replay (no_create) lmm is not needed, client has it already */
2914 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2915 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2918 /* llog cookies are always 0, the field is kept for compatibility */
2919 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2920 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2922 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2923 * by default. If the target object has more ACL entries, then
2924 * enlarge the buffer when necessary. */
2925 if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2926 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2927 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2929 mdt_preset_secctx_size(info);
2930 mdt_preset_encctx_size(info);
2932 rc = req_capsule_server_pack(pill);
2934 CERROR("Can't pack response, rc %d\n", rc);
2935 RETURN(err_serious(rc));
2938 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2939 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2941 repbody->mbo_eadatasize = 0;
2942 repbody->mbo_aclsize = 0;
2945 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2947 /* for replay no cookkie / lmm need, because client have this already */
2948 if (info->mti_spec.no_create)
2949 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2950 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2952 rc = mdt_init_ucred_reint(info);
2954 GOTO(out_shrink, rc);
2956 rc = mdt_fix_attr_ucred(info, op);
2958 GOTO(out_ucred, rc = err_serious(rc));
2960 rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2962 GOTO(out_ucred, rc);
2963 } else if (rc == 1) {
2964 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2965 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2966 GOTO(out_ucred, rc);
2968 rc = mdt_reint_rec(info, lhc);
2971 mdt_exit_ucred(info);
2973 mdt_client_compatibility(info);
2975 rc2 = mdt_fix_reply(info);
2980 * Data-on-MDT optimization - read data along with OPEN and return it
2981 * in reply when possible.
2983 if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2984 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2990 static long mdt_reint_opcode(struct ptlrpc_request *req,
2991 const struct req_format **fmt)
2993 struct mdt_device *mdt;
2994 struct mdt_rec_reint *rec;
2997 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2999 opc = rec->rr_opcode;
3000 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
3001 if (opc < REINT_MAX && fmt[opc] != NULL)
3002 req_capsule_extend(&req->rq_pill, fmt[opc]);
3004 mdt = mdt_exp2dev(req->rq_export);
3005 CERROR("%s: Unsupported opcode '%ld' from client '%s':"
3006 " rc = %d\n", req->rq_export->exp_obd->obd_name,
3007 opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
3008 opc = err_serious(-EFAULT);
3011 opc = err_serious(-EFAULT);
3016 static int mdt_reint(struct tgt_session_info *tsi)
3020 static const struct req_format *reint_fmts[REINT_MAX] = {
3021 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
3022 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
3023 [REINT_LINK] = &RQF_MDS_REINT_LINK,
3024 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
3025 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
3026 [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
3027 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
3028 [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
3029 [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE,
3030 [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC,
3035 opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
3037 struct mdt_thread_info *info = tsi2mdt_info(tsi);
3039 * No lock possible here from client to pass it to reint code
3042 rc = mdt_reint_internal(info, NULL, opc);
3043 mdt_thread_info_fini(info);
3048 tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
3052 /* this should sync the whole device */
3053 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
3055 struct dt_device *dt = mdt->mdt_bottom;
3059 rc = dt->dd_ops->dt_sync(env, dt);
3063 /* this should sync this object */
3064 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
3065 struct mdt_object *mo)
3071 if (!mdt_object_exists(mo)) {
3072 CWARN("%s: non existing object "DFID": rc = %d\n",
3073 exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
3078 if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
3079 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
3080 dt_obj_version_t version;
3082 version = dt_version_get(env, mdt_obj2dt(mo));
3083 if (version > tgt->lut_obd->obd_last_committed)
3084 rc = mo_object_sync(env, mdt_object_child(mo));
3086 rc = mo_object_sync(env, mdt_object_child(mo));
3092 static int mdt_sync(struct tgt_session_info *tsi)
3094 struct ptlrpc_request *req = tgt_ses_req(tsi);
3095 struct req_capsule *pill = tsi->tsi_pill;
3096 struct mdt_body *body;
3097 ktime_t kstart = ktime_get();
3102 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
3103 RETURN(err_serious(-ENOMEM));
3105 if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
3106 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
3108 struct mdt_thread_info *info = tsi2mdt_info(tsi);
3110 if (unlikely(info->mti_object == NULL))
3113 /* sync an object */
3114 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
3117 const struct lu_fid *fid;
3118 struct lu_attr *la = &info->mti_attr.ma_attr;
3120 info->mti_attr.ma_need = MA_INODE;
3121 info->mti_attr.ma_valid = 0;
3122 rc = mdt_attr_get_complex(info, info->mti_object,
3125 body = req_capsule_server_get(pill,
3127 fid = mdt_object_fid(info->mti_object);
3128 mdt_pack_attr2body(info, body, la, fid);
3131 mdt_thread_info_fini(info);