4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2010, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_handler.c
33 * Lustre Metadata Target (mdt) request handler
35 * Author: Peter Braam <braam@clusterfs.com>
36 * Author: Andreas Dilger <adilger@clusterfs.com>
37 * Author: Phil Schwan <phil@clusterfs.com>
38 * Author: Mike Shaver <shaver@clusterfs.com>
39 * Author: Nikita Danilov <nikita@clusterfs.com>
40 * Author: Huang Hua <huanghua@clusterfs.com>
41 * Author: Yury Umanets <umka@clusterfs.com>
44 #define DEBUG_SUBSYSTEM S_MDS
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
67 #include "mdt_internal.h"
69 static unsigned int max_mod_rpcs_per_client = 8;
70 module_param(max_mod_rpcs_per_client, uint, 0644);
71 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73 mdl_mode_t mdt_mdl_lock_modes[] = {
74 [LCK_MINMODE] = MDL_MINMODE,
81 [LCK_GROUP] = MDL_GROUP
84 enum ldlm_mode mdt_dlm_lock_modes[] = {
85 [MDL_MINMODE] = LCK_MINMODE,
92 [MDL_GROUP] = LCK_GROUP
95 static struct mdt_device *mdt_dev(struct lu_device *d);
97 static const struct lu_object_operations mdt_obj_ops;
99 /* Slab for MDT object allocation */
100 static struct kmem_cache *mdt_object_kmem;
102 /* For HSM restore handles */
103 struct kmem_cache *mdt_hsm_cdt_kmem;
105 /* For HSM request handles */
106 struct kmem_cache *mdt_hsm_car_kmem;
108 static struct lu_kmem_descr mdt_caches[] = {
110 .ckd_cache = &mdt_object_kmem,
111 .ckd_name = "mdt_obj",
112 .ckd_size = sizeof(struct mdt_object)
115 .ckd_cache = &mdt_hsm_cdt_kmem,
116 .ckd_name = "mdt_cdt_restore_handle",
117 .ckd_size = sizeof(struct cdt_restore_handle)
120 .ckd_cache = &mdt_hsm_car_kmem,
121 .ckd_name = "mdt_cdt_agent_req",
122 .ckd_size = sizeof(struct cdt_agent_req)
129 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
133 return rep->lock_policy_res1 & op_flag;
136 void mdt_clear_disposition(struct mdt_thread_info *info,
137 struct ldlm_reply *rep, __u64 op_flag)
140 info->mti_opdata &= ~op_flag;
141 tgt_opdata_clear(info->mti_env, op_flag);
144 rep->lock_policy_res1 &= ~op_flag;
147 void mdt_set_disposition(struct mdt_thread_info *info,
148 struct ldlm_reply *rep, __u64 op_flag)
151 info->mti_opdata |= op_flag;
152 tgt_opdata_set(info->mti_env, op_flag);
155 rep->lock_policy_res1 |= op_flag;
158 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 lh->mlh_pdo_hash = 0;
161 lh->mlh_reg_mode = lm;
162 lh->mlh_rreg_mode = lm;
163 lh->mlh_type = MDT_REG_LOCK;
166 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
168 mdt_lock_reg_init(lh, lock->l_req_mode);
169 if (lock->l_req_mode == LCK_GROUP)
170 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
173 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
174 const struct lu_name *lname)
176 lh->mlh_reg_mode = lock_mode;
177 lh->mlh_pdo_mode = LCK_MINMODE;
178 lh->mlh_rreg_mode = lock_mode;
179 lh->mlh_type = MDT_PDO_LOCK;
181 if (lu_name_is_valid(lname)) {
182 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
184 /* XXX Workaround for LU-2856
186 * Zero is a valid return value of full_name_hash, but
187 * several users of mlh_pdo_hash assume a non-zero
188 * hash value. We therefore map zero onto an
189 * arbitrary, but consistent value (1) to avoid
190 * problems further down the road. */
191 if (unlikely(lh->mlh_pdo_hash == 0))
192 lh->mlh_pdo_hash = 1;
194 lh->mlh_pdo_hash = 0;
198 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
199 struct mdt_lock_handle *lh)
205 * Any dir access needs couple of locks:
207 * 1) on part of dir we gonna take lookup/modify;
209 * 2) on whole dir to protect it from concurrent splitting and/or to
210 * flush client's cache for readdir().
212 * so, for a given mode and object this routine decides what lock mode
213 * to use for lock #2:
215 * 1) if caller's gonna lookup in dir then we need to protect dir from
216 * being splitted only - LCK_CR
218 * 2) if caller's gonna modify dir then we need to protect dir from
219 * being splitted and to flush cache - LCK_CW
221 * 3) if caller's gonna modify dir and that dir seems ready for
222 * splitting then we need to protect it from any type of access
223 * (lookup/modify/split) - LCK_EX --bzzz
226 LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
227 LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
230 * Ask underlaying level its opinion about preferable PDO lock mode
231 * having access type passed as regular lock mode:
233 * - MDL_MINMODE means that lower layer does not want to specify lock
236 * - MDL_NL means that no PDO lock should be taken. This is used in some
237 * cases. Say, for non-splittable directories no need to use PDO locks
240 mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
241 mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
243 if (mode != MDL_MINMODE) {
244 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
247 * Lower layer does not want to specify locking mode. We do it
248 * our selves. No special protection is needed, just flush
249 * client's cache on modification and allow concurrent
252 switch (lh->mlh_reg_mode) {
254 lh->mlh_pdo_mode = LCK_EX;
257 lh->mlh_pdo_mode = LCK_CR;
260 lh->mlh_pdo_mode = LCK_CW;
263 CERROR("Not expected lock type (0x%x)\n",
264 (int)lh->mlh_reg_mode);
269 LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
273 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
276 struct mdt_device *mdt = info->mti_mdt;
277 struct lu_name *lname = &info->mti_name;
278 const char *start = fileset;
279 char *filename = info->mti_filename;
280 struct mdt_object *parent;
284 LASSERT(!info->mti_cross_ref);
287 * We may want to allow this to mount a completely separate
288 * fileset from the MDT in the future, but keeping it to
289 * ROOT/ only for now avoid potential security issues.
291 *fid = mdt->mdt_md_root_fid;
293 while (rc == 0 && start != NULL && *start != '\0') {
294 const char *s1 = start;
300 while (*s2 != '/' && *s2 != '\0')
308 lname->ln_namelen = s2 - s1;
309 if (lname->ln_namelen > NAME_MAX) {
314 /* reject .. as a path component */
315 if (lname->ln_namelen == 2 &&
316 strncmp(s1, "..", 2) == 0) {
321 strncpy(filename, s1, lname->ln_namelen);
322 filename[lname->ln_namelen] = '\0';
323 lname->ln_name = filename;
325 parent = mdt_object_find(info->mti_env, mdt, fid);
326 if (IS_ERR(parent)) {
327 rc = PTR_ERR(parent);
330 /* Only got the fid of this obj by name */
332 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
333 fid, &info->mti_spec);
334 mdt_object_put(info->mti_env, parent);
337 parent = mdt_object_find(info->mti_env, mdt, fid);
339 rc = PTR_ERR(parent);
341 mode = lu_object_attr(&parent->mot_obj);
342 if (!S_ISDIR(mode)) {
344 } else if (mdt_is_remote_object(info, parent, parent)) {
345 if (!mdt->mdt_enable_remote_subdir_mount) {
347 LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
351 LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
356 mdt_object_put(info->mti_env, parent);
363 static int mdt_get_root(struct tgt_session_info *tsi)
365 struct mdt_thread_info *info = tsi2mdt_info(tsi);
366 struct mdt_device *mdt = info->mti_mdt;
367 struct mdt_body *repbody;
368 char *fileset = NULL, *buffer = NULL;
370 struct obd_export *exp = info->mti_exp;
371 char *nodemap_fileset;
375 rc = mdt_check_ucred(info);
377 GOTO(out, rc = err_serious(rc));
379 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
380 GOTO(out, rc = err_serious(-ENOMEM));
382 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
383 if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
384 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
386 GOTO(out, rc = err_serious(-EFAULT));
389 nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
390 if (nodemap_fileset && nodemap_fileset[0]) {
391 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
393 /* consider fileset from client as a sub-fileset
394 * of the nodemap one */
395 OBD_ALLOC(buffer, PATH_MAX + 1);
397 GOTO(out, rc = err_serious(-ENOMEM));
398 if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
399 nodemap_fileset, fileset) >= PATH_MAX + 1)
400 GOTO(out, rc = err_serious(-EINVAL));
403 /* enforce fileset as specified in the nodemap */
404 fileset = nodemap_fileset;
409 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
410 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
412 GOTO(out, rc = err_serious(rc));
414 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
416 repbody->mbo_valid |= OBD_MD_FLID;
420 mdt_thread_info_fini(info);
422 OBD_FREE(buffer, PATH_MAX+1);
426 static int mdt_statfs(struct tgt_session_info *tsi)
428 struct ptlrpc_request *req = tgt_ses_req(tsi);
429 struct mdt_thread_info *info = tsi2mdt_info(tsi);
430 struct mdt_device *mdt = info->mti_mdt;
431 struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
432 struct md_device *next = mdt->mdt_child;
433 struct ptlrpc_service_part *svcpt;
434 struct obd_statfs *osfs;
435 struct mdt_body *reqbody = NULL;
436 struct mdt_statfs_cache *msf;
437 ktime_t kstart = ktime_get();
438 int current_blockbits;
443 svcpt = req->rq_rqbd->rqbd_svcpt;
445 /* This will trigger a watchdog timeout */
446 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
447 (MDT_SERVICE_WATCHDOG_FACTOR *
448 at_get(&svcpt->scp_at_estimate)) + 1);
450 rc = mdt_check_ucred(info);
452 GOTO(out, rc = err_serious(rc));
454 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
455 GOTO(out, rc = err_serious(-ENOMEM));
457 osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
459 GOTO(out, rc = -EPROTO);
461 if (mdt_is_sum_statfs_client(req->rq_export) &&
462 lustre_packed_msg_size(req->rq_reqmsg) ==
463 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
464 &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
465 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
466 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
469 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
470 msf = &mdt->mdt_sum_osfs;
472 msf = &mdt->mdt_osfs;
474 if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
475 /** statfs data is too old, get up-to-date one */
476 if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
477 rc = next->md_ops->mdo_statfs(info->mti_env,
480 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
484 spin_lock(&mdt->mdt_lock);
485 msf->msf_osfs = *osfs;
486 msf->msf_age = ktime_get_seconds();
487 spin_unlock(&mdt->mdt_lock);
489 /** use cached statfs data */
490 spin_lock(&mdt->mdt_lock);
491 *osfs = msf->msf_osfs;
492 spin_unlock(&mdt->mdt_lock);
495 /* tgd_blockbit is recordsize bits set during mkfs.
496 * This once set does not change. However, 'zfs set'
497 * can be used to change the MDT blocksize. Instead
498 * of using cached value of 'tgd_blockbit' always
499 * calculate the blocksize bits which may have
502 current_blockbits = fls64(osfs->os_bsize) - 1;
504 /* at least try to account for cached pages. its still racy and
505 * might be under-reporting if clients haven't announced their
506 * caches with brw recently */
507 CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
508 " pending %llu free %llu avail %llu\n",
509 tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
510 tgd->tgd_tot_pending,
511 osfs->os_bfree << current_blockbits,
512 osfs->os_bavail << current_blockbits);
514 osfs->os_bavail -= min_t(u64, osfs->os_bavail,
515 ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
516 osfs->os_bsize - 1) >> current_blockbits));
518 tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
519 CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
520 "%llu objects: %llu free; state %x\n",
521 osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
522 osfs->os_files, osfs->os_ffree, osfs->os_state);
524 if (!exp_grant_param_supp(tsi->tsi_exp) &&
525 current_blockbits > COMPAT_BSIZE_SHIFT) {
526 /* clients which don't support OBD_CONNECT_GRANT_PARAM
527 * should not see a block size > page size, otherwise
528 * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
529 * block size which is the biggest block size known to work
530 * with all client's page size. */
531 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
532 osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT;
533 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
534 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
537 mdt_counter_incr(req, LPROC_MDT_STATFS,
538 ktime_us_delta(ktime_get(), kstart));
540 mdt_thread_info_fini(info);
544 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
546 struct lov_comp_md_v1 *comp_v1;
547 struct lov_mds_md *v1;
549 __u32 dom_stripesize = 0;
551 bool has_ost_stripes = false;
558 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
561 comp_v1 = (struct lov_comp_md_v1 *)lmm;
562 off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
563 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
565 /* Fast check for DoM entry with no mirroring, should be the first */
566 if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
567 lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
570 /* check all entries otherwise */
571 for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
572 struct lov_comp_md_entry_v1 *lcme;
574 lcme = &comp_v1->lcm_entries[i];
575 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
578 off = le32_to_cpu(lcme->lcme_offset);
579 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
581 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
583 dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
585 has_ost_stripes = true;
587 if (dom_stripesize && has_ost_stripes)
588 RETURN(dom_stripesize);
590 /* DoM-only case exits here */
591 if (is_dom_only && dom_stripesize)
593 RETURN(dom_stripesize);
597 * Pack size attributes into the reply.
599 int mdt_pack_size2body(struct mdt_thread_info *info,
600 const struct lu_fid *fid, struct lustre_handle *lh)
603 struct md_attr *ma = &info->mti_attr;
605 bool dom_lock = false;
609 LASSERT(ma->ma_attr.la_valid & LA_MODE);
611 if (!S_ISREG(ma->ma_attr.la_mode) ||
612 !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
615 dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
616 /* no DoM stripe, no size in reply */
620 if (lustre_handle_is_used(lh)) {
621 struct ldlm_lock *lock;
623 lock = ldlm_handle2lock(lh);
625 dom_lock = ldlm_has_dom(lock);
630 /* no DoM lock, no size in reply */
634 /* Either DoM lock exists or LMM has only DoM stripe then
635 * return size on body. */
636 b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
638 mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
642 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
644 * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
646 * \param info thread info object
647 * \param repbody reply to pack ACLs into
648 * \param o mdt object of file to examine
649 * \param nodemap nodemap of client to reply to
651 * \retval -errno error getting or parsing ACL from disk
653 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
654 struct mdt_object *o, struct lu_nodemap *nodemap)
656 const struct lu_env *env = info->mti_env;
657 struct md_object *next = mdt_object_child(o);
658 struct lu_buf *buf = &info->mti_buf;
659 struct mdt_device *mdt = info->mti_mdt;
660 struct req_capsule *pill = info->mti_pill;
665 buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
666 buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
667 if (buf->lb_len == 0)
670 LASSERT(!info->mti_big_acl_used);
672 rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
674 if (rc == -ENODATA) {
675 repbody->mbo_aclsize = 0;
676 repbody->mbo_valid |= OBD_MD_FLACL;
678 } else if (rc == -EOPNOTSUPP) {
680 } else if (rc == -ERANGE) {
681 if (exp_connect_large_acl(info->mti_exp) &&
682 !info->mti_big_acl_used) {
683 if (info->mti_big_acl == NULL) {
684 info->mti_big_aclsize =
686 mdt->mdt_max_ea_size,
688 OBD_ALLOC_LARGE(info->mti_big_acl,
689 info->mti_big_aclsize);
690 if (info->mti_big_acl == NULL) {
691 info->mti_big_aclsize = 0;
692 CERROR("%s: unable to grow "
695 PFID(mdt_object_fid(o)));
700 CDEBUG(D_INODE, "%s: grow the "DFID
701 " ACL buffer to size %d\n",
703 PFID(mdt_object_fid(o)),
704 info->mti_big_aclsize);
706 buf->lb_buf = info->mti_big_acl;
707 buf->lb_len = info->mti_big_aclsize;
708 info->mti_big_acl_used = 1;
711 /* FS has ACL bigger that our limits */
712 CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
713 mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
714 info->mti_big_aclsize);
717 CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
718 mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
721 rc = nodemap_map_acl(nodemap, buf->lb_buf,
722 rc, NODEMAP_FS_TO_CLIENT);
723 /* if all ACLs mapped out, rc is still >= 0 */
725 CERROR("%s: nodemap_map_acl unable to parse "DFID
726 " ACL: rc = %d\n", mdt_obd_name(mdt),
727 PFID(mdt_object_fid(o)), rc);
728 repbody->mbo_aclsize = 0;
729 repbody->mbo_valid &= ~OBD_MD_FLACL;
731 repbody->mbo_aclsize = rc;
732 repbody->mbo_valid |= OBD_MD_FLACL;
741 /* XXX Look into layout in MDT layer. */
742 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
744 struct lov_comp_md_v1 *comp_v1;
745 struct lov_mds_md *v1;
748 if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
749 comp_v1 = (struct lov_comp_md_v1 *)lmm;
751 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
752 v1 = (struct lov_mds_md *)((char *)comp_v1 +
753 comp_v1->lcm_entries[i].lcme_offset);
754 /* We don't support partial release for now */
755 if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
760 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
765 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
766 const struct lu_attr *attr, const struct lu_fid *fid)
768 struct md_attr *ma = &info->mti_attr;
769 struct obd_export *exp = info->mti_exp;
770 struct lu_nodemap *nodemap = NULL;
772 LASSERT(ma->ma_valid & MA_INODE);
774 if (attr->la_valid & LA_ATIME) {
775 b->mbo_atime = attr->la_atime;
776 b->mbo_valid |= OBD_MD_FLATIME;
778 if (attr->la_valid & LA_MTIME) {
779 b->mbo_mtime = attr->la_mtime;
780 b->mbo_valid |= OBD_MD_FLMTIME;
782 if (attr->la_valid & LA_CTIME) {
783 b->mbo_ctime = attr->la_ctime;
784 b->mbo_valid |= OBD_MD_FLCTIME;
786 if (attr->la_valid & LA_BTIME) {
787 b->mbo_btime = attr->la_btime;
788 b->mbo_valid |= OBD_MD_FLBTIME;
790 if (attr->la_valid & LA_FLAGS) {
791 b->mbo_flags = attr->la_flags;
792 b->mbo_valid |= OBD_MD_FLFLAGS;
794 if (attr->la_valid & LA_NLINK) {
795 b->mbo_nlink = attr->la_nlink;
796 b->mbo_valid |= OBD_MD_FLNLINK;
798 if (attr->la_valid & (LA_UID|LA_GID)) {
799 nodemap = nodemap_get_from_exp(exp);
803 if (attr->la_valid & LA_UID) {
804 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
805 NODEMAP_FS_TO_CLIENT,
807 b->mbo_valid |= OBD_MD_FLUID;
809 if (attr->la_valid & LA_GID) {
810 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
811 NODEMAP_FS_TO_CLIENT,
813 b->mbo_valid |= OBD_MD_FLGID;
816 if (attr->la_valid & LA_PROJID) {
817 /* TODO, nodemap for project id */
818 b->mbo_projid = attr->la_projid;
819 b->mbo_valid |= OBD_MD_FLPROJID;
822 b->mbo_mode = attr->la_mode;
823 if (attr->la_valid & LA_MODE)
824 b->mbo_valid |= OBD_MD_FLMODE;
825 if (attr->la_valid & LA_TYPE)
826 b->mbo_valid |= OBD_MD_FLTYPE;
830 b->mbo_valid |= OBD_MD_FLID;
831 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
832 PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
835 if (!(attr->la_valid & LA_TYPE))
838 b->mbo_rdev = attr->la_rdev;
839 b->mbo_size = attr->la_size;
840 b->mbo_blocks = attr->la_blocks;
842 if (!S_ISREG(attr->la_mode)) {
843 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
844 } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
845 /* means no objects are allocated on osts. */
846 LASSERT(!(ma->ma_valid & MA_LOV));
847 /* just ignore blocks occupied by extend attributes on MDS */
849 /* if no object is allocated on osts, the size on mds is valid.
851 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
852 } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
853 if (mdt_hsm_is_released(ma->ma_lmm)) {
854 /* A released file stores its size on MDS. */
855 /* But return 1 block for released file, unless tools
856 * like tar will consider it fully sparse. (LU-3864)
858 if (unlikely(b->mbo_size == 0))
862 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
863 } else if (info->mti_som_valid) { /* som is valid */
864 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
865 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
866 b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
867 b->mbo_size = ma->ma_som.ms_size;
868 b->mbo_blocks = ma->ma_som.ms_blocks;
872 if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
873 b->mbo_valid & OBD_MD_FLLAZYSIZE))
874 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
875 PFID(fid), (unsigned long long)b->mbo_size);
878 if (!IS_ERR_OR_NULL(nodemap))
879 nodemap_putref(nodemap);
882 static inline int mdt_body_has_lov(const struct lu_attr *la,
883 const struct mdt_body *body)
885 return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
886 (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
889 void mdt_client_compatibility(struct mdt_thread_info *info)
891 struct mdt_body *body;
892 struct ptlrpc_request *req = mdt_info_req(info);
893 struct obd_export *exp = req->rq_export;
894 struct md_attr *ma = &info->mti_attr;
895 struct lu_attr *la = &ma->ma_attr;
898 if (exp_connect_layout(exp))
899 /* the client can deal with 16-bit lmm_stripe_count */
902 body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
904 if (!mdt_body_has_lov(la, body))
907 /* now we have a reply with a lov for a client not compatible with the
908 * layout lock so we have to clean the layout generation number */
909 if (S_ISREG(la->la_mode))
910 ma->ma_lmm->lmm_layout_gen = 0;
914 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
915 struct mdt_object *o)
917 const struct lu_env *env = info->mti_env;
920 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
929 /* Is it a directory? Let's check for the LMV as well */
930 if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
931 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
935 rc2 = mo_xattr_get(env, mdt_object_child(o),
937 XATTR_NAME_DEFAULT_LMV);
939 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
947 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
950 const struct lu_env *env = info->mti_env;
954 LASSERT(info->mti_big_lmm_used == 0);
955 rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
959 /* big_lmm may need to be grown */
960 if (info->mti_big_lmmsize < rc) {
961 int size = size_roundup_power2(rc);
963 if (info->mti_big_lmmsize > 0) {
964 /* free old buffer */
965 LASSERT(info->mti_big_lmm);
966 OBD_FREE_LARGE(info->mti_big_lmm,
967 info->mti_big_lmmsize);
968 info->mti_big_lmm = NULL;
969 info->mti_big_lmmsize = 0;
972 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
973 if (info->mti_big_lmm == NULL)
975 info->mti_big_lmmsize = size;
977 LASSERT(info->mti_big_lmmsize >= rc);
979 info->mti_buf.lb_buf = info->mti_big_lmm;
980 info->mti_buf.lb_len = info->mti_big_lmmsize;
981 rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
986 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
987 struct md_attr *ma, const char *name)
989 struct md_object *next = mdt_object_child(o);
990 struct lu_buf *buf = &info->mti_buf;
993 if (strcmp(name, XATTR_NAME_LOV) == 0) {
994 buf->lb_buf = ma->ma_lmm;
995 buf->lb_len = ma->ma_lmm_size;
996 LASSERT(!(ma->ma_valid & MA_LOV));
997 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
998 buf->lb_buf = ma->ma_lmv;
999 buf->lb_len = ma->ma_lmv_size;
1000 LASSERT(!(ma->ma_valid & MA_LMV));
1001 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1002 buf->lb_buf = ma->ma_default_lmv;
1003 buf->lb_len = ma->ma_default_lmv_size;
1004 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1009 LASSERT(buf->lb_buf);
1011 rc = mo_xattr_get(info->mti_env, next, buf, name);
1015 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1016 if (info->mti_big_lmm_used)
1017 ma->ma_lmm = info->mti_big_lmm;
1019 /* NOT return LOV EA with hole to old client. */
1020 if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1021 LOV_PATTERN_F_HOLE) &&
1022 !(exp_connect_flags(info->mti_exp) &
1023 OBD_CONNECT_LFSCK)) {
1026 ma->ma_lmm_size = rc;
1027 ma->ma_valid |= MA_LOV;
1029 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1030 if (info->mti_big_lmm_used)
1031 ma->ma_lmv = info->mti_big_lmm;
1033 ma->ma_lmv_size = rc;
1034 ma->ma_valid |= MA_LMV;
1035 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1036 ma->ma_default_lmv_size = rc;
1037 ma->ma_valid |= MA_LMV_DEF;
1040 /* Update mdt_max_mdsize so all clients will be aware that */
1041 if (info->mti_mdt->mdt_max_mdsize < rc)
1042 info->mti_mdt->mdt_max_mdsize = rc;
1045 } else if (rc == -ENODATA) {
1048 } else if (rc == -ERANGE) {
1049 /* Default LMV has fixed size, so it must be able to fit
1050 * in the original buffer */
1051 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1053 rc = mdt_big_xattr_get(info, o, name);
1055 info->mti_big_lmm_used = 1;
1063 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1064 struct md_attr *ma, const char *name)
1068 if (!info->mti_big_lmm) {
1069 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1070 if (!info->mti_big_lmm)
1072 info->mti_big_lmmsize = PAGE_SIZE;
1075 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1076 ma->ma_lmm = info->mti_big_lmm;
1077 ma->ma_lmm_size = info->mti_big_lmmsize;
1078 ma->ma_valid &= ~MA_LOV;
1079 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1080 ma->ma_lmv = info->mti_big_lmm;
1081 ma->ma_lmv_size = info->mti_big_lmmsize;
1082 ma->ma_valid &= ~MA_LMV;
1087 LASSERT(!info->mti_big_lmm_used);
1088 rc = __mdt_stripe_get(info, o, ma, name);
1089 /* since big_lmm is always used here, clear 'used' flag to avoid
1090 * assertion in mdt_big_xattr_get().
1092 info->mti_big_lmm_used = 0;
1097 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1098 struct lu_fid *pfid)
1100 struct lu_buf *buf = &info->mti_buf;
1101 struct link_ea_header *leh;
1102 struct link_ea_entry *lee;
1106 buf->lb_buf = info->mti_big_lmm;
1107 buf->lb_len = info->mti_big_lmmsize;
1108 rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1109 buf, XATTR_NAME_LINK);
1110 /* ignore errors, MA_PFID won't be set and it is
1111 * up to the caller to treat this as an error */
1112 if (rc == -ERANGE || buf->lb_len == 0) {
1113 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1114 buf->lb_buf = info->mti_big_lmm;
1115 buf->lb_len = info->mti_big_lmmsize;
1120 if (rc < sizeof(*leh)) {
1121 CERROR("short LinkEA on "DFID": rc = %d\n",
1122 PFID(mdt_object_fid(o)), rc);
1126 leh = (struct link_ea_header *) buf->lb_buf;
1127 lee = (struct link_ea_entry *)(leh + 1);
1128 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1129 leh->leh_magic = LINK_EA_MAGIC;
1130 leh->leh_reccount = __swab32(leh->leh_reccount);
1131 leh->leh_len = __swab64(leh->leh_len);
1133 if (leh->leh_magic != LINK_EA_MAGIC)
1135 if (leh->leh_reccount == 0)
1138 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1139 fid_be_to_cpu(pfid, pfid);
1144 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1145 struct lu_fid *pfid, struct lu_name *lname)
1147 struct lu_buf *buf = &info->mti_buf;
1148 struct link_ea_header *leh;
1149 struct link_ea_entry *lee;
1153 buf->lb_buf = info->mti_xattr_buf;
1154 buf->lb_len = sizeof(info->mti_xattr_buf);
1155 rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1157 if (rc == -ERANGE) {
1158 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1159 buf->lb_buf = info->mti_big_lmm;
1160 buf->lb_len = info->mti_big_lmmsize;
1165 if (rc < sizeof(*leh)) {
1166 CERROR("short LinkEA on "DFID": rc = %d\n",
1167 PFID(mdt_object_fid(o)), rc);
1171 leh = (struct link_ea_header *)buf->lb_buf;
1172 lee = (struct link_ea_entry *)(leh + 1);
1173 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1174 leh->leh_magic = LINK_EA_MAGIC;
1175 leh->leh_reccount = __swab32(leh->leh_reccount);
1176 leh->leh_len = __swab64(leh->leh_len);
1178 if (leh->leh_magic != LINK_EA_MAGIC)
1181 if (leh->leh_reccount == 0)
1184 linkea_entry_unpack(lee, &reclen, lname, pfid);
1189 int mdt_attr_get_complex(struct mdt_thread_info *info,
1190 struct mdt_object *o, struct md_attr *ma)
1192 const struct lu_env *env = info->mti_env;
1193 struct md_object *next = mdt_object_child(o);
1194 struct lu_buf *buf = &info->mti_buf;
1195 int need = ma->ma_need;
1202 if (mdt_object_exists(o) == 0)
1203 GOTO(out, rc = -ENOENT);
1204 mode = lu_object_attr(&next->mo_lu);
1206 if (need & MA_INODE) {
1207 ma->ma_need = MA_INODE;
1208 rc = mo_attr_get(env, next, ma);
1213 (void) mdt_get_som(info, o, ma);
1214 ma->ma_valid |= MA_INODE;
1217 if (need & MA_PFID) {
1218 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1220 ma->ma_valid |= MA_PFID;
1221 /* ignore this error, parent fid is not mandatory */
1225 if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1226 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1231 if (need & MA_LMV && S_ISDIR(mode)) {
1232 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1237 if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1238 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1244 * In the handle of MA_INODE, we may already get the SOM attr.
1246 if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1247 rc = mdt_get_som(info, o, ma);
1252 if (need & MA_HSM && S_ISREG(mode)) {
1253 buf->lb_buf = info->mti_xattr_buf;
1254 buf->lb_len = sizeof(info->mti_xattr_buf);
1255 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1256 sizeof(info->mti_xattr_buf));
1257 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1258 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1260 ma->ma_valid |= MA_HSM;
1261 else if (rc2 < 0 && rc2 != -ENODATA)
1262 GOTO(out, rc = rc2);
1265 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1266 if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1267 buf->lb_buf = ma->ma_acl;
1268 buf->lb_len = ma->ma_acl_size;
1269 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1271 ma->ma_acl_size = rc2;
1272 ma->ma_valid |= MA_ACL_DEF;
1273 } else if (rc2 == -ENODATA) {
1275 ma->ma_acl_size = 0;
1277 GOTO(out, rc = rc2);
1282 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1283 rc, ma->ma_valid, ma->ma_lmm);
1287 static int mdt_getattr_internal(struct mdt_thread_info *info,
1288 struct mdt_object *o, int ma_need)
1290 struct mdt_device *mdt = info->mti_mdt;
1291 struct md_object *next = mdt_object_child(o);
1292 const struct mdt_body *reqbody = info->mti_body;
1293 struct ptlrpc_request *req = mdt_info_req(info);
1294 struct md_attr *ma = &info->mti_attr;
1295 struct lu_attr *la = &ma->ma_attr;
1296 struct req_capsule *pill = info->mti_pill;
1297 const struct lu_env *env = info->mti_env;
1298 struct mdt_body *repbody;
1299 struct lu_buf *buffer = &info->mti_buf;
1300 struct obd_export *exp = info->mti_exp;
1301 ktime_t kstart = ktime_get();
1306 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1307 RETURN(err_serious(-ENOMEM));
1309 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1313 if (mdt_object_remote(o)) {
1314 /* This object is located on remote node.*/
1315 /* Return -ENOTSUPP for old client */
1316 if (!mdt_is_dne_client(req->rq_export))
1317 GOTO(out, rc = -ENOTSUPP);
1319 repbody->mbo_fid1 = *mdt_object_fid(o);
1320 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1324 if (reqbody->mbo_eadatasize > 0) {
1325 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1326 if (buffer->lb_buf == NULL)
1327 GOTO(out, rc = -EPROTO);
1328 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1331 buffer->lb_buf = NULL;
1333 ma_need &= ~(MA_LOV | MA_LMV);
1334 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1335 mdt_obd_name(info->mti_mdt),
1336 req->rq_export->exp_client_uuid.uuid);
1339 /* from 2.12.58 intent_getattr pack default LMV in reply */
1340 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1341 ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1342 (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1343 req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1345 ma->ma_lmv = buffer->lb_buf;
1346 ma->ma_lmv_size = buffer->lb_len;
1347 ma->ma_default_lmv = req_capsule_server_get(pill,
1348 &RMF_DEFAULT_MDT_MD);
1349 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1350 &RMF_DEFAULT_MDT_MD,
1352 ma->ma_need = MA_INODE;
1353 if (ma->ma_lmv_size > 0)
1354 ma->ma_need |= MA_LMV;
1355 if (ma->ma_default_lmv_size > 0)
1356 ma->ma_need |= MA_LMV_DEF;
1357 } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1358 (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1359 /* If it is dir and client require MEA, then we got MEA */
1360 /* Assumption: MDT_MD size is enough for lmv size. */
1361 ma->ma_lmv = buffer->lb_buf;
1362 ma->ma_lmv_size = buffer->lb_len;
1363 ma->ma_need = MA_INODE;
1364 if (ma->ma_lmv_size > 0) {
1365 if (reqbody->mbo_valid & OBD_MD_MEA) {
1366 ma->ma_need |= MA_LMV;
1367 } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1368 ma->ma_need |= MA_LMV_DEF;
1369 ma->ma_default_lmv = buffer->lb_buf;
1371 ma->ma_default_lmv_size = buffer->lb_len;
1372 ma->ma_lmv_size = 0;
1376 ma->ma_lmm = buffer->lb_buf;
1377 ma->ma_lmm_size = buffer->lb_len;
1378 ma->ma_need = MA_INODE | MA_HSM;
1379 if (ma->ma_lmm_size > 0) {
1380 ma->ma_need |= MA_LOV;
1381 /* Older clients may crash if they getattr overstriped
1384 if (!exp_connect_overstriping(exp) &&
1385 mdt_lmm_is_overstriping(ma->ma_lmm))
1386 RETURN(-EOPNOTSUPP);
1390 if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1391 reqbody->mbo_valid & OBD_MD_FLDIREA &&
1392 lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1393 /* get default stripe info for this dir. */
1394 ma->ma_need |= MA_LOV_DEF;
1396 ma->ma_need |= ma_need;
1398 rc = mdt_attr_get_complex(info, o, ma);
1400 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1401 "%s: getattr error for "DFID": rc = %d\n",
1402 mdt_obd_name(info->mti_mdt),
1403 PFID(mdt_object_fid(o)), rc);
1407 /* if file is released, check if a restore is running */
1408 if (ma->ma_valid & MA_HSM) {
1409 repbody->mbo_valid |= OBD_MD_TSTATE;
1410 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1411 mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1412 repbody->mbo_t_state = MS_RESTORE;
1415 if (unlikely(!(ma->ma_valid & MA_INODE)))
1418 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1420 if (mdt_body_has_lov(la, reqbody)) {
1421 u32 stripe_count = 1;
1423 if (ma->ma_valid & MA_LOV) {
1424 LASSERT(ma->ma_lmm_size);
1425 repbody->mbo_eadatasize = ma->ma_lmm_size;
1426 if (S_ISDIR(la->la_mode))
1427 repbody->mbo_valid |= OBD_MD_FLDIREA;
1429 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1430 mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1432 if (ma->ma_valid & MA_LMV) {
1433 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1434 u32 magic = le32_to_cpu(lmv->lmv_magic);
1436 /* Return -ENOTSUPP for old client */
1437 if (!mdt_is_striped_client(req->rq_export))
1440 LASSERT(S_ISDIR(la->la_mode));
1441 mdt_dump_lmv(D_INFO, ma->ma_lmv);
1442 repbody->mbo_eadatasize = ma->ma_lmv_size;
1443 repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1445 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1446 if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1447 mdt_restripe_migrate_add(info, o);
1448 else if (magic == LMV_MAGIC_V1 &&
1449 lmv_is_restriping(lmv))
1450 mdt_restripe_update_add(info, o);
1452 if (ma->ma_valid & MA_LMV_DEF) {
1453 /* Return -ENOTSUPP for old client */
1454 if (!mdt_is_striped_client(req->rq_export))
1456 LASSERT(S_ISDIR(la->la_mode));
1458 * when ll_dir_getstripe() gets default LMV, it
1459 * checks mbo_eadatasize.
1461 if (!(ma->ma_valid & MA_LMV))
1462 repbody->mbo_eadatasize =
1463 ma->ma_default_lmv_size;
1464 repbody->mbo_valid |= (OBD_MD_FLDIREA |
1465 OBD_MD_DEFAULT_MEA);
1468 "dirent count %llu stripe count %u MDT count %d\n",
1469 ma->ma_attr.la_dirent_count, stripe_count,
1470 atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1471 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1472 ma->ma_attr.la_dirent_count >
1473 mdt->mdt_restriper.mdr_dir_split_count &&
1474 !fid_is_root(mdt_object_fid(o)) &&
1475 mdt->mdt_enable_dir_auto_split &&
1476 !o->mot_restriping &&
1477 stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
1478 mdt_auto_split_add(info, o);
1479 } else if (S_ISLNK(la->la_mode) &&
1480 reqbody->mbo_valid & OBD_MD_LINKNAME) {
1481 buffer->lb_buf = ma->ma_lmm;
1482 /* eadatasize from client includes NULL-terminator, so
1483 * there is no need to read it */
1484 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1485 rc = mo_readlink(env, next, buffer);
1486 if (unlikely(rc <= 0)) {
1487 CERROR("%s: readlink failed for "DFID": rc = %d\n",
1488 mdt_obd_name(info->mti_mdt),
1489 PFID(mdt_object_fid(o)), rc);
1492 int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1494 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1496 repbody->mbo_valid |= OBD_MD_LINKNAME;
1497 /* we need to report back size with NULL-terminator
1498 * because client expects that */
1499 repbody->mbo_eadatasize = rc + 1;
1500 if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1501 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1502 "on "DFID ", expected %d\n",
1503 mdt_obd_name(info->mti_mdt),
1504 rc, PFID(mdt_object_fid(o)),
1505 reqbody->mbo_eadatasize - 1);
1506 /* NULL terminate */
1507 ((char *)ma->ma_lmm)[rc] = 0;
1509 /* If the total CDEBUG() size is larger than a page, it
1510 * will print a warning to the console, avoid this by
1511 * printing just the last part of the symlink. */
1512 CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1513 print_limit < rc ? "..." : "", print_limit,
1514 (char *)ma->ma_lmm + rc - print_limit, rc);
1519 if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1520 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1521 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1522 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1523 repbody->mbo_max_mdsize);
1526 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1527 if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1528 (reqbody->mbo_valid & OBD_MD_FLACL)) {
1529 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1530 if (IS_ERR(nodemap))
1531 RETURN(PTR_ERR(nodemap));
1533 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1534 nodemap_putref(nodemap);
1540 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1541 ktime_us_delta(ktime_get(), kstart));
1546 static int mdt_getattr(struct tgt_session_info *tsi)
1548 struct mdt_thread_info *info = tsi2mdt_info(tsi);
1549 struct mdt_object *obj = info->mti_object;
1550 struct req_capsule *pill = info->mti_pill;
1551 struct mdt_body *reqbody;
1552 struct mdt_body *repbody;
1556 if (unlikely(info->mti_object == NULL))
1559 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1561 LASSERT(lu_object_assert_exists(&obj->mot_obj));
1563 /* Special case for Data-on-MDT files to get data version */
1564 if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1565 rc = mdt_data_version_get(tsi);
1569 /* Unlike intent case where we need to pre-fill out buffers early on
1570 * in intent policy for ldlm reasons, here we can have a much better
1571 * guess at EA size by just reading it from disk.
1572 * Exceptions are readdir and (missing) directory striping */
1574 if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1575 /* No easy way to know how long is the symlink, but it cannot
1576 * be more than PATH_MAX, so we allocate +1 */
1578 /* A special case for fs ROOT: getattr there might fetch
1579 * default EA for entire fs, not just for this dir!
1581 } else if (lu_fid_eq(mdt_object_fid(obj),
1582 &info->mti_mdt->mdt_md_root_fid) &&
1583 (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1584 (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1586 /* Should the default strping be bigger, mdt_fix_reply
1587 * will reallocate */
1588 rc = DEF_REP_MD_SIZE;
1590 /* Read the actual EA size from disk */
1591 rc = mdt_attr_get_eabuf_size(info, obj);
1595 GOTO(out, rc = err_serious(rc));
1597 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1599 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1600 * by default. If the target object has more ACL entries, then
1601 * enlarge the buffer when necessary. */
1602 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1603 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1605 rc = req_capsule_server_pack(pill);
1606 if (unlikely(rc != 0))
1607 GOTO(out, rc = err_serious(rc));
1609 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1610 LASSERT(repbody != NULL);
1611 repbody->mbo_eadatasize = 0;
1612 repbody->mbo_aclsize = 0;
1614 rc = mdt_check_ucred(info);
1616 GOTO(out_shrink, rc);
1618 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1620 rc = mdt_getattr_internal(info, obj, 0);
1623 mdt_client_compatibility(info);
1624 rc2 = mdt_fix_reply(info);
1628 mdt_thread_info_fini(info);
1633 * Handler of layout intent RPC requiring the layout modification
1635 * \param[in] info thread environment
1636 * \param[in] obj object
1637 * \param[out] lhc object ldlm lock handle
1638 * \param[in] layout layout change descriptor
1640 * \retval 0 on success
1641 * \retval < 0 error code
1643 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1644 struct mdt_lock_handle *lhc,
1645 struct md_layout_change *layout)
1651 if (!mdt_object_exists(obj))
1654 if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1657 rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1662 rc = mdt_check_resent_lock(info, obj, lhc);
1668 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1670 /* take layout lock to prepare layout change */
1671 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1672 lockpart |= MDS_INODELOCK_UPDATE;
1674 mdt_lock_handle_init(lhc);
1675 mdt_lock_reg_init(lhc, LCK_EX);
1676 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1681 mutex_lock(&obj->mot_som_mutex);
1682 rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1683 mutex_unlock(&obj->mot_som_mutex);
1686 mdt_object_unlock(info, obj, lhc, 1);
1692 * Exchange MOF_LOV_CREATED flags between two objects after a
1693 * layout swap. No assumption is made on whether o1 or o2 have
1694 * created objects or not.
1696 * \param[in,out] o1 First swap layout object
1697 * \param[in,out] o2 Second swap layout object
1699 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1701 unsigned int o1_lov_created = o1->mot_lov_created;
1703 mutex_lock(&o1->mot_lov_mutex);
1704 mutex_lock(&o2->mot_lov_mutex);
1706 o1->mot_lov_created = o2->mot_lov_created;
1707 o2->mot_lov_created = o1_lov_created;
1709 mutex_unlock(&o2->mot_lov_mutex);
1710 mutex_unlock(&o1->mot_lov_mutex);
1713 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1715 struct mdt_thread_info *info;
1716 struct ptlrpc_request *req = tgt_ses_req(tsi);
1717 struct obd_export *exp = req->rq_export;
1718 struct mdt_object *o1, *o2, *o;
1719 struct mdt_lock_handle *lh1, *lh2;
1720 struct mdc_swap_layouts *msl;
1724 /* client does not support layout lock, so layout swaping
1726 * FIXME: there is a problem for old clients which don't support
1727 * layout lock yet. If those clients have already opened the file
1728 * they won't be notified at all so that old layout may still be
1729 * used to do IO. This can be fixed after file release is landed by
1730 * doing exclusive open and taking full EX ibits lock. - Jinshan */
1731 if (!exp_connect_layout(exp))
1732 RETURN(-EOPNOTSUPP);
1734 info = tsi2mdt_info(tsi);
1735 if (unlikely(info->mti_object == NULL))
1738 if (info->mti_dlm_req != NULL)
1739 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1741 o1 = info->mti_object;
1742 o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1743 &info->mti_body->mbo_fid2);
1745 GOTO(out, rc = PTR_ERR(o));
1747 if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1748 GOTO(put, rc = -ENOENT);
1750 rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1751 if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1757 /* permission check. Make sure the calling process having permission
1758 * to write both files. */
1759 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1764 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1769 msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1771 GOTO(put, rc = -EPROTO);
1773 lh1 = &info->mti_lh[MDT_LH_NEW];
1774 mdt_lock_reg_init(lh1, LCK_EX);
1775 lh2 = &info->mti_lh[MDT_LH_OLD];
1776 mdt_lock_reg_init(lh2, LCK_EX);
1778 rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1779 MDS_INODELOCK_XATTR);
1783 rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1784 MDS_INODELOCK_XATTR);
1788 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1789 mdt_object_child(o2), msl->msl_flags);
1793 mdt_swap_lov_flag(o1, o2);
1796 mdt_object_unlock(info, o2, lh2, rc);
1798 mdt_object_unlock(info, o1, lh1, rc);
1800 mdt_object_put(info->mti_env, o);
1802 mdt_thread_info_fini(info);
1806 static int mdt_raw_lookup(struct mdt_thread_info *info,
1807 struct mdt_object *parent,
1808 const struct lu_name *lname,
1809 struct ldlm_reply *ldlm_rep)
1811 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1815 LASSERT(!info->mti_cross_ref);
1817 /* Only got the fid of this obj by name */
1818 fid_zero(child_fid);
1819 rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
1820 lname, child_fid, &info->mti_spec);
1822 struct mdt_body *repbody;
1824 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1825 repbody->mbo_fid1 = *child_fid;
1826 repbody->mbo_valid = OBD_MD_FLID;
1827 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1828 } else if (rc == -ENOENT) {
1829 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1836 * UPDATE lock should be taken against parent, and be released before exit;
1837 * child_bits lock should be taken against child, and be returned back:
1838 * (1)normal request should release the child lock;
1839 * (2)intent request will grant the lock to client.
1841 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1842 struct mdt_lock_handle *lhc,
1844 struct ldlm_reply *ldlm_rep)
1846 struct ptlrpc_request *req = mdt_info_req(info);
1847 struct mdt_body *reqbody = NULL;
1848 struct mdt_object *parent = info->mti_object;
1849 struct mdt_object *child = NULL;
1850 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1851 struct lu_name *lname = NULL;
1852 struct mdt_lock_handle *lhp = NULL;
1853 struct ldlm_lock *lock;
1854 struct req_capsule *pill = info->mti_pill;
1862 is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1863 LASSERT(ergo(is_resent,
1864 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1869 if (info->mti_cross_ref) {
1870 /* Only getattr on the child. Parent is on another node. */
1871 mdt_set_disposition(info, ldlm_rep,
1872 DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1874 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1876 PFID(mdt_object_fid(child)), ldlm_rep);
1878 rc = mdt_check_resent_lock(info, child, lhc);
1881 } else if (rc > 0) {
1882 mdt_lock_handle_init(lhc);
1883 mdt_lock_reg_init(lhc, LCK_PR);
1886 * Object's name entry is on another MDS, it will
1887 * request PERM lock only because LOOKUP lock is owned
1888 * by the MDS where name entry resides.
1890 * TODO: it should try layout lock too. - Jinshan
1892 child_bits &= ~(MDS_INODELOCK_LOOKUP |
1893 MDS_INODELOCK_LAYOUT);
1894 child_bits |= MDS_INODELOCK_PERM;
1896 rc = mdt_object_lock(info, child, lhc, child_bits);
1901 /* Finally, we can get attr for child. */
1902 if (!mdt_object_exists(child)) {
1903 LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1905 "remote object doesn't exist.");
1906 mdt_object_unlock(info, child, lhc, 1);
1910 rc = mdt_getattr_internal(info, child, 0);
1911 if (unlikely(rc != 0)) {
1912 mdt_object_unlock(info, child, lhc, 1);
1916 rc = mdt_pack_secctx_in_reply(info, child);
1918 mdt_object_unlock(info, child, lhc, 1);
1922 rc = mdt_pack_encctx_in_reply(info, child);
1924 mdt_object_unlock(info, child, lhc, 1);
1928 lname = &info->mti_name;
1929 mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
1931 if (lu_name_is_valid(lname)) {
1932 if (mdt_object_remote(parent)) {
1933 CERROR("%s: parent "DFID" is on remote target\n",
1934 mdt_obd_name(info->mti_mdt),
1935 PFID(mdt_object_fid(parent)));
1939 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
1940 "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1941 PNAME(lname), ldlm_rep);
1943 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1944 if (unlikely(reqbody == NULL))
1945 RETURN(err_serious(-EPROTO));
1947 *child_fid = reqbody->mbo_fid2;
1948 if (unlikely(!fid_is_sane(child_fid)))
1949 RETURN(err_serious(-EINVAL));
1951 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
1952 mdt_object_get(info->mti_env, parent);
1955 child = mdt_object_find(info->mti_env, info->mti_mdt,
1958 RETURN(PTR_ERR(child));
1961 if (mdt_object_remote(child)) {
1962 CERROR("%s: child "DFID" is on remote target\n",
1963 mdt_obd_name(info->mti_mdt),
1964 PFID(mdt_object_fid(child)));
1965 GOTO(out_child, rc = -EPROTO);
1968 /* don't fetch LOOKUP lock if it's remote object */
1969 rc = mdt_is_remote_object(info, parent, child);
1971 GOTO(out_child, rc);
1973 child_bits &= ~MDS_INODELOCK_LOOKUP;
1975 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1977 PFID(mdt_object_fid(parent)),
1978 PFID(&reqbody->mbo_fid2), ldlm_rep);
1981 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1983 if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
1984 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1986 "Parent doesn't exist!");
1987 GOTO(out_child, rc = -ESTALE);
1990 if (lu_name_is_valid(lname)) {
1991 /* Always allow to lookup ".." */
1992 if (unlikely(lname->ln_namelen == 2 &&
1993 lname->ln_name[0] == '.' &&
1994 lname->ln_name[1] == '.'))
1995 info->mti_spec.sp_permitted = 1;
1997 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
1998 rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
2003 /* step 1: lock parent only if parent is a directory */
2004 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2005 lhp = &info->mti_lh[MDT_LH_PARENT];
2006 mdt_lock_pdo_init(lhp, LCK_PR, lname);
2007 rc = mdt_object_lock(info, parent, lhp,
2008 MDS_INODELOCK_UPDATE);
2009 if (unlikely(rc != 0))
2013 /* step 2: lookup child's fid by name */
2014 fid_zero(child_fid);
2015 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2016 child_fid, &info->mti_spec);
2018 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2021 GOTO(unlock_parent, rc);
2023 child = mdt_object_find(info->mti_env, info->mti_mdt,
2025 if (unlikely(IS_ERR(child)))
2026 GOTO(unlock_parent, rc = PTR_ERR(child));
2029 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2031 /* step 3: lock child regardless if it is local or remote. */
2034 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2035 if (!mdt_object_exists(child)) {
2036 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2038 "Object doesn't exist!");
2039 GOTO(out_child, rc = -ENOENT);
2042 rc = mdt_check_resent_lock(info, child, lhc);
2044 GOTO(out_child, rc);
2045 } else if (rc > 0) {
2046 mdt_lock_handle_init(lhc);
2047 mdt_lock_reg_init(lhc, LCK_PR);
2049 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2050 !mdt_object_remote(child)) {
2051 struct md_attr *ma = &info->mti_attr;
2054 ma->ma_need = MA_INODE;
2055 rc = mdt_attr_get_complex(info, child, ma);
2056 if (unlikely(rc != 0))
2057 GOTO(out_child, rc);
2059 /* If the file has not been changed for some time, we
2060 * return not only a LOOKUP lock, but also an UPDATE
2061 * lock and this might save us RPC on later STAT. For
2062 * directories, it also let negative dentry cache start
2063 * working for this dir. */
2064 if (ma->ma_valid & MA_INODE &&
2065 ma->ma_attr.la_valid & LA_CTIME &&
2066 info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2067 ma->ma_attr.la_ctime < ktime_get_real_seconds())
2068 child_bits |= MDS_INODELOCK_UPDATE;
2071 /* layout lock must be granted in a best-effort way
2072 * for IT operations */
2073 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2074 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2075 !mdt_object_remote(child) && ldlm_rep != NULL) {
2076 if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2077 exp_connect_layout(info->mti_exp)) {
2078 /* try to grant layout lock for regular file. */
2079 try_bits = MDS_INODELOCK_LAYOUT;
2081 /* Acquire DOM lock in advance for data-on-mdt file */
2082 if (child != parent)
2083 try_bits |= MDS_INODELOCK_DOM;
2086 if (try_bits != 0) {
2087 /* try layout lock, it may fail to be granted due to
2088 * contention at LOOKUP or UPDATE */
2089 rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2091 if (child_bits & MDS_INODELOCK_LAYOUT)
2094 /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2095 * client will enqueue the lock to the remote MDT */
2096 if (mdt_object_remote(child))
2097 child_bits &= ~MDS_INODELOCK_UPDATE;
2098 rc = mdt_object_lock(info, child, lhc, child_bits);
2100 if (unlikely(rc != 0))
2101 GOTO(out_child, rc);
2104 /* finally, we can get attr for child. */
2105 rc = mdt_getattr_internal(info, child, ma_need);
2106 if (unlikely(rc != 0)) {
2107 mdt_object_unlock(info, child, lhc, 1);
2108 GOTO(out_child, rc);
2111 rc = mdt_pack_secctx_in_reply(info, child);
2113 mdt_object_unlock(info, child, lhc, 1);
2114 GOTO(out_child, rc);
2117 rc = mdt_pack_encctx_in_reply(info, child);
2119 mdt_object_unlock(info, child, lhc, 1);
2120 GOTO(out_child, rc);
2123 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2125 /* Debugging code. */
2126 LDLM_DEBUG(lock, "Returning lock to client");
2127 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2128 &lock->l_resource->lr_name),
2129 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2130 PLDLMRES(lock->l_resource),
2131 PFID(mdt_object_fid(child)));
2133 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2134 !mdt_object_remote(child) && child != parent) {
2135 mdt_object_put(info->mti_env, child);
2136 rc = mdt_pack_size2body(info, child_fid,
2138 if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2139 /* DOM lock was taken in advance but this is
2140 * not DoM file. Drop the lock.
2142 lock_res_and_lock(lock);
2143 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2144 unlock_res_and_lock(lock);
2146 LDLM_LOCK_PUT(lock);
2147 GOTO(unlock_parent, rc = 0);
2149 LDLM_LOCK_PUT(lock);
2155 mdt_object_put(info->mti_env, child);
2158 mdt_object_unlock(info, parent, lhp, 1);
2162 /* normal handler: should release the child lock */
2163 static int mdt_getattr_name(struct tgt_session_info *tsi)
2165 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2166 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2167 struct mdt_body *reqbody;
2168 struct mdt_body *repbody;
2173 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2174 LASSERT(reqbody != NULL);
2175 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2176 LASSERT(repbody != NULL);
2178 info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2179 repbody->mbo_eadatasize = 0;
2180 repbody->mbo_aclsize = 0;
2182 rc = mdt_init_ucred(info, reqbody);
2184 GOTO(out_shrink, rc);
2186 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2187 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2188 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2189 lhc->mlh_reg_lh.cookie = 0;
2191 mdt_exit_ucred(info);
2194 mdt_client_compatibility(info);
2195 rc2 = mdt_fix_reply(info);
2198 mdt_thread_info_fini(info);
2202 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2203 const struct lu_fid *pfid,
2204 const struct lu_name *name,
2205 struct mdt_object *obj, s64 ctime)
2207 struct lu_fid *child_fid = &info->mti_tmp_fid1;
2208 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2209 struct mdt_device *mdt = info->mti_mdt;
2210 struct md_attr *ma = &info->mti_attr;
2211 struct mdt_lock_handle *parent_lh;
2212 struct mdt_lock_handle *child_lh;
2213 struct mdt_object *pobj;
2214 bool cos_incompat = false;
2218 pobj = mdt_object_find(info->mti_env, mdt, pfid);
2220 GOTO(out, rc = PTR_ERR(pobj));
2222 parent_lh = &info->mti_lh[MDT_LH_PARENT];
2223 mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2224 rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2226 GOTO(put_parent, rc);
2228 if (mdt_object_remote(pobj))
2229 cos_incompat = true;
2231 rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2232 name, child_fid, &info->mti_spec);
2234 GOTO(unlock_parent, rc);
2236 if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2237 GOTO(unlock_parent, rc = -EREMCHG);
2239 child_lh = &info->mti_lh[MDT_LH_CHILD];
2240 mdt_lock_reg_init(child_lh, LCK_EX);
2241 rc = mdt_reint_striped_lock(info, obj, child_lh,
2242 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2243 einfo, cos_incompat);
2245 GOTO(unlock_parent, rc);
2247 if (atomic_read(&obj->mot_open_count)) {
2248 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2249 PFID(mdt_object_fid(obj)));
2250 GOTO(unlock_child, rc = -EBUSY);
2254 ma->ma_valid = MA_INODE;
2255 ma->ma_attr.la_valid = LA_CTIME;
2256 ma->ma_attr.la_ctime = ctime;
2258 mutex_lock(&obj->mot_lov_mutex);
2260 rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2261 mdt_object_child(obj), name, ma, 0);
2263 mutex_unlock(&obj->mot_lov_mutex);
2266 mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2268 mdt_object_unlock(info, pobj, parent_lh, 1);
2270 mdt_object_put(info->mti_env, pobj);
2275 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2276 struct mdt_object *obj)
2278 struct lu_ucred *uc = lu_ucred(info->mti_env);
2279 struct md_attr *ma = &info->mti_attr;
2280 struct lu_attr *la = &ma->ma_attr;
2284 ma->ma_need = MA_INODE;
2285 rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2289 if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2292 if (md_capable(uc, CAP_DAC_OVERRIDE))
2294 if (uc->uc_fsuid == la->la_uid) {
2295 if ((la->la_mode & S_IWUSR) == 0)
2297 } else if (uc->uc_fsgid == la->la_gid) {
2298 if ((la->la_mode & S_IWGRP) == 0)
2300 } else if ((la->la_mode & S_IWOTH) == 0) {
2308 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2311 struct mdt_device *mdt = info->mti_mdt;
2312 struct mdt_object *obj = NULL;
2313 struct linkea_data ldata = { NULL };
2314 struct lu_buf *buf = &info->mti_big_buf;
2315 struct lu_name *name = &info->mti_name;
2316 struct lu_fid *pfid = &info->mti_tmp_fid1;
2317 struct link_ea_header *leh;
2318 struct link_ea_entry *lee;
2319 int reclen, count, rc = 0;
2322 if (!fid_is_sane(fid))
2323 GOTO(out, rc = -EINVAL);
2325 if (!fid_is_namespace_visible(fid))
2326 GOTO(out, rc = -EINVAL);
2328 obj = mdt_object_find(info->mti_env, mdt, fid);
2330 GOTO(out, rc = PTR_ERR(obj));
2332 if (mdt_object_remote(obj))
2333 GOTO(out, rc = -EREMOTE);
2334 if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2335 GOTO(out, rc = -ENOENT);
2337 rc = mdt_rmfid_check_permission(info, obj);
2342 buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2344 GOTO(out, rc = -ENOMEM);
2347 rc = mdt_links_read(info, obj, &ldata);
2352 lee = (struct link_ea_entry *)(leh + 1);
2353 for (count = 0; count < leh->leh_reccount; count++) {
2354 /* remove every hardlink */
2355 linkea_entry_unpack(lee, &reclen, name, pfid);
2356 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2357 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2363 if (obj && !IS_ERR(obj))
2364 mdt_object_put(info->mti_env, obj);
2365 if (info->mti_big_buf.lb_buf)
2366 lu_buf_free(&info->mti_big_buf);
2371 static int mdt_rmfid(struct tgt_session_info *tsi)
2373 struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2374 struct mdt_body *reqbody;
2375 struct lu_fid *fids, *rfids;
2381 reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2382 if (reqbody == NULL)
2384 bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2386 nr = bufsize / sizeof(struct lu_fid);
2387 if (nr * sizeof(struct lu_fid) != bufsize)
2389 req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2390 RCL_SERVER, nr * sizeof(__u32));
2391 req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2392 RCL_SERVER, nr * sizeof(struct lu_fid));
2393 rc = req_capsule_server_pack(tsi->tsi_pill);
2395 GOTO(out, rc = err_serious(rc));
2396 fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2399 rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2401 rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2404 mdt_init_ucred(mti, reqbody);
2405 for (i = 0; i < nr; i++) {
2407 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2409 mdt_exit_ucred(mti);
2415 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2416 void *karg, void __user *uarg);
2418 static int mdt_set_info(struct tgt_session_info *tsi)
2420 struct ptlrpc_request *req = tgt_ses_req(tsi);
2423 int keylen, vallen, rc = 0;
2427 key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2429 DEBUG_REQ(D_HA, req, "no set_info key");
2430 RETURN(err_serious(-EFAULT));
2433 keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2436 val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2438 DEBUG_REQ(D_HA, req, "no set_info val");
2439 RETURN(err_serious(-EFAULT));
2442 vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2445 /* Swab any part of val you need to here */
2446 if (KEY_IS(KEY_READ_ONLY)) {
2447 spin_lock(&req->rq_export->exp_lock);
2449 *exp_connect_flags_ptr(req->rq_export) |=
2452 *exp_connect_flags_ptr(req->rq_export) &=
2453 ~OBD_CONNECT_RDONLY;
2454 spin_unlock(&req->rq_export->exp_lock);
2455 } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2456 struct changelog_setinfo *cs = val;
2458 if (vallen != sizeof(*cs)) {
2459 CERROR("%s: bad changelog_clear setinfo size %d\n",
2460 tgt_name(tsi->tsi_tgt), vallen);
2463 if (ptlrpc_req_need_swab(req)) {
2464 __swab64s(&cs->cs_recno);
2465 __swab32s(&cs->cs_id);
2468 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2470 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2472 } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2474 obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2481 static int mdt_readpage(struct tgt_session_info *tsi)
2483 struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
2484 struct mdt_object *object = mdt_obj(tsi->tsi_corpus);
2485 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
2486 const struct mdt_body *reqbody = tsi->tsi_mdt_body;
2487 struct mdt_body *repbody;
2493 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2494 RETURN(err_serious(-ENOMEM));
2496 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2497 if (repbody == NULL || reqbody == NULL)
2498 RETURN(err_serious(-EFAULT));
2501 * prepare @rdpg before calling lower layers and transfer itself. Here
2502 * reqbody->size contains offset of where to start to read and
2503 * reqbody->nlink contains number bytes to read.
2505 rdpg->rp_hash = reqbody->mbo_size;
2506 if (rdpg->rp_hash != reqbody->mbo_size) {
2507 CERROR("Invalid hash: %#llx != %#llx\n",
2508 rdpg->rp_hash, reqbody->mbo_size);
2512 rdpg->rp_attrs = reqbody->mbo_mode;
2513 if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2514 rdpg->rp_attrs |= LUDA_64BITHASH;
2515 rdpg->rp_count = min_t(unsigned int, reqbody->mbo_nlink,
2516 exp_max_brw_size(tsi->tsi_exp));
2517 rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2519 OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2520 if (rdpg->rp_pages == NULL)
2523 for (i = 0; i < rdpg->rp_npages; ++i) {
2524 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2525 if (rdpg->rp_pages[i] == NULL)
2526 GOTO(free_rdpg, rc = -ENOMEM);
2529 /* call lower layers to fill allocated pages with directory data */
2530 rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2532 GOTO(free_rdpg, rc);
2534 /* send pages to client */
2535 rc = tgt_sendpage(tsi, rdpg, rc);
2540 for (i = 0; i < rdpg->rp_npages; i++)
2541 if (rdpg->rp_pages[i] != NULL)
2542 __free_page(rdpg->rp_pages[i]);
2543 OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2545 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2551 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2553 struct lu_ucred *uc = mdt_ucred_check(info);
2554 struct lu_attr *attr = &info->mti_attr.ma_attr;
2559 if (op != REINT_SETATTR) {
2560 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2561 attr->la_uid = uc->uc_fsuid;
2562 /* for S_ISGID, inherit gid from his parent, such work will be
2563 * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2564 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2565 attr->la_gid = uc->uc_fsgid;
2571 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2573 return op == REINT_OPEN &&
2574 !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2577 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2579 struct req_capsule *pill = info->mti_pill;
2581 if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2583 req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2585 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2587 /* pre-set size in server part with max size */
2588 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2590 OBD_MAX_DEFAULT_EA_SIZE);
2592 req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2597 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2599 struct req_capsule *pill = info->mti_pill;
2601 if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2603 /* pre-set size in server part with max size */
2604 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2606 info->mti_mdt->mdt_max_mdsize);
2609 static int mdt_reint_internal(struct mdt_thread_info *info,
2610 struct mdt_lock_handle *lhc,
2613 struct req_capsule *pill = info->mti_pill;
2614 struct mdt_body *repbody;
2619 rc = mdt_reint_unpack(info, op);
2621 CERROR("Can't unpack reint, rc %d\n", rc);
2622 RETURN(err_serious(rc));
2626 /* check if the file system is set to readonly. O_RDONLY open
2627 * is still allowed even the file system is set to readonly mode */
2628 if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2629 RETURN(err_serious(-EROFS));
2631 /* for replay (no_create) lmm is not needed, client has it already */
2632 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2633 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2636 /* llog cookies are always 0, the field is kept for compatibility */
2637 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2638 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2640 /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2641 * by default. If the target object has more ACL entries, then
2642 * enlarge the buffer when necessary. */
2643 if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2644 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2645 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2647 mdt_preset_secctx_size(info);
2648 mdt_preset_encctx_size(info);
2650 rc = req_capsule_server_pack(pill);
2652 CERROR("Can't pack response, rc %d\n", rc);
2653 RETURN(err_serious(rc));
2656 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2657 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2659 repbody->mbo_eadatasize = 0;
2660 repbody->mbo_aclsize = 0;
2663 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2665 /* for replay no cookkie / lmm need, because client have this already */
2666 if (info->mti_spec.no_create)
2667 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2668 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2670 rc = mdt_init_ucred_reint(info);
2672 GOTO(out_shrink, rc);
2674 rc = mdt_fix_attr_ucred(info, op);
2676 GOTO(out_ucred, rc = err_serious(rc));
2678 rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2680 GOTO(out_ucred, rc);
2681 } else if (rc == 1) {
2682 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2683 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2684 GOTO(out_ucred, rc);
2686 rc = mdt_reint_rec(info, lhc);
2689 mdt_exit_ucred(info);
2691 mdt_client_compatibility(info);
2693 rc2 = mdt_fix_reply(info);
2698 * Data-on-MDT optimization - read data along with OPEN and return it
2699 * in reply when possible.
2701 if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2702 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2708 static long mdt_reint_opcode(struct ptlrpc_request *req,
2709 const struct req_format **fmt)
2711 struct mdt_device *mdt;
2712 struct mdt_rec_reint *rec;
2715 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2717 opc = rec->rr_opcode;
2718 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2719 if (opc < REINT_MAX && fmt[opc] != NULL)
2720 req_capsule_extend(&req->rq_pill, fmt[opc]);
2722 mdt = mdt_exp2dev(req->rq_export);
2723 CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2724 " rc = %d\n", req->rq_export->exp_obd->obd_name,
2725 opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2726 opc = err_serious(-EFAULT);
2729 opc = err_serious(-EFAULT);
2734 static int mdt_reint(struct tgt_session_info *tsi)
2738 static const struct req_format *reint_fmts[REINT_MAX] = {
2739 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
2740 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
2741 [REINT_LINK] = &RQF_MDS_REINT_LINK,
2742 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
2743 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
2744 [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
2745 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2746 [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
2747 [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE,
2748 [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC,
2753 opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2755 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2757 * No lock possible here from client to pass it to reint code
2760 rc = mdt_reint_internal(info, NULL, opc);
2761 mdt_thread_info_fini(info);
2766 tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2770 /* this should sync the whole device */
2771 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2773 struct dt_device *dt = mdt->mdt_bottom;
2777 rc = dt->dd_ops->dt_sync(env, dt);
2781 /* this should sync this object */
2782 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2783 struct mdt_object *mo)
2789 if (!mdt_object_exists(mo)) {
2790 CWARN("%s: non existing object "DFID": rc = %d\n",
2791 exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2796 if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2797 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2798 dt_obj_version_t version;
2800 version = dt_version_get(env, mdt_obj2dt(mo));
2801 if (version > tgt->lut_obd->obd_last_committed)
2802 rc = mo_object_sync(env, mdt_object_child(mo));
2804 rc = mo_object_sync(env, mdt_object_child(mo));
2810 static int mdt_sync(struct tgt_session_info *tsi)
2812 struct ptlrpc_request *req = tgt_ses_req(tsi);
2813 struct req_capsule *pill = tsi->tsi_pill;
2814 struct mdt_body *body;
2815 ktime_t kstart = ktime_get();
2820 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2821 RETURN(err_serious(-ENOMEM));
2823 if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2824 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2826 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2828 if (unlikely(info->mti_object == NULL))
2831 /* sync an object */
2832 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2835 const struct lu_fid *fid;
2836 struct lu_attr *la = &info->mti_attr.ma_attr;
2838 info->mti_attr.ma_need = MA_INODE;
2839 info->mti_attr.ma_valid = 0;
2840 rc = mdt_attr_get_complex(info, info->mti_object,
2843 body = req_capsule_server_get(pill,
2845 fid = mdt_object_fid(info->mti_object);
2846 mdt_pack_attr2body(info, body, la, fid);
2849 mdt_thread_info_fini(info);
2852 mdt_counter_incr(req, LPROC_MDT_SYNC,
2853 ktime_us_delta(ktime_get(), kstart));
2858 static int mdt_data_sync(struct tgt_session_info *tsi)
2860 struct mdt_thread_info *info;
2861 struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2862 struct ost_body *body = tsi->tsi_ost_body;
2863 struct ost_body *repbody;
2864 struct mdt_object *mo = NULL;
2870 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2872 /* if no fid is specified then do nothing,
2873 * device sync is done via MDS_SYNC */
2874 if (fid_is_zero(&tsi->tsi_fid))
2877 mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2879 RETURN(PTR_ERR(mo));
2881 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2885 repbody->oa.o_oi = body->oa.o_oi;
2886 repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2888 info = tsi2mdt_info(tsi);
2889 ma = &info->mti_attr;
2890 ma->ma_need = MA_INODE;
2892 rc = mdt_attr_get_complex(info, mo, ma);
2894 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2897 mdt_thread_info_fini(info);
2902 mdt_object_put(tsi->tsi_env, mo);
2907 * Handle quota control requests to consult current usage/limit, but also
2908 * to configure quota enforcement
2910 static int mdt_quotactl(struct tgt_session_info *tsi)
2912 struct obd_export *exp = tsi->tsi_exp;
2913 struct req_capsule *pill = tsi->tsi_pill;
2914 struct obd_quotactl *oqctl, *repoqc;
2916 struct mdt_device *mdt = mdt_exp2dev(exp);
2917 struct lu_device *qmt = mdt->mdt_qmt_dev;
2918 struct lu_nodemap *nodemap;
2921 oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2923 RETURN(err_serious(-EPROTO));
2925 rc = req_capsule_server_pack(pill);
2927 RETURN(err_serious(rc));
2929 nodemap = nodemap_get_from_exp(exp);
2930 if (IS_ERR(nodemap))
2931 RETURN(PTR_ERR(nodemap));
2933 switch (oqctl->qc_cmd) {
2934 /* master quotactl */
2937 case LUSTRE_Q_SETDEFAULT:
2938 case LUSTRE_Q_SETQUOTAPOOL:
2939 case LUSTRE_Q_SETINFOPOOL:
2940 if (!nodemap_can_setquota(nodemap))
2941 GOTO(out_nodemap, rc = -EPERM);
2945 case LUSTRE_Q_GETDEFAULT:
2946 case LUSTRE_Q_GETQUOTAPOOL:
2947 case LUSTRE_Q_GETINFOPOOL:
2949 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2950 /* slave quotactl */
2957 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
2958 mdt_obd_name(mdt), oqctl->qc_cmd, rc);
2959 GOTO(out_nodemap, rc);
2963 switch (oqctl->qc_type) {
2965 id = nodemap_map_id(nodemap, NODEMAP_UID,
2966 NODEMAP_CLIENT_TO_FS, id);
2969 id = nodemap_map_id(nodemap, NODEMAP_GID,
2970 NODEMAP_CLIENT_TO_FS, id);
2973 /* todo: check/map project id */
2977 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2979 repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
2981 GOTO(out_nodemap, rc = err_serious(-EFAULT));
2983 if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
2984 barrier_exit(tsi->tsi_tgt->lut_bottom);
2986 if (oqctl->qc_id != id)
2987 swap(oqctl->qc_id, id);
2989 if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
2990 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
2991 RETURN(-EINPROGRESS);
2994 switch (oqctl->qc_cmd) {
3000 case LUSTRE_Q_SETDEFAULT:
3001 case LUSTRE_Q_GETDEFAULT:
3002 case LUSTRE_Q_SETQUOTAPOOL:
3003 case LUSTRE_Q_GETQUOTAPOOL:
3004 case LUSTRE_Q_SETINFOPOOL:
3005 case LUSTRE_Q_GETINFOPOOL:
3006 /* forward quotactl request to QMT */
3007 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
3012 /* slave quotactl */
3013 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
3018 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
3019 GOTO(out_nodemap, rc = -EFAULT);
3022 if (oqctl->qc_id != id)
3023 swap(oqctl->qc_id, id);
3025 QCTL_COPY(repoqc, oqctl);
3029 nodemap_putref(nodemap);
3034 /** clone llog ctxt from child (mdd)
3035 * This allows remote llog (replicator) access.
3036 * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
3037 * context was originally set up, or we can handle them directly.
3038 * I choose the latter, but that means I need any llog
3039 * contexts set up by child to be accessable by the mdt. So we clone the
3040 * context into our context list here.
3042 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3045 struct md_device *next = mdt->mdt_child;
3046 struct llog_ctxt *ctxt;
3049 if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3052 rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3053 if (rc || ctxt == NULL) {
3057 rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3059 CERROR("Can't set mdt ctxt %d\n", rc);
3064 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3065 struct mdt_device *mdt, int idx)
3067 struct llog_ctxt *ctxt;
3069 ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3072 /* Put once for the get we just did, and once for the clone */
3073 llog_ctxt_put(ctxt);
3074 llog_ctxt_put(ctxt);
3079 * sec context handlers
3081 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3083 CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3089 * quota request handlers
3091 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3093 struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
3094 struct lu_device *qmt = mdt->mdt_qmt_dev;
3099 RETURN(err_serious(-EOPNOTSUPP));
3101 rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3105 struct mdt_object *mdt_object_new(const struct lu_env *env,
3106 struct mdt_device *d,
3107 const struct lu_fid *f)
3109 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3110 struct lu_object *o;
3111 struct mdt_object *m;
3114 CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3115 o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3116 if (unlikely(IS_ERR(o)))
3117 m = (struct mdt_object *)o;
3123 struct mdt_object *mdt_object_find(const struct lu_env *env,
3124 struct mdt_device *d,
3125 const struct lu_fid *f)
3127 struct lu_object *o;
3128 struct mdt_object *m;
3131 CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3132 o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3133 if (unlikely(IS_ERR(o)))
3134 m = (struct mdt_object *)o;
3142 * Asyncronous commit for mdt device.
3144 * Pass asynchonous commit call down the MDS stack.
3146 * \param env environment
3147 * \param mdt the mdt device
3149 static void mdt_device_commit_async(const struct lu_env *env,
3150 struct mdt_device *mdt)
3152 struct dt_device *dt = mdt->mdt_bottom;
3156 rc = dt->dd_ops->dt_commit_async(env, dt);
3157 if (unlikely(rc != 0))
3158 CWARN("%s: async commit start failed: rc = %d\n",
3159 mdt_obd_name(mdt), rc);
3160 atomic_inc(&mdt->mdt_async_commit_count);
3165 * Mark the lock as "synchonous".