1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137 const void *area, ssize_t len)
141 buf = &mdd_env_info(env)->mti_buf;
142 buf->lb_buf = (void *)area;
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154 OBD_VFREE(buf->lb_buf, buf->lb_len);
156 OBD_FREE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL) {
161 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162 OBD_ALLOC(buf->lb_buf, buf->lb_len);
165 if (buf->lb_buf == NULL) {
166 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
169 if (buf->lb_buf == NULL)
175 /** Increase the size of the \a mti_big_buf.
176 * preserves old data in buffer
177 * old buffer remains unchanged on error
178 * \retval 0 or -ENOMEM
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
185 LASSERT(len >= oldbuf->lb_len);
186 if (len > BUF_VMALLOC_SIZE) {
187 OBD_VMALLOC(buf.lb_buf, len);
190 OBD_ALLOC(buf.lb_buf, len);
193 if (buf.lb_buf == NULL)
197 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199 if (oldbuf->lb_vmalloc)
200 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204 memcpy(oldbuf, &buf, sizeof(buf));
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210 struct mdd_device *mdd)
212 struct mdd_thread_info *mti = mdd_env_info(env);
215 max_cookie_size = mdd_lov_cookiesize(env, mdd);
216 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217 if (mti->mti_max_cookie)
218 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219 mti->mti_max_cookie = NULL;
220 mti->mti_max_cookie_size = 0;
222 if (unlikely(mti->mti_max_cookie == NULL)) {
223 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224 if (likely(mti->mti_max_cookie != NULL))
225 mti->mti_max_cookie_size = max_cookie_size;
227 if (likely(mti->mti_max_cookie != NULL))
228 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229 return mti->mti_max_cookie;
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233 struct mdd_device *mdd)
235 struct mdd_thread_info *mti = mdd_env_info(env);
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240 if (mti->mti_max_lmm)
241 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242 mti->mti_max_lmm = NULL;
243 mti->mti_max_lmm_size = 0;
245 if (unlikely(mti->mti_max_lmm == NULL)) {
246 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247 if (unlikely(mti->mti_max_lmm != NULL))
248 mti->mti_max_lmm_size = max_lmm_size;
250 return mti->mti_max_lmm;
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254 const struct lu_object_header *hdr,
257 struct mdd_object *mdd_obj;
259 OBD_ALLOC_PTR(mdd_obj);
260 if (mdd_obj != NULL) {
263 o = mdd2lu_obj(mdd_obj);
264 lu_object_init(o, NULL, d);
265 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267 mdd_obj->mod_count = 0;
268 o->lo_ops = &mdd_lu_obj_ops;
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276 const struct lu_object_conf *unused)
278 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279 struct mdd_object *mdd_obj = lu2mdd_obj(o);
280 struct lu_object *below;
281 struct lu_device *under;
284 mdd_obj->mod_cltime = 0;
285 under = &d->mdd_child->dd_lu_dev;
286 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287 mdd_pdlock_init(mdd_obj);
291 lu_object_add(o, below);
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 if (lu_object_exists(o))
299 return mdd_get_flags(env, lu2mdd_obj(o));
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 struct mdd_object *mdd = lu2mdd_obj(o);
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313 lu_printer_t p, const struct lu_object *o)
315 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317 "valid=%x, cltime=%llu, flags=%lx)",
318 mdd, mdd->mod_count, mdd->mod_valid,
319 mdd->mod_cltime, mdd->mod_flags);
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323 .loo_object_init = mdd_object_init,
324 .loo_object_start = mdd_object_start,
325 .loo_object_free = mdd_object_free,
326 .loo_object_print = mdd_object_print,
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330 struct mdd_device *d,
331 const struct lu_fid *f)
333 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337 const char *path, struct lu_fid *fid)
340 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341 struct mdd_object *obj;
342 struct lu_name *lname = &mdd_env_info(env)->mti_name;
347 /* temp buffer for path element */
348 buf = mdd_buf_alloc(env, PATH_MAX);
349 if (buf->lb_buf == NULL)
352 lname->ln_name = name = buf->lb_buf;
353 lname->ln_namelen = 0;
354 *f = mdd->mdd_root_fid;
361 while (*path != '/' && *path != '\0') {
369 /* find obj corresponding to fid */
370 obj = mdd_object_find(env, mdd, f);
372 GOTO(out, rc = -EREMOTE);
374 GOTO(out, rc = -PTR_ERR(obj));
375 /* get child fid from parent and name */
376 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377 mdd_object_put(env, obj);
382 lname->ln_namelen = 0;
391 /** The maximum depth that fid2path() will search.
392 * This is limited only because we want to store the fids for
393 * historical path lookup purposes.
395 #define MAX_PATH_DEPTH 100
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399 __u64 pli_recno; /**< history point */
400 __u64 pli_currec; /**< current record */
401 struct lu_fid pli_fid;
402 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403 struct mdd_object *pli_mdd_obj;
404 char *pli_path; /**< full path */
406 int pli_linkno; /**< which hardlink to follow */
407 int pli_fidcount; /**< number of \a pli_fids */
410 static int mdd_path_current(const struct lu_env *env,
411 struct path_lookup_info *pli)
413 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414 struct mdd_object *mdd_obj;
415 struct lu_buf *buf = NULL;
416 struct link_ea_header *leh;
417 struct link_ea_entry *lee;
418 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
425 ptr = pli->pli_path + pli->pli_pathlen - 1;
428 pli->pli_fidcount = 0;
429 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432 mdd_obj = mdd_object_find(env, mdd,
433 &pli->pli_fids[pli->pli_fidcount]);
435 GOTO(out, rc = -EREMOTE);
437 GOTO(out, rc = -PTR_ERR(mdd_obj));
438 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440 mdd_object_put(env, mdd_obj);
444 /* Do I need to error out here? */
449 /* Get parent fid and object name */
450 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451 buf = mdd_links_get(env, mdd_obj);
452 mdd_read_unlock(env, mdd_obj);
453 mdd_object_put(env, mdd_obj);
455 GOTO(out, rc = PTR_ERR(buf));
458 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461 /* If set, use link #linkno for path lookup, otherwise use
462 link #0. Only do this for the final path element. */
463 if ((pli->pli_fidcount == 0) &&
464 (pli->pli_linkno < leh->leh_reccount)) {
466 for (count = 0; count < pli->pli_linkno; count++) {
467 lee = (struct link_ea_entry *)
468 ((char *)lee + reclen);
469 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471 if (pli->pli_linkno < leh->leh_reccount - 1)
472 /* indicate to user there are more links */
476 /* Pack the name in the end of the buffer */
477 ptr -= tmpname->ln_namelen;
478 if (ptr - 1 <= pli->pli_path)
479 GOTO(out, rc = -EOVERFLOW);
480 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
483 /* Store the parent fid for historic lookup */
484 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485 GOTO(out, rc = -EOVERFLOW);
486 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
489 /* Verify that our path hasn't changed since we started the lookup.
490 Record the current index, and verify the path resolves to the
491 same fid. If it does, then the path is correct as of this index. */
492 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
493 pli->pli_currec = mdd->mdd_cl.mc_index;
494 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
495 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498 GOTO (out, rc = -EAGAIN);
500 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503 PFID(&pli->pli_fid));
504 GOTO(out, rc = -EAGAIN);
507 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
511 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512 /* if we vmalloced a large buffer drop it */
518 static int mdd_path_historic(const struct lu_env *env,
519 struct path_lookup_info *pli)
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526 char *path, int pathlen, __u64 *recno, int *linkno)
528 struct path_lookup_info *pli;
536 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
612 struct lov_desc *ldesc;
613 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617 LASSERT(ldesc != NULL);
622 lmm->lmm_magic = LOV_MAGIC_V1;
623 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624 lmm->lmm_pattern = ldesc->ld_pattern;
625 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 *size = sizeof(struct lov_mds_md);
629 RETURN(sizeof(struct lov_mds_md));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
651 ma->ma_valid |= MA_LOV;
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664 rc = __mdd_lmm_get(env, mdd_obj, ma);
665 mdd_read_unlock(env, mdd_obj);
670 static int __mdd_lmv_get(const struct lu_env *env,
671 struct mdd_object *mdd_obj, struct md_attr *ma)
676 if (ma->ma_valid & MA_LMV)
679 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
682 ma->ma_valid |= MA_LMV;
688 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
691 struct mdd_thread_info *info = mdd_env_info(env);
692 struct lustre_mdt_attrs *lma =
693 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
698 /* If all needed data are already valid, nothing to do */
699 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
700 (ma->ma_need & (MA_HSM | MA_SOM)))
703 /* Read LMA from disk EA */
704 lma_size = sizeof(info->mti_xattr_buf);
705 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
709 /* Useless to check LMA incompatibility because this is already done in
710 * osd_ea_fid_get(), and this will fail long before this code is
712 * So, if we are here, LMA is compatible.
715 lustre_lma_swab(lma);
717 /* Swab and copy LMA */
718 if (ma->ma_need & MA_HSM) {
719 if (lma->lma_compat & LMAC_HSM)
720 ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
722 ma->ma_hsm_flags = 0;
723 ma->ma_valid |= MA_HSM;
727 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
728 LASSERT(ma->ma_som != NULL);
729 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
730 ma->ma_som->msd_size = lma->lma_som_size;
731 ma->ma_som->msd_blocks = lma->lma_som_blocks;
732 ma->ma_som->msd_mountid = lma->lma_som_mountid;
733 ma->ma_valid |= MA_SOM;
739 static int mdd_attr_get_internal(const struct lu_env *env,
740 struct mdd_object *mdd_obj,
746 if (ma->ma_need & MA_INODE)
747 rc = mdd_iattr_get(env, mdd_obj, ma);
749 if (rc == 0 && ma->ma_need & MA_LOV) {
750 if (S_ISREG(mdd_object_type(mdd_obj)) ||
751 S_ISDIR(mdd_object_type(mdd_obj)))
752 rc = __mdd_lmm_get(env, mdd_obj, ma);
754 if (rc == 0 && ma->ma_need & MA_LMV) {
755 if (S_ISDIR(mdd_object_type(mdd_obj)))
756 rc = __mdd_lmv_get(env, mdd_obj, ma);
758 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
759 if (S_ISREG(mdd_object_type(mdd_obj)))
760 rc = __mdd_lma_get(env, mdd_obj, ma);
762 #ifdef CONFIG_FS_POSIX_ACL
763 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
764 if (S_ISDIR(mdd_object_type(mdd_obj)))
765 rc = mdd_def_acl_get(env, mdd_obj, ma);
768 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
773 int mdd_attr_get_internal_locked(const struct lu_env *env,
774 struct mdd_object *mdd_obj, struct md_attr *ma)
777 int needlock = ma->ma_need &
778 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
781 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
782 rc = mdd_attr_get_internal(env, mdd_obj, ma);
784 mdd_read_unlock(env, mdd_obj);
789 * No permission check is needed.
791 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
794 struct mdd_object *mdd_obj = md2mdd_obj(obj);
798 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
803 * No permission check is needed.
805 static int mdd_xattr_get(const struct lu_env *env,
806 struct md_object *obj, struct lu_buf *buf,
809 struct mdd_object *mdd_obj = md2mdd_obj(obj);
814 LASSERT(mdd_object_exists(mdd_obj));
816 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
817 rc = mdo_xattr_get(env, mdd_obj, buf, name,
818 mdd_object_capa(env, mdd_obj));
819 mdd_read_unlock(env, mdd_obj);
825 * Permission check is done when open,
826 * no need check again.
828 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
831 struct mdd_object *mdd_obj = md2mdd_obj(obj);
832 struct dt_object *next;
837 LASSERT(mdd_object_exists(mdd_obj));
839 next = mdd_object_child(mdd_obj);
840 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
841 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
842 mdd_object_capa(env, mdd_obj));
843 mdd_read_unlock(env, mdd_obj);
848 * No permission check is needed.
850 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
853 struct mdd_object *mdd_obj = md2mdd_obj(obj);
858 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
860 mdd_read_unlock(env, mdd_obj);
865 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
866 struct mdd_object *c, struct md_attr *ma,
867 struct thandle *handle,
868 const struct md_op_spec *spec)
870 struct lu_attr *attr = &ma->ma_attr;
871 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
872 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
873 const struct dt_index_features *feat = spec->sp_feat;
877 if (!mdd_object_exists(c)) {
878 struct dt_object *next = mdd_object_child(c);
881 if (feat != &dt_directory_features && feat != NULL)
882 dof->dof_type = DFT_INDEX;
884 dof->dof_type = dt_mode_to_dft(attr->la_mode);
886 dof->u.dof_idx.di_feat = feat;
888 /* @hint will be initialized by underlying device. */
889 next->do_ops->do_ah_init(env, hint,
890 p ? mdd_object_child(p) : NULL,
891 attr->la_mode & S_IFMT);
893 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
894 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
902 * Make sure the ctime is increased only.
904 static inline int mdd_attr_check(const struct lu_env *env,
905 struct mdd_object *obj,
906 struct lu_attr *attr)
908 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
912 if (attr->la_valid & LA_CTIME) {
913 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
917 if (attr->la_ctime < tmp_la->la_ctime)
918 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
919 else if (attr->la_valid == LA_CTIME &&
920 attr->la_ctime == tmp_la->la_ctime)
921 attr->la_valid &= ~LA_CTIME;
926 int mdd_attr_set_internal(const struct lu_env *env,
927 struct mdd_object *obj,
928 struct lu_attr *attr,
929 struct thandle *handle,
935 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
936 #ifdef CONFIG_FS_POSIX_ACL
937 if (!rc && (attr->la_valid & LA_MODE) && needacl)
938 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
943 int mdd_attr_check_set_internal(const struct lu_env *env,
944 struct mdd_object *obj,
945 struct lu_attr *attr,
946 struct thandle *handle,
952 rc = mdd_attr_check(env, obj, attr);
957 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
961 static int mdd_attr_set_internal_locked(const struct lu_env *env,
962 struct mdd_object *obj,
963 struct lu_attr *attr,
964 struct thandle *handle,
970 needacl = needacl && (attr->la_valid & LA_MODE);
972 mdd_write_lock(env, obj, MOR_TGT_CHILD);
973 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
975 mdd_write_unlock(env, obj);
979 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
980 struct mdd_object *obj,
981 struct lu_attr *attr,
982 struct thandle *handle,
988 needacl = needacl && (attr->la_valid & LA_MODE);
990 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
993 mdd_write_unlock(env, obj);
997 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
998 const struct lu_buf *buf, const char *name,
999 int fl, struct thandle *handle)
1001 struct lustre_capa *capa = mdd_object_capa(env, obj);
1005 if (buf->lb_buf && buf->lb_len > 0)
1006 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1007 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1008 rc = mdo_xattr_del(env, obj, name, handle, capa);
1014 * This gives the same functionality as the code between
1015 * sys_chmod and inode_setattr
1016 * chown_common and inode_setattr
1017 * utimes and inode_setattr
1018 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1020 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1021 struct lu_attr *la, const struct md_attr *ma)
1023 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1024 struct md_ucred *uc;
1031 /* Do not permit change file type */
1032 if (la->la_valid & LA_TYPE)
1035 /* They should not be processed by setattr */
1036 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1039 /* export destroy does not have ->le_ses, but we may want
1040 * to drop LUSTRE_SOM_FL. */
1046 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1050 if (la->la_valid == LA_CTIME) {
1051 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1052 /* This is only for set ctime when rename's source is
1054 rc = mdd_may_delete(env, NULL, obj,
1055 (struct md_attr *)ma, 1, 0);
1056 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1057 la->la_valid &= ~LA_CTIME;
1061 if (la->la_valid == LA_ATIME) {
1062 /* This is atime only set for read atime update on close. */
1063 if (la->la_atime <= tmp_la->la_atime +
1064 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1065 la->la_valid &= ~LA_ATIME;
1069 /* Check if flags change. */
1070 if (la->la_valid & LA_FLAGS) {
1071 unsigned int oldflags = 0;
1072 unsigned int newflags = la->la_flags &
1073 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1075 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1076 !mdd_capable(uc, CFS_CAP_FOWNER))
1079 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1080 * only be changed by the relevant capability. */
1081 if (mdd_is_immutable(obj))
1082 oldflags |= LUSTRE_IMMUTABLE_FL;
1083 if (mdd_is_append(obj))
1084 oldflags |= LUSTRE_APPEND_FL;
1085 if ((oldflags ^ newflags) &&
1086 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1089 if (!S_ISDIR(tmp_la->la_mode))
1090 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1093 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1094 (la->la_valid & ~LA_FLAGS) &&
1095 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1098 /* Check for setting the obj time. */
1099 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1100 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1101 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1102 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1103 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1111 /* Make sure a caller can chmod. */
1112 if (la->la_valid & LA_MODE) {
1113 /* Bypass la_vaild == LA_MODE,
1114 * this is for changing file with SUID or SGID. */
1115 if ((la->la_valid & ~LA_MODE) &&
1116 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1117 (uc->mu_fsuid != tmp_la->la_uid) &&
1118 !mdd_capable(uc, CFS_CAP_FOWNER))
1121 if (la->la_mode == (cfs_umode_t) -1)
1122 la->la_mode = tmp_la->la_mode;
1124 la->la_mode = (la->la_mode & S_IALLUGO) |
1125 (tmp_la->la_mode & ~S_IALLUGO);
1127 /* Also check the setgid bit! */
1128 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1129 la->la_gid : tmp_la->la_gid) &&
1130 !mdd_capable(uc, CFS_CAP_FSETID))
1131 la->la_mode &= ~S_ISGID;
1133 la->la_mode = tmp_la->la_mode;
1136 /* Make sure a caller can chown. */
1137 if (la->la_valid & LA_UID) {
1138 if (la->la_uid == (uid_t) -1)
1139 la->la_uid = tmp_la->la_uid;
1140 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1141 (la->la_uid != tmp_la->la_uid)) &&
1142 !mdd_capable(uc, CFS_CAP_CHOWN))
1145 /* If the user or group of a non-directory has been
1146 * changed by a non-root user, remove the setuid bit.
1147 * 19981026 David C Niemi <niemi@tux.org>
1149 * Changed this to apply to all users, including root,
1150 * to avoid some races. This is the behavior we had in
1151 * 2.0. The check for non-root was definitely wrong
1152 * for 2.2 anyway, as it should have been using
1153 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1154 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1155 !S_ISDIR(tmp_la->la_mode)) {
1156 la->la_mode &= ~S_ISUID;
1157 la->la_valid |= LA_MODE;
1161 /* Make sure caller can chgrp. */
1162 if (la->la_valid & LA_GID) {
1163 if (la->la_gid == (gid_t) -1)
1164 la->la_gid = tmp_la->la_gid;
1165 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1166 ((la->la_gid != tmp_la->la_gid) &&
1167 !lustre_in_group_p(uc, la->la_gid))) &&
1168 !mdd_capable(uc, CFS_CAP_CHOWN))
1171 /* Likewise, if the user or group of a non-directory
1172 * has been changed by a non-root user, remove the
1173 * setgid bit UNLESS there is no group execute bit
1174 * (this would be a file marked for mandatory
1175 * locking). 19981026 David C Niemi <niemi@tux.org>
1177 * Removed the fsuid check (see the comment above) --
1179 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1180 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1181 la->la_mode &= ~S_ISGID;
1182 la->la_valid |= LA_MODE;
1186 /* For both Size-on-MDS case and truncate case,
1187 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1188 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1189 * For SOM case, it is true, the MAY_WRITE perm has been checked
1190 * when open, no need check again. For truncate case, it is false,
1191 * the MAY_WRITE perm should be checked here. */
1192 if (ma->ma_attr_flags & MDS_SOM) {
1193 /* For the "Size-on-MDS" setattr update, merge coming
1194 * attributes with the set in the inode. BUG 10641 */
1195 if ((la->la_valid & LA_ATIME) &&
1196 (la->la_atime <= tmp_la->la_atime))
1197 la->la_valid &= ~LA_ATIME;
1199 /* OST attributes do not have a priority over MDS attributes,
1200 * so drop times if ctime is equal. */
1201 if ((la->la_valid & LA_CTIME) &&
1202 (la->la_ctime <= tmp_la->la_ctime))
1203 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1205 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1206 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1207 (uc->mu_fsuid == tmp_la->la_uid)) &&
1208 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1209 rc = mdd_permission_internal_locked(env, obj,
1216 if (la->la_valid & LA_CTIME) {
1217 /* The pure setattr, it has the priority over what is
1218 * already set, do not drop it if ctime is equal. */
1219 if (la->la_ctime < tmp_la->la_ctime)
1220 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1228 /** Store a data change changelog record
1229 * If this fails, we must fail the whole transaction; we don't
1230 * want the change to commit without the log entry.
1231 * \param mdd_obj - mdd_object of change
1232 * \param handle - transacion handle
1234 static int mdd_changelog_data_store(const struct lu_env *env,
1235 struct mdd_device *mdd,
1236 enum changelog_rec_type type,
1237 struct mdd_object *mdd_obj,
1238 struct thandle *handle)
1240 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1241 struct llog_changelog_rec *rec;
1246 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1249 LASSERT(handle != NULL);
1250 LASSERT(mdd_obj != NULL);
1252 if ((type == CL_SETATTR) &&
1253 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1254 /* Don't need multiple updates in this log */
1255 /* Don't check under lock - no big deal if we get an extra
1260 reclen = llog_data_len(sizeof(*rec));
1261 buf = mdd_buf_alloc(env, reclen);
1262 if (buf->lb_buf == NULL)
1264 rec = (struct llog_changelog_rec *)buf->lb_buf;
1266 rec->cr.cr_flags = CLF_VERSION;
1267 rec->cr.cr_type = (__u32)type;
1268 rec->cr.cr_tfid = *tfid;
1269 rec->cr.cr_namelen = 0;
1270 mdd_obj->mod_cltime = cfs_time_current_64();
1272 rc = mdd_changelog_llog_write(mdd, rec, handle);
1274 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1275 rc, type, PFID(tfid));
1283 * Should be called with write lock held.
1285 * \see mdd_lma_set_locked().
1287 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1288 const struct md_attr *ma, struct thandle *handle)
1290 struct mdd_thread_info *info = mdd_env_info(env);
1292 struct lustre_mdt_attrs *lma =
1293 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1294 int lmasize = sizeof(struct lustre_mdt_attrs);
1299 /* Either HSM or SOM part is not valid, we need to read it before */
1300 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1301 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1305 lustre_lma_swab(lma);
1307 memset(lma, 0, lmasize);
1311 if (ma->ma_valid & MA_HSM) {
1312 lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1313 lma->lma_compat |= LMAC_HSM;
1317 if (ma->ma_valid & MA_SOM) {
1318 LASSERT(ma->ma_som != NULL);
1319 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1320 lma->lma_compat &= ~LMAC_SOM;
1322 lma->lma_compat |= LMAC_SOM;
1323 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1324 lma->lma_som_size = ma->ma_som->msd_size;
1325 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1326 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1331 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1333 lustre_lma_swab(lma);
1334 buf = mdd_buf_get(env, lma, lmasize);
1335 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1341 * Save LMA extended attributes with data from \a ma.
1343 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1344 * not, LMA EA will be first read from disk, modified and write back.
1347 static int mdd_lma_set_locked(const struct lu_env *env,
1348 struct mdd_object *mdd_obj,
1349 const struct md_attr *ma, struct thandle *handle)
1353 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1354 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1355 mdd_write_unlock(env, mdd_obj);
1359 /* set attr and LOV EA at once, return updated attr */
1360 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1361 const struct md_attr *ma)
1363 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1364 struct mdd_device *mdd = mdo2mdd(obj);
1365 struct thandle *handle;
1366 struct lov_mds_md *lmm = NULL;
1367 struct llog_cookie *logcookies = NULL;
1368 int rc, lmm_size = 0, cookie_size = 0;
1369 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1370 #ifdef HAVE_QUOTA_SUPPORT
1371 struct obd_device *obd = mdd->mdd_obd_dev;
1372 struct mds_obd *mds = &obd->u.mds;
1373 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1374 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1375 int quota_opc = 0, block_count = 0;
1376 int inode_pending[MAXQUOTAS] = { 0, 0 };
1377 int block_pending[MAXQUOTAS] = { 0, 0 };
1381 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1382 MDD_TXN_ATTR_SET_OP);
1383 handle = mdd_trans_start(env, mdd);
1385 RETURN(PTR_ERR(handle));
1386 /*TODO: add lock here*/
1387 /* start a log jounal handle if needed */
1388 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1389 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1390 lmm_size = mdd_lov_mdsize(env, mdd);
1391 lmm = mdd_max_lmm_get(env, mdd);
1393 GOTO(cleanup, rc = -ENOMEM);
1395 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1402 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1403 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1404 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1406 *la_copy = ma->ma_attr;
1407 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1411 #ifdef HAVE_QUOTA_SUPPORT
1412 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1413 struct obd_export *exp = md_quota(env)->mq_exp;
1414 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1416 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1418 quota_opc = FSFILT_OP_SETATTR;
1419 mdd_quota_wrapper(la_copy, qnids);
1420 mdd_quota_wrapper(la_tmp, qoids);
1421 /* get file quota for new owner */
1422 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1423 qnids, inode_pending, 1, NULL, 0,
1425 block_count = (la_tmp->la_blocks + 7) >> 3;
1428 mdd_data_get(env, mdd_obj, &data);
1429 /* get block quota for new owner */
1430 lquota_chkquota(mds_quota_interface_ref, obd,
1431 exp, qnids, block_pending,
1433 LQUOTA_FLAGS_BLK, data, 1);
1439 if (la_copy->la_valid & LA_FLAGS) {
1440 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1443 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1444 } else if (la_copy->la_valid) { /* setattr */
1445 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1447 /* journal chown/chgrp in llog, just like unlink */
1448 if (rc == 0 && lmm_size){
1449 cookie_size = mdd_lov_cookiesize(env, mdd);
1450 logcookies = mdd_max_cookie_get(env, mdd);
1451 if (logcookies == NULL)
1452 GOTO(cleanup, rc = -ENOMEM);
1454 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1455 logcookies, cookie_size) <= 0)
1460 if (rc == 0 && ma->ma_valid & MA_LOV) {
1463 mode = mdd_object_type(mdd_obj);
1464 if (S_ISREG(mode) || S_ISDIR(mode)) {
1465 rc = mdd_lsm_sanity_check(env, mdd_obj);
1469 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1470 ma->ma_lmm_size, handle, 1);
1474 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1477 mode = mdd_object_type(mdd_obj);
1479 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1483 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1484 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1486 mdd_trans_stop(env, mdd, rc, handle);
1487 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1488 /*set obd attr, if needed*/
1489 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1492 #ifdef HAVE_QUOTA_SUPPORT
1494 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1496 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1498 /* Trigger dqrel/dqacq for original owner and new owner.
1499 * If failed, the next call for lquota_chkquota will
1501 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1508 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1509 const struct lu_buf *buf, const char *name, int fl,
1510 struct thandle *handle)
1515 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1516 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1517 mdd_write_unlock(env, obj);
1522 static int mdd_xattr_sanity_check(const struct lu_env *env,
1523 struct mdd_object *obj)
1525 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1526 struct md_ucred *uc = md_ucred(env);
1530 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1533 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1537 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1538 !mdd_capable(uc, CFS_CAP_FOWNER))
1545 * The caller should guarantee to update the object ctime
1546 * after xattr_set if needed.
1548 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1549 const struct lu_buf *buf, const char *name,
1552 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1553 struct mdd_device *mdd = mdo2mdd(obj);
1554 struct thandle *handle;
1558 rc = mdd_xattr_sanity_check(env, mdd_obj);
1562 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1563 /* security-replated changes may require sync */
1564 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1565 mdd->mdd_sync_permission == 1)
1566 txn_param_sync(&mdd_env_info(env)->mti_param);
1568 handle = mdd_trans_start(env, mdd);
1570 RETURN(PTR_ERR(handle));
1572 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1574 /* Only record user xattr changes */
1575 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1576 (strncmp("user.", name, 5) == 0))
1577 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1579 mdd_trans_stop(env, mdd, rc, handle);
1585 * The caller should guarantee to update the object ctime
1586 * after xattr_set if needed.
1588 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1591 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1592 struct mdd_device *mdd = mdo2mdd(obj);
1593 struct thandle *handle;
1597 rc = mdd_xattr_sanity_check(env, mdd_obj);
1601 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1602 handle = mdd_trans_start(env, mdd);
1604 RETURN(PTR_ERR(handle));
1606 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1607 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1608 mdd_object_capa(env, mdd_obj));
1609 mdd_write_unlock(env, mdd_obj);
1611 /* Only record user xattr changes */
1612 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1613 (strncmp("user.", name, 5) != 0))
1614 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1617 mdd_trans_stop(env, mdd, rc, handle);
1622 /* partial unlink */
1623 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1626 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1627 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1628 struct mdd_device *mdd = mdo2mdd(obj);
1629 struct thandle *handle;
1630 #ifdef HAVE_QUOTA_SUPPORT
1631 struct obd_device *obd = mdd->mdd_obd_dev;
1632 struct mds_obd *mds = &obd->u.mds;
1633 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1640 * Check -ENOENT early here because we need to get object type
1641 * to calculate credits before transaction start
1643 if (!mdd_object_exists(mdd_obj))
1646 LASSERT(mdd_object_exists(mdd_obj) > 0);
1648 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1652 handle = mdd_trans_start(env, mdd);
1656 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1658 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1662 __mdd_ref_del(env, mdd_obj, handle, 0);
1664 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1666 __mdd_ref_del(env, mdd_obj, handle, 1);
1669 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1670 la_copy->la_ctime = ma->ma_attr.la_ctime;
1672 la_copy->la_valid = LA_CTIME;
1673 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1677 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1678 #ifdef HAVE_QUOTA_SUPPORT
1679 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1680 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1681 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1682 mdd_quota_wrapper(&ma->ma_attr, qids);
1689 mdd_write_unlock(env, mdd_obj);
1690 mdd_trans_stop(env, mdd, rc, handle);
1691 #ifdef HAVE_QUOTA_SUPPORT
1693 /* Trigger dqrel on the owner of child. If failed,
1694 * the next call for lquota_chkquota will process it */
1695 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1701 /* partial operation */
1702 static int mdd_oc_sanity_check(const struct lu_env *env,
1703 struct mdd_object *obj,
1709 switch (ma->ma_attr.la_mode & S_IFMT) {
1726 static int mdd_object_create(const struct lu_env *env,
1727 struct md_object *obj,
1728 const struct md_op_spec *spec,
1732 struct mdd_device *mdd = mdo2mdd(obj);
1733 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1734 const struct lu_fid *pfid = spec->u.sp_pfid;
1735 struct thandle *handle;
1736 #ifdef HAVE_QUOTA_SUPPORT
1737 struct obd_device *obd = mdd->mdd_obd_dev;
1738 struct obd_export *exp = md_quota(env)->mq_exp;
1739 struct mds_obd *mds = &obd->u.mds;
1740 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1741 int quota_opc = 0, block_count = 0;
1742 int inode_pending[MAXQUOTAS] = { 0, 0 };
1743 int block_pending[MAXQUOTAS] = { 0, 0 };
1748 #ifdef HAVE_QUOTA_SUPPORT
1749 if (mds->mds_quota) {
1750 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1751 mdd_quota_wrapper(&ma->ma_attr, qids);
1752 /* get file quota for child */
1753 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1754 qids, inode_pending, 1, NULL, 0,
1756 switch (ma->ma_attr.la_mode & S_IFMT) {
1765 /* get block quota for child */
1767 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1768 qids, block_pending, block_count,
1769 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1773 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1774 handle = mdd_trans_start(env, mdd);
1776 GOTO(out_pending, rc = PTR_ERR(handle));
1778 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1779 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1783 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1787 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1788 /* If creating the slave object, set slave EA here. */
1789 int lmv_size = spec->u.sp_ea.eadatalen;
1790 struct lmv_stripe_md *lmv;
1792 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1793 LASSERT(lmv != NULL && lmv_size > 0);
1795 rc = __mdd_xattr_set(env, mdd_obj,
1796 mdd_buf_get_const(env, lmv, lmv_size),
1797 XATTR_NAME_LMV, 0, handle);
1801 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1804 #ifdef CONFIG_FS_POSIX_ACL
1805 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1806 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1808 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1809 buf->lb_len = spec->u.sp_ea.eadatalen;
1810 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1811 rc = __mdd_acl_init(env, mdd_obj, buf,
1812 &ma->ma_attr.la_mode,
1817 ma->ma_attr.la_valid |= LA_MODE;
1820 pfid = spec->u.sp_ea.fid;
1823 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1829 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1830 mdd_write_unlock(env, mdd_obj);
1832 mdd_trans_stop(env, mdd, rc, handle);
1834 #ifdef HAVE_QUOTA_SUPPORT
1836 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1838 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1840 /* Trigger dqacq on the owner of child. If failed,
1841 * the next call for lquota_chkquota will process it. */
1842 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1850 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1851 const struct md_attr *ma)
1853 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1854 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1855 struct mdd_device *mdd = mdo2mdd(obj);
1856 struct thandle *handle;
1860 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1861 handle = mdd_trans_start(env, mdd);
1865 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1866 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1868 __mdd_ref_add(env, mdd_obj, handle);
1869 mdd_write_unlock(env, mdd_obj);
1871 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1872 la_copy->la_ctime = ma->ma_attr.la_ctime;
1874 la_copy->la_valid = LA_CTIME;
1875 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1878 mdd_trans_stop(env, mdd, 0, handle);
1884 * do NOT or the MAY_*'s, you'll get the weakest
1886 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1890 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1891 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1892 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1893 * owner can write to a file even if it is marked readonly to hide
1894 * its brokenness. (bug 5781) */
1895 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1896 struct md_ucred *uc = md_ucred(env);
1898 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1899 (la->la_uid == uc->mu_fsuid))
1903 if (flags & FMODE_READ)
1905 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1907 if (flags & MDS_FMODE_EXEC)
1912 static int mdd_open_sanity_check(const struct lu_env *env,
1913 struct mdd_object *obj, int flag)
1915 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1920 if (mdd_is_dead_obj(obj))
1923 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1927 if (S_ISLNK(tmp_la->la_mode))
1930 mode = accmode(env, tmp_la, flag);
1932 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1935 if (!(flag & MDS_OPEN_CREATED)) {
1936 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1941 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1942 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1943 flag &= ~MDS_OPEN_TRUNC;
1945 /* For writing append-only file must open it with append mode. */
1946 if (mdd_is_append(obj)) {
1947 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1949 if (flag & MDS_OPEN_TRUNC)
1955 * Now, flag -- O_NOATIME does not be packed by client.
1957 if (flag & O_NOATIME) {
1958 struct md_ucred *uc = md_ucred(env);
1960 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1961 (uc->mu_valid == UCRED_NEW)) &&
1962 (uc->mu_fsuid != tmp_la->la_uid) &&
1963 !mdd_capable(uc, CFS_CAP_FOWNER))
1971 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1974 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1977 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1979 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1981 mdd_obj->mod_count++;
1983 mdd_write_unlock(env, mdd_obj);
1987 /* return md_attr back,
1988 * if it is last unlink then return lov ea + llog cookie*/
1989 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1995 if (S_ISREG(mdd_object_type(obj))) {
1996 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1997 * Caller must be ready for that. */
1999 rc = __mdd_lmm_get(env, obj, ma);
2000 if ((ma->ma_valid & MA_LOV))
2001 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2008 * No permission check is needed.
2010 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2013 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2014 struct mdd_device *mdd = mdo2mdd(obj);
2015 struct thandle *handle;
2019 #ifdef HAVE_QUOTA_SUPPORT
2020 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2021 struct mds_obd *mds = &obd->u.mds;
2022 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2027 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2030 handle = mdd_trans_start(env, mdo2mdd(obj));
2032 RETURN(PTR_ERR(handle));
2034 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2035 /* release open count */
2036 mdd_obj->mod_count --;
2038 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2039 /* remove link to object from orphan index */
2040 rc = __mdd_orphan_del(env, mdd_obj, handle);
2042 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2043 "list, OSS objects to be destroyed.\n",
2044 PFID(mdd_object_fid(mdd_obj)));
2046 CERROR("Object "DFID" can not be deleted from orphan "
2047 "list, maybe cause OST objects can not be "
2048 "destroyed (err: %d).\n",
2049 PFID(mdd_object_fid(mdd_obj)), rc);
2050 /* If object was not deleted from orphan list, do not
2051 * destroy OSS objects, which will be done when next
2057 rc = mdd_iattr_get(env, mdd_obj, ma);
2058 /* Object maybe not in orphan list originally, it is rare case for
2059 * mdd_finish_unlink() failure. */
2060 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2061 #ifdef HAVE_QUOTA_SUPPORT
2062 if (mds->mds_quota) {
2063 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2064 mdd_quota_wrapper(&ma->ma_attr, qids);
2067 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2068 if (ma->ma_valid & MA_FLAGS &&
2069 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2070 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2072 rc = mdd_object_kill(env, mdd_obj, ma);
2078 CERROR("Error when prepare to delete Object "DFID" , "
2079 "which will cause OST objects can not be "
2080 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2086 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2088 mdd_write_unlock(env, mdd_obj);
2089 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2090 #ifdef HAVE_QUOTA_SUPPORT
2092 /* Trigger dqrel on the owner of child. If failed,
2093 * the next call for lquota_chkquota will process it */
2094 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2101 * Permission check is done when open,
2102 * no need check again.
2104 static int mdd_readpage_sanity_check(const struct lu_env *env,
2105 struct mdd_object *obj)
2107 struct dt_object *next = mdd_object_child(obj);
2111 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2119 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2120 int first, void *area, int nob,
2121 const struct dt_it_ops *iops, struct dt_it *it,
2122 __u64 *start, __u64 *end,
2123 struct lu_dirent **last, __u32 attr)
2127 struct lu_dirent *ent;
2130 memset(area, 0, sizeof (struct lu_dirpage));
2131 area += sizeof (struct lu_dirpage);
2132 nob -= sizeof (struct lu_dirpage);
2140 len = iops->key_size(env, it);
2142 /* IAM iterator can return record with zero len. */
2146 hash = iops->store(env, it);
2147 if (unlikely(first)) {
2152 /* calculate max space required for lu_dirent */
2153 recsize = lu_dirent_calc_size(len, attr);
2155 if (nob >= recsize) {
2156 result = iops->rec(env, it, ent, attr);
2157 if (result == -ESTALE)
2162 /* osd might not able to pack all attributes,
2163 * so recheck rec length */
2164 recsize = le16_to_cpu(ent->lde_reclen);
2167 * record doesn't fit into page, enlarge previous one.
2170 (*last)->lde_reclen =
2171 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2180 ent = (void *)ent + recsize;
2184 result = iops->next(env, it);
2185 if (result == -ESTALE)
2187 } while (result == 0);
2194 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2195 const struct lu_rdpg *rdpg)
2198 struct dt_object *next = mdd_object_child(obj);
2199 const struct dt_it_ops *iops;
2201 struct lu_dirent *last = NULL;
2202 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2209 LASSERT(rdpg->rp_pages != NULL);
2210 LASSERT(next->do_index_ops != NULL);
2212 if (rdpg->rp_count <= 0)
2216 * iterate through directory and fill pages from @rdpg
2218 iops = &next->do_index_ops->dio_it;
2219 it = iops->init(env, next, mdd_object_capa(env, obj));
2223 rc = iops->load(env, it, rdpg->rp_hash);
2227 * Iterator didn't find record with exactly the key requested.
2229 * It is currently either
2231 * - positioned above record with key less than
2232 * requested---skip it.
2234 * - or not positioned at all (is in IAM_IT_SKEWED
2235 * state)---position it on the next item.
2237 rc = iops->next(env, it);
2242 * At this point and across for-loop:
2244 * rc == 0 -> ok, proceed.
2245 * rc > 0 -> end of directory.
2248 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2249 i++, nob -= CFS_PAGE_SIZE) {
2250 LASSERT(i < rdpg->rp_npages);
2251 pg = rdpg->rp_pages[i];
2252 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2253 min_t(int, nob, CFS_PAGE_SIZE), iops,
2254 it, &hash_start, &hash_end, &last,
2256 if (rc != 0 || i == rdpg->rp_npages - 1) {
2258 last->lde_reclen = 0;
2266 hash_end = DIR_END_OFF;
2270 struct lu_dirpage *dp;
2272 dp = cfs_kmap(rdpg->rp_pages[0]);
2273 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2274 dp->ldp_hash_end = cpu_to_le64(hash_end);
2277 * No pages were processed, mark this.
2279 dp->ldp_flags |= LDF_EMPTY;
2281 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2282 cfs_kunmap(rdpg->rp_pages[0]);
2285 iops->fini(env, it);
2290 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2291 const struct lu_rdpg *rdpg)
2293 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2297 LASSERT(mdd_object_exists(mdd_obj));
2299 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2300 rc = mdd_readpage_sanity_check(env, mdd_obj);
2302 GOTO(out_unlock, rc);
2304 if (mdd_is_dead_obj(mdd_obj)) {
2306 struct lu_dirpage *dp;
2309 * According to POSIX, please do not return any entry to client:
2310 * even dot and dotdot should not be returned.
2312 CWARN("readdir from dead object: "DFID"\n",
2313 PFID(mdd_object_fid(mdd_obj)));
2315 if (rdpg->rp_count <= 0)
2316 GOTO(out_unlock, rc = -EFAULT);
2317 LASSERT(rdpg->rp_pages != NULL);
2319 pg = rdpg->rp_pages[0];
2320 dp = (struct lu_dirpage*)cfs_kmap(pg);
2321 memset(dp, 0 , sizeof(struct lu_dirpage));
2322 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2323 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2324 dp->ldp_flags |= LDF_EMPTY;
2325 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2327 GOTO(out_unlock, rc = 0);
2330 rc = __mdd_readpage(env, mdd_obj, rdpg);
2334 mdd_read_unlock(env, mdd_obj);
2338 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2340 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2341 struct dt_object *next;
2343 LASSERT(mdd_object_exists(mdd_obj));
2344 next = mdd_object_child(mdd_obj);
2345 return next->do_ops->do_object_sync(env, next);
2348 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2349 struct md_object *obj)
2351 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2353 LASSERT(mdd_object_exists(mdd_obj));
2354 return do_version_get(env, mdd_object_child(mdd_obj));
2357 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2358 dt_obj_version_t version)
2360 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2362 LASSERT(mdd_object_exists(mdd_obj));
2363 return do_version_set(env, mdd_object_child(mdd_obj), version);
2366 const struct md_object_operations mdd_obj_ops = {
2367 .moo_permission = mdd_permission,
2368 .moo_attr_get = mdd_attr_get,
2369 .moo_attr_set = mdd_attr_set,
2370 .moo_xattr_get = mdd_xattr_get,
2371 .moo_xattr_set = mdd_xattr_set,
2372 .moo_xattr_list = mdd_xattr_list,
2373 .moo_xattr_del = mdd_xattr_del,
2374 .moo_object_create = mdd_object_create,
2375 .moo_ref_add = mdd_ref_add,
2376 .moo_ref_del = mdd_ref_del,
2377 .moo_open = mdd_open,
2378 .moo_close = mdd_close,
2379 .moo_readpage = mdd_readpage,
2380 .moo_readlink = mdd_readlink,
2381 .moo_capa_get = mdd_capa_get,
2382 .moo_object_sync = mdd_object_sync,
2383 .moo_version_get = mdd_version_get,
2384 .moo_version_set = mdd_version_set,
2385 .moo_path = mdd_path,