1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
129 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135 const void *area, ssize_t len)
139 buf = &mdd_env_info(env)->mti_buf;
140 buf->lb_buf = (void *)area;
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
147 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
149 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL) {
155 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL)
162 /** Increase the size of the \a mti_big_buf.
163 * preserves old data in buffer
164 * old buffer remains unchanged on error
165 * \retval 0 or -ENOMEM
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
169 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
172 LASSERT(len >= oldbuf->lb_len);
173 OBD_ALLOC_LARGE(buf.lb_buf, len);
175 if (buf.lb_buf == NULL)
179 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE_LARGE(mti->mti_max_cookie,
198 mti->mti_max_cookie_size);
199 mti->mti_max_cookie = NULL;
200 mti->mti_max_cookie_size = 0;
202 if (unlikely(mti->mti_max_cookie == NULL)) {
203 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204 if (likely(mti->mti_max_cookie != NULL))
205 mti->mti_max_cookie_size = max_cookie_size;
207 if (likely(mti->mti_max_cookie != NULL))
208 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209 return mti->mti_max_cookie;
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213 struct mdd_device *mdd)
215 struct mdd_thread_info *mti = mdd_env_info(env);
218 max_lmm_size = mdd_lov_mdsize(env, mdd);
219 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220 if (mti->mti_max_lmm)
221 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
225 if (unlikely(mti->mti_max_lmm == NULL)) {
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = max_lmm_size;
230 return mti->mti_max_lmm;
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234 const struct lu_object_header *hdr,
237 struct mdd_object *mdd_obj;
239 OBD_ALLOC_PTR(mdd_obj);
240 if (mdd_obj != NULL) {
243 o = mdd2lu_obj(mdd_obj);
244 lu_object_init(o, NULL, d);
245 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247 mdd_obj->mod_count = 0;
248 o->lo_ops = &mdd_lu_obj_ops;
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256 const struct lu_object_conf *unused)
258 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259 struct mdd_object *mdd_obj = lu2mdd_obj(o);
260 struct lu_object *below;
261 struct lu_device *under;
264 mdd_obj->mod_cltime = 0;
265 under = &d->mdd_child->dd_lu_dev;
266 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267 mdd_pdlock_init(mdd_obj);
271 lu_object_add(o, below);
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
278 if (lu_object_exists(o))
279 return mdd_get_flags(env, lu2mdd_obj(o));
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
286 struct mdd_object *mdd = lu2mdd_obj(o);
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293 lu_printer_t p, const struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297 "valid=%x, cltime="LPU64", flags=%lx)",
298 mdd, mdd->mod_count, mdd->mod_valid,
299 mdd->mod_cltime, mdd->mod_flags);
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303 .loo_object_init = mdd_object_init,
304 .loo_object_start = mdd_object_start,
305 .loo_object_free = mdd_object_free,
306 .loo_object_print = mdd_object_print,
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310 struct mdd_device *d,
311 const struct lu_fid *f)
313 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317 const char *path, struct lu_fid *fid)
320 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321 struct mdd_object *obj;
322 struct lu_name *lname = &mdd_env_info(env)->mti_name;
327 /* temp buffer for path element */
328 buf = mdd_buf_alloc(env, PATH_MAX);
329 if (buf->lb_buf == NULL)
332 lname->ln_name = name = buf->lb_buf;
333 lname->ln_namelen = 0;
334 *f = mdd->mdd_root_fid;
341 while (*path != '/' && *path != '\0') {
349 /* find obj corresponding to fid */
350 obj = mdd_object_find(env, mdd, f);
352 GOTO(out, rc = -EREMOTE);
354 GOTO(out, rc = PTR_ERR(obj));
355 /* get child fid from parent and name */
356 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357 mdd_object_put(env, obj);
362 lname->ln_namelen = 0;
371 /** The maximum depth that fid2path() will search.
372 * This is limited only because we want to store the fids for
373 * historical path lookup purposes.
375 #define MAX_PATH_DEPTH 100
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379 __u64 pli_recno; /**< history point */
380 __u64 pli_currec; /**< current record */
381 struct lu_fid pli_fid;
382 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383 struct mdd_object *pli_mdd_obj;
384 char *pli_path; /**< full path */
386 int pli_linkno; /**< which hardlink to follow */
387 int pli_fidcount; /**< number of \a pli_fids */
390 static int mdd_path_current(const struct lu_env *env,
391 struct path_lookup_info *pli)
393 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394 struct mdd_object *mdd_obj;
395 struct lu_buf *buf = NULL;
396 struct link_ea_header *leh;
397 struct link_ea_entry *lee;
398 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
405 ptr = pli->pli_path + pli->pli_pathlen - 1;
408 pli->pli_fidcount = 0;
409 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
411 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412 mdd_obj = mdd_object_find(env, mdd,
413 &pli->pli_fids[pli->pli_fidcount]);
415 GOTO(out, rc = -EREMOTE);
417 GOTO(out, rc = PTR_ERR(mdd_obj));
418 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
420 mdd_object_put(env, mdd_obj);
424 /* Do I need to error out here? */
429 /* Get parent fid and object name */
430 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431 buf = mdd_links_get(env, mdd_obj);
432 mdd_read_unlock(env, mdd_obj);
433 mdd_object_put(env, mdd_obj);
435 GOTO(out, rc = PTR_ERR(buf));
438 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
441 /* If set, use link #linkno for path lookup, otherwise use
442 link #0. Only do this for the final path element. */
443 if ((pli->pli_fidcount == 0) &&
444 (pli->pli_linkno < leh->leh_reccount)) {
446 for (count = 0; count < pli->pli_linkno; count++) {
447 lee = (struct link_ea_entry *)
448 ((char *)lee + reclen);
449 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
451 if (pli->pli_linkno < leh->leh_reccount - 1)
452 /* indicate to user there are more links */
456 /* Pack the name in the end of the buffer */
457 ptr -= tmpname->ln_namelen;
458 if (ptr - 1 <= pli->pli_path)
459 GOTO(out, rc = -EOVERFLOW);
460 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
463 /* Store the parent fid for historic lookup */
464 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465 GOTO(out, rc = -EOVERFLOW);
466 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
469 /* Verify that our path hasn't changed since we started the lookup.
470 Record the current index, and verify the path resolves to the
471 same fid. If it does, then the path is correct as of this index. */
472 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473 pli->pli_currec = mdd->mdd_cl.mc_index;
474 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
477 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478 GOTO (out, rc = -EAGAIN);
480 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483 PFID(&pli->pli_fid));
484 GOTO(out, rc = -EAGAIN);
486 ptr++; /* skip leading / */
487 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492 /* if we vmalloced a large buffer drop it */
498 static int mdd_path_historic(const struct lu_env *env,
499 struct path_lookup_info *pli)
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 *recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
525 pli->pli_mdd_obj = md2mdd_obj(obj);
526 pli->pli_recno = *recno;
527 pli->pli_path = path;
528 pli->pli_pathlen = pathlen;
529 pli->pli_linkno = *linkno;
531 /* Retry multiple times in case file is being moved */
532 while (tries-- && rc == -EAGAIN)
533 rc = mdd_path_current(env, pli);
535 /* For historical path lookup, the current links may not have existed
536 * at "recno" time. We must switch over to earlier links/parents
537 * by using the changelog records. If the earlier parent doesn't
538 * exist, we must search back through the changelog to reconstruct
539 * its parents, then check if it exists, etc.
540 * We may ignore this problem for the initial implementation and
541 * state that an "original" hardlink must still exist for us to find
542 * historic path name. */
543 if (pli->pli_recno != -1) {
544 rc = mdd_path_historic(env, pli);
546 *recno = pli->pli_currec;
547 /* Return next link index to caller */
548 *linkno = pli->pli_linkno;
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
558 struct lu_attr *la = &mdd_env_info(env)->mti_la;
562 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
564 mdd_flags_xlate(obj, la->la_flags);
565 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566 obj->mod_flags |= MNLINK_OBJ;
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
578 if (ma->ma_valid & MA_INODE)
581 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582 mdd_object_capa(env, mdd_obj));
584 ma->ma_valid |= MA_INODE;
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
590 struct lov_desc *ldesc;
591 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592 struct lov_user_md *lum = (struct lov_user_md*)lmm;
598 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599 LASSERT(ldesc != NULL);
601 lum->lmm_magic = LOV_MAGIC_V1;
602 lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603 lum->lmm_pattern = ldesc->ld_pattern;
604 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
608 RETURN(sizeof(*lum));
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613 struct mdd_object *mdd_obj, struct md_attr *ma)
618 if (ma->ma_valid & MA_LOV)
621 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
623 if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
624 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
626 ma->ma_lmm_size = rc;
627 ma->ma_valid |= MA_LOV;
633 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
639 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
640 rc = __mdd_lmm_get(env, mdd_obj, ma);
641 mdd_read_unlock(env, mdd_obj);
646 static int __mdd_lmv_get(const struct lu_env *env,
647 struct mdd_object *mdd_obj, struct md_attr *ma)
652 if (ma->ma_valid & MA_LMV)
655 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
658 ma->ma_valid |= MA_LMV;
664 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
667 struct mdd_thread_info *info = mdd_env_info(env);
668 struct lustre_mdt_attrs *lma =
669 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
674 /* If all needed data are already valid, nothing to do */
675 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
676 (ma->ma_need & (MA_HSM | MA_SOM)))
679 /* Read LMA from disk EA */
680 lma_size = sizeof(info->mti_xattr_buf);
681 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
685 /* Useless to check LMA incompatibility because this is already done in
686 * osd_ea_fid_get(), and this will fail long before this code is
688 * So, if we are here, LMA is compatible.
691 lustre_lma_swab(lma);
693 /* Swab and copy LMA */
694 if (ma->ma_need & MA_HSM) {
695 if (lma->lma_compat & LMAC_HSM)
696 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
698 ma->ma_hsm.mh_flags = 0;
699 ma->ma_valid |= MA_HSM;
703 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
704 LASSERT(ma->ma_som != NULL);
705 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
706 ma->ma_som->msd_size = lma->lma_som_size;
707 ma->ma_som->msd_blocks = lma->lma_som_blocks;
708 ma->ma_som->msd_mountid = lma->lma_som_mountid;
709 ma->ma_valid |= MA_SOM;
715 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
721 if (ma->ma_need & MA_INODE)
722 rc = mdd_iattr_get(env, mdd_obj, ma);
724 if (rc == 0 && ma->ma_need & MA_LOV) {
725 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726 S_ISDIR(mdd_object_type(mdd_obj)))
727 rc = __mdd_lmm_get(env, mdd_obj, ma);
729 if (rc == 0 && ma->ma_need & MA_LMV) {
730 if (S_ISDIR(mdd_object_type(mdd_obj)))
731 rc = __mdd_lmv_get(env, mdd_obj, ma);
733 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
734 if (S_ISREG(mdd_object_type(mdd_obj)))
735 rc = __mdd_lma_get(env, mdd_obj, ma);
737 #ifdef CONFIG_FS_POSIX_ACL
738 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
739 if (S_ISDIR(mdd_object_type(mdd_obj)))
740 rc = mdd_def_acl_get(env, mdd_obj, ma);
743 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
744 rc, ma->ma_valid, ma->ma_lmm);
748 int mdd_attr_get_internal_locked(const struct lu_env *env,
749 struct mdd_object *mdd_obj, struct md_attr *ma)
752 int needlock = ma->ma_need &
753 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
756 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
757 rc = mdd_attr_get_internal(env, mdd_obj, ma);
759 mdd_read_unlock(env, mdd_obj);
764 * No permission check is needed.
766 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
769 struct mdd_object *mdd_obj = md2mdd_obj(obj);
773 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
778 * No permission check is needed.
780 static int mdd_xattr_get(const struct lu_env *env,
781 struct md_object *obj, struct lu_buf *buf,
784 struct mdd_object *mdd_obj = md2mdd_obj(obj);
789 LASSERT(mdd_object_exists(mdd_obj));
791 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792 rc = mdo_xattr_get(env, mdd_obj, buf, name,
793 mdd_object_capa(env, mdd_obj));
794 mdd_read_unlock(env, mdd_obj);
800 * Permission check is done when open,
801 * no need check again.
803 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
806 struct mdd_object *mdd_obj = md2mdd_obj(obj);
807 struct dt_object *next;
812 LASSERT(mdd_object_exists(mdd_obj));
814 next = mdd_object_child(mdd_obj);
815 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
816 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
817 mdd_object_capa(env, mdd_obj));
818 mdd_read_unlock(env, mdd_obj);
823 * No permission check is needed.
825 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
828 struct mdd_object *mdd_obj = md2mdd_obj(obj);
833 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
835 mdd_read_unlock(env, mdd_obj);
840 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
841 struct mdd_object *c, struct md_attr *ma,
842 struct thandle *handle,
843 const struct md_op_spec *spec)
845 struct lu_attr *attr = &ma->ma_attr;
846 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
847 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
848 const struct dt_index_features *feat = spec->sp_feat;
852 if (!mdd_object_exists(c)) {
853 struct dt_object *next = mdd_object_child(c);
856 if (feat != &dt_directory_features && feat != NULL)
857 dof->dof_type = DFT_INDEX;
859 dof->dof_type = dt_mode_to_dft(attr->la_mode);
861 dof->u.dof_idx.di_feat = feat;
863 /* @hint will be initialized by underlying device. */
864 next->do_ops->do_ah_init(env, hint,
865 p ? mdd_object_child(p) : NULL,
866 attr->la_mode & S_IFMT);
868 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
869 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
877 * Make sure the ctime is increased only.
879 static inline int mdd_attr_check(const struct lu_env *env,
880 struct mdd_object *obj,
881 struct lu_attr *attr)
883 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
887 if (attr->la_valid & LA_CTIME) {
888 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
892 if (attr->la_ctime < tmp_la->la_ctime)
893 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
894 else if (attr->la_valid == LA_CTIME &&
895 attr->la_ctime == tmp_la->la_ctime)
896 attr->la_valid &= ~LA_CTIME;
901 int mdd_attr_set_internal(const struct lu_env *env,
902 struct mdd_object *obj,
903 struct lu_attr *attr,
904 struct thandle *handle,
910 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
911 #ifdef CONFIG_FS_POSIX_ACL
912 if (!rc && (attr->la_valid & LA_MODE) && needacl)
913 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
918 int mdd_attr_check_set_internal(const struct lu_env *env,
919 struct mdd_object *obj,
920 struct lu_attr *attr,
921 struct thandle *handle,
927 rc = mdd_attr_check(env, obj, attr);
932 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
936 static int mdd_attr_set_internal_locked(const struct lu_env *env,
937 struct mdd_object *obj,
938 struct lu_attr *attr,
939 struct thandle *handle,
945 needacl = needacl && (attr->la_valid & LA_MODE);
947 mdd_write_lock(env, obj, MOR_TGT_CHILD);
948 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
950 mdd_write_unlock(env, obj);
954 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
955 struct mdd_object *obj,
956 struct lu_attr *attr,
957 struct thandle *handle,
963 needacl = needacl && (attr->la_valid & LA_MODE);
965 mdd_write_lock(env, obj, MOR_TGT_CHILD);
966 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
968 mdd_write_unlock(env, obj);
972 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
973 const struct lu_buf *buf, const char *name,
974 int fl, struct thandle *handle)
976 struct lustre_capa *capa = mdd_object_capa(env, obj);
980 if (buf->lb_buf && buf->lb_len > 0)
981 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
982 else if (buf->lb_buf == NULL && buf->lb_len == 0)
983 rc = mdo_xattr_del(env, obj, name, handle, capa);
989 * This gives the same functionality as the code between
990 * sys_chmod and inode_setattr
991 * chown_common and inode_setattr
992 * utimes and inode_setattr
993 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
995 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
996 struct lu_attr *la, const struct md_attr *ma)
998 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1006 /* Do not permit change file type */
1007 if (la->la_valid & LA_TYPE)
1010 /* They should not be processed by setattr */
1011 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1014 /* export destroy does not have ->le_ses, but we may want
1015 * to drop LUSTRE_SOM_FL. */
1021 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1025 if (la->la_valid == LA_CTIME) {
1026 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1027 /* This is only for set ctime when rename's source is
1029 rc = mdd_may_delete(env, NULL, obj,
1030 (struct md_attr *)ma, 1, 0);
1031 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1032 la->la_valid &= ~LA_CTIME;
1036 if (la->la_valid == LA_ATIME) {
1037 /* This is atime only set for read atime update on close. */
1038 if (la->la_atime > tmp_la->la_atime &&
1039 la->la_atime <= (tmp_la->la_atime +
1040 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1041 la->la_valid &= ~LA_ATIME;
1045 /* Check if flags change. */
1046 if (la->la_valid & LA_FLAGS) {
1047 unsigned int oldflags = 0;
1048 unsigned int newflags = la->la_flags &
1049 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1051 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1052 !mdd_capable(uc, CFS_CAP_FOWNER))
1055 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1056 * only be changed by the relevant capability. */
1057 if (mdd_is_immutable(obj))
1058 oldflags |= LUSTRE_IMMUTABLE_FL;
1059 if (mdd_is_append(obj))
1060 oldflags |= LUSTRE_APPEND_FL;
1061 if ((oldflags ^ newflags) &&
1062 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1065 if (!S_ISDIR(tmp_la->la_mode))
1066 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1069 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1070 (la->la_valid & ~LA_FLAGS) &&
1071 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1074 /* Check for setting the obj time. */
1075 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1076 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1077 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1078 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1079 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1087 /* Make sure a caller can chmod. */
1088 if (la->la_valid & LA_MODE) {
1089 /* Bypass la_vaild == LA_MODE,
1090 * this is for changing file with SUID or SGID. */
1091 if ((la->la_valid & ~LA_MODE) &&
1092 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1093 (uc->mu_fsuid != tmp_la->la_uid) &&
1094 !mdd_capable(uc, CFS_CAP_FOWNER))
1097 if (la->la_mode == (cfs_umode_t) -1)
1098 la->la_mode = tmp_la->la_mode;
1100 la->la_mode = (la->la_mode & S_IALLUGO) |
1101 (tmp_la->la_mode & ~S_IALLUGO);
1103 /* Also check the setgid bit! */
1104 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1105 la->la_gid : tmp_la->la_gid) &&
1106 !mdd_capable(uc, CFS_CAP_FSETID))
1107 la->la_mode &= ~S_ISGID;
1109 la->la_mode = tmp_la->la_mode;
1112 /* Make sure a caller can chown. */
1113 if (la->la_valid & LA_UID) {
1114 if (la->la_uid == (uid_t) -1)
1115 la->la_uid = tmp_la->la_uid;
1116 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1117 (la->la_uid != tmp_la->la_uid)) &&
1118 !mdd_capable(uc, CFS_CAP_CHOWN))
1121 /* If the user or group of a non-directory has been
1122 * changed by a non-root user, remove the setuid bit.
1123 * 19981026 David C Niemi <niemi@tux.org>
1125 * Changed this to apply to all users, including root,
1126 * to avoid some races. This is the behavior we had in
1127 * 2.0. The check for non-root was definitely wrong
1128 * for 2.2 anyway, as it should have been using
1129 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1130 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1131 !S_ISDIR(tmp_la->la_mode)) {
1132 la->la_mode &= ~S_ISUID;
1133 la->la_valid |= LA_MODE;
1137 /* Make sure caller can chgrp. */
1138 if (la->la_valid & LA_GID) {
1139 if (la->la_gid == (gid_t) -1)
1140 la->la_gid = tmp_la->la_gid;
1141 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1142 ((la->la_gid != tmp_la->la_gid) &&
1143 !lustre_in_group_p(uc, la->la_gid))) &&
1144 !mdd_capable(uc, CFS_CAP_CHOWN))
1147 /* Likewise, if the user or group of a non-directory
1148 * has been changed by a non-root user, remove the
1149 * setgid bit UNLESS there is no group execute bit
1150 * (this would be a file marked for mandatory
1151 * locking). 19981026 David C Niemi <niemi@tux.org>
1153 * Removed the fsuid check (see the comment above) --
1155 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1156 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1157 la->la_mode &= ~S_ISGID;
1158 la->la_valid |= LA_MODE;
1162 /* For both Size-on-MDS case and truncate case,
1163 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1164 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1165 * For SOM case, it is true, the MAY_WRITE perm has been checked
1166 * when open, no need check again. For truncate case, it is false,
1167 * the MAY_WRITE perm should be checked here. */
1168 if (ma->ma_attr_flags & MDS_SOM) {
1169 /* For the "Size-on-MDS" setattr update, merge coming
1170 * attributes with the set in the inode. BUG 10641 */
1171 if ((la->la_valid & LA_ATIME) &&
1172 (la->la_atime <= tmp_la->la_atime))
1173 la->la_valid &= ~LA_ATIME;
1175 /* OST attributes do not have a priority over MDS attributes,
1176 * so drop times if ctime is equal. */
1177 if ((la->la_valid & LA_CTIME) &&
1178 (la->la_ctime <= tmp_la->la_ctime))
1179 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1181 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1182 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1183 (uc->mu_fsuid == tmp_la->la_uid)) &&
1184 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1185 rc = mdd_permission_internal_locked(env, obj,
1192 if (la->la_valid & LA_CTIME) {
1193 /* The pure setattr, it has the priority over what is
1194 * already set, do not drop it if ctime is equal. */
1195 if (la->la_ctime < tmp_la->la_ctime)
1196 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1204 /** Store a data change changelog record
1205 * If this fails, we must fail the whole transaction; we don't
1206 * want the change to commit without the log entry.
1207 * \param mdd_obj - mdd_object of change
1208 * \param handle - transacion handle
1210 static int mdd_changelog_data_store(const struct lu_env *env,
1211 struct mdd_device *mdd,
1212 enum changelog_rec_type type,
1214 struct mdd_object *mdd_obj,
1215 struct thandle *handle)
1217 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1218 struct llog_changelog_rec *rec;
1224 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1226 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1229 LASSERT(handle != NULL);
1230 LASSERT(mdd_obj != NULL);
1232 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1233 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1234 /* Don't need multiple updates in this log */
1235 /* Don't check under lock - no big deal if we get an extra
1240 reclen = llog_data_len(sizeof(*rec));
1241 buf = mdd_buf_alloc(env, reclen);
1242 if (buf->lb_buf == NULL)
1244 rec = (struct llog_changelog_rec *)buf->lb_buf;
1246 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1247 rec->cr.cr_type = (__u32)type;
1248 rec->cr.cr_tfid = *tfid;
1249 rec->cr.cr_namelen = 0;
1250 mdd_obj->mod_cltime = cfs_time_current_64();
1252 rc = mdd_changelog_llog_write(mdd, rec, handle);
1254 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1255 rc, type, PFID(tfid));
1262 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1263 int flags, struct md_object *obj)
1265 struct thandle *handle;
1266 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1267 struct mdd_device *mdd = mdo2mdd(obj);
1271 handle = mdd_trans_start(env, mdd);
1274 return(PTR_ERR(handle));
1276 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1279 mdd_trans_stop(env, mdd, rc, handle);
1285 * Should be called with write lock held.
1287 * \see mdd_lma_set_locked().
1289 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1290 const struct md_attr *ma, struct thandle *handle)
1292 struct mdd_thread_info *info = mdd_env_info(env);
1294 struct lustre_mdt_attrs *lma =
1295 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1296 int lmasize = sizeof(struct lustre_mdt_attrs);
1301 /* Either HSM or SOM part is not valid, we need to read it before */
1302 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1303 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1307 lustre_lma_swab(lma);
1309 memset(lma, 0, lmasize);
1313 if (ma->ma_valid & MA_HSM) {
1314 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1315 lma->lma_compat |= LMAC_HSM;
1319 if (ma->ma_valid & MA_SOM) {
1320 LASSERT(ma->ma_som != NULL);
1321 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1322 lma->lma_compat &= ~LMAC_SOM;
1324 lma->lma_compat |= LMAC_SOM;
1325 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1326 lma->lma_som_size = ma->ma_som->msd_size;
1327 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1328 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1333 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1335 lustre_lma_swab(lma);
1336 buf = mdd_buf_get(env, lma, lmasize);
1337 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1343 * Save LMA extended attributes with data from \a ma.
1345 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1346 * not, LMA EA will be first read from disk, modified and write back.
1349 static int mdd_lma_set_locked(const struct lu_env *env,
1350 struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1355 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1356 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1357 mdd_write_unlock(env, mdd_obj);
1361 /* Precedence for choosing record type when multiple
1362 * attributes change: setattr > mtime > ctime > atime
1363 * (ctime changes when mtime does, plus chmod/chown.
1364 * atime and ctime are independent.) */
1365 static int mdd_attr_set_changelog(const struct lu_env *env,
1366 struct md_object *obj, struct thandle *handle,
1369 struct mdd_device *mdd = mdo2mdd(obj);
1372 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1373 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1374 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1375 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1376 bits = bits & mdd->mdd_cl.mc_mask;
1380 /* The record type is the lowest non-masked set bit */
1381 while (bits && ((bits & 1) == 0)) {
1386 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1387 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1388 md2mdd_obj(obj), handle);
1391 /* set attr and LOV EA at once, return updated attr */
1392 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1393 const struct md_attr *ma)
1395 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1396 struct mdd_device *mdd = mdo2mdd(obj);
1397 struct thandle *handle;
1398 struct lov_mds_md *lmm = NULL;
1399 struct llog_cookie *logcookies = NULL;
1400 int rc, lmm_size = 0, cookie_size = 0;
1401 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1402 #ifdef HAVE_QUOTA_SUPPORT
1403 struct obd_device *obd = mdd->mdd_obd_dev;
1404 struct mds_obd *mds = &obd->u.mds;
1405 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1406 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1407 int quota_opc = 0, block_count = 0;
1408 int inode_pending[MAXQUOTAS] = { 0, 0 };
1409 int block_pending[MAXQUOTAS] = { 0, 0 };
1413 *la_copy = ma->ma_attr;
1414 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1418 /* setattr on "close" only change atime, or do nothing */
1419 if (ma->ma_valid == MA_INODE &&
1420 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1423 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1424 MDD_TXN_ATTR_SET_OP);
1425 handle = mdd_trans_start(env, mdd);
1427 RETURN(PTR_ERR(handle));
1428 /*TODO: add lock here*/
1429 /* start a log jounal handle if needed */
1430 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1431 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1432 lmm_size = mdd_lov_mdsize(env, mdd);
1433 lmm = mdd_max_lmm_get(env, mdd);
1435 GOTO(cleanup, rc = -ENOMEM);
1437 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1444 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1445 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1446 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1448 #ifdef HAVE_QUOTA_SUPPORT
1449 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1450 struct obd_export *exp = md_quota(env)->mq_exp;
1451 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1453 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1455 quota_opc = FSFILT_OP_SETATTR;
1456 mdd_quota_wrapper(la_copy, qnids);
1457 mdd_quota_wrapper(la_tmp, qoids);
1458 /* get file quota for new owner */
1459 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1460 qnids, inode_pending, 1, NULL, 0,
1462 block_count = (la_tmp->la_blocks + 7) >> 3;
1465 mdd_data_get(env, mdd_obj, &data);
1466 /* get block quota for new owner */
1467 lquota_chkquota(mds_quota_interface_ref, obd,
1468 exp, qnids, block_pending,
1470 LQUOTA_FLAGS_BLK, data, 1);
1476 if (la_copy->la_valid & LA_FLAGS) {
1477 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1480 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1481 } else if (la_copy->la_valid) { /* setattr */
1482 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1484 /* journal chown/chgrp in llog, just like unlink */
1485 if (rc == 0 && lmm_size){
1486 cookie_size = mdd_lov_cookiesize(env, mdd);
1487 logcookies = mdd_max_cookie_get(env, mdd);
1488 if (logcookies == NULL)
1489 GOTO(cleanup, rc = -ENOMEM);
1491 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1492 logcookies, cookie_size) <= 0)
1497 if (rc == 0 && ma->ma_valid & MA_LOV) {
1500 mode = mdd_object_type(mdd_obj);
1501 if (S_ISREG(mode) || S_ISDIR(mode)) {
1502 rc = mdd_lsm_sanity_check(env, mdd_obj);
1506 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1507 ma->ma_lmm_size, handle, 1);
1511 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1514 mode = mdd_object_type(mdd_obj);
1516 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1521 rc = mdd_attr_set_changelog(env, obj, handle,
1522 ma->ma_attr.la_valid);
1523 mdd_trans_stop(env, mdd, rc, handle);
1524 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1525 /*set obd attr, if needed*/
1526 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1529 #ifdef HAVE_QUOTA_SUPPORT
1531 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1533 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1535 /* Trigger dqrel/dqacq for original owner and new owner.
1536 * If failed, the next call for lquota_chkquota will
1538 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1545 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1546 const struct lu_buf *buf, const char *name, int fl,
1547 struct thandle *handle)
1552 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1553 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1554 mdd_write_unlock(env, obj);
1559 static int mdd_xattr_sanity_check(const struct lu_env *env,
1560 struct mdd_object *obj)
1562 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1563 struct md_ucred *uc = md_ucred(env);
1567 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1570 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1574 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1575 !mdd_capable(uc, CFS_CAP_FOWNER))
1582 * The caller should guarantee to update the object ctime
1583 * after xattr_set if needed.
1585 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1586 const struct lu_buf *buf, const char *name,
1589 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590 struct mdd_device *mdd = mdo2mdd(obj);
1591 struct thandle *handle;
1595 rc = mdd_xattr_sanity_check(env, mdd_obj);
1599 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1600 /* security-replated changes may require sync */
1601 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1602 mdd->mdd_sync_permission == 1)
1603 txn_param_sync(&mdd_env_info(env)->mti_param);
1605 handle = mdd_trans_start(env, mdd);
1607 RETURN(PTR_ERR(handle));
1609 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1611 /* Only record user xattr changes */
1612 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1613 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1615 mdd_trans_stop(env, mdd, rc, handle);
1621 * The caller should guarantee to update the object ctime
1622 * after xattr_set if needed.
1624 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1627 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1628 struct mdd_device *mdd = mdo2mdd(obj);
1629 struct thandle *handle;
1633 rc = mdd_xattr_sanity_check(env, mdd_obj);
1637 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1638 handle = mdd_trans_start(env, mdd);
1640 RETURN(PTR_ERR(handle));
1642 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1643 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1644 mdd_object_capa(env, mdd_obj));
1645 mdd_write_unlock(env, mdd_obj);
1647 /* Only record user xattr changes */
1648 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1649 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1652 mdd_trans_stop(env, mdd, rc, handle);
1657 /* partial unlink */
1658 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1661 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1662 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663 struct mdd_device *mdd = mdo2mdd(obj);
1664 struct thandle *handle;
1665 #ifdef HAVE_QUOTA_SUPPORT
1666 struct obd_device *obd = mdd->mdd_obd_dev;
1667 struct mds_obd *mds = &obd->u.mds;
1668 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1675 * Check -ENOENT early here because we need to get object type
1676 * to calculate credits before transaction start
1678 if (!mdd_object_exists(mdd_obj))
1681 LASSERT(mdd_object_exists(mdd_obj) > 0);
1683 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1687 handle = mdd_trans_start(env, mdd);
1691 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1693 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1697 __mdd_ref_del(env, mdd_obj, handle, 0);
1699 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1701 __mdd_ref_del(env, mdd_obj, handle, 1);
1704 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1705 la_copy->la_ctime = ma->ma_attr.la_ctime;
1707 la_copy->la_valid = LA_CTIME;
1708 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1712 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1713 #ifdef HAVE_QUOTA_SUPPORT
1714 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1715 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1716 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1717 mdd_quota_wrapper(&ma->ma_attr, qids);
1724 mdd_write_unlock(env, mdd_obj);
1725 mdd_trans_stop(env, mdd, rc, handle);
1726 #ifdef HAVE_QUOTA_SUPPORT
1728 /* Trigger dqrel on the owner of child. If failed,
1729 * the next call for lquota_chkquota will process it */
1730 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1736 /* partial operation */
1737 static int mdd_oc_sanity_check(const struct lu_env *env,
1738 struct mdd_object *obj,
1744 switch (ma->ma_attr.la_mode & S_IFMT) {
1761 static int mdd_object_create(const struct lu_env *env,
1762 struct md_object *obj,
1763 const struct md_op_spec *spec,
1767 struct mdd_device *mdd = mdo2mdd(obj);
1768 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1769 const struct lu_fid *pfid = spec->u.sp_pfid;
1770 struct thandle *handle;
1771 #ifdef HAVE_QUOTA_SUPPORT
1772 struct obd_device *obd = mdd->mdd_obd_dev;
1773 struct obd_export *exp = md_quota(env)->mq_exp;
1774 struct mds_obd *mds = &obd->u.mds;
1775 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1776 int quota_opc = 0, block_count = 0;
1777 int inode_pending[MAXQUOTAS] = { 0, 0 };
1778 int block_pending[MAXQUOTAS] = { 0, 0 };
1783 #ifdef HAVE_QUOTA_SUPPORT
1784 if (mds->mds_quota) {
1785 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1786 mdd_quota_wrapper(&ma->ma_attr, qids);
1787 /* get file quota for child */
1788 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1789 qids, inode_pending, 1, NULL, 0,
1791 switch (ma->ma_attr.la_mode & S_IFMT) {
1800 /* get block quota for child */
1802 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1803 qids, block_pending, block_count,
1804 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1808 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1809 handle = mdd_trans_start(env, mdd);
1811 GOTO(out_pending, rc = PTR_ERR(handle));
1813 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1814 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1818 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1822 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1823 /* If creating the slave object, set slave EA here. */
1824 int lmv_size = spec->u.sp_ea.eadatalen;
1825 struct lmv_stripe_md *lmv;
1827 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1828 LASSERT(lmv != NULL && lmv_size > 0);
1830 rc = __mdd_xattr_set(env, mdd_obj,
1831 mdd_buf_get_const(env, lmv, lmv_size),
1832 XATTR_NAME_LMV, 0, handle);
1836 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1839 #ifdef CONFIG_FS_POSIX_ACL
1840 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1841 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1843 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1844 buf->lb_len = spec->u.sp_ea.eadatalen;
1845 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1846 rc = __mdd_acl_init(env, mdd_obj, buf,
1847 &ma->ma_attr.la_mode,
1852 ma->ma_attr.la_valid |= LA_MODE;
1855 pfid = spec->u.sp_ea.fid;
1858 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1864 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1865 mdd_write_unlock(env, mdd_obj);
1867 mdd_trans_stop(env, mdd, rc, handle);
1869 #ifdef HAVE_QUOTA_SUPPORT
1871 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1873 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1875 /* Trigger dqacq on the owner of child. If failed,
1876 * the next call for lquota_chkquota will process it. */
1877 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1885 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1886 const struct md_attr *ma)
1888 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1889 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1890 struct mdd_device *mdd = mdo2mdd(obj);
1891 struct thandle *handle;
1895 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1896 handle = mdd_trans_start(env, mdd);
1900 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1901 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1903 __mdd_ref_add(env, mdd_obj, handle);
1904 mdd_write_unlock(env, mdd_obj);
1906 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1907 la_copy->la_ctime = ma->ma_attr.la_ctime;
1909 la_copy->la_valid = LA_CTIME;
1910 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1913 mdd_trans_stop(env, mdd, 0, handle);
1919 * do NOT or the MAY_*'s, you'll get the weakest
1921 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1925 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1926 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1927 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1928 * owner can write to a file even if it is marked readonly to hide
1929 * its brokenness. (bug 5781) */
1930 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1931 struct md_ucred *uc = md_ucred(env);
1933 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1934 (la->la_uid == uc->mu_fsuid))
1938 if (flags & FMODE_READ)
1940 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1942 if (flags & MDS_FMODE_EXEC)
1947 static int mdd_open_sanity_check(const struct lu_env *env,
1948 struct mdd_object *obj, int flag)
1950 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1955 if (mdd_is_dead_obj(obj))
1958 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1962 if (S_ISLNK(tmp_la->la_mode))
1965 mode = accmode(env, tmp_la, flag);
1967 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1970 if (!(flag & MDS_OPEN_CREATED)) {
1971 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1976 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1977 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1978 flag &= ~MDS_OPEN_TRUNC;
1980 /* For writing append-only file must open it with append mode. */
1981 if (mdd_is_append(obj)) {
1982 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1984 if (flag & MDS_OPEN_TRUNC)
1990 * Now, flag -- O_NOATIME does not be packed by client.
1992 if (flag & O_NOATIME) {
1993 struct md_ucred *uc = md_ucred(env);
1995 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1996 (uc->mu_valid == UCRED_NEW)) &&
1997 (uc->mu_fsuid != tmp_la->la_uid) &&
1998 !mdd_capable(uc, CFS_CAP_FOWNER))
2006 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2009 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2012 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2014 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2016 mdd_obj->mod_count++;
2018 mdd_write_unlock(env, mdd_obj);
2022 /* return md_attr back,
2023 * if it is last unlink then return lov ea + llog cookie*/
2024 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2030 if (S_ISREG(mdd_object_type(obj))) {
2031 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2032 * Caller must be ready for that. */
2034 rc = __mdd_lmm_get(env, obj, ma);
2035 if ((ma->ma_valid & MA_LOV))
2036 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2043 * No permission check is needed.
2045 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2048 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2049 struct mdd_device *mdd = mdo2mdd(obj);
2050 struct thandle *handle = NULL;
2054 #ifdef HAVE_QUOTA_SUPPORT
2055 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2056 struct mds_obd *mds = &obd->u.mds;
2057 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2062 /* check without any lock */
2063 if (mdd_obj->mod_count == 1 &&
2064 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2066 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2069 handle = mdd_trans_start(env, mdo2mdd(obj));
2071 RETURN(PTR_ERR(handle));
2074 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2075 if (handle == NULL &&
2076 mdd_obj->mod_count == 1 &&
2077 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2078 mdd_write_unlock(env, mdd_obj);
2082 /* release open count */
2083 mdd_obj->mod_count --;
2085 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2086 /* remove link to object from orphan index */
2087 rc = __mdd_orphan_del(env, mdd_obj, handle);
2089 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2090 "list, OSS objects to be destroyed.\n",
2091 PFID(mdd_object_fid(mdd_obj)));
2093 CERROR("Object "DFID" can not be deleted from orphan "
2094 "list, maybe cause OST objects can not be "
2095 "destroyed (err: %d).\n",
2096 PFID(mdd_object_fid(mdd_obj)), rc);
2097 /* If object was not deleted from orphan list, do not
2098 * destroy OSS objects, which will be done when next
2104 rc = mdd_iattr_get(env, mdd_obj, ma);
2105 /* Object maybe not in orphan list originally, it is rare case for
2106 * mdd_finish_unlink() failure. */
2107 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2108 #ifdef HAVE_QUOTA_SUPPORT
2109 if (mds->mds_quota) {
2110 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2111 mdd_quota_wrapper(&ma->ma_attr, qids);
2114 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2115 if (ma->ma_valid & MA_FLAGS &&
2116 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2117 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2119 rc = mdd_object_kill(env, mdd_obj, ma);
2125 CERROR("Error when prepare to delete Object "DFID" , "
2126 "which will cause OST objects can not be "
2127 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2133 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2135 mdd_write_unlock(env, mdd_obj);
2137 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2138 #ifdef HAVE_QUOTA_SUPPORT
2140 /* Trigger dqrel on the owner of child. If failed,
2141 * the next call for lquota_chkquota will process it */
2142 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2149 * Permission check is done when open,
2150 * no need check again.
2152 static int mdd_readpage_sanity_check(const struct lu_env *env,
2153 struct mdd_object *obj)
2155 struct dt_object *next = mdd_object_child(obj);
2159 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2167 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2168 int first, void *area, int nob,
2169 const struct dt_it_ops *iops, struct dt_it *it,
2170 __u64 *start, __u64 *end,
2171 struct lu_dirent **last, __u32 attr)
2175 struct lu_dirent *ent;
2178 memset(area, 0, sizeof (struct lu_dirpage));
2179 area += sizeof (struct lu_dirpage);
2180 nob -= sizeof (struct lu_dirpage);
2188 len = iops->key_size(env, it);
2190 /* IAM iterator can return record with zero len. */
2194 hash = iops->store(env, it);
2195 if (unlikely(first)) {
2200 /* calculate max space required for lu_dirent */
2201 recsize = lu_dirent_calc_size(len, attr);
2203 if (nob >= recsize) {
2204 result = iops->rec(env, it, ent, attr);
2205 if (result == -ESTALE)
2210 /* osd might not able to pack all attributes,
2211 * so recheck rec length */
2212 recsize = le16_to_cpu(ent->lde_reclen);
2215 * record doesn't fit into page, enlarge previous one.
2218 (*last)->lde_reclen =
2219 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2228 ent = (void *)ent + recsize;
2232 result = iops->next(env, it);
2233 if (result == -ESTALE)
2235 } while (result == 0);
2242 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2243 const struct lu_rdpg *rdpg)
2246 struct dt_object *next = mdd_object_child(obj);
2247 const struct dt_it_ops *iops;
2249 struct lu_dirent *last = NULL;
2250 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2257 LASSERT(rdpg->rp_pages != NULL);
2258 LASSERT(next->do_index_ops != NULL);
2260 if (rdpg->rp_count <= 0)
2264 * iterate through directory and fill pages from @rdpg
2266 iops = &next->do_index_ops->dio_it;
2267 it = iops->init(env, next, mdd_object_capa(env, obj));
2271 rc = iops->load(env, it, rdpg->rp_hash);
2275 * Iterator didn't find record with exactly the key requested.
2277 * It is currently either
2279 * - positioned above record with key less than
2280 * requested---skip it.
2282 * - or not positioned at all (is in IAM_IT_SKEWED
2283 * state)---position it on the next item.
2285 rc = iops->next(env, it);
2290 * At this point and across for-loop:
2292 * rc == 0 -> ok, proceed.
2293 * rc > 0 -> end of directory.
2296 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2297 i++, nob -= CFS_PAGE_SIZE) {
2298 LASSERT(i < rdpg->rp_npages);
2299 pg = rdpg->rp_pages[i];
2300 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2301 min_t(int, nob, CFS_PAGE_SIZE), iops,
2302 it, &hash_start, &hash_end, &last,
2304 if (rc != 0 || i == rdpg->rp_npages - 1) {
2306 last->lde_reclen = 0;
2314 hash_end = DIR_END_OFF;
2318 struct lu_dirpage *dp;
2320 dp = cfs_kmap(rdpg->rp_pages[0]);
2321 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2322 dp->ldp_hash_end = cpu_to_le64(hash_end);
2325 * No pages were processed, mark this.
2327 dp->ldp_flags |= LDF_EMPTY;
2329 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2330 cfs_kunmap(rdpg->rp_pages[0]);
2333 iops->fini(env, it);
2338 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2339 const struct lu_rdpg *rdpg)
2341 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2345 LASSERT(mdd_object_exists(mdd_obj));
2347 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2348 rc = mdd_readpage_sanity_check(env, mdd_obj);
2350 GOTO(out_unlock, rc);
2352 if (mdd_is_dead_obj(mdd_obj)) {
2354 struct lu_dirpage *dp;
2357 * According to POSIX, please do not return any entry to client:
2358 * even dot and dotdot should not be returned.
2360 CWARN("readdir from dead object: "DFID"\n",
2361 PFID(mdd_object_fid(mdd_obj)));
2363 if (rdpg->rp_count <= 0)
2364 GOTO(out_unlock, rc = -EFAULT);
2365 LASSERT(rdpg->rp_pages != NULL);
2367 pg = rdpg->rp_pages[0];
2368 dp = (struct lu_dirpage*)cfs_kmap(pg);
2369 memset(dp, 0 , sizeof(struct lu_dirpage));
2370 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2371 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2372 dp->ldp_flags |= LDF_EMPTY;
2373 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2375 GOTO(out_unlock, rc = 0);
2378 rc = __mdd_readpage(env, mdd_obj, rdpg);
2382 mdd_read_unlock(env, mdd_obj);
2386 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2388 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2389 struct dt_object *next;
2391 LASSERT(mdd_object_exists(mdd_obj));
2392 next = mdd_object_child(mdd_obj);
2393 return next->do_ops->do_object_sync(env, next);
2396 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2397 struct md_object *obj)
2399 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2401 LASSERT(mdd_object_exists(mdd_obj));
2402 return do_version_get(env, mdd_object_child(mdd_obj));
2405 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2406 dt_obj_version_t version)
2408 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2410 LASSERT(mdd_object_exists(mdd_obj));
2411 do_version_set(env, mdd_object_child(mdd_obj), version);
2414 const struct md_object_operations mdd_obj_ops = {
2415 .moo_permission = mdd_permission,
2416 .moo_attr_get = mdd_attr_get,
2417 .moo_attr_set = mdd_attr_set,
2418 .moo_xattr_get = mdd_xattr_get,
2419 .moo_xattr_set = mdd_xattr_set,
2420 .moo_xattr_list = mdd_xattr_list,
2421 .moo_xattr_del = mdd_xattr_del,
2422 .moo_object_create = mdd_object_create,
2423 .moo_ref_add = mdd_ref_add,
2424 .moo_ref_del = mdd_ref_del,
2425 .moo_open = mdd_open,
2426 .moo_close = mdd_close,
2427 .moo_readpage = mdd_readpage,
2428 .moo_readlink = mdd_readlink,
2429 .moo_changelog = mdd_changelog,
2430 .moo_capa_get = mdd_capa_get,
2431 .moo_object_sync = mdd_object_sync,
2432 .moo_version_get = mdd_version_get,
2433 .moo_version_set = mdd_version_set,
2434 .moo_path = mdd_path,
2435 .moo_file_lock = mdd_file_lock,
2436 .moo_file_unlock = mdd_file_unlock,