1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
151 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
153 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
155 OBD_VFREE(buf->lb_buf, buf->lb_len);
157 OBD_FREE(buf->lb_buf, buf->lb_len);
160 if (buf->lb_buf == NULL) {
162 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163 OBD_ALLOC(buf->lb_buf, buf->lb_len);
166 if (buf->lb_buf == NULL) {
167 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
170 if (buf->lb_buf == NULL)
176 /** Increase the size of the \a mti_big_buf.
177 * preserves old data in buffer
178 * old buffer remains unchanged on error
179 * \retval 0 or -ENOMEM
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
183 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
186 LASSERT(len >= oldbuf->lb_len);
187 if (len > BUF_VMALLOC_SIZE) {
188 OBD_VMALLOC(buf.lb_buf, len);
191 OBD_ALLOC(buf.lb_buf, len);
194 if (buf.lb_buf == NULL)
198 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
200 if (oldbuf->lb_vmalloc)
201 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
203 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
205 memcpy(oldbuf, &buf, sizeof(buf));
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211 struct mdd_device *mdd)
213 struct mdd_thread_info *mti = mdd_env_info(env);
216 max_cookie_size = mdd_lov_cookiesize(env, mdd);
217 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218 if (mti->mti_max_cookie)
219 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220 mti->mti_max_cookie = NULL;
221 mti->mti_max_cookie_size = 0;
223 if (unlikely(mti->mti_max_cookie == NULL)) {
224 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225 if (likely(mti->mti_max_cookie != NULL))
226 mti->mti_max_cookie_size = max_cookie_size;
228 if (likely(mti->mti_max_cookie != NULL))
229 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230 return mti->mti_max_cookie;
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234 struct mdd_device *mdd)
236 struct mdd_thread_info *mti = mdd_env_info(env);
239 max_lmm_size = mdd_lov_mdsize(env, mdd);
240 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241 if (mti->mti_max_lmm)
242 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243 mti->mti_max_lmm = NULL;
244 mti->mti_max_lmm_size = 0;
246 if (unlikely(mti->mti_max_lmm == NULL)) {
247 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248 if (likely(mti->mti_max_lmm != NULL))
249 mti->mti_max_lmm_size = max_lmm_size;
251 return mti->mti_max_lmm;
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255 const struct lu_object_header *hdr,
258 struct mdd_object *mdd_obj;
260 OBD_ALLOC_PTR(mdd_obj);
261 if (mdd_obj != NULL) {
264 o = mdd2lu_obj(mdd_obj);
265 lu_object_init(o, NULL, d);
266 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268 mdd_obj->mod_count = 0;
269 o->lo_ops = &mdd_lu_obj_ops;
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277 const struct lu_object_conf *unused)
279 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280 struct mdd_object *mdd_obj = lu2mdd_obj(o);
281 struct lu_object *below;
282 struct lu_device *under;
285 mdd_obj->mod_cltime = 0;
286 under = &d->mdd_child->dd_lu_dev;
287 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288 mdd_pdlock_init(mdd_obj);
292 lu_object_add(o, below);
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
299 if (lu_object_exists(o))
300 return mdd_get_flags(env, lu2mdd_obj(o));
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
307 struct mdd_object *mdd = lu2mdd_obj(o);
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314 lu_printer_t p, const struct lu_object *o)
316 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318 "valid=%x, cltime="LPU64", flags=%lx)",
319 mdd, mdd->mod_count, mdd->mod_valid,
320 mdd->mod_cltime, mdd->mod_flags);
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324 .loo_object_init = mdd_object_init,
325 .loo_object_start = mdd_object_start,
326 .loo_object_free = mdd_object_free,
327 .loo_object_print = mdd_object_print,
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331 struct mdd_device *d,
332 const struct lu_fid *f)
334 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338 const char *path, struct lu_fid *fid)
341 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342 struct mdd_object *obj;
343 struct lu_name *lname = &mdd_env_info(env)->mti_name;
348 /* temp buffer for path element */
349 buf = mdd_buf_alloc(env, PATH_MAX);
350 if (buf->lb_buf == NULL)
353 lname->ln_name = name = buf->lb_buf;
354 lname->ln_namelen = 0;
355 *f = mdd->mdd_root_fid;
362 while (*path != '/' && *path != '\0') {
370 /* find obj corresponding to fid */
371 obj = mdd_object_find(env, mdd, f);
373 GOTO(out, rc = -EREMOTE);
375 GOTO(out, rc = PTR_ERR(obj));
376 /* get child fid from parent and name */
377 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378 mdd_object_put(env, obj);
383 lname->ln_namelen = 0;
392 /** The maximum depth that fid2path() will search.
393 * This is limited only because we want to store the fids for
394 * historical path lookup purposes.
396 #define MAX_PATH_DEPTH 100
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400 __u64 pli_recno; /**< history point */
401 __u64 pli_currec; /**< current record */
402 struct lu_fid pli_fid;
403 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404 struct mdd_object *pli_mdd_obj;
405 char *pli_path; /**< full path */
407 int pli_linkno; /**< which hardlink to follow */
408 int pli_fidcount; /**< number of \a pli_fids */
411 static int mdd_path_current(const struct lu_env *env,
412 struct path_lookup_info *pli)
414 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415 struct mdd_object *mdd_obj;
416 struct lu_buf *buf = NULL;
417 struct link_ea_header *leh;
418 struct link_ea_entry *lee;
419 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
426 ptr = pli->pli_path + pli->pli_pathlen - 1;
429 pli->pli_fidcount = 0;
430 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
432 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433 mdd_obj = mdd_object_find(env, mdd,
434 &pli->pli_fids[pli->pli_fidcount]);
436 GOTO(out, rc = -EREMOTE);
438 GOTO(out, rc = PTR_ERR(mdd_obj));
439 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
441 mdd_object_put(env, mdd_obj);
445 /* Do I need to error out here? */
450 /* Get parent fid and object name */
451 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452 buf = mdd_links_get(env, mdd_obj);
453 mdd_read_unlock(env, mdd_obj);
454 mdd_object_put(env, mdd_obj);
456 GOTO(out, rc = PTR_ERR(buf));
459 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462 /* If set, use link #linkno for path lookup, otherwise use
463 link #0. Only do this for the final path element. */
464 if ((pli->pli_fidcount == 0) &&
465 (pli->pli_linkno < leh->leh_reccount)) {
467 for (count = 0; count < pli->pli_linkno; count++) {
468 lee = (struct link_ea_entry *)
469 ((char *)lee + reclen);
470 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
472 if (pli->pli_linkno < leh->leh_reccount - 1)
473 /* indicate to user there are more links */
477 /* Pack the name in the end of the buffer */
478 ptr -= tmpname->ln_namelen;
479 if (ptr - 1 <= pli->pli_path)
480 GOTO(out, rc = -EOVERFLOW);
481 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
484 /* Store the parent fid for historic lookup */
485 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486 GOTO(out, rc = -EOVERFLOW);
487 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
490 /* Verify that our path hasn't changed since we started the lookup.
491 Record the current index, and verify the path resolves to the
492 same fid. If it does, then the path is correct as of this index. */
493 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494 pli->pli_currec = mdd->mdd_cl.mc_index;
495 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
498 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499 GOTO (out, rc = -EAGAIN);
501 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504 PFID(&pli->pli_fid));
505 GOTO(out, rc = -EAGAIN);
507 ptr++; /* skip leading / */
508 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
512 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513 /* if we vmalloced a large buffer drop it */
519 static int mdd_path_historic(const struct lu_env *env,
520 struct path_lookup_info *pli)
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527 char *path, int pathlen, __u64 *recno, int *linkno)
529 struct path_lookup_info *pli;
537 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
611 struct lov_desc *ldesc;
612 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
613 struct lov_user_md *lum = (struct lov_user_md*)lmm;
619 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
620 LASSERT(ldesc != NULL);
622 lum->lmm_magic = LOV_MAGIC_V1;
623 lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
624 lum->lmm_pattern = ldesc->ld_pattern;
625 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
629 RETURN(sizeof(*lum));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
644 if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
645 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
647 ma->ma_lmm_size = rc;
648 ma->ma_valid |= MA_LOV;
654 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
660 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
661 rc = __mdd_lmm_get(env, mdd_obj, ma);
662 mdd_read_unlock(env, mdd_obj);
667 static int __mdd_lmv_get(const struct lu_env *env,
668 struct mdd_object *mdd_obj, struct md_attr *ma)
673 if (ma->ma_valid & MA_LMV)
676 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
679 ma->ma_valid |= MA_LMV;
685 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
688 struct mdd_thread_info *info = mdd_env_info(env);
689 struct lustre_mdt_attrs *lma =
690 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
695 /* If all needed data are already valid, nothing to do */
696 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
697 (ma->ma_need & (MA_HSM | MA_SOM)))
700 /* Read LMA from disk EA */
701 lma_size = sizeof(info->mti_xattr_buf);
702 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
706 /* Useless to check LMA incompatibility because this is already done in
707 * osd_ea_fid_get(), and this will fail long before this code is
709 * So, if we are here, LMA is compatible.
712 lustre_lma_swab(lma);
714 /* Swab and copy LMA */
715 if (ma->ma_need & MA_HSM) {
716 if (lma->lma_compat & LMAC_HSM)
717 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
719 ma->ma_hsm.mh_flags = 0;
720 ma->ma_valid |= MA_HSM;
724 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
725 LASSERT(ma->ma_som != NULL);
726 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
727 ma->ma_som->msd_size = lma->lma_som_size;
728 ma->ma_som->msd_blocks = lma->lma_som_blocks;
729 ma->ma_som->msd_mountid = lma->lma_som_mountid;
730 ma->ma_valid |= MA_SOM;
736 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
742 if (ma->ma_need & MA_INODE)
743 rc = mdd_iattr_get(env, mdd_obj, ma);
745 if (rc == 0 && ma->ma_need & MA_LOV) {
746 if (S_ISREG(mdd_object_type(mdd_obj)) ||
747 S_ISDIR(mdd_object_type(mdd_obj)))
748 rc = __mdd_lmm_get(env, mdd_obj, ma);
750 if (rc == 0 && ma->ma_need & MA_LMV) {
751 if (S_ISDIR(mdd_object_type(mdd_obj)))
752 rc = __mdd_lmv_get(env, mdd_obj, ma);
754 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
755 if (S_ISREG(mdd_object_type(mdd_obj)))
756 rc = __mdd_lma_get(env, mdd_obj, ma);
758 #ifdef CONFIG_FS_POSIX_ACL
759 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
760 if (S_ISDIR(mdd_object_type(mdd_obj)))
761 rc = mdd_def_acl_get(env, mdd_obj, ma);
764 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
765 rc, ma->ma_valid, ma->ma_lmm);
769 int mdd_attr_get_internal_locked(const struct lu_env *env,
770 struct mdd_object *mdd_obj, struct md_attr *ma)
773 int needlock = ma->ma_need &
774 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
777 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
778 rc = mdd_attr_get_internal(env, mdd_obj, ma);
780 mdd_read_unlock(env, mdd_obj);
785 * No permission check is needed.
787 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
790 struct mdd_object *mdd_obj = md2mdd_obj(obj);
794 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
799 * No permission check is needed.
801 static int mdd_xattr_get(const struct lu_env *env,
802 struct md_object *obj, struct lu_buf *buf,
805 struct mdd_object *mdd_obj = md2mdd_obj(obj);
810 LASSERT(mdd_object_exists(mdd_obj));
812 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
813 rc = mdo_xattr_get(env, mdd_obj, buf, name,
814 mdd_object_capa(env, mdd_obj));
815 mdd_read_unlock(env, mdd_obj);
821 * Permission check is done when open,
822 * no need check again.
824 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
828 struct dt_object *next;
833 LASSERT(mdd_object_exists(mdd_obj));
835 next = mdd_object_child(mdd_obj);
836 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
837 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
838 mdd_object_capa(env, mdd_obj));
839 mdd_read_unlock(env, mdd_obj);
844 * No permission check is needed.
846 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
854 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
855 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
856 mdd_read_unlock(env, mdd_obj);
861 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
862 struct mdd_object *c, struct md_attr *ma,
863 struct thandle *handle,
864 const struct md_op_spec *spec)
866 struct lu_attr *attr = &ma->ma_attr;
867 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
868 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
869 const struct dt_index_features *feat = spec->sp_feat;
873 if (!mdd_object_exists(c)) {
874 struct dt_object *next = mdd_object_child(c);
877 if (feat != &dt_directory_features && feat != NULL)
878 dof->dof_type = DFT_INDEX;
880 dof->dof_type = dt_mode_to_dft(attr->la_mode);
882 dof->u.dof_idx.di_feat = feat;
884 /* @hint will be initialized by underlying device. */
885 next->do_ops->do_ah_init(env, hint,
886 p ? mdd_object_child(p) : NULL,
887 attr->la_mode & S_IFMT);
889 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
890 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
898 * Make sure the ctime is increased only.
900 static inline int mdd_attr_check(const struct lu_env *env,
901 struct mdd_object *obj,
902 struct lu_attr *attr)
904 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
908 if (attr->la_valid & LA_CTIME) {
909 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
913 if (attr->la_ctime < tmp_la->la_ctime)
914 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
915 else if (attr->la_valid == LA_CTIME &&
916 attr->la_ctime == tmp_la->la_ctime)
917 attr->la_valid &= ~LA_CTIME;
922 int mdd_attr_set_internal(const struct lu_env *env,
923 struct mdd_object *obj,
924 struct lu_attr *attr,
925 struct thandle *handle,
931 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
932 #ifdef CONFIG_FS_POSIX_ACL
933 if (!rc && (attr->la_valid & LA_MODE) && needacl)
934 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
939 int mdd_attr_check_set_internal(const struct lu_env *env,
940 struct mdd_object *obj,
941 struct lu_attr *attr,
942 struct thandle *handle,
948 rc = mdd_attr_check(env, obj, attr);
953 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
957 static int mdd_attr_set_internal_locked(const struct lu_env *env,
958 struct mdd_object *obj,
959 struct lu_attr *attr,
960 struct thandle *handle,
966 needacl = needacl && (attr->la_valid & LA_MODE);
968 mdd_write_lock(env, obj, MOR_TGT_CHILD);
969 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
971 mdd_write_unlock(env, obj);
975 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
976 struct mdd_object *obj,
977 struct lu_attr *attr,
978 struct thandle *handle,
984 needacl = needacl && (attr->la_valid & LA_MODE);
986 mdd_write_lock(env, obj, MOR_TGT_CHILD);
987 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
989 mdd_write_unlock(env, obj);
993 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
994 const struct lu_buf *buf, const char *name,
995 int fl, struct thandle *handle)
997 struct lustre_capa *capa = mdd_object_capa(env, obj);
1001 if (buf->lb_buf && buf->lb_len > 0)
1002 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1003 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1004 rc = mdo_xattr_del(env, obj, name, handle, capa);
1010 * This gives the same functionality as the code between
1011 * sys_chmod and inode_setattr
1012 * chown_common and inode_setattr
1013 * utimes and inode_setattr
1014 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1016 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1017 struct lu_attr *la, const struct md_attr *ma)
1019 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1020 struct md_ucred *uc;
1027 /* Do not permit change file type */
1028 if (la->la_valid & LA_TYPE)
1031 /* They should not be processed by setattr */
1032 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1035 /* export destroy does not have ->le_ses, but we may want
1036 * to drop LUSTRE_SOM_FL. */
1042 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1046 if (la->la_valid == LA_CTIME) {
1047 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1048 /* This is only for set ctime when rename's source is
1050 rc = mdd_may_delete(env, NULL, obj,
1051 (struct md_attr *)ma, 1, 0);
1052 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1053 la->la_valid &= ~LA_CTIME;
1057 if (la->la_valid == LA_ATIME) {
1058 /* This is atime only set for read atime update on close. */
1059 if (la->la_atime > tmp_la->la_atime &&
1060 la->la_atime <= (tmp_la->la_atime +
1061 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1062 la->la_valid &= ~LA_ATIME;
1066 /* Check if flags change. */
1067 if (la->la_valid & LA_FLAGS) {
1068 unsigned int oldflags = 0;
1069 unsigned int newflags = la->la_flags &
1070 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1072 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1073 !mdd_capable(uc, CFS_CAP_FOWNER))
1076 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1077 * only be changed by the relevant capability. */
1078 if (mdd_is_immutable(obj))
1079 oldflags |= LUSTRE_IMMUTABLE_FL;
1080 if (mdd_is_append(obj))
1081 oldflags |= LUSTRE_APPEND_FL;
1082 if ((oldflags ^ newflags) &&
1083 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1086 if (!S_ISDIR(tmp_la->la_mode))
1087 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1090 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1091 (la->la_valid & ~LA_FLAGS) &&
1092 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1095 /* Check for setting the obj time. */
1096 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1097 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1098 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1099 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1100 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1108 /* Make sure a caller can chmod. */
1109 if (la->la_valid & LA_MODE) {
1110 /* Bypass la_vaild == LA_MODE,
1111 * this is for changing file with SUID or SGID. */
1112 if ((la->la_valid & ~LA_MODE) &&
1113 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1114 (uc->mu_fsuid != tmp_la->la_uid) &&
1115 !mdd_capable(uc, CFS_CAP_FOWNER))
1118 if (la->la_mode == (cfs_umode_t) -1)
1119 la->la_mode = tmp_la->la_mode;
1121 la->la_mode = (la->la_mode & S_IALLUGO) |
1122 (tmp_la->la_mode & ~S_IALLUGO);
1124 /* Also check the setgid bit! */
1125 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1126 la->la_gid : tmp_la->la_gid) &&
1127 !mdd_capable(uc, CFS_CAP_FSETID))
1128 la->la_mode &= ~S_ISGID;
1130 la->la_mode = tmp_la->la_mode;
1133 /* Make sure a caller can chown. */
1134 if (la->la_valid & LA_UID) {
1135 if (la->la_uid == (uid_t) -1)
1136 la->la_uid = tmp_la->la_uid;
1137 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1138 (la->la_uid != tmp_la->la_uid)) &&
1139 !mdd_capable(uc, CFS_CAP_CHOWN))
1142 /* If the user or group of a non-directory has been
1143 * changed by a non-root user, remove the setuid bit.
1144 * 19981026 David C Niemi <niemi@tux.org>
1146 * Changed this to apply to all users, including root,
1147 * to avoid some races. This is the behavior we had in
1148 * 2.0. The check for non-root was definitely wrong
1149 * for 2.2 anyway, as it should have been using
1150 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1151 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1152 !S_ISDIR(tmp_la->la_mode)) {
1153 la->la_mode &= ~S_ISUID;
1154 la->la_valid |= LA_MODE;
1158 /* Make sure caller can chgrp. */
1159 if (la->la_valid & LA_GID) {
1160 if (la->la_gid == (gid_t) -1)
1161 la->la_gid = tmp_la->la_gid;
1162 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1163 ((la->la_gid != tmp_la->la_gid) &&
1164 !lustre_in_group_p(uc, la->la_gid))) &&
1165 !mdd_capable(uc, CFS_CAP_CHOWN))
1168 /* Likewise, if the user or group of a non-directory
1169 * has been changed by a non-root user, remove the
1170 * setgid bit UNLESS there is no group execute bit
1171 * (this would be a file marked for mandatory
1172 * locking). 19981026 David C Niemi <niemi@tux.org>
1174 * Removed the fsuid check (see the comment above) --
1176 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1177 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1178 la->la_mode &= ~S_ISGID;
1179 la->la_valid |= LA_MODE;
1183 /* For both Size-on-MDS case and truncate case,
1184 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1185 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1186 * For SOM case, it is true, the MAY_WRITE perm has been checked
1187 * when open, no need check again. For truncate case, it is false,
1188 * the MAY_WRITE perm should be checked here. */
1189 if (ma->ma_attr_flags & MDS_SOM) {
1190 /* For the "Size-on-MDS" setattr update, merge coming
1191 * attributes with the set in the inode. BUG 10641 */
1192 if ((la->la_valid & LA_ATIME) &&
1193 (la->la_atime <= tmp_la->la_atime))
1194 la->la_valid &= ~LA_ATIME;
1196 /* OST attributes do not have a priority over MDS attributes,
1197 * so drop times if ctime is equal. */
1198 if ((la->la_valid & LA_CTIME) &&
1199 (la->la_ctime <= tmp_la->la_ctime))
1200 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1202 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1203 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1204 (uc->mu_fsuid == tmp_la->la_uid)) &&
1205 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1206 rc = mdd_permission_internal_locked(env, obj,
1213 if (la->la_valid & LA_CTIME) {
1214 /* The pure setattr, it has the priority over what is
1215 * already set, do not drop it if ctime is equal. */
1216 if (la->la_ctime < tmp_la->la_ctime)
1217 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1225 /** Store a data change changelog record
1226 * If this fails, we must fail the whole transaction; we don't
1227 * want the change to commit without the log entry.
1228 * \param mdd_obj - mdd_object of change
1229 * \param handle - transacion handle
1231 static int mdd_changelog_data_store(const struct lu_env *env,
1232 struct mdd_device *mdd,
1233 enum changelog_rec_type type,
1235 struct mdd_object *mdd_obj,
1236 struct thandle *handle)
1238 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1239 struct llog_changelog_rec *rec;
1245 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1247 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1250 LASSERT(handle != NULL);
1251 LASSERT(mdd_obj != NULL);
1253 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1254 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1255 /* Don't need multiple updates in this log */
1256 /* Don't check under lock - no big deal if we get an extra
1261 reclen = llog_data_len(sizeof(*rec));
1262 buf = mdd_buf_alloc(env, reclen);
1263 if (buf->lb_buf == NULL)
1265 rec = (struct llog_changelog_rec *)buf->lb_buf;
1267 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1268 rec->cr.cr_type = (__u32)type;
1269 rec->cr.cr_tfid = *tfid;
1270 rec->cr.cr_namelen = 0;
1271 mdd_obj->mod_cltime = cfs_time_current_64();
1273 rc = mdd_changelog_llog_write(mdd, rec, handle);
1275 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1276 rc, type, PFID(tfid));
1283 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1284 int flags, struct md_object *obj)
1286 struct thandle *handle;
1287 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1288 struct mdd_device *mdd = mdo2mdd(obj);
1292 handle = mdd_trans_start(env, mdd);
1295 return(PTR_ERR(handle));
1297 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1300 mdd_trans_stop(env, mdd, rc, handle);
1306 * Should be called with write lock held.
1308 * \see mdd_lma_set_locked().
1310 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1311 const struct md_attr *ma, struct thandle *handle)
1313 struct mdd_thread_info *info = mdd_env_info(env);
1315 struct lustre_mdt_attrs *lma =
1316 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1317 int lmasize = sizeof(struct lustre_mdt_attrs);
1322 /* Either HSM or SOM part is not valid, we need to read it before */
1323 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1324 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1328 lustre_lma_swab(lma);
1330 memset(lma, 0, lmasize);
1334 if (ma->ma_valid & MA_HSM) {
1335 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1336 lma->lma_compat |= LMAC_HSM;
1340 if (ma->ma_valid & MA_SOM) {
1341 LASSERT(ma->ma_som != NULL);
1342 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1343 lma->lma_compat &= ~LMAC_SOM;
1345 lma->lma_compat |= LMAC_SOM;
1346 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1347 lma->lma_som_size = ma->ma_som->msd_size;
1348 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1349 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1354 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1356 lustre_lma_swab(lma);
1357 buf = mdd_buf_get(env, lma, lmasize);
1358 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1364 * Save LMA extended attributes with data from \a ma.
1366 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1367 * not, LMA EA will be first read from disk, modified and write back.
1370 static int mdd_lma_set_locked(const struct lu_env *env,
1371 struct mdd_object *mdd_obj,
1372 const struct md_attr *ma, struct thandle *handle)
1376 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1377 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1378 mdd_write_unlock(env, mdd_obj);
1382 /* Precedence for choosing record type when multiple
1383 * attributes change: setattr > mtime > ctime > atime
1384 * (ctime changes when mtime does, plus chmod/chown.
1385 * atime and ctime are independent.) */
1386 static int mdd_attr_set_changelog(const struct lu_env *env,
1387 struct md_object *obj, struct thandle *handle,
1390 struct mdd_device *mdd = mdo2mdd(obj);
1393 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1394 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1395 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1396 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1397 bits = bits & mdd->mdd_cl.mc_mask;
1401 /* The record type is the lowest non-masked set bit */
1402 while (bits && ((bits & 1) == 0)) {
1407 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1408 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1409 md2mdd_obj(obj), handle);
1412 /* set attr and LOV EA at once, return updated attr */
1413 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1414 const struct md_attr *ma)
1416 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1417 struct mdd_device *mdd = mdo2mdd(obj);
1418 struct thandle *handle;
1419 struct lov_mds_md *lmm = NULL;
1420 struct llog_cookie *logcookies = NULL;
1421 int rc, lmm_size = 0, cookie_size = 0;
1422 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1423 #ifdef HAVE_QUOTA_SUPPORT
1424 struct obd_device *obd = mdd->mdd_obd_dev;
1425 struct mds_obd *mds = &obd->u.mds;
1426 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1427 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1428 int quota_opc = 0, block_count = 0;
1429 int inode_pending[MAXQUOTAS] = { 0, 0 };
1430 int block_pending[MAXQUOTAS] = { 0, 0 };
1434 *la_copy = ma->ma_attr;
1435 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1439 /* setattr on "close" only change atime, or do nothing */
1440 if (ma->ma_valid == MA_INODE &&
1441 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1444 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1445 MDD_TXN_ATTR_SET_OP);
1446 handle = mdd_trans_start(env, mdd);
1448 RETURN(PTR_ERR(handle));
1449 /*TODO: add lock here*/
1450 /* start a log jounal handle if needed */
1451 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1452 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1453 lmm_size = mdd_lov_mdsize(env, mdd);
1454 lmm = mdd_max_lmm_get(env, mdd);
1456 GOTO(cleanup, rc = -ENOMEM);
1458 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1465 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1466 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1467 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1469 #ifdef HAVE_QUOTA_SUPPORT
1470 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1471 struct obd_export *exp = md_quota(env)->mq_exp;
1472 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1474 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1476 quota_opc = FSFILT_OP_SETATTR;
1477 mdd_quota_wrapper(la_copy, qnids);
1478 mdd_quota_wrapper(la_tmp, qoids);
1479 /* get file quota for new owner */
1480 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1481 qnids, inode_pending, 1, NULL, 0,
1483 block_count = (la_tmp->la_blocks + 7) >> 3;
1486 mdd_data_get(env, mdd_obj, &data);
1487 /* get block quota for new owner */
1488 lquota_chkquota(mds_quota_interface_ref, obd,
1489 exp, qnids, block_pending,
1491 LQUOTA_FLAGS_BLK, data, 1);
1497 if (la_copy->la_valid & LA_FLAGS) {
1498 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1501 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1502 } else if (la_copy->la_valid) { /* setattr */
1503 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1505 /* journal chown/chgrp in llog, just like unlink */
1506 if (rc == 0 && lmm_size){
1507 cookie_size = mdd_lov_cookiesize(env, mdd);
1508 logcookies = mdd_max_cookie_get(env, mdd);
1509 if (logcookies == NULL)
1510 GOTO(cleanup, rc = -ENOMEM);
1512 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1513 logcookies, cookie_size) <= 0)
1518 if (rc == 0 && ma->ma_valid & MA_LOV) {
1521 mode = mdd_object_type(mdd_obj);
1522 if (S_ISREG(mode) || S_ISDIR(mode)) {
1523 rc = mdd_lsm_sanity_check(env, mdd_obj);
1527 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1528 ma->ma_lmm_size, handle, 1);
1532 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1535 mode = mdd_object_type(mdd_obj);
1537 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1542 rc = mdd_attr_set_changelog(env, obj, handle,
1543 ma->ma_attr.la_valid);
1544 mdd_trans_stop(env, mdd, rc, handle);
1545 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1546 /*set obd attr, if needed*/
1547 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1550 #ifdef HAVE_QUOTA_SUPPORT
1552 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1554 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1556 /* Trigger dqrel/dqacq for original owner and new owner.
1557 * If failed, the next call for lquota_chkquota will
1559 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1566 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1567 const struct lu_buf *buf, const char *name, int fl,
1568 struct thandle *handle)
1573 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1574 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1575 mdd_write_unlock(env, obj);
1580 static int mdd_xattr_sanity_check(const struct lu_env *env,
1581 struct mdd_object *obj)
1583 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1584 struct md_ucred *uc = md_ucred(env);
1588 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1591 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1595 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1596 !mdd_capable(uc, CFS_CAP_FOWNER))
1603 * The caller should guarantee to update the object ctime
1604 * after xattr_set if needed.
1606 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1607 const struct lu_buf *buf, const char *name,
1610 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1611 struct mdd_device *mdd = mdo2mdd(obj);
1612 struct thandle *handle;
1616 rc = mdd_xattr_sanity_check(env, mdd_obj);
1620 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1621 /* security-replated changes may require sync */
1622 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1623 mdd->mdd_sync_permission == 1)
1624 txn_param_sync(&mdd_env_info(env)->mti_param);
1626 handle = mdd_trans_start(env, mdd);
1628 RETURN(PTR_ERR(handle));
1630 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1632 /* Only record user xattr changes */
1633 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1634 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1636 mdd_trans_stop(env, mdd, rc, handle);
1642 * The caller should guarantee to update the object ctime
1643 * after xattr_set if needed.
1645 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1648 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1649 struct mdd_device *mdd = mdo2mdd(obj);
1650 struct thandle *handle;
1654 rc = mdd_xattr_sanity_check(env, mdd_obj);
1658 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1659 handle = mdd_trans_start(env, mdd);
1661 RETURN(PTR_ERR(handle));
1663 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1664 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1665 mdd_object_capa(env, mdd_obj));
1666 mdd_write_unlock(env, mdd_obj);
1668 /* Only record user xattr changes */
1669 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1670 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1673 mdd_trans_stop(env, mdd, rc, handle);
1678 /* partial unlink */
1679 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1682 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1683 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1684 struct mdd_device *mdd = mdo2mdd(obj);
1685 struct thandle *handle;
1686 #ifdef HAVE_QUOTA_SUPPORT
1687 struct obd_device *obd = mdd->mdd_obd_dev;
1688 struct mds_obd *mds = &obd->u.mds;
1689 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1696 * Check -ENOENT early here because we need to get object type
1697 * to calculate credits before transaction start
1699 if (!mdd_object_exists(mdd_obj))
1702 LASSERT(mdd_object_exists(mdd_obj) > 0);
1704 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1708 handle = mdd_trans_start(env, mdd);
1712 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1714 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1718 __mdd_ref_del(env, mdd_obj, handle, 0);
1720 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1722 __mdd_ref_del(env, mdd_obj, handle, 1);
1725 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1726 la_copy->la_ctime = ma->ma_attr.la_ctime;
1728 la_copy->la_valid = LA_CTIME;
1729 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1733 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1734 #ifdef HAVE_QUOTA_SUPPORT
1735 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1736 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1737 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1738 mdd_quota_wrapper(&ma->ma_attr, qids);
1745 mdd_write_unlock(env, mdd_obj);
1746 mdd_trans_stop(env, mdd, rc, handle);
1747 #ifdef HAVE_QUOTA_SUPPORT
1749 /* Trigger dqrel on the owner of child. If failed,
1750 * the next call for lquota_chkquota will process it */
1751 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1757 /* partial operation */
1758 static int mdd_oc_sanity_check(const struct lu_env *env,
1759 struct mdd_object *obj,
1765 switch (ma->ma_attr.la_mode & S_IFMT) {
1782 static int mdd_object_create(const struct lu_env *env,
1783 struct md_object *obj,
1784 const struct md_op_spec *spec,
1788 struct mdd_device *mdd = mdo2mdd(obj);
1789 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1790 const struct lu_fid *pfid = spec->u.sp_pfid;
1791 struct thandle *handle;
1792 #ifdef HAVE_QUOTA_SUPPORT
1793 struct obd_device *obd = mdd->mdd_obd_dev;
1794 struct obd_export *exp = md_quota(env)->mq_exp;
1795 struct mds_obd *mds = &obd->u.mds;
1796 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1797 int quota_opc = 0, block_count = 0;
1798 int inode_pending[MAXQUOTAS] = { 0, 0 };
1799 int block_pending[MAXQUOTAS] = { 0, 0 };
1804 #ifdef HAVE_QUOTA_SUPPORT
1805 if (mds->mds_quota) {
1806 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1807 mdd_quota_wrapper(&ma->ma_attr, qids);
1808 /* get file quota for child */
1809 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1810 qids, inode_pending, 1, NULL, 0,
1812 switch (ma->ma_attr.la_mode & S_IFMT) {
1821 /* get block quota for child */
1823 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1824 qids, block_pending, block_count,
1825 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1829 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1830 handle = mdd_trans_start(env, mdd);
1832 GOTO(out_pending, rc = PTR_ERR(handle));
1834 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1835 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1839 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1843 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1844 /* If creating the slave object, set slave EA here. */
1845 int lmv_size = spec->u.sp_ea.eadatalen;
1846 struct lmv_stripe_md *lmv;
1848 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1849 LASSERT(lmv != NULL && lmv_size > 0);
1851 rc = __mdd_xattr_set(env, mdd_obj,
1852 mdd_buf_get_const(env, lmv, lmv_size),
1853 XATTR_NAME_LMV, 0, handle);
1857 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1860 #ifdef CONFIG_FS_POSIX_ACL
1861 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1862 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1864 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1865 buf->lb_len = spec->u.sp_ea.eadatalen;
1866 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1867 rc = __mdd_acl_init(env, mdd_obj, buf,
1868 &ma->ma_attr.la_mode,
1873 ma->ma_attr.la_valid |= LA_MODE;
1876 pfid = spec->u.sp_ea.fid;
1879 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1885 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1886 mdd_write_unlock(env, mdd_obj);
1888 mdd_trans_stop(env, mdd, rc, handle);
1890 #ifdef HAVE_QUOTA_SUPPORT
1892 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1894 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1896 /* Trigger dqacq on the owner of child. If failed,
1897 * the next call for lquota_chkquota will process it. */
1898 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1906 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1907 const struct md_attr *ma)
1909 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1910 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1911 struct mdd_device *mdd = mdo2mdd(obj);
1912 struct thandle *handle;
1916 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1917 handle = mdd_trans_start(env, mdd);
1921 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1922 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1924 __mdd_ref_add(env, mdd_obj, handle);
1925 mdd_write_unlock(env, mdd_obj);
1927 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1928 la_copy->la_ctime = ma->ma_attr.la_ctime;
1930 la_copy->la_valid = LA_CTIME;
1931 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1934 mdd_trans_stop(env, mdd, 0, handle);
1940 * do NOT or the MAY_*'s, you'll get the weakest
1942 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1946 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1947 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1948 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1949 * owner can write to a file even if it is marked readonly to hide
1950 * its brokenness. (bug 5781) */
1951 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1952 struct md_ucred *uc = md_ucred(env);
1954 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1955 (la->la_uid == uc->mu_fsuid))
1959 if (flags & FMODE_READ)
1961 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1963 if (flags & MDS_FMODE_EXEC)
1968 static int mdd_open_sanity_check(const struct lu_env *env,
1969 struct mdd_object *obj, int flag)
1971 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1976 if (mdd_is_dead_obj(obj))
1979 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1983 if (S_ISLNK(tmp_la->la_mode))
1986 mode = accmode(env, tmp_la, flag);
1988 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1991 if (!(flag & MDS_OPEN_CREATED)) {
1992 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1997 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1998 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1999 flag &= ~MDS_OPEN_TRUNC;
2001 /* For writing append-only file must open it with append mode. */
2002 if (mdd_is_append(obj)) {
2003 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2005 if (flag & MDS_OPEN_TRUNC)
2011 * Now, flag -- O_NOATIME does not be packed by client.
2013 if (flag & O_NOATIME) {
2014 struct md_ucred *uc = md_ucred(env);
2016 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2017 (uc->mu_valid == UCRED_NEW)) &&
2018 (uc->mu_fsuid != tmp_la->la_uid) &&
2019 !mdd_capable(uc, CFS_CAP_FOWNER))
2027 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2030 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2033 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2035 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2037 mdd_obj->mod_count++;
2039 mdd_write_unlock(env, mdd_obj);
2043 /* return md_attr back,
2044 * if it is last unlink then return lov ea + llog cookie*/
2045 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2051 if (S_ISREG(mdd_object_type(obj))) {
2052 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2053 * Caller must be ready for that. */
2055 rc = __mdd_lmm_get(env, obj, ma);
2056 if ((ma->ma_valid & MA_LOV))
2057 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2064 * No permission check is needed.
2066 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2069 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2070 struct mdd_device *mdd = mdo2mdd(obj);
2071 struct thandle *handle = NULL;
2075 #ifdef HAVE_QUOTA_SUPPORT
2076 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2077 struct mds_obd *mds = &obd->u.mds;
2078 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2083 /* check without any lock */
2084 if (mdd_obj->mod_count == 1 &&
2085 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2087 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2090 handle = mdd_trans_start(env, mdo2mdd(obj));
2092 RETURN(PTR_ERR(handle));
2095 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2096 if (handle == NULL &&
2097 mdd_obj->mod_count == 1 &&
2098 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2099 mdd_write_unlock(env, mdd_obj);
2103 /* release open count */
2104 mdd_obj->mod_count --;
2106 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2107 /* remove link to object from orphan index */
2108 rc = __mdd_orphan_del(env, mdd_obj, handle);
2110 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2111 "list, OSS objects to be destroyed.\n",
2112 PFID(mdd_object_fid(mdd_obj)));
2114 CERROR("Object "DFID" can not be deleted from orphan "
2115 "list, maybe cause OST objects can not be "
2116 "destroyed (err: %d).\n",
2117 PFID(mdd_object_fid(mdd_obj)), rc);
2118 /* If object was not deleted from orphan list, do not
2119 * destroy OSS objects, which will be done when next
2125 rc = mdd_iattr_get(env, mdd_obj, ma);
2126 /* Object maybe not in orphan list originally, it is rare case for
2127 * mdd_finish_unlink() failure. */
2128 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2129 #ifdef HAVE_QUOTA_SUPPORT
2130 if (mds->mds_quota) {
2131 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2132 mdd_quota_wrapper(&ma->ma_attr, qids);
2135 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2136 if (ma->ma_valid & MA_FLAGS &&
2137 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2138 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2140 rc = mdd_object_kill(env, mdd_obj, ma);
2146 CERROR("Error when prepare to delete Object "DFID" , "
2147 "which will cause OST objects can not be "
2148 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2154 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2156 mdd_write_unlock(env, mdd_obj);
2158 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2159 #ifdef HAVE_QUOTA_SUPPORT
2161 /* Trigger dqrel on the owner of child. If failed,
2162 * the next call for lquota_chkquota will process it */
2163 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2170 * Permission check is done when open,
2171 * no need check again.
2173 static int mdd_readpage_sanity_check(const struct lu_env *env,
2174 struct mdd_object *obj)
2176 struct dt_object *next = mdd_object_child(obj);
2180 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2188 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2189 int first, void *area, int nob,
2190 const struct dt_it_ops *iops, struct dt_it *it,
2191 __u64 *start, __u64 *end,
2192 struct lu_dirent **last, __u32 attr)
2196 struct lu_dirent *ent;
2199 memset(area, 0, sizeof (struct lu_dirpage));
2200 area += sizeof (struct lu_dirpage);
2201 nob -= sizeof (struct lu_dirpage);
2209 len = iops->key_size(env, it);
2211 /* IAM iterator can return record with zero len. */
2215 hash = iops->store(env, it);
2216 if (unlikely(first)) {
2221 /* calculate max space required for lu_dirent */
2222 recsize = lu_dirent_calc_size(len, attr);
2224 if (nob >= recsize) {
2225 result = iops->rec(env, it, ent, attr);
2226 if (result == -ESTALE)
2231 /* osd might not able to pack all attributes,
2232 * so recheck rec length */
2233 recsize = le16_to_cpu(ent->lde_reclen);
2236 * record doesn't fit into page, enlarge previous one.
2239 (*last)->lde_reclen =
2240 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2249 ent = (void *)ent + recsize;
2253 result = iops->next(env, it);
2254 if (result == -ESTALE)
2256 } while (result == 0);
2263 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2264 const struct lu_rdpg *rdpg)
2267 struct dt_object *next = mdd_object_child(obj);
2268 const struct dt_it_ops *iops;
2270 struct lu_dirent *last = NULL;
2271 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2278 LASSERT(rdpg->rp_pages != NULL);
2279 LASSERT(next->do_index_ops != NULL);
2281 if (rdpg->rp_count <= 0)
2285 * iterate through directory and fill pages from @rdpg
2287 iops = &next->do_index_ops->dio_it;
2288 it = iops->init(env, next, mdd_object_capa(env, obj));
2292 rc = iops->load(env, it, rdpg->rp_hash);
2296 * Iterator didn't find record with exactly the key requested.
2298 * It is currently either
2300 * - positioned above record with key less than
2301 * requested---skip it.
2303 * - or not positioned at all (is in IAM_IT_SKEWED
2304 * state)---position it on the next item.
2306 rc = iops->next(env, it);
2311 * At this point and across for-loop:
2313 * rc == 0 -> ok, proceed.
2314 * rc > 0 -> end of directory.
2317 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2318 i++, nob -= CFS_PAGE_SIZE) {
2319 LASSERT(i < rdpg->rp_npages);
2320 pg = rdpg->rp_pages[i];
2321 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2322 min_t(int, nob, CFS_PAGE_SIZE), iops,
2323 it, &hash_start, &hash_end, &last,
2325 if (rc != 0 || i == rdpg->rp_npages - 1) {
2327 last->lde_reclen = 0;
2335 hash_end = DIR_END_OFF;
2339 struct lu_dirpage *dp;
2341 dp = cfs_kmap(rdpg->rp_pages[0]);
2342 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2343 dp->ldp_hash_end = cpu_to_le64(hash_end);
2346 * No pages were processed, mark this.
2348 dp->ldp_flags |= LDF_EMPTY;
2350 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2351 cfs_kunmap(rdpg->rp_pages[0]);
2354 iops->fini(env, it);
2359 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2360 const struct lu_rdpg *rdpg)
2362 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2366 LASSERT(mdd_object_exists(mdd_obj));
2368 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2369 rc = mdd_readpage_sanity_check(env, mdd_obj);
2371 GOTO(out_unlock, rc);
2373 if (mdd_is_dead_obj(mdd_obj)) {
2375 struct lu_dirpage *dp;
2378 * According to POSIX, please do not return any entry to client:
2379 * even dot and dotdot should not be returned.
2381 CWARN("readdir from dead object: "DFID"\n",
2382 PFID(mdd_object_fid(mdd_obj)));
2384 if (rdpg->rp_count <= 0)
2385 GOTO(out_unlock, rc = -EFAULT);
2386 LASSERT(rdpg->rp_pages != NULL);
2388 pg = rdpg->rp_pages[0];
2389 dp = (struct lu_dirpage*)cfs_kmap(pg);
2390 memset(dp, 0 , sizeof(struct lu_dirpage));
2391 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2392 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2393 dp->ldp_flags |= LDF_EMPTY;
2394 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2396 GOTO(out_unlock, rc = 0);
2399 rc = __mdd_readpage(env, mdd_obj, rdpg);
2403 mdd_read_unlock(env, mdd_obj);
2407 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2409 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2410 struct dt_object *next;
2412 LASSERT(mdd_object_exists(mdd_obj));
2413 next = mdd_object_child(mdd_obj);
2414 return next->do_ops->do_object_sync(env, next);
2417 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2418 struct md_object *obj)
2420 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2422 LASSERT(mdd_object_exists(mdd_obj));
2423 return do_version_get(env, mdd_object_child(mdd_obj));
2426 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2427 dt_obj_version_t version)
2429 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2431 LASSERT(mdd_object_exists(mdd_obj));
2432 do_version_set(env, mdd_object_child(mdd_obj), version);
2435 const struct md_object_operations mdd_obj_ops = {
2436 .moo_permission = mdd_permission,
2437 .moo_attr_get = mdd_attr_get,
2438 .moo_attr_set = mdd_attr_set,
2439 .moo_xattr_get = mdd_xattr_get,
2440 .moo_xattr_set = mdd_xattr_set,
2441 .moo_xattr_list = mdd_xattr_list,
2442 .moo_xattr_del = mdd_xattr_del,
2443 .moo_object_create = mdd_object_create,
2444 .moo_ref_add = mdd_ref_add,
2445 .moo_ref_del = mdd_ref_del,
2446 .moo_open = mdd_open,
2447 .moo_close = mdd_close,
2448 .moo_readpage = mdd_readpage,
2449 .moo_readlink = mdd_readlink,
2450 .moo_changelog = mdd_changelog,
2451 .moo_capa_get = mdd_capa_get,
2452 .moo_object_sync = mdd_object_sync,
2453 .moo_version_get = mdd_version_get,
2454 .moo_version_set = mdd_version_set,
2455 .moo_path = mdd_path,
2456 .moo_file_lock = mdd_file_lock,
2457 .moo_file_unlock = mdd_file_unlock,