1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
151 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
153 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
155 OBD_VFREE(buf->lb_buf, buf->lb_len);
157 OBD_FREE(buf->lb_buf, buf->lb_len);
160 if (buf->lb_buf == NULL) {
162 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163 OBD_ALLOC(buf->lb_buf, buf->lb_len);
166 if (buf->lb_buf == NULL) {
167 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
170 if (buf->lb_buf == NULL)
176 /** Increase the size of the \a mti_big_buf.
177 * preserves old data in buffer
178 * old buffer remains unchanged on error
179 * \retval 0 or -ENOMEM
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
183 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
186 LASSERT(len >= oldbuf->lb_len);
187 if (len > BUF_VMALLOC_SIZE) {
188 OBD_VMALLOC(buf.lb_buf, len);
191 OBD_ALLOC(buf.lb_buf, len);
194 if (buf.lb_buf == NULL)
198 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
200 if (oldbuf->lb_vmalloc)
201 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
203 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
205 memcpy(oldbuf, &buf, sizeof(buf));
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211 struct mdd_device *mdd)
213 struct mdd_thread_info *mti = mdd_env_info(env);
216 max_cookie_size = mdd_lov_cookiesize(env, mdd);
217 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218 if (mti->mti_max_cookie)
219 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220 mti->mti_max_cookie = NULL;
221 mti->mti_max_cookie_size = 0;
223 if (unlikely(mti->mti_max_cookie == NULL)) {
224 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225 if (likely(mti->mti_max_cookie != NULL))
226 mti->mti_max_cookie_size = max_cookie_size;
228 if (likely(mti->mti_max_cookie != NULL))
229 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230 return mti->mti_max_cookie;
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234 struct mdd_device *mdd)
236 struct mdd_thread_info *mti = mdd_env_info(env);
239 max_lmm_size = mdd_lov_mdsize(env, mdd);
240 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241 if (mti->mti_max_lmm)
242 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243 mti->mti_max_lmm = NULL;
244 mti->mti_max_lmm_size = 0;
246 if (unlikely(mti->mti_max_lmm == NULL)) {
247 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248 if (likely(mti->mti_max_lmm != NULL))
249 mti->mti_max_lmm_size = max_lmm_size;
251 return mti->mti_max_lmm;
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255 const struct lu_object_header *hdr,
258 struct mdd_object *mdd_obj;
260 OBD_ALLOC_PTR(mdd_obj);
261 if (mdd_obj != NULL) {
264 o = mdd2lu_obj(mdd_obj);
265 lu_object_init(o, NULL, d);
266 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268 mdd_obj->mod_count = 0;
269 o->lo_ops = &mdd_lu_obj_ops;
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277 const struct lu_object_conf *unused)
279 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280 struct mdd_object *mdd_obj = lu2mdd_obj(o);
281 struct lu_object *below;
282 struct lu_device *under;
285 mdd_obj->mod_cltime = 0;
286 under = &d->mdd_child->dd_lu_dev;
287 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288 mdd_pdlock_init(mdd_obj);
292 lu_object_add(o, below);
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
299 if (lu_object_exists(o))
300 return mdd_get_flags(env, lu2mdd_obj(o));
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
307 struct mdd_object *mdd = lu2mdd_obj(o);
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314 lu_printer_t p, const struct lu_object *o)
316 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318 "valid=%x, cltime="LPU64", flags=%lx)",
319 mdd, mdd->mod_count, mdd->mod_valid,
320 mdd->mod_cltime, mdd->mod_flags);
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324 .loo_object_init = mdd_object_init,
325 .loo_object_start = mdd_object_start,
326 .loo_object_free = mdd_object_free,
327 .loo_object_print = mdd_object_print,
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331 struct mdd_device *d,
332 const struct lu_fid *f)
334 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338 const char *path, struct lu_fid *fid)
341 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342 struct mdd_object *obj;
343 struct lu_name *lname = &mdd_env_info(env)->mti_name;
348 /* temp buffer for path element */
349 buf = mdd_buf_alloc(env, PATH_MAX);
350 if (buf->lb_buf == NULL)
353 lname->ln_name = name = buf->lb_buf;
354 lname->ln_namelen = 0;
355 *f = mdd->mdd_root_fid;
362 while (*path != '/' && *path != '\0') {
370 /* find obj corresponding to fid */
371 obj = mdd_object_find(env, mdd, f);
373 GOTO(out, rc = -EREMOTE);
375 GOTO(out, rc = -PTR_ERR(obj));
376 /* get child fid from parent and name */
377 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378 mdd_object_put(env, obj);
383 lname->ln_namelen = 0;
392 /** The maximum depth that fid2path() will search.
393 * This is limited only because we want to store the fids for
394 * historical path lookup purposes.
396 #define MAX_PATH_DEPTH 100
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400 __u64 pli_recno; /**< history point */
401 __u64 pli_currec; /**< current record */
402 struct lu_fid pli_fid;
403 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404 struct mdd_object *pli_mdd_obj;
405 char *pli_path; /**< full path */
407 int pli_linkno; /**< which hardlink to follow */
408 int pli_fidcount; /**< number of \a pli_fids */
411 static int mdd_path_current(const struct lu_env *env,
412 struct path_lookup_info *pli)
414 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415 struct mdd_object *mdd_obj;
416 struct lu_buf *buf = NULL;
417 struct link_ea_header *leh;
418 struct link_ea_entry *lee;
419 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
426 ptr = pli->pli_path + pli->pli_pathlen - 1;
429 pli->pli_fidcount = 0;
430 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
432 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433 mdd_obj = mdd_object_find(env, mdd,
434 &pli->pli_fids[pli->pli_fidcount]);
436 GOTO(out, rc = -EREMOTE);
438 GOTO(out, rc = -PTR_ERR(mdd_obj));
439 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
441 mdd_object_put(env, mdd_obj);
445 /* Do I need to error out here? */
450 /* Get parent fid and object name */
451 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452 buf = mdd_links_get(env, mdd_obj);
453 mdd_read_unlock(env, mdd_obj);
454 mdd_object_put(env, mdd_obj);
456 GOTO(out, rc = PTR_ERR(buf));
459 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462 /* If set, use link #linkno for path lookup, otherwise use
463 link #0. Only do this for the final path element. */
464 if ((pli->pli_fidcount == 0) &&
465 (pli->pli_linkno < leh->leh_reccount)) {
467 for (count = 0; count < pli->pli_linkno; count++) {
468 lee = (struct link_ea_entry *)
469 ((char *)lee + reclen);
470 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
472 if (pli->pli_linkno < leh->leh_reccount - 1)
473 /* indicate to user there are more links */
477 /* Pack the name in the end of the buffer */
478 ptr -= tmpname->ln_namelen;
479 if (ptr - 1 <= pli->pli_path)
480 GOTO(out, rc = -EOVERFLOW);
481 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
484 /* Store the parent fid for historic lookup */
485 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486 GOTO(out, rc = -EOVERFLOW);
487 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
490 /* Verify that our path hasn't changed since we started the lookup.
491 Record the current index, and verify the path resolves to the
492 same fid. If it does, then the path is correct as of this index. */
493 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494 pli->pli_currec = mdd->mdd_cl.mc_index;
495 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
498 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499 GOTO (out, rc = -EAGAIN);
501 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504 PFID(&pli->pli_fid));
505 GOTO(out, rc = -EAGAIN);
507 ptr++; /* skip leading / */
508 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
512 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513 /* if we vmalloced a large buffer drop it */
519 static int mdd_path_historic(const struct lu_env *env,
520 struct path_lookup_info *pli)
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527 char *path, int pathlen, __u64 *recno, int *linkno)
529 struct path_lookup_info *pli;
537 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
611 struct lov_desc *ldesc;
612 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
613 struct lov_user_md *lum = (struct lov_user_md*)lmm;
619 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
620 LASSERT(ldesc != NULL);
622 lum->lmm_magic = LOV_MAGIC_V1;
623 lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
624 lum->lmm_pattern = ldesc->ld_pattern;
625 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
629 RETURN(sizeof(*lum));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
644 if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
645 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
647 ma->ma_lmm_size = rc;
648 ma->ma_valid |= MA_LOV;
654 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
660 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
661 rc = __mdd_lmm_get(env, mdd_obj, ma);
662 mdd_read_unlock(env, mdd_obj);
667 static int __mdd_lmv_get(const struct lu_env *env,
668 struct mdd_object *mdd_obj, struct md_attr *ma)
673 if (ma->ma_valid & MA_LMV)
676 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
679 ma->ma_valid |= MA_LMV;
685 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
688 struct mdd_thread_info *info = mdd_env_info(env);
689 struct lustre_mdt_attrs *lma =
690 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
695 /* If all needed data are already valid, nothing to do */
696 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
697 (ma->ma_need & (MA_HSM | MA_SOM)))
700 /* Read LMA from disk EA */
701 lma_size = sizeof(info->mti_xattr_buf);
702 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
706 /* Useless to check LMA incompatibility because this is already done in
707 * osd_ea_fid_get(), and this will fail long before this code is
709 * So, if we are here, LMA is compatible.
712 lustre_lma_swab(lma);
714 /* Swab and copy LMA */
715 if (ma->ma_need & MA_HSM) {
716 if (lma->lma_compat & LMAC_HSM)
717 ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
719 ma->ma_hsm_flags = 0;
720 ma->ma_valid |= MA_HSM;
724 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
725 LASSERT(ma->ma_som != NULL);
726 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
727 ma->ma_som->msd_size = lma->lma_som_size;
728 ma->ma_som->msd_blocks = lma->lma_som_blocks;
729 ma->ma_som->msd_mountid = lma->lma_som_mountid;
730 ma->ma_valid |= MA_SOM;
736 static int mdd_attr_get_internal(const struct lu_env *env,
737 struct mdd_object *mdd_obj,
743 if (ma->ma_need & MA_INODE)
744 rc = mdd_iattr_get(env, mdd_obj, ma);
746 if (rc == 0 && ma->ma_need & MA_LOV) {
747 if (S_ISREG(mdd_object_type(mdd_obj)) ||
748 S_ISDIR(mdd_object_type(mdd_obj)))
749 rc = __mdd_lmm_get(env, mdd_obj, ma);
751 if (rc == 0 && ma->ma_need & MA_LMV) {
752 if (S_ISDIR(mdd_object_type(mdd_obj)))
753 rc = __mdd_lmv_get(env, mdd_obj, ma);
755 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
756 if (S_ISREG(mdd_object_type(mdd_obj)))
757 rc = __mdd_lma_get(env, mdd_obj, ma);
759 #ifdef CONFIG_FS_POSIX_ACL
760 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
761 if (S_ISDIR(mdd_object_type(mdd_obj)))
762 rc = mdd_def_acl_get(env, mdd_obj, ma);
765 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
770 int mdd_attr_get_internal_locked(const struct lu_env *env,
771 struct mdd_object *mdd_obj, struct md_attr *ma)
774 int needlock = ma->ma_need &
775 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
778 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
779 rc = mdd_attr_get_internal(env, mdd_obj, ma);
781 mdd_read_unlock(env, mdd_obj);
786 * No permission check is needed.
788 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
791 struct mdd_object *mdd_obj = md2mdd_obj(obj);
795 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
800 * No permission check is needed.
802 static int mdd_xattr_get(const struct lu_env *env,
803 struct md_object *obj, struct lu_buf *buf,
806 struct mdd_object *mdd_obj = md2mdd_obj(obj);
811 LASSERT(mdd_object_exists(mdd_obj));
813 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
814 rc = mdo_xattr_get(env, mdd_obj, buf, name,
815 mdd_object_capa(env, mdd_obj));
816 mdd_read_unlock(env, mdd_obj);
822 * Permission check is done when open,
823 * no need check again.
825 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
828 struct mdd_object *mdd_obj = md2mdd_obj(obj);
829 struct dt_object *next;
834 LASSERT(mdd_object_exists(mdd_obj));
836 next = mdd_object_child(mdd_obj);
837 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
838 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
839 mdd_object_capa(env, mdd_obj));
840 mdd_read_unlock(env, mdd_obj);
845 * No permission check is needed.
847 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
850 struct mdd_object *mdd_obj = md2mdd_obj(obj);
855 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
856 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
857 mdd_read_unlock(env, mdd_obj);
862 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
863 struct mdd_object *c, struct md_attr *ma,
864 struct thandle *handle,
865 const struct md_op_spec *spec)
867 struct lu_attr *attr = &ma->ma_attr;
868 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
869 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
870 const struct dt_index_features *feat = spec->sp_feat;
874 if (!mdd_object_exists(c)) {
875 struct dt_object *next = mdd_object_child(c);
878 if (feat != &dt_directory_features && feat != NULL)
879 dof->dof_type = DFT_INDEX;
881 dof->dof_type = dt_mode_to_dft(attr->la_mode);
883 dof->u.dof_idx.di_feat = feat;
885 /* @hint will be initialized by underlying device. */
886 next->do_ops->do_ah_init(env, hint,
887 p ? mdd_object_child(p) : NULL,
888 attr->la_mode & S_IFMT);
890 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
891 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
899 * Make sure the ctime is increased only.
901 static inline int mdd_attr_check(const struct lu_env *env,
902 struct mdd_object *obj,
903 struct lu_attr *attr)
905 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
909 if (attr->la_valid & LA_CTIME) {
910 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
914 if (attr->la_ctime < tmp_la->la_ctime)
915 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
916 else if (attr->la_valid == LA_CTIME &&
917 attr->la_ctime == tmp_la->la_ctime)
918 attr->la_valid &= ~LA_CTIME;
923 int mdd_attr_set_internal(const struct lu_env *env,
924 struct mdd_object *obj,
925 struct lu_attr *attr,
926 struct thandle *handle,
932 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
933 #ifdef CONFIG_FS_POSIX_ACL
934 if (!rc && (attr->la_valid & LA_MODE) && needacl)
935 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
940 int mdd_attr_check_set_internal(const struct lu_env *env,
941 struct mdd_object *obj,
942 struct lu_attr *attr,
943 struct thandle *handle,
949 rc = mdd_attr_check(env, obj, attr);
954 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
958 static int mdd_attr_set_internal_locked(const struct lu_env *env,
959 struct mdd_object *obj,
960 struct lu_attr *attr,
961 struct thandle *handle,
967 needacl = needacl && (attr->la_valid & LA_MODE);
969 mdd_write_lock(env, obj, MOR_TGT_CHILD);
970 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
972 mdd_write_unlock(env, obj);
976 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
977 struct mdd_object *obj,
978 struct lu_attr *attr,
979 struct thandle *handle,
985 needacl = needacl && (attr->la_valid & LA_MODE);
987 mdd_write_lock(env, obj, MOR_TGT_CHILD);
988 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
990 mdd_write_unlock(env, obj);
994 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
995 const struct lu_buf *buf, const char *name,
996 int fl, struct thandle *handle)
998 struct lustre_capa *capa = mdd_object_capa(env, obj);
1002 if (buf->lb_buf && buf->lb_len > 0)
1003 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1004 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1005 rc = mdo_xattr_del(env, obj, name, handle, capa);
1011 * This gives the same functionality as the code between
1012 * sys_chmod and inode_setattr
1013 * chown_common and inode_setattr
1014 * utimes and inode_setattr
1015 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1017 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1018 struct lu_attr *la, const struct md_attr *ma)
1020 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1021 struct md_ucred *uc;
1028 /* Do not permit change file type */
1029 if (la->la_valid & LA_TYPE)
1032 /* They should not be processed by setattr */
1033 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1036 /* export destroy does not have ->le_ses, but we may want
1037 * to drop LUSTRE_SOM_FL. */
1043 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1047 if (la->la_valid == LA_CTIME) {
1048 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1049 /* This is only for set ctime when rename's source is
1051 rc = mdd_may_delete(env, NULL, obj,
1052 (struct md_attr *)ma, 1, 0);
1053 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1054 la->la_valid &= ~LA_CTIME;
1058 if (la->la_valid == LA_ATIME) {
1059 /* This is atime only set for read atime update on close. */
1060 if (la->la_atime > tmp_la->la_atime &&
1061 la->la_atime <= (tmp_la->la_atime +
1062 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1063 la->la_valid &= ~LA_ATIME;
1067 /* Check if flags change. */
1068 if (la->la_valid & LA_FLAGS) {
1069 unsigned int oldflags = 0;
1070 unsigned int newflags = la->la_flags &
1071 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1073 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1074 !mdd_capable(uc, CFS_CAP_FOWNER))
1077 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1078 * only be changed by the relevant capability. */
1079 if (mdd_is_immutable(obj))
1080 oldflags |= LUSTRE_IMMUTABLE_FL;
1081 if (mdd_is_append(obj))
1082 oldflags |= LUSTRE_APPEND_FL;
1083 if ((oldflags ^ newflags) &&
1084 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1087 if (!S_ISDIR(tmp_la->la_mode))
1088 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1091 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1092 (la->la_valid & ~LA_FLAGS) &&
1093 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1096 /* Check for setting the obj time. */
1097 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1098 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1099 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1100 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1101 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1109 /* Make sure a caller can chmod. */
1110 if (la->la_valid & LA_MODE) {
1111 /* Bypass la_vaild == LA_MODE,
1112 * this is for changing file with SUID or SGID. */
1113 if ((la->la_valid & ~LA_MODE) &&
1114 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1115 (uc->mu_fsuid != tmp_la->la_uid) &&
1116 !mdd_capable(uc, CFS_CAP_FOWNER))
1119 if (la->la_mode == (cfs_umode_t) -1)
1120 la->la_mode = tmp_la->la_mode;
1122 la->la_mode = (la->la_mode & S_IALLUGO) |
1123 (tmp_la->la_mode & ~S_IALLUGO);
1125 /* Also check the setgid bit! */
1126 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1127 la->la_gid : tmp_la->la_gid) &&
1128 !mdd_capable(uc, CFS_CAP_FSETID))
1129 la->la_mode &= ~S_ISGID;
1131 la->la_mode = tmp_la->la_mode;
1134 /* Make sure a caller can chown. */
1135 if (la->la_valid & LA_UID) {
1136 if (la->la_uid == (uid_t) -1)
1137 la->la_uid = tmp_la->la_uid;
1138 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1139 (la->la_uid != tmp_la->la_uid)) &&
1140 !mdd_capable(uc, CFS_CAP_CHOWN))
1143 /* If the user or group of a non-directory has been
1144 * changed by a non-root user, remove the setuid bit.
1145 * 19981026 David C Niemi <niemi@tux.org>
1147 * Changed this to apply to all users, including root,
1148 * to avoid some races. This is the behavior we had in
1149 * 2.0. The check for non-root was definitely wrong
1150 * for 2.2 anyway, as it should have been using
1151 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1152 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1153 !S_ISDIR(tmp_la->la_mode)) {
1154 la->la_mode &= ~S_ISUID;
1155 la->la_valid |= LA_MODE;
1159 /* Make sure caller can chgrp. */
1160 if (la->la_valid & LA_GID) {
1161 if (la->la_gid == (gid_t) -1)
1162 la->la_gid = tmp_la->la_gid;
1163 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1164 ((la->la_gid != tmp_la->la_gid) &&
1165 !lustre_in_group_p(uc, la->la_gid))) &&
1166 !mdd_capable(uc, CFS_CAP_CHOWN))
1169 /* Likewise, if the user or group of a non-directory
1170 * has been changed by a non-root user, remove the
1171 * setgid bit UNLESS there is no group execute bit
1172 * (this would be a file marked for mandatory
1173 * locking). 19981026 David C Niemi <niemi@tux.org>
1175 * Removed the fsuid check (see the comment above) --
1177 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1178 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1179 la->la_mode &= ~S_ISGID;
1180 la->la_valid |= LA_MODE;
1184 /* For both Size-on-MDS case and truncate case,
1185 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1186 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1187 * For SOM case, it is true, the MAY_WRITE perm has been checked
1188 * when open, no need check again. For truncate case, it is false,
1189 * the MAY_WRITE perm should be checked here. */
1190 if (ma->ma_attr_flags & MDS_SOM) {
1191 /* For the "Size-on-MDS" setattr update, merge coming
1192 * attributes with the set in the inode. BUG 10641 */
1193 if ((la->la_valid & LA_ATIME) &&
1194 (la->la_atime <= tmp_la->la_atime))
1195 la->la_valid &= ~LA_ATIME;
1197 /* OST attributes do not have a priority over MDS attributes,
1198 * so drop times if ctime is equal. */
1199 if ((la->la_valid & LA_CTIME) &&
1200 (la->la_ctime <= tmp_la->la_ctime))
1201 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1203 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1204 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1205 (uc->mu_fsuid == tmp_la->la_uid)) &&
1206 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1207 rc = mdd_permission_internal_locked(env, obj,
1214 if (la->la_valid & LA_CTIME) {
1215 /* The pure setattr, it has the priority over what is
1216 * already set, do not drop it if ctime is equal. */
1217 if (la->la_ctime < tmp_la->la_ctime)
1218 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1226 /** Store a data change changelog record
1227 * If this fails, we must fail the whole transaction; we don't
1228 * want the change to commit without the log entry.
1229 * \param mdd_obj - mdd_object of change
1230 * \param handle - transacion handle
1232 static int mdd_changelog_data_store(const struct lu_env *env,
1233 struct mdd_device *mdd,
1234 enum changelog_rec_type type,
1236 struct mdd_object *mdd_obj,
1237 struct thandle *handle)
1239 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1240 struct llog_changelog_rec *rec;
1246 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1248 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1251 LASSERT(handle != NULL);
1252 LASSERT(mdd_obj != NULL);
1254 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1255 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1256 /* Don't need multiple updates in this log */
1257 /* Don't check under lock - no big deal if we get an extra
1262 reclen = llog_data_len(sizeof(*rec));
1263 buf = mdd_buf_alloc(env, reclen);
1264 if (buf->lb_buf == NULL)
1266 rec = (struct llog_changelog_rec *)buf->lb_buf;
1268 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1269 rec->cr.cr_type = (__u32)type;
1270 rec->cr.cr_tfid = *tfid;
1271 rec->cr.cr_namelen = 0;
1272 mdd_obj->mod_cltime = cfs_time_current_64();
1274 rc = mdd_changelog_llog_write(mdd, rec, handle);
1276 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1277 rc, type, PFID(tfid));
1285 * Should be called with write lock held.
1287 * \see mdd_lma_set_locked().
1289 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1290 const struct md_attr *ma, struct thandle *handle)
1292 struct mdd_thread_info *info = mdd_env_info(env);
1294 struct lustre_mdt_attrs *lma =
1295 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1296 int lmasize = sizeof(struct lustre_mdt_attrs);
1301 /* Either HSM or SOM part is not valid, we need to read it before */
1302 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1303 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1307 lustre_lma_swab(lma);
1309 memset(lma, 0, lmasize);
1313 if (ma->ma_valid & MA_HSM) {
1314 lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1315 lma->lma_compat |= LMAC_HSM;
1319 if (ma->ma_valid & MA_SOM) {
1320 LASSERT(ma->ma_som != NULL);
1321 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1322 lma->lma_compat &= ~LMAC_SOM;
1324 lma->lma_compat |= LMAC_SOM;
1325 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1326 lma->lma_som_size = ma->ma_som->msd_size;
1327 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1328 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1333 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1335 lustre_lma_swab(lma);
1336 buf = mdd_buf_get(env, lma, lmasize);
1337 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1343 * Save LMA extended attributes with data from \a ma.
1345 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1346 * not, LMA EA will be first read from disk, modified and write back.
1349 static int mdd_lma_set_locked(const struct lu_env *env,
1350 struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1355 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1356 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1357 mdd_write_unlock(env, mdd_obj);
1361 /* Precedence for choosing record type when multiple
1362 * attributes change: setattr > mtime > ctime > atime
1363 * (ctime changes when mtime does, plus chmod/chown.
1364 * atime and ctime are independent.) */
1365 static int mdd_attr_set_changelog(const struct lu_env *env,
1366 struct md_object *obj, struct thandle *handle,
1369 struct mdd_device *mdd = mdo2mdd(obj);
1372 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1373 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1374 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1375 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1376 bits = bits & mdd->mdd_cl.mc_mask;
1380 /* The record type is the lowest non-masked set bit */
1381 while (bits && ((bits & 1) == 0)) {
1386 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1387 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1388 md2mdd_obj(obj), handle);
1391 /* set attr and LOV EA at once, return updated attr */
1392 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1393 const struct md_attr *ma)
1395 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1396 struct mdd_device *mdd = mdo2mdd(obj);
1397 struct thandle *handle;
1398 struct lov_mds_md *lmm = NULL;
1399 struct llog_cookie *logcookies = NULL;
1400 int rc, lmm_size = 0, cookie_size = 0;
1401 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1402 #ifdef HAVE_QUOTA_SUPPORT
1403 struct obd_device *obd = mdd->mdd_obd_dev;
1404 struct mds_obd *mds = &obd->u.mds;
1405 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1406 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1407 int quota_opc = 0, block_count = 0;
1408 int inode_pending[MAXQUOTAS] = { 0, 0 };
1409 int block_pending[MAXQUOTAS] = { 0, 0 };
1413 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1414 MDD_TXN_ATTR_SET_OP);
1415 handle = mdd_trans_start(env, mdd);
1417 RETURN(PTR_ERR(handle));
1418 /*TODO: add lock here*/
1419 /* start a log jounal handle if needed */
1420 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1421 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1422 lmm_size = mdd_lov_mdsize(env, mdd);
1423 lmm = mdd_max_lmm_get(env, mdd);
1425 GOTO(cleanup, rc = -ENOMEM);
1427 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1434 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1435 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1436 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1438 *la_copy = ma->ma_attr;
1439 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1443 #ifdef HAVE_QUOTA_SUPPORT
1444 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1445 struct obd_export *exp = md_quota(env)->mq_exp;
1446 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1448 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1450 quota_opc = FSFILT_OP_SETATTR;
1451 mdd_quota_wrapper(la_copy, qnids);
1452 mdd_quota_wrapper(la_tmp, qoids);
1453 /* get file quota for new owner */
1454 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1455 qnids, inode_pending, 1, NULL, 0,
1457 block_count = (la_tmp->la_blocks + 7) >> 3;
1460 mdd_data_get(env, mdd_obj, &data);
1461 /* get block quota for new owner */
1462 lquota_chkquota(mds_quota_interface_ref, obd,
1463 exp, qnids, block_pending,
1465 LQUOTA_FLAGS_BLK, data, 1);
1471 if (la_copy->la_valid & LA_FLAGS) {
1472 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1475 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1476 } else if (la_copy->la_valid) { /* setattr */
1477 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1479 /* journal chown/chgrp in llog, just like unlink */
1480 if (rc == 0 && lmm_size){
1481 cookie_size = mdd_lov_cookiesize(env, mdd);
1482 logcookies = mdd_max_cookie_get(env, mdd);
1483 if (logcookies == NULL)
1484 GOTO(cleanup, rc = -ENOMEM);
1486 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1487 logcookies, cookie_size) <= 0)
1492 if (rc == 0 && ma->ma_valid & MA_LOV) {
1495 mode = mdd_object_type(mdd_obj);
1496 if (S_ISREG(mode) || S_ISDIR(mode)) {
1497 rc = mdd_lsm_sanity_check(env, mdd_obj);
1501 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1502 ma->ma_lmm_size, handle, 1);
1506 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1509 mode = mdd_object_type(mdd_obj);
1511 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1516 rc = mdd_attr_set_changelog(env, obj, handle,
1517 ma->ma_attr.la_valid);
1518 mdd_trans_stop(env, mdd, rc, handle);
1519 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1520 /*set obd attr, if needed*/
1521 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1524 #ifdef HAVE_QUOTA_SUPPORT
1526 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1528 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1530 /* Trigger dqrel/dqacq for original owner and new owner.
1531 * If failed, the next call for lquota_chkquota will
1533 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1540 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1541 const struct lu_buf *buf, const char *name, int fl,
1542 struct thandle *handle)
1547 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1548 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1549 mdd_write_unlock(env, obj);
1554 static int mdd_xattr_sanity_check(const struct lu_env *env,
1555 struct mdd_object *obj)
1557 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1558 struct md_ucred *uc = md_ucred(env);
1562 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1565 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1569 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1570 !mdd_capable(uc, CFS_CAP_FOWNER))
1577 * The caller should guarantee to update the object ctime
1578 * after xattr_set if needed.
1580 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1581 const struct lu_buf *buf, const char *name,
1584 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1585 struct mdd_device *mdd = mdo2mdd(obj);
1586 struct thandle *handle;
1590 rc = mdd_xattr_sanity_check(env, mdd_obj);
1594 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1595 /* security-replated changes may require sync */
1596 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1597 mdd->mdd_sync_permission == 1)
1598 txn_param_sync(&mdd_env_info(env)->mti_param);
1600 handle = mdd_trans_start(env, mdd);
1602 RETURN(PTR_ERR(handle));
1604 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1606 /* Only record user xattr changes */
1607 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1608 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1610 mdd_trans_stop(env, mdd, rc, handle);
1616 * The caller should guarantee to update the object ctime
1617 * after xattr_set if needed.
1619 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1622 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1623 struct mdd_device *mdd = mdo2mdd(obj);
1624 struct thandle *handle;
1628 rc = mdd_xattr_sanity_check(env, mdd_obj);
1632 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1633 handle = mdd_trans_start(env, mdd);
1635 RETURN(PTR_ERR(handle));
1637 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1638 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1639 mdd_object_capa(env, mdd_obj));
1640 mdd_write_unlock(env, mdd_obj);
1642 /* Only record user xattr changes */
1643 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1644 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1647 mdd_trans_stop(env, mdd, rc, handle);
1652 /* partial unlink */
1653 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1656 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1657 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1658 struct mdd_device *mdd = mdo2mdd(obj);
1659 struct thandle *handle;
1660 #ifdef HAVE_QUOTA_SUPPORT
1661 struct obd_device *obd = mdd->mdd_obd_dev;
1662 struct mds_obd *mds = &obd->u.mds;
1663 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1670 * Check -ENOENT early here because we need to get object type
1671 * to calculate credits before transaction start
1673 if (!mdd_object_exists(mdd_obj))
1676 LASSERT(mdd_object_exists(mdd_obj) > 0);
1678 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1682 handle = mdd_trans_start(env, mdd);
1686 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1688 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1692 __mdd_ref_del(env, mdd_obj, handle, 0);
1694 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1696 __mdd_ref_del(env, mdd_obj, handle, 1);
1699 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1700 la_copy->la_ctime = ma->ma_attr.la_ctime;
1702 la_copy->la_valid = LA_CTIME;
1703 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1707 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1708 #ifdef HAVE_QUOTA_SUPPORT
1709 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1710 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1711 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1712 mdd_quota_wrapper(&ma->ma_attr, qids);
1719 mdd_write_unlock(env, mdd_obj);
1720 mdd_trans_stop(env, mdd, rc, handle);
1721 #ifdef HAVE_QUOTA_SUPPORT
1723 /* Trigger dqrel on the owner of child. If failed,
1724 * the next call for lquota_chkquota will process it */
1725 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1731 /* partial operation */
1732 static int mdd_oc_sanity_check(const struct lu_env *env,
1733 struct mdd_object *obj,
1739 switch (ma->ma_attr.la_mode & S_IFMT) {
1756 static int mdd_object_create(const struct lu_env *env,
1757 struct md_object *obj,
1758 const struct md_op_spec *spec,
1762 struct mdd_device *mdd = mdo2mdd(obj);
1763 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1764 const struct lu_fid *pfid = spec->u.sp_pfid;
1765 struct thandle *handle;
1766 #ifdef HAVE_QUOTA_SUPPORT
1767 struct obd_device *obd = mdd->mdd_obd_dev;
1768 struct obd_export *exp = md_quota(env)->mq_exp;
1769 struct mds_obd *mds = &obd->u.mds;
1770 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1771 int quota_opc = 0, block_count = 0;
1772 int inode_pending[MAXQUOTAS] = { 0, 0 };
1773 int block_pending[MAXQUOTAS] = { 0, 0 };
1778 #ifdef HAVE_QUOTA_SUPPORT
1779 if (mds->mds_quota) {
1780 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1781 mdd_quota_wrapper(&ma->ma_attr, qids);
1782 /* get file quota for child */
1783 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1784 qids, inode_pending, 1, NULL, 0,
1786 switch (ma->ma_attr.la_mode & S_IFMT) {
1795 /* get block quota for child */
1797 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1798 qids, block_pending, block_count,
1799 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1803 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1804 handle = mdd_trans_start(env, mdd);
1806 GOTO(out_pending, rc = PTR_ERR(handle));
1808 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1809 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1813 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1817 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1818 /* If creating the slave object, set slave EA here. */
1819 int lmv_size = spec->u.sp_ea.eadatalen;
1820 struct lmv_stripe_md *lmv;
1822 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1823 LASSERT(lmv != NULL && lmv_size > 0);
1825 rc = __mdd_xattr_set(env, mdd_obj,
1826 mdd_buf_get_const(env, lmv, lmv_size),
1827 XATTR_NAME_LMV, 0, handle);
1831 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1834 #ifdef CONFIG_FS_POSIX_ACL
1835 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1836 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1838 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1839 buf->lb_len = spec->u.sp_ea.eadatalen;
1840 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1841 rc = __mdd_acl_init(env, mdd_obj, buf,
1842 &ma->ma_attr.la_mode,
1847 ma->ma_attr.la_valid |= LA_MODE;
1850 pfid = spec->u.sp_ea.fid;
1853 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1859 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1860 mdd_write_unlock(env, mdd_obj);
1862 mdd_trans_stop(env, mdd, rc, handle);
1864 #ifdef HAVE_QUOTA_SUPPORT
1866 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1868 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1870 /* Trigger dqacq on the owner of child. If failed,
1871 * the next call for lquota_chkquota will process it. */
1872 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1880 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1881 const struct md_attr *ma)
1883 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1884 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1885 struct mdd_device *mdd = mdo2mdd(obj);
1886 struct thandle *handle;
1890 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1891 handle = mdd_trans_start(env, mdd);
1895 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1896 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1898 __mdd_ref_add(env, mdd_obj, handle);
1899 mdd_write_unlock(env, mdd_obj);
1901 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1902 la_copy->la_ctime = ma->ma_attr.la_ctime;
1904 la_copy->la_valid = LA_CTIME;
1905 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1908 mdd_trans_stop(env, mdd, 0, handle);
1914 * do NOT or the MAY_*'s, you'll get the weakest
1916 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1920 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1921 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1922 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1923 * owner can write to a file even if it is marked readonly to hide
1924 * its brokenness. (bug 5781) */
1925 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1926 struct md_ucred *uc = md_ucred(env);
1928 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1929 (la->la_uid == uc->mu_fsuid))
1933 if (flags & FMODE_READ)
1935 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1937 if (flags & MDS_FMODE_EXEC)
1942 static int mdd_open_sanity_check(const struct lu_env *env,
1943 struct mdd_object *obj, int flag)
1945 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1950 if (mdd_is_dead_obj(obj))
1953 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1957 if (S_ISLNK(tmp_la->la_mode))
1960 mode = accmode(env, tmp_la, flag);
1962 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1965 if (!(flag & MDS_OPEN_CREATED)) {
1966 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1971 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1972 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1973 flag &= ~MDS_OPEN_TRUNC;
1975 /* For writing append-only file must open it with append mode. */
1976 if (mdd_is_append(obj)) {
1977 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1979 if (flag & MDS_OPEN_TRUNC)
1985 * Now, flag -- O_NOATIME does not be packed by client.
1987 if (flag & O_NOATIME) {
1988 struct md_ucred *uc = md_ucred(env);
1990 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1991 (uc->mu_valid == UCRED_NEW)) &&
1992 (uc->mu_fsuid != tmp_la->la_uid) &&
1993 !mdd_capable(uc, CFS_CAP_FOWNER))
2001 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2004 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2007 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2009 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2011 mdd_obj->mod_count++;
2013 mdd_write_unlock(env, mdd_obj);
2017 /* return md_attr back,
2018 * if it is last unlink then return lov ea + llog cookie*/
2019 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2025 if (S_ISREG(mdd_object_type(obj))) {
2026 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2027 * Caller must be ready for that. */
2029 rc = __mdd_lmm_get(env, obj, ma);
2030 if ((ma->ma_valid & MA_LOV))
2031 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2038 * No permission check is needed.
2040 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2043 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2044 struct mdd_device *mdd = mdo2mdd(obj);
2045 struct thandle *handle;
2049 #ifdef HAVE_QUOTA_SUPPORT
2050 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2051 struct mds_obd *mds = &obd->u.mds;
2052 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2057 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2060 handle = mdd_trans_start(env, mdo2mdd(obj));
2062 RETURN(PTR_ERR(handle));
2064 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2065 /* release open count */
2066 mdd_obj->mod_count --;
2068 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2069 /* remove link to object from orphan index */
2070 rc = __mdd_orphan_del(env, mdd_obj, handle);
2072 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2073 "list, OSS objects to be destroyed.\n",
2074 PFID(mdd_object_fid(mdd_obj)));
2076 CERROR("Object "DFID" can not be deleted from orphan "
2077 "list, maybe cause OST objects can not be "
2078 "destroyed (err: %d).\n",
2079 PFID(mdd_object_fid(mdd_obj)), rc);
2080 /* If object was not deleted from orphan list, do not
2081 * destroy OSS objects, which will be done when next
2087 rc = mdd_iattr_get(env, mdd_obj, ma);
2088 /* Object maybe not in orphan list originally, it is rare case for
2089 * mdd_finish_unlink() failure. */
2090 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2091 #ifdef HAVE_QUOTA_SUPPORT
2092 if (mds->mds_quota) {
2093 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2094 mdd_quota_wrapper(&ma->ma_attr, qids);
2097 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2098 if (ma->ma_valid & MA_FLAGS &&
2099 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2100 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2102 rc = mdd_object_kill(env, mdd_obj, ma);
2108 CERROR("Error when prepare to delete Object "DFID" , "
2109 "which will cause OST objects can not be "
2110 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2116 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2118 mdd_write_unlock(env, mdd_obj);
2119 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2120 #ifdef HAVE_QUOTA_SUPPORT
2122 /* Trigger dqrel on the owner of child. If failed,
2123 * the next call for lquota_chkquota will process it */
2124 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2131 * Permission check is done when open,
2132 * no need check again.
2134 static int mdd_readpage_sanity_check(const struct lu_env *env,
2135 struct mdd_object *obj)
2137 struct dt_object *next = mdd_object_child(obj);
2141 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2149 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2150 int first, void *area, int nob,
2151 const struct dt_it_ops *iops, struct dt_it *it,
2152 __u64 *start, __u64 *end,
2153 struct lu_dirent **last, __u32 attr)
2157 struct lu_dirent *ent;
2160 memset(area, 0, sizeof (struct lu_dirpage));
2161 area += sizeof (struct lu_dirpage);
2162 nob -= sizeof (struct lu_dirpage);
2170 len = iops->key_size(env, it);
2172 /* IAM iterator can return record with zero len. */
2176 hash = iops->store(env, it);
2177 if (unlikely(first)) {
2182 /* calculate max space required for lu_dirent */
2183 recsize = lu_dirent_calc_size(len, attr);
2185 if (nob >= recsize) {
2186 result = iops->rec(env, it, ent, attr);
2187 if (result == -ESTALE)
2192 /* osd might not able to pack all attributes,
2193 * so recheck rec length */
2194 recsize = le16_to_cpu(ent->lde_reclen);
2197 * record doesn't fit into page, enlarge previous one.
2200 (*last)->lde_reclen =
2201 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2210 ent = (void *)ent + recsize;
2214 result = iops->next(env, it);
2215 if (result == -ESTALE)
2217 } while (result == 0);
2224 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2225 const struct lu_rdpg *rdpg)
2228 struct dt_object *next = mdd_object_child(obj);
2229 const struct dt_it_ops *iops;
2231 struct lu_dirent *last = NULL;
2232 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2239 LASSERT(rdpg->rp_pages != NULL);
2240 LASSERT(next->do_index_ops != NULL);
2242 if (rdpg->rp_count <= 0)
2246 * iterate through directory and fill pages from @rdpg
2248 iops = &next->do_index_ops->dio_it;
2249 it = iops->init(env, next, mdd_object_capa(env, obj));
2253 rc = iops->load(env, it, rdpg->rp_hash);
2257 * Iterator didn't find record with exactly the key requested.
2259 * It is currently either
2261 * - positioned above record with key less than
2262 * requested---skip it.
2264 * - or not positioned at all (is in IAM_IT_SKEWED
2265 * state)---position it on the next item.
2267 rc = iops->next(env, it);
2272 * At this point and across for-loop:
2274 * rc == 0 -> ok, proceed.
2275 * rc > 0 -> end of directory.
2278 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2279 i++, nob -= CFS_PAGE_SIZE) {
2280 LASSERT(i < rdpg->rp_npages);
2281 pg = rdpg->rp_pages[i];
2282 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2283 min_t(int, nob, CFS_PAGE_SIZE), iops,
2284 it, &hash_start, &hash_end, &last,
2286 if (rc != 0 || i == rdpg->rp_npages - 1) {
2288 last->lde_reclen = 0;
2296 hash_end = DIR_END_OFF;
2300 struct lu_dirpage *dp;
2302 dp = cfs_kmap(rdpg->rp_pages[0]);
2303 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2304 dp->ldp_hash_end = cpu_to_le64(hash_end);
2307 * No pages were processed, mark this.
2309 dp->ldp_flags |= LDF_EMPTY;
2311 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2312 cfs_kunmap(rdpg->rp_pages[0]);
2315 iops->fini(env, it);
2320 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2321 const struct lu_rdpg *rdpg)
2323 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2327 LASSERT(mdd_object_exists(mdd_obj));
2329 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2330 rc = mdd_readpage_sanity_check(env, mdd_obj);
2332 GOTO(out_unlock, rc);
2334 if (mdd_is_dead_obj(mdd_obj)) {
2336 struct lu_dirpage *dp;
2339 * According to POSIX, please do not return any entry to client:
2340 * even dot and dotdot should not be returned.
2342 CWARN("readdir from dead object: "DFID"\n",
2343 PFID(mdd_object_fid(mdd_obj)));
2345 if (rdpg->rp_count <= 0)
2346 GOTO(out_unlock, rc = -EFAULT);
2347 LASSERT(rdpg->rp_pages != NULL);
2349 pg = rdpg->rp_pages[0];
2350 dp = (struct lu_dirpage*)cfs_kmap(pg);
2351 memset(dp, 0 , sizeof(struct lu_dirpage));
2352 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2353 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2354 dp->ldp_flags |= LDF_EMPTY;
2355 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2357 GOTO(out_unlock, rc = 0);
2360 rc = __mdd_readpage(env, mdd_obj, rdpg);
2364 mdd_read_unlock(env, mdd_obj);
2368 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2370 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2371 struct dt_object *next;
2373 LASSERT(mdd_object_exists(mdd_obj));
2374 next = mdd_object_child(mdd_obj);
2375 return next->do_ops->do_object_sync(env, next);
2378 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2379 struct md_object *obj)
2381 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2383 LASSERT(mdd_object_exists(mdd_obj));
2384 return do_version_get(env, mdd_object_child(mdd_obj));
2387 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2388 dt_obj_version_t version)
2390 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2392 LASSERT(mdd_object_exists(mdd_obj));
2393 do_version_set(env, mdd_object_child(mdd_obj), version);
2396 const struct md_object_operations mdd_obj_ops = {
2397 .moo_permission = mdd_permission,
2398 .moo_attr_get = mdd_attr_get,
2399 .moo_attr_set = mdd_attr_set,
2400 .moo_xattr_get = mdd_xattr_get,
2401 .moo_xattr_set = mdd_xattr_set,
2402 .moo_xattr_list = mdd_xattr_list,
2403 .moo_xattr_del = mdd_xattr_del,
2404 .moo_object_create = mdd_object_create,
2405 .moo_ref_add = mdd_ref_add,
2406 .moo_ref_del = mdd_ref_del,
2407 .moo_open = mdd_open,
2408 .moo_close = mdd_close,
2409 .moo_readpage = mdd_readpage,
2410 .moo_readlink = mdd_readlink,
2411 .moo_capa_get = mdd_capa_get,
2412 .moo_object_sync = mdd_object_sync,
2413 .moo_version_get = mdd_version_get,
2414 .moo_version_set = mdd_version_set,
2415 .moo_path = mdd_path,