1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
151 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
153 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
155 OBD_VFREE(buf->lb_buf, buf->lb_len);
157 OBD_FREE(buf->lb_buf, buf->lb_len);
160 if (buf->lb_buf == NULL) {
162 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163 OBD_ALLOC(buf->lb_buf, buf->lb_len);
166 if (buf->lb_buf == NULL) {
167 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
170 if (buf->lb_buf == NULL)
176 /** Increase the size of the \a mti_big_buf.
177 * preserves old data in buffer
178 * old buffer remains unchanged on error
179 * \retval 0 or -ENOMEM
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
183 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
186 LASSERT(len >= oldbuf->lb_len);
187 if (len > BUF_VMALLOC_SIZE) {
188 OBD_VMALLOC(buf.lb_buf, len);
191 OBD_ALLOC(buf.lb_buf, len);
194 if (buf.lb_buf == NULL)
198 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
200 if (oldbuf->lb_vmalloc)
201 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
203 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
205 memcpy(oldbuf, &buf, sizeof(buf));
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211 struct mdd_device *mdd)
213 struct mdd_thread_info *mti = mdd_env_info(env);
216 max_cookie_size = mdd_lov_cookiesize(env, mdd);
217 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218 if (mti->mti_max_cookie)
219 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220 mti->mti_max_cookie = NULL;
221 mti->mti_max_cookie_size = 0;
223 if (unlikely(mti->mti_max_cookie == NULL)) {
224 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225 if (likely(mti->mti_max_cookie != NULL))
226 mti->mti_max_cookie_size = max_cookie_size;
228 if (likely(mti->mti_max_cookie != NULL))
229 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230 return mti->mti_max_cookie;
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234 struct mdd_device *mdd)
236 struct mdd_thread_info *mti = mdd_env_info(env);
239 max_lmm_size = mdd_lov_mdsize(env, mdd);
240 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241 if (mti->mti_max_lmm)
242 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243 mti->mti_max_lmm = NULL;
244 mti->mti_max_lmm_size = 0;
246 if (unlikely(mti->mti_max_lmm == NULL)) {
247 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248 if (unlikely(mti->mti_max_lmm != NULL))
249 mti->mti_max_lmm_size = max_lmm_size;
251 return mti->mti_max_lmm;
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255 const struct lu_object_header *hdr,
258 struct mdd_object *mdd_obj;
260 OBD_ALLOC_PTR(mdd_obj);
261 if (mdd_obj != NULL) {
264 o = mdd2lu_obj(mdd_obj);
265 lu_object_init(o, NULL, d);
266 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268 mdd_obj->mod_count = 0;
269 o->lo_ops = &mdd_lu_obj_ops;
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277 const struct lu_object_conf *unused)
279 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280 struct mdd_object *mdd_obj = lu2mdd_obj(o);
281 struct lu_object *below;
282 struct lu_device *under;
285 mdd_obj->mod_cltime = 0;
286 under = &d->mdd_child->dd_lu_dev;
287 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288 mdd_pdlock_init(mdd_obj);
292 lu_object_add(o, below);
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
299 if (lu_object_exists(o))
300 return mdd_get_flags(env, lu2mdd_obj(o));
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
307 struct mdd_object *mdd = lu2mdd_obj(o);
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314 lu_printer_t p, const struct lu_object *o)
316 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318 "valid=%x, cltime="LPU64", flags=%lx)",
319 mdd, mdd->mod_count, mdd->mod_valid,
320 mdd->mod_cltime, mdd->mod_flags);
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324 .loo_object_init = mdd_object_init,
325 .loo_object_start = mdd_object_start,
326 .loo_object_free = mdd_object_free,
327 .loo_object_print = mdd_object_print,
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331 struct mdd_device *d,
332 const struct lu_fid *f)
334 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338 const char *path, struct lu_fid *fid)
341 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342 struct mdd_object *obj;
343 struct lu_name *lname = &mdd_env_info(env)->mti_name;
348 /* temp buffer for path element */
349 buf = mdd_buf_alloc(env, PATH_MAX);
350 if (buf->lb_buf == NULL)
353 lname->ln_name = name = buf->lb_buf;
354 lname->ln_namelen = 0;
355 *f = mdd->mdd_root_fid;
362 while (*path != '/' && *path != '\0') {
370 /* find obj corresponding to fid */
371 obj = mdd_object_find(env, mdd, f);
373 GOTO(out, rc = -EREMOTE);
375 GOTO(out, rc = -PTR_ERR(obj));
376 /* get child fid from parent and name */
377 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378 mdd_object_put(env, obj);
383 lname->ln_namelen = 0;
392 /** The maximum depth that fid2path() will search.
393 * This is limited only because we want to store the fids for
394 * historical path lookup purposes.
396 #define MAX_PATH_DEPTH 100
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400 __u64 pli_recno; /**< history point */
401 __u64 pli_currec; /**< current record */
402 struct lu_fid pli_fid;
403 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404 struct mdd_object *pli_mdd_obj;
405 char *pli_path; /**< full path */
407 int pli_linkno; /**< which hardlink to follow */
408 int pli_fidcount; /**< number of \a pli_fids */
411 static int mdd_path_current(const struct lu_env *env,
412 struct path_lookup_info *pli)
414 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415 struct mdd_object *mdd_obj;
416 struct lu_buf *buf = NULL;
417 struct link_ea_header *leh;
418 struct link_ea_entry *lee;
419 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
426 ptr = pli->pli_path + pli->pli_pathlen - 1;
429 pli->pli_fidcount = 0;
430 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
432 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433 mdd_obj = mdd_object_find(env, mdd,
434 &pli->pli_fids[pli->pli_fidcount]);
436 GOTO(out, rc = -EREMOTE);
438 GOTO(out, rc = -PTR_ERR(mdd_obj));
439 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
441 mdd_object_put(env, mdd_obj);
445 /* Do I need to error out here? */
450 /* Get parent fid and object name */
451 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452 buf = mdd_links_get(env, mdd_obj);
453 mdd_read_unlock(env, mdd_obj);
454 mdd_object_put(env, mdd_obj);
456 GOTO(out, rc = PTR_ERR(buf));
459 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462 /* If set, use link #linkno for path lookup, otherwise use
463 link #0. Only do this for the final path element. */
464 if ((pli->pli_fidcount == 0) &&
465 (pli->pli_linkno < leh->leh_reccount)) {
467 for (count = 0; count < pli->pli_linkno; count++) {
468 lee = (struct link_ea_entry *)
469 ((char *)lee + reclen);
470 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
472 if (pli->pli_linkno < leh->leh_reccount - 1)
473 /* indicate to user there are more links */
477 /* Pack the name in the end of the buffer */
478 ptr -= tmpname->ln_namelen;
479 if (ptr - 1 <= pli->pli_path)
480 GOTO(out, rc = -EOVERFLOW);
481 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
484 /* Store the parent fid for historic lookup */
485 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486 GOTO(out, rc = -EOVERFLOW);
487 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
490 /* Verify that our path hasn't changed since we started the lookup.
491 Record the current index, and verify the path resolves to the
492 same fid. If it does, then the path is correct as of this index. */
493 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494 pli->pli_currec = mdd->mdd_cl.mc_index;
495 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
498 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499 GOTO (out, rc = -EAGAIN);
501 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504 PFID(&pli->pli_fid));
505 GOTO(out, rc = -EAGAIN);
508 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
512 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513 /* if we vmalloced a large buffer drop it */
519 static int mdd_path_historic(const struct lu_env *env,
520 struct path_lookup_info *pli)
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527 char *path, int pathlen, __u64 *recno, int *linkno)
529 struct path_lookup_info *pli;
537 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
547 pli->pli_mdd_obj = md2mdd_obj(obj);
548 pli->pli_recno = *recno;
549 pli->pli_path = path;
550 pli->pli_pathlen = pathlen;
551 pli->pli_linkno = *linkno;
553 /* Retry multiple times in case file is being moved */
554 while (tries-- && rc == -EAGAIN)
555 rc = mdd_path_current(env, pli);
557 /* For historical path lookup, the current links may not have existed
558 * at "recno" time. We must switch over to earlier links/parents
559 * by using the changelog records. If the earlier parent doesn't
560 * exist, we must search back through the changelog to reconstruct
561 * its parents, then check if it exists, etc.
562 * We may ignore this problem for the initial implementation and
563 * state that an "original" hardlink must still exist for us to find
564 * historic path name. */
565 if (pli->pli_recno != -1) {
566 rc = mdd_path_historic(env, pli);
568 *recno = pli->pli_currec;
569 /* Return next link index to caller */
570 *linkno = pli->pli_linkno;
578 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
580 struct lu_attr *la = &mdd_env_info(env)->mti_la;
584 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
586 mdd_flags_xlate(obj, la->la_flags);
587 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
588 obj->mod_flags |= MNLINK_OBJ;
593 /* get only inode attributes */
594 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
600 if (ma->ma_valid & MA_INODE)
603 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
604 mdd_object_capa(env, mdd_obj));
606 ma->ma_valid |= MA_INODE;
610 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
613 struct lov_desc *ldesc;
614 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
617 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
618 LASSERT(ldesc != NULL);
619 LASSERT(size != NULL);
626 lmm->lmm_magic = LOV_MAGIC_V1;
627 lmm->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
628 lmm->lmm_pattern = ldesc->ld_pattern;
629 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
630 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
631 *size = sizeof(struct lov_mds_md);
633 RETURN(sizeof(struct lov_mds_md));
636 /* get lov EA only */
637 static int __mdd_lmm_get(const struct lu_env *env,
638 struct mdd_object *mdd_obj, struct md_attr *ma)
643 if (ma->ma_valid & MA_LOV)
646 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
648 if (rc == 0 && ma->ma_need & MA_LOV_DEF)
649 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
652 ma->ma_valid |= MA_LOV;
658 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
664 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
665 rc = __mdd_lmm_get(env, mdd_obj, ma);
666 mdd_read_unlock(env, mdd_obj);
671 static int __mdd_lmv_get(const struct lu_env *env,
672 struct mdd_object *mdd_obj, struct md_attr *ma)
677 if (ma->ma_valid & MA_LMV)
680 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
683 ma->ma_valid |= MA_LMV;
689 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
692 struct mdd_thread_info *info = mdd_env_info(env);
693 struct lustre_mdt_attrs *lma =
694 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
699 /* If all needed data are already valid, nothing to do */
700 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
701 (ma->ma_need & (MA_HSM | MA_SOM)))
704 /* Read LMA from disk EA */
705 lma_size = sizeof(info->mti_xattr_buf);
706 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
710 /* Useless to check LMA incompatibility because this is already done in
711 * osd_ea_fid_get(), and this will fail long before this code is
713 * So, if we are here, LMA is compatible.
716 lustre_lma_swab(lma);
718 /* Swab and copy LMA */
719 if (ma->ma_need & MA_HSM) {
720 if (lma->lma_compat & LMAC_HSM)
721 ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
723 ma->ma_hsm_flags = 0;
724 ma->ma_valid |= MA_HSM;
728 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
729 LASSERT(ma->ma_som != NULL);
730 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
731 ma->ma_som->msd_size = lma->lma_som_size;
732 ma->ma_som->msd_blocks = lma->lma_som_blocks;
733 ma->ma_som->msd_mountid = lma->lma_som_mountid;
734 ma->ma_valid |= MA_SOM;
740 static int mdd_attr_get_internal(const struct lu_env *env,
741 struct mdd_object *mdd_obj,
747 if (ma->ma_need & MA_INODE)
748 rc = mdd_iattr_get(env, mdd_obj, ma);
750 if (rc == 0 && ma->ma_need & MA_LOV) {
751 if (S_ISREG(mdd_object_type(mdd_obj)) ||
752 S_ISDIR(mdd_object_type(mdd_obj)))
753 rc = __mdd_lmm_get(env, mdd_obj, ma);
755 if (rc == 0 && ma->ma_need & MA_LMV) {
756 if (S_ISDIR(mdd_object_type(mdd_obj)))
757 rc = __mdd_lmv_get(env, mdd_obj, ma);
759 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
760 if (S_ISREG(mdd_object_type(mdd_obj)))
761 rc = __mdd_lma_get(env, mdd_obj, ma);
763 #ifdef CONFIG_FS_POSIX_ACL
764 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
765 if (S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = mdd_def_acl_get(env, mdd_obj, ma);
769 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
774 int mdd_attr_get_internal_locked(const struct lu_env *env,
775 struct mdd_object *mdd_obj, struct md_attr *ma)
778 int needlock = ma->ma_need &
779 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
782 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
783 rc = mdd_attr_get_internal(env, mdd_obj, ma);
785 mdd_read_unlock(env, mdd_obj);
790 * No permission check is needed.
792 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
795 struct mdd_object *mdd_obj = md2mdd_obj(obj);
799 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
804 * No permission check is needed.
806 static int mdd_xattr_get(const struct lu_env *env,
807 struct md_object *obj, struct lu_buf *buf,
810 struct mdd_object *mdd_obj = md2mdd_obj(obj);
815 LASSERT(mdd_object_exists(mdd_obj));
817 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
818 rc = mdo_xattr_get(env, mdd_obj, buf, name,
819 mdd_object_capa(env, mdd_obj));
820 mdd_read_unlock(env, mdd_obj);
826 * Permission check is done when open,
827 * no need check again.
829 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
832 struct mdd_object *mdd_obj = md2mdd_obj(obj);
833 struct dt_object *next;
838 LASSERT(mdd_object_exists(mdd_obj));
840 next = mdd_object_child(mdd_obj);
841 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
842 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
843 mdd_object_capa(env, mdd_obj));
844 mdd_read_unlock(env, mdd_obj);
849 * No permission check is needed.
851 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
854 struct mdd_object *mdd_obj = md2mdd_obj(obj);
859 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
860 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
861 mdd_read_unlock(env, mdd_obj);
866 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
867 struct mdd_object *c, struct md_attr *ma,
868 struct thandle *handle,
869 const struct md_op_spec *spec)
871 struct lu_attr *attr = &ma->ma_attr;
872 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
873 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
874 const struct dt_index_features *feat = spec->sp_feat;
878 if (!mdd_object_exists(c)) {
879 struct dt_object *next = mdd_object_child(c);
882 if (feat != &dt_directory_features && feat != NULL)
883 dof->dof_type = DFT_INDEX;
885 dof->dof_type = dt_mode_to_dft(attr->la_mode);
887 dof->u.dof_idx.di_feat = feat;
889 /* @hint will be initialized by underlying device. */
890 next->do_ops->do_ah_init(env, hint,
891 p ? mdd_object_child(p) : NULL,
892 attr->la_mode & S_IFMT);
894 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
895 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
903 * Make sure the ctime is increased only.
905 static inline int mdd_attr_check(const struct lu_env *env,
906 struct mdd_object *obj,
907 struct lu_attr *attr)
909 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
913 if (attr->la_valid & LA_CTIME) {
914 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
918 if (attr->la_ctime < tmp_la->la_ctime)
919 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
920 else if (attr->la_valid == LA_CTIME &&
921 attr->la_ctime == tmp_la->la_ctime)
922 attr->la_valid &= ~LA_CTIME;
927 int mdd_attr_set_internal(const struct lu_env *env,
928 struct mdd_object *obj,
929 struct lu_attr *attr,
930 struct thandle *handle,
936 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
937 #ifdef CONFIG_FS_POSIX_ACL
938 if (!rc && (attr->la_valid & LA_MODE) && needacl)
939 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
944 int mdd_attr_check_set_internal(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 rc = mdd_attr_check(env, obj, attr);
958 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
962 static int mdd_attr_set_internal_locked(const struct lu_env *env,
963 struct mdd_object *obj,
964 struct lu_attr *attr,
965 struct thandle *handle,
971 needacl = needacl && (attr->la_valid & LA_MODE);
973 mdd_write_lock(env, obj, MOR_TGT_CHILD);
974 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976 mdd_write_unlock(env, obj);
980 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
981 struct mdd_object *obj,
982 struct lu_attr *attr,
983 struct thandle *handle,
989 needacl = needacl && (attr->la_valid & LA_MODE);
991 mdd_write_lock(env, obj, MOR_TGT_CHILD);
992 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
994 mdd_write_unlock(env, obj);
998 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
999 const struct lu_buf *buf, const char *name,
1000 int fl, struct thandle *handle)
1002 struct lustre_capa *capa = mdd_object_capa(env, obj);
1006 if (buf->lb_buf && buf->lb_len > 0)
1007 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1008 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1009 rc = mdo_xattr_del(env, obj, name, handle, capa);
1015 * This gives the same functionality as the code between
1016 * sys_chmod and inode_setattr
1017 * chown_common and inode_setattr
1018 * utimes and inode_setattr
1019 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1021 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1022 struct lu_attr *la, const struct md_attr *ma)
1024 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1025 struct md_ucred *uc;
1032 /* Do not permit change file type */
1033 if (la->la_valid & LA_TYPE)
1036 /* They should not be processed by setattr */
1037 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1040 /* export destroy does not have ->le_ses, but we may want
1041 * to drop LUSTRE_SOM_FL. */
1047 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1051 if (la->la_valid == LA_CTIME) {
1052 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1053 /* This is only for set ctime when rename's source is
1055 rc = mdd_may_delete(env, NULL, obj,
1056 (struct md_attr *)ma, 1, 0);
1057 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1058 la->la_valid &= ~LA_CTIME;
1062 if (la->la_valid == LA_ATIME) {
1063 /* This is atime only set for read atime update on close. */
1064 if (la->la_atime <= tmp_la->la_atime +
1065 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1066 la->la_valid &= ~LA_ATIME;
1070 /* Check if flags change. */
1071 if (la->la_valid & LA_FLAGS) {
1072 unsigned int oldflags = 0;
1073 unsigned int newflags = la->la_flags &
1074 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1076 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1077 !mdd_capable(uc, CFS_CAP_FOWNER))
1080 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1081 * only be changed by the relevant capability. */
1082 if (mdd_is_immutable(obj))
1083 oldflags |= LUSTRE_IMMUTABLE_FL;
1084 if (mdd_is_append(obj))
1085 oldflags |= LUSTRE_APPEND_FL;
1086 if ((oldflags ^ newflags) &&
1087 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1090 if (!S_ISDIR(tmp_la->la_mode))
1091 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1094 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1095 (la->la_valid & ~LA_FLAGS) &&
1096 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1099 /* Check for setting the obj time. */
1100 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1101 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1102 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1103 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1104 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1112 /* Make sure a caller can chmod. */
1113 if (la->la_valid & LA_MODE) {
1114 /* Bypass la_vaild == LA_MODE,
1115 * this is for changing file with SUID or SGID. */
1116 if ((la->la_valid & ~LA_MODE) &&
1117 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1118 (uc->mu_fsuid != tmp_la->la_uid) &&
1119 !mdd_capable(uc, CFS_CAP_FOWNER))
1122 if (la->la_mode == (cfs_umode_t) -1)
1123 la->la_mode = tmp_la->la_mode;
1125 la->la_mode = (la->la_mode & S_IALLUGO) |
1126 (tmp_la->la_mode & ~S_IALLUGO);
1128 /* Also check the setgid bit! */
1129 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1130 la->la_gid : tmp_la->la_gid) &&
1131 !mdd_capable(uc, CFS_CAP_FSETID))
1132 la->la_mode &= ~S_ISGID;
1134 la->la_mode = tmp_la->la_mode;
1137 /* Make sure a caller can chown. */
1138 if (la->la_valid & LA_UID) {
1139 if (la->la_uid == (uid_t) -1)
1140 la->la_uid = tmp_la->la_uid;
1141 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1142 (la->la_uid != tmp_la->la_uid)) &&
1143 !mdd_capable(uc, CFS_CAP_CHOWN))
1146 /* If the user or group of a non-directory has been
1147 * changed by a non-root user, remove the setuid bit.
1148 * 19981026 David C Niemi <niemi@tux.org>
1150 * Changed this to apply to all users, including root,
1151 * to avoid some races. This is the behavior we had in
1152 * 2.0. The check for non-root was definitely wrong
1153 * for 2.2 anyway, as it should have been using
1154 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1155 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1156 !S_ISDIR(tmp_la->la_mode)) {
1157 la->la_mode &= ~S_ISUID;
1158 la->la_valid |= LA_MODE;
1162 /* Make sure caller can chgrp. */
1163 if (la->la_valid & LA_GID) {
1164 if (la->la_gid == (gid_t) -1)
1165 la->la_gid = tmp_la->la_gid;
1166 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1167 ((la->la_gid != tmp_la->la_gid) &&
1168 !lustre_in_group_p(uc, la->la_gid))) &&
1169 !mdd_capable(uc, CFS_CAP_CHOWN))
1172 /* Likewise, if the user or group of a non-directory
1173 * has been changed by a non-root user, remove the
1174 * setgid bit UNLESS there is no group execute bit
1175 * (this would be a file marked for mandatory
1176 * locking). 19981026 David C Niemi <niemi@tux.org>
1178 * Removed the fsuid check (see the comment above) --
1180 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1181 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1182 la->la_mode &= ~S_ISGID;
1183 la->la_valid |= LA_MODE;
1187 /* For both Size-on-MDS case and truncate case,
1188 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1189 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1190 * For SOM case, it is true, the MAY_WRITE perm has been checked
1191 * when open, no need check again. For truncate case, it is false,
1192 * the MAY_WRITE perm should be checked here. */
1193 if (ma->ma_attr_flags & MDS_SOM) {
1194 /* For the "Size-on-MDS" setattr update, merge coming
1195 * attributes with the set in the inode. BUG 10641 */
1196 if ((la->la_valid & LA_ATIME) &&
1197 (la->la_atime <= tmp_la->la_atime))
1198 la->la_valid &= ~LA_ATIME;
1200 /* OST attributes do not have a priority over MDS attributes,
1201 * so drop times if ctime is equal. */
1202 if ((la->la_valid & LA_CTIME) &&
1203 (la->la_ctime <= tmp_la->la_ctime))
1204 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1206 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1207 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1208 (uc->mu_fsuid == tmp_la->la_uid)) &&
1209 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1210 rc = mdd_permission_internal_locked(env, obj,
1217 if (la->la_valid & LA_CTIME) {
1218 /* The pure setattr, it has the priority over what is
1219 * already set, do not drop it if ctime is equal. */
1220 if (la->la_ctime < tmp_la->la_ctime)
1221 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1229 /** Store a data change changelog record
1230 * If this fails, we must fail the whole transaction; we don't
1231 * want the change to commit without the log entry.
1232 * \param mdd_obj - mdd_object of change
1233 * \param handle - transacion handle
1235 static int mdd_changelog_data_store(const struct lu_env *env,
1236 struct mdd_device *mdd,
1237 enum changelog_rec_type type,
1238 struct mdd_object *mdd_obj,
1239 struct thandle *handle)
1241 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1242 struct llog_changelog_rec *rec;
1247 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1250 LASSERT(handle != NULL);
1251 LASSERT(mdd_obj != NULL);
1253 if ((type == CL_TIME) &&
1254 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1255 /* Don't need multiple updates in this log */
1256 /* Don't check under lock - no big deal if we get an extra
1261 reclen = llog_data_len(sizeof(*rec));
1262 buf = mdd_buf_alloc(env, reclen);
1263 if (buf->lb_buf == NULL)
1265 rec = (struct llog_changelog_rec *)buf->lb_buf;
1267 rec->cr.cr_flags = CLF_VERSION;
1268 rec->cr.cr_type = (__u32)type;
1269 rec->cr.cr_tfid = *tfid;
1270 rec->cr.cr_namelen = 0;
1271 mdd_obj->mod_cltime = cfs_time_current_64();
1273 rc = mdd_changelog_llog_write(mdd, rec, handle);
1275 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1276 rc, type, PFID(tfid));
1284 * Should be called with write lock held.
1286 * \see mdd_lma_set_locked().
1288 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1289 const struct md_attr *ma, struct thandle *handle)
1291 struct mdd_thread_info *info = mdd_env_info(env);
1293 struct lustre_mdt_attrs *lma =
1294 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1295 int lmasize = sizeof(struct lustre_mdt_attrs);
1300 /* Either HSM or SOM part is not valid, we need to read it before */
1301 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1302 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1306 lustre_lma_swab(lma);
1308 memset(lma, 0, lmasize);
1312 if (ma->ma_valid & MA_HSM) {
1313 lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1314 lma->lma_compat |= LMAC_HSM;
1318 if (ma->ma_valid & MA_SOM) {
1319 LASSERT(ma->ma_som != NULL);
1320 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1321 lma->lma_compat &= ~LMAC_SOM;
1323 lma->lma_compat |= LMAC_SOM;
1324 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1325 lma->lma_som_size = ma->ma_som->msd_size;
1326 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1327 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1332 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1334 lustre_lma_swab(lma);
1335 buf = mdd_buf_get(env, lma, lmasize);
1336 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1342 * Save LMA extended attributes with data from \a ma.
1344 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1345 * not, LMA EA will be first read from disk, modified and write back.
1348 static int mdd_lma_set_locked(const struct lu_env *env,
1349 struct mdd_object *mdd_obj,
1350 const struct md_attr *ma, struct thandle *handle)
1354 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1355 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1356 mdd_write_unlock(env, mdd_obj);
1360 /* set attr and LOV EA at once, return updated attr */
1361 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1362 const struct md_attr *ma)
1364 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1365 struct mdd_device *mdd = mdo2mdd(obj);
1366 struct thandle *handle;
1367 struct lov_mds_md *lmm = NULL;
1368 struct llog_cookie *logcookies = NULL;
1369 int rc, lmm_size = 0, cookie_size = 0;
1370 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1371 #ifdef HAVE_QUOTA_SUPPORT
1372 struct obd_device *obd = mdd->mdd_obd_dev;
1373 struct mds_obd *mds = &obd->u.mds;
1374 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1375 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1376 int quota_opc = 0, block_count = 0;
1377 int inode_pending[MAXQUOTAS] = { 0, 0 };
1378 int block_pending[MAXQUOTAS] = { 0, 0 };
1382 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1383 MDD_TXN_ATTR_SET_OP);
1384 handle = mdd_trans_start(env, mdd);
1386 RETURN(PTR_ERR(handle));
1387 /*TODO: add lock here*/
1388 /* start a log jounal handle if needed */
1389 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1390 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1391 lmm_size = mdd_lov_mdsize(env, mdd);
1392 lmm = mdd_max_lmm_get(env, mdd);
1394 GOTO(cleanup, rc = -ENOMEM);
1396 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1403 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1404 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1405 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1407 *la_copy = ma->ma_attr;
1408 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1412 #ifdef HAVE_QUOTA_SUPPORT
1413 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1414 struct obd_export *exp = md_quota(env)->mq_exp;
1415 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1417 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1419 quota_opc = FSFILT_OP_SETATTR;
1420 mdd_quota_wrapper(la_copy, qnids);
1421 mdd_quota_wrapper(la_tmp, qoids);
1422 /* get file quota for new owner */
1423 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1424 qnids, inode_pending, 1, NULL, 0,
1426 block_count = (la_tmp->la_blocks + 7) >> 3;
1429 mdd_data_get(env, mdd_obj, &data);
1430 /* get block quota for new owner */
1431 lquota_chkquota(mds_quota_interface_ref, obd,
1432 exp, qnids, block_pending,
1434 LQUOTA_FLAGS_BLK, data, 1);
1440 if (la_copy->la_valid & LA_FLAGS) {
1441 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1444 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1445 } else if (la_copy->la_valid) { /* setattr */
1446 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1448 /* journal chown/chgrp in llog, just like unlink */
1449 if (rc == 0 && lmm_size){
1450 cookie_size = mdd_lov_cookiesize(env, mdd);
1451 logcookies = mdd_max_cookie_get(env, mdd);
1452 if (logcookies == NULL)
1453 GOTO(cleanup, rc = -ENOMEM);
1455 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1456 logcookies, cookie_size) <= 0)
1461 if (rc == 0 && ma->ma_valid & MA_LOV) {
1464 mode = mdd_object_type(mdd_obj);
1465 if (S_ISREG(mode) || S_ISDIR(mode)) {
1466 rc = mdd_lsm_sanity_check(env, mdd_obj);
1470 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1471 ma->ma_lmm_size, handle, 1);
1475 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1478 mode = mdd_object_type(mdd_obj);
1480 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1485 rc = mdd_changelog_data_store(env, mdd,
1486 (ma->ma_attr.la_valid &
1487 ~(LA_MTIME|LA_CTIME|LA_ATIME)) ?
1488 CL_SETATTR : CL_TIME,
1490 mdd_trans_stop(env, mdd, rc, handle);
1491 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1492 /*set obd attr, if needed*/
1493 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1496 #ifdef HAVE_QUOTA_SUPPORT
1498 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1500 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1502 /* Trigger dqrel/dqacq for original owner and new owner.
1503 * If failed, the next call for lquota_chkquota will
1505 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1512 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1513 const struct lu_buf *buf, const char *name, int fl,
1514 struct thandle *handle)
1519 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1520 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1521 mdd_write_unlock(env, obj);
1526 static int mdd_xattr_sanity_check(const struct lu_env *env,
1527 struct mdd_object *obj)
1529 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1530 struct md_ucred *uc = md_ucred(env);
1534 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1537 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1541 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1542 !mdd_capable(uc, CFS_CAP_FOWNER))
1549 * The caller should guarantee to update the object ctime
1550 * after xattr_set if needed.
1552 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1553 const struct lu_buf *buf, const char *name,
1556 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1557 struct mdd_device *mdd = mdo2mdd(obj);
1558 struct thandle *handle;
1562 rc = mdd_xattr_sanity_check(env, mdd_obj);
1566 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1567 /* security-replated changes may require sync */
1568 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1569 mdd->mdd_sync_permission == 1)
1570 txn_param_sync(&mdd_env_info(env)->mti_param);
1572 handle = mdd_trans_start(env, mdd);
1574 RETURN(PTR_ERR(handle));
1576 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1578 /* Only record user xattr changes */
1579 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1580 (strncmp("user.", name, 5) == 0))
1581 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1583 mdd_trans_stop(env, mdd, rc, handle);
1589 * The caller should guarantee to update the object ctime
1590 * after xattr_set if needed.
1592 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1595 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1596 struct mdd_device *mdd = mdo2mdd(obj);
1597 struct thandle *handle;
1601 rc = mdd_xattr_sanity_check(env, mdd_obj);
1605 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1606 handle = mdd_trans_start(env, mdd);
1608 RETURN(PTR_ERR(handle));
1610 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1611 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1612 mdd_object_capa(env, mdd_obj));
1613 mdd_write_unlock(env, mdd_obj);
1615 /* Only record user xattr changes */
1616 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1617 (strncmp("user.", name, 5) != 0))
1618 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1621 mdd_trans_stop(env, mdd, rc, handle);
1626 /* partial unlink */
1627 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1630 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1631 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1632 struct mdd_device *mdd = mdo2mdd(obj);
1633 struct thandle *handle;
1634 #ifdef HAVE_QUOTA_SUPPORT
1635 struct obd_device *obd = mdd->mdd_obd_dev;
1636 struct mds_obd *mds = &obd->u.mds;
1637 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1644 * Check -ENOENT early here because we need to get object type
1645 * to calculate credits before transaction start
1647 if (!mdd_object_exists(mdd_obj))
1650 LASSERT(mdd_object_exists(mdd_obj) > 0);
1652 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1656 handle = mdd_trans_start(env, mdd);
1660 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1662 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1666 __mdd_ref_del(env, mdd_obj, handle, 0);
1668 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1670 __mdd_ref_del(env, mdd_obj, handle, 1);
1673 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1674 la_copy->la_ctime = ma->ma_attr.la_ctime;
1676 la_copy->la_valid = LA_CTIME;
1677 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1681 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1682 #ifdef HAVE_QUOTA_SUPPORT
1683 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1684 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1685 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1686 mdd_quota_wrapper(&ma->ma_attr, qids);
1693 mdd_write_unlock(env, mdd_obj);
1694 mdd_trans_stop(env, mdd, rc, handle);
1695 #ifdef HAVE_QUOTA_SUPPORT
1697 /* Trigger dqrel on the owner of child. If failed,
1698 * the next call for lquota_chkquota will process it */
1699 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1705 /* partial operation */
1706 static int mdd_oc_sanity_check(const struct lu_env *env,
1707 struct mdd_object *obj,
1713 switch (ma->ma_attr.la_mode & S_IFMT) {
1730 static int mdd_object_create(const struct lu_env *env,
1731 struct md_object *obj,
1732 const struct md_op_spec *spec,
1736 struct mdd_device *mdd = mdo2mdd(obj);
1737 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1738 const struct lu_fid *pfid = spec->u.sp_pfid;
1739 struct thandle *handle;
1740 #ifdef HAVE_QUOTA_SUPPORT
1741 struct obd_device *obd = mdd->mdd_obd_dev;
1742 struct obd_export *exp = md_quota(env)->mq_exp;
1743 struct mds_obd *mds = &obd->u.mds;
1744 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1745 int quota_opc = 0, block_count = 0;
1746 int inode_pending[MAXQUOTAS] = { 0, 0 };
1747 int block_pending[MAXQUOTAS] = { 0, 0 };
1752 #ifdef HAVE_QUOTA_SUPPORT
1753 if (mds->mds_quota) {
1754 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1755 mdd_quota_wrapper(&ma->ma_attr, qids);
1756 /* get file quota for child */
1757 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1758 qids, inode_pending, 1, NULL, 0,
1760 switch (ma->ma_attr.la_mode & S_IFMT) {
1769 /* get block quota for child */
1771 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1772 qids, block_pending, block_count,
1773 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1777 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1778 handle = mdd_trans_start(env, mdd);
1780 GOTO(out_pending, rc = PTR_ERR(handle));
1782 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1783 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1787 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1791 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1792 /* If creating the slave object, set slave EA here. */
1793 int lmv_size = spec->u.sp_ea.eadatalen;
1794 struct lmv_stripe_md *lmv;
1796 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1797 LASSERT(lmv != NULL && lmv_size > 0);
1799 rc = __mdd_xattr_set(env, mdd_obj,
1800 mdd_buf_get_const(env, lmv, lmv_size),
1801 XATTR_NAME_LMV, 0, handle);
1805 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1808 #ifdef CONFIG_FS_POSIX_ACL
1809 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1810 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1812 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1813 buf->lb_len = spec->u.sp_ea.eadatalen;
1814 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1815 rc = __mdd_acl_init(env, mdd_obj, buf,
1816 &ma->ma_attr.la_mode,
1821 ma->ma_attr.la_valid |= LA_MODE;
1824 pfid = spec->u.sp_ea.fid;
1827 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1833 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1834 mdd_write_unlock(env, mdd_obj);
1836 mdd_trans_stop(env, mdd, rc, handle);
1838 #ifdef HAVE_QUOTA_SUPPORT
1840 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1842 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1844 /* Trigger dqacq on the owner of child. If failed,
1845 * the next call for lquota_chkquota will process it. */
1846 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1854 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1855 const struct md_attr *ma)
1857 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1858 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1859 struct mdd_device *mdd = mdo2mdd(obj);
1860 struct thandle *handle;
1864 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1865 handle = mdd_trans_start(env, mdd);
1869 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1870 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1872 __mdd_ref_add(env, mdd_obj, handle);
1873 mdd_write_unlock(env, mdd_obj);
1875 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1876 la_copy->la_ctime = ma->ma_attr.la_ctime;
1878 la_copy->la_valid = LA_CTIME;
1879 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1882 mdd_trans_stop(env, mdd, 0, handle);
1888 * do NOT or the MAY_*'s, you'll get the weakest
1890 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1894 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1895 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1896 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1897 * owner can write to a file even if it is marked readonly to hide
1898 * its brokenness. (bug 5781) */
1899 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1900 struct md_ucred *uc = md_ucred(env);
1902 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1903 (la->la_uid == uc->mu_fsuid))
1907 if (flags & FMODE_READ)
1909 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1911 if (flags & MDS_FMODE_EXEC)
1916 static int mdd_open_sanity_check(const struct lu_env *env,
1917 struct mdd_object *obj, int flag)
1919 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1924 if (mdd_is_dead_obj(obj))
1927 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1931 if (S_ISLNK(tmp_la->la_mode))
1934 mode = accmode(env, tmp_la, flag);
1936 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1939 if (!(flag & MDS_OPEN_CREATED)) {
1940 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1945 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1946 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1947 flag &= ~MDS_OPEN_TRUNC;
1949 /* For writing append-only file must open it with append mode. */
1950 if (mdd_is_append(obj)) {
1951 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1953 if (flag & MDS_OPEN_TRUNC)
1959 * Now, flag -- O_NOATIME does not be packed by client.
1961 if (flag & O_NOATIME) {
1962 struct md_ucred *uc = md_ucred(env);
1964 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1965 (uc->mu_valid == UCRED_NEW)) &&
1966 (uc->mu_fsuid != tmp_la->la_uid) &&
1967 !mdd_capable(uc, CFS_CAP_FOWNER))
1975 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1978 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1981 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1983 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1985 mdd_obj->mod_count++;
1987 mdd_write_unlock(env, mdd_obj);
1991 /* return md_attr back,
1992 * if it is last unlink then return lov ea + llog cookie*/
1993 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1999 if (S_ISREG(mdd_object_type(obj))) {
2000 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2001 * Caller must be ready for that. */
2003 rc = __mdd_lmm_get(env, obj, ma);
2004 if ((ma->ma_valid & MA_LOV))
2005 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2012 * No permission check is needed.
2014 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2017 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2018 struct mdd_device *mdd = mdo2mdd(obj);
2019 struct thandle *handle;
2023 #ifdef HAVE_QUOTA_SUPPORT
2024 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2025 struct mds_obd *mds = &obd->u.mds;
2026 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2031 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2034 handle = mdd_trans_start(env, mdo2mdd(obj));
2036 RETURN(PTR_ERR(handle));
2038 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2039 /* release open count */
2040 mdd_obj->mod_count --;
2042 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2043 /* remove link to object from orphan index */
2044 rc = __mdd_orphan_del(env, mdd_obj, handle);
2046 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2047 "list, OSS objects to be destroyed.\n",
2048 PFID(mdd_object_fid(mdd_obj)));
2050 CERROR("Object "DFID" can not be deleted from orphan "
2051 "list, maybe cause OST objects can not be "
2052 "destroyed (err: %d).\n",
2053 PFID(mdd_object_fid(mdd_obj)), rc);
2054 /* If object was not deleted from orphan list, do not
2055 * destroy OSS objects, which will be done when next
2061 rc = mdd_iattr_get(env, mdd_obj, ma);
2062 /* Object maybe not in orphan list originally, it is rare case for
2063 * mdd_finish_unlink() failure. */
2064 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2065 #ifdef HAVE_QUOTA_SUPPORT
2066 if (mds->mds_quota) {
2067 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2068 mdd_quota_wrapper(&ma->ma_attr, qids);
2071 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2072 if (ma->ma_valid & MA_FLAGS &&
2073 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2074 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2076 rc = mdd_object_kill(env, mdd_obj, ma);
2082 CERROR("Error when prepare to delete Object "DFID" , "
2083 "which will cause OST objects can not be "
2084 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2090 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2092 mdd_write_unlock(env, mdd_obj);
2093 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2094 #ifdef HAVE_QUOTA_SUPPORT
2096 /* Trigger dqrel on the owner of child. If failed,
2097 * the next call for lquota_chkquota will process it */
2098 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2105 * Permission check is done when open,
2106 * no need check again.
2108 static int mdd_readpage_sanity_check(const struct lu_env *env,
2109 struct mdd_object *obj)
2111 struct dt_object *next = mdd_object_child(obj);
2115 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2123 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2124 int first, void *area, int nob,
2125 const struct dt_it_ops *iops, struct dt_it *it,
2126 __u64 *start, __u64 *end,
2127 struct lu_dirent **last, __u32 attr)
2131 struct lu_dirent *ent;
2134 memset(area, 0, sizeof (struct lu_dirpage));
2135 area += sizeof (struct lu_dirpage);
2136 nob -= sizeof (struct lu_dirpage);
2144 len = iops->key_size(env, it);
2146 /* IAM iterator can return record with zero len. */
2150 hash = iops->store(env, it);
2151 if (unlikely(first)) {
2156 /* calculate max space required for lu_dirent */
2157 recsize = lu_dirent_calc_size(len, attr);
2159 if (nob >= recsize) {
2160 result = iops->rec(env, it, ent, attr);
2161 if (result == -ESTALE)
2166 /* osd might not able to pack all attributes,
2167 * so recheck rec length */
2168 recsize = le16_to_cpu(ent->lde_reclen);
2171 * record doesn't fit into page, enlarge previous one.
2174 (*last)->lde_reclen =
2175 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2184 ent = (void *)ent + recsize;
2188 result = iops->next(env, it);
2189 if (result == -ESTALE)
2191 } while (result == 0);
2198 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2199 const struct lu_rdpg *rdpg)
2202 struct dt_object *next = mdd_object_child(obj);
2203 const struct dt_it_ops *iops;
2205 struct lu_dirent *last = NULL;
2206 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2213 LASSERT(rdpg->rp_pages != NULL);
2214 LASSERT(next->do_index_ops != NULL);
2216 if (rdpg->rp_count <= 0)
2220 * iterate through directory and fill pages from @rdpg
2222 iops = &next->do_index_ops->dio_it;
2223 it = iops->init(env, next, mdd_object_capa(env, obj));
2227 rc = iops->load(env, it, rdpg->rp_hash);
2231 * Iterator didn't find record with exactly the key requested.
2233 * It is currently either
2235 * - positioned above record with key less than
2236 * requested---skip it.
2238 * - or not positioned at all (is in IAM_IT_SKEWED
2239 * state)---position it on the next item.
2241 rc = iops->next(env, it);
2246 * At this point and across for-loop:
2248 * rc == 0 -> ok, proceed.
2249 * rc > 0 -> end of directory.
2252 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2253 i++, nob -= CFS_PAGE_SIZE) {
2254 LASSERT(i < rdpg->rp_npages);
2255 pg = rdpg->rp_pages[i];
2256 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2257 min_t(int, nob, CFS_PAGE_SIZE), iops,
2258 it, &hash_start, &hash_end, &last,
2260 if (rc != 0 || i == rdpg->rp_npages - 1) {
2262 last->lde_reclen = 0;
2270 hash_end = DIR_END_OFF;
2274 struct lu_dirpage *dp;
2276 dp = cfs_kmap(rdpg->rp_pages[0]);
2277 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2278 dp->ldp_hash_end = cpu_to_le64(hash_end);
2281 * No pages were processed, mark this.
2283 dp->ldp_flags |= LDF_EMPTY;
2285 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2286 cfs_kunmap(rdpg->rp_pages[0]);
2289 iops->fini(env, it);
2294 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2295 const struct lu_rdpg *rdpg)
2297 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2301 LASSERT(mdd_object_exists(mdd_obj));
2303 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2304 rc = mdd_readpage_sanity_check(env, mdd_obj);
2306 GOTO(out_unlock, rc);
2308 if (mdd_is_dead_obj(mdd_obj)) {
2310 struct lu_dirpage *dp;
2313 * According to POSIX, please do not return any entry to client:
2314 * even dot and dotdot should not be returned.
2316 CWARN("readdir from dead object: "DFID"\n",
2317 PFID(mdd_object_fid(mdd_obj)));
2319 if (rdpg->rp_count <= 0)
2320 GOTO(out_unlock, rc = -EFAULT);
2321 LASSERT(rdpg->rp_pages != NULL);
2323 pg = rdpg->rp_pages[0];
2324 dp = (struct lu_dirpage*)cfs_kmap(pg);
2325 memset(dp, 0 , sizeof(struct lu_dirpage));
2326 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2327 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2328 dp->ldp_flags |= LDF_EMPTY;
2329 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2331 GOTO(out_unlock, rc = 0);
2334 rc = __mdd_readpage(env, mdd_obj, rdpg);
2338 mdd_read_unlock(env, mdd_obj);
2342 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2344 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2345 struct dt_object *next;
2347 LASSERT(mdd_object_exists(mdd_obj));
2348 next = mdd_object_child(mdd_obj);
2349 return next->do_ops->do_object_sync(env, next);
2352 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2353 struct md_object *obj)
2355 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2357 LASSERT(mdd_object_exists(mdd_obj));
2358 return do_version_get(env, mdd_object_child(mdd_obj));
2361 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2362 dt_obj_version_t version)
2364 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2366 LASSERT(mdd_object_exists(mdd_obj));
2367 return do_version_set(env, mdd_object_child(mdd_obj), version);
2370 const struct md_object_operations mdd_obj_ops = {
2371 .moo_permission = mdd_permission,
2372 .moo_attr_get = mdd_attr_get,
2373 .moo_attr_set = mdd_attr_set,
2374 .moo_xattr_get = mdd_xattr_get,
2375 .moo_xattr_set = mdd_xattr_set,
2376 .moo_xattr_list = mdd_xattr_list,
2377 .moo_xattr_del = mdd_xattr_del,
2378 .moo_object_create = mdd_object_create,
2379 .moo_ref_add = mdd_ref_add,
2380 .moo_ref_del = mdd_ref_del,
2381 .moo_open = mdd_open,
2382 .moo_close = mdd_close,
2383 .moo_readpage = mdd_readpage,
2384 .moo_readlink = mdd_readlink,
2385 .moo_capa_get = mdd_capa_get,
2386 .moo_object_sync = mdd_object_sync,
2387 .moo_version_get = mdd_version_get,
2388 .moo_version_set = mdd_version_set,
2389 .moo_path = mdd_path,