1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
129 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135 const void *area, ssize_t len)
139 buf = &mdd_env_info(env)->mti_buf;
140 buf->lb_buf = (void *)area;
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
147 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
149 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL) {
155 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL)
162 /** Increase the size of the \a mti_big_buf.
163 * preserves old data in buffer
164 * old buffer remains unchanged on error
165 * \retval 0 or -ENOMEM
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
169 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
172 LASSERT(len >= oldbuf->lb_len);
173 OBD_ALLOC_LARGE(buf.lb_buf, len);
175 if (buf.lb_buf == NULL)
179 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE_LARGE(mti->mti_max_cookie,
198 mti->mti_max_cookie_size);
199 mti->mti_max_cookie = NULL;
200 mti->mti_max_cookie_size = 0;
202 if (unlikely(mti->mti_max_cookie == NULL)) {
203 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204 if (likely(mti->mti_max_cookie != NULL))
205 mti->mti_max_cookie_size = max_cookie_size;
207 if (likely(mti->mti_max_cookie != NULL))
208 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209 return mti->mti_max_cookie;
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213 struct mdd_device *mdd)
215 struct mdd_thread_info *mti = mdd_env_info(env);
218 max_lmm_size = mdd_lov_mdsize(env, mdd);
219 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220 if (mti->mti_max_lmm)
221 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
225 if (unlikely(mti->mti_max_lmm == NULL)) {
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = max_lmm_size;
230 return mti->mti_max_lmm;
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234 const struct lu_object_header *hdr,
237 struct mdd_object *mdd_obj;
239 OBD_ALLOC_PTR(mdd_obj);
240 if (mdd_obj != NULL) {
243 o = mdd2lu_obj(mdd_obj);
244 lu_object_init(o, NULL, d);
245 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247 mdd_obj->mod_count = 0;
248 o->lo_ops = &mdd_lu_obj_ops;
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256 const struct lu_object_conf *unused)
258 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259 struct mdd_object *mdd_obj = lu2mdd_obj(o);
260 struct lu_object *below;
261 struct lu_device *under;
264 mdd_obj->mod_cltime = 0;
265 under = &d->mdd_child->dd_lu_dev;
266 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267 mdd_pdlock_init(mdd_obj);
271 lu_object_add(o, below);
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
278 if (lu_object_exists(o))
279 return mdd_get_flags(env, lu2mdd_obj(o));
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
286 struct mdd_object *mdd = lu2mdd_obj(o);
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293 lu_printer_t p, const struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297 "valid=%x, cltime="LPU64", flags=%lx)",
298 mdd, mdd->mod_count, mdd->mod_valid,
299 mdd->mod_cltime, mdd->mod_flags);
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303 .loo_object_init = mdd_object_init,
304 .loo_object_start = mdd_object_start,
305 .loo_object_free = mdd_object_free,
306 .loo_object_print = mdd_object_print,
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310 struct mdd_device *d,
311 const struct lu_fid *f)
313 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317 const char *path, struct lu_fid *fid)
320 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321 struct mdd_object *obj;
322 struct lu_name *lname = &mdd_env_info(env)->mti_name;
327 /* temp buffer for path element */
328 buf = mdd_buf_alloc(env, PATH_MAX);
329 if (buf->lb_buf == NULL)
332 lname->ln_name = name = buf->lb_buf;
333 lname->ln_namelen = 0;
334 *f = mdd->mdd_root_fid;
341 while (*path != '/' && *path != '\0') {
349 /* find obj corresponding to fid */
350 obj = mdd_object_find(env, mdd, f);
352 GOTO(out, rc = -EREMOTE);
354 GOTO(out, rc = PTR_ERR(obj));
355 /* get child fid from parent and name */
356 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357 mdd_object_put(env, obj);
362 lname->ln_namelen = 0;
371 /** The maximum depth that fid2path() will search.
372 * This is limited only because we want to store the fids for
373 * historical path lookup purposes.
375 #define MAX_PATH_DEPTH 100
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379 __u64 pli_recno; /**< history point */
380 __u64 pli_currec; /**< current record */
381 struct lu_fid pli_fid;
382 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383 struct mdd_object *pli_mdd_obj;
384 char *pli_path; /**< full path */
386 int pli_linkno; /**< which hardlink to follow */
387 int pli_fidcount; /**< number of \a pli_fids */
390 static int mdd_path_current(const struct lu_env *env,
391 struct path_lookup_info *pli)
393 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394 struct mdd_object *mdd_obj;
395 struct lu_buf *buf = NULL;
396 struct link_ea_header *leh;
397 struct link_ea_entry *lee;
398 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
405 ptr = pli->pli_path + pli->pli_pathlen - 1;
408 pli->pli_fidcount = 0;
409 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
411 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412 mdd_obj = mdd_object_find(env, mdd,
413 &pli->pli_fids[pli->pli_fidcount]);
415 GOTO(out, rc = -EREMOTE);
417 GOTO(out, rc = PTR_ERR(mdd_obj));
418 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
420 mdd_object_put(env, mdd_obj);
424 /* Do I need to error out here? */
429 /* Get parent fid and object name */
430 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431 buf = mdd_links_get(env, mdd_obj);
432 mdd_read_unlock(env, mdd_obj);
433 mdd_object_put(env, mdd_obj);
435 GOTO(out, rc = PTR_ERR(buf));
438 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
441 /* If set, use link #linkno for path lookup, otherwise use
442 link #0. Only do this for the final path element. */
443 if ((pli->pli_fidcount == 0) &&
444 (pli->pli_linkno < leh->leh_reccount)) {
446 for (count = 0; count < pli->pli_linkno; count++) {
447 lee = (struct link_ea_entry *)
448 ((char *)lee + reclen);
449 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
451 if (pli->pli_linkno < leh->leh_reccount - 1)
452 /* indicate to user there are more links */
456 /* Pack the name in the end of the buffer */
457 ptr -= tmpname->ln_namelen;
458 if (ptr - 1 <= pli->pli_path)
459 GOTO(out, rc = -EOVERFLOW);
460 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
463 /* Store the parent fid for historic lookup */
464 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465 GOTO(out, rc = -EOVERFLOW);
466 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
469 /* Verify that our path hasn't changed since we started the lookup.
470 Record the current index, and verify the path resolves to the
471 same fid. If it does, then the path is correct as of this index. */
472 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473 pli->pli_currec = mdd->mdd_cl.mc_index;
474 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
477 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478 GOTO (out, rc = -EAGAIN);
480 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483 PFID(&pli->pli_fid));
484 GOTO(out, rc = -EAGAIN);
486 ptr++; /* skip leading / */
487 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492 /* if we vmalloced a large buffer drop it */
498 static int mdd_path_historic(const struct lu_env *env,
499 struct path_lookup_info *pli)
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 *recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
525 pli->pli_mdd_obj = md2mdd_obj(obj);
526 pli->pli_recno = *recno;
527 pli->pli_path = path;
528 pli->pli_pathlen = pathlen;
529 pli->pli_linkno = *linkno;
531 /* Retry multiple times in case file is being moved */
532 while (tries-- && rc == -EAGAIN)
533 rc = mdd_path_current(env, pli);
535 /* For historical path lookup, the current links may not have existed
536 * at "recno" time. We must switch over to earlier links/parents
537 * by using the changelog records. If the earlier parent doesn't
538 * exist, we must search back through the changelog to reconstruct
539 * its parents, then check if it exists, etc.
540 * We may ignore this problem for the initial implementation and
541 * state that an "original" hardlink must still exist for us to find
542 * historic path name. */
543 if (pli->pli_recno != -1) {
544 rc = mdd_path_historic(env, pli);
546 *recno = pli->pli_currec;
547 /* Return next link index to caller */
548 *linkno = pli->pli_linkno;
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
558 struct lu_attr *la = &mdd_env_info(env)->mti_la;
562 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
564 mdd_flags_xlate(obj, la->la_flags);
565 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566 obj->mod_flags |= MNLINK_OBJ;
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
578 if (ma->ma_valid & MA_INODE)
581 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582 mdd_object_capa(env, mdd_obj));
584 ma->ma_valid |= MA_INODE;
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
590 struct lov_desc *ldesc;
591 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592 struct lov_user_md *lum = (struct lov_user_md*)lmm;
598 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599 LASSERT(ldesc != NULL);
601 lum->lmm_magic = LOV_MAGIC_V1;
602 lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603 lum->lmm_pattern = ldesc->ld_pattern;
604 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
608 RETURN(sizeof(*lum));
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613 struct mdd_object *mdd_obj, struct md_attr *ma)
618 if (ma->ma_valid & MA_LOV)
621 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
623 if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
624 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
626 ma->ma_lmm_size = rc;
627 ma->ma_valid |= MA_LOV;
633 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
639 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
640 rc = __mdd_lmm_get(env, mdd_obj, ma);
641 mdd_read_unlock(env, mdd_obj);
646 static int __mdd_lmv_get(const struct lu_env *env,
647 struct mdd_object *mdd_obj, struct md_attr *ma)
652 if (ma->ma_valid & MA_LMV)
655 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
658 ma->ma_valid |= MA_LMV;
664 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
667 struct mdd_thread_info *info = mdd_env_info(env);
668 struct lustre_mdt_attrs *lma =
669 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
674 /* If all needed data are already valid, nothing to do */
675 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
676 (ma->ma_need & (MA_HSM | MA_SOM)))
679 /* Read LMA from disk EA */
680 lma_size = sizeof(info->mti_xattr_buf);
681 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
685 /* Useless to check LMA incompatibility because this is already done in
686 * osd_ea_fid_get(), and this will fail long before this code is
688 * So, if we are here, LMA is compatible.
691 lustre_lma_swab(lma);
693 /* Swab and copy LMA */
694 if (ma->ma_need & MA_HSM) {
695 if (lma->lma_compat & LMAC_HSM)
696 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
698 ma->ma_hsm.mh_flags = 0;
699 ma->ma_valid |= MA_HSM;
703 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
704 LASSERT(ma->ma_som != NULL);
705 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
706 ma->ma_som->msd_size = lma->lma_som_size;
707 ma->ma_som->msd_blocks = lma->lma_som_blocks;
708 ma->ma_som->msd_mountid = lma->lma_som_mountid;
709 ma->ma_valid |= MA_SOM;
715 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
721 if (ma->ma_need & MA_INODE)
722 rc = mdd_iattr_get(env, mdd_obj, ma);
724 if (rc == 0 && ma->ma_need & MA_LOV) {
725 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726 S_ISDIR(mdd_object_type(mdd_obj)))
727 rc = __mdd_lmm_get(env, mdd_obj, ma);
729 if (rc == 0 && ma->ma_need & MA_LMV) {
730 if (S_ISDIR(mdd_object_type(mdd_obj)))
731 rc = __mdd_lmv_get(env, mdd_obj, ma);
733 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
734 if (S_ISREG(mdd_object_type(mdd_obj)))
735 rc = __mdd_lma_get(env, mdd_obj, ma);
737 #ifdef CONFIG_FS_POSIX_ACL
738 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
739 if (S_ISDIR(mdd_object_type(mdd_obj)))
740 rc = mdd_def_acl_get(env, mdd_obj, ma);
743 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
744 rc, ma->ma_valid, ma->ma_lmm);
748 int mdd_attr_get_internal_locked(const struct lu_env *env,
749 struct mdd_object *mdd_obj, struct md_attr *ma)
752 int needlock = ma->ma_need &
753 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
756 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
757 rc = mdd_attr_get_internal(env, mdd_obj, ma);
759 mdd_read_unlock(env, mdd_obj);
764 * No permission check is needed.
766 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
769 struct mdd_object *mdd_obj = md2mdd_obj(obj);
773 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
778 * No permission check is needed.
780 static int mdd_xattr_get(const struct lu_env *env,
781 struct md_object *obj, struct lu_buf *buf,
784 struct mdd_object *mdd_obj = md2mdd_obj(obj);
789 LASSERT(mdd_object_exists(mdd_obj));
791 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792 rc = mdo_xattr_get(env, mdd_obj, buf, name,
793 mdd_object_capa(env, mdd_obj));
794 mdd_read_unlock(env, mdd_obj);
800 * Permission check is done when open,
801 * no need check again.
803 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
806 struct mdd_object *mdd_obj = md2mdd_obj(obj);
807 struct dt_object *next;
812 LASSERT(mdd_object_exists(mdd_obj));
814 next = mdd_object_child(mdd_obj);
815 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
816 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
817 mdd_object_capa(env, mdd_obj));
818 mdd_read_unlock(env, mdd_obj);
823 * No permission check is needed.
825 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
828 struct mdd_object *mdd_obj = md2mdd_obj(obj);
833 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
835 mdd_read_unlock(env, mdd_obj);
840 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
841 struct mdd_object *c, struct md_attr *ma,
842 struct thandle *handle,
843 const struct md_op_spec *spec)
845 struct lu_attr *attr = &ma->ma_attr;
846 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
847 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
848 const struct dt_index_features *feat = spec->sp_feat;
852 if (!mdd_object_exists(c)) {
853 struct dt_object *next = mdd_object_child(c);
856 if (feat != &dt_directory_features && feat != NULL)
857 dof->dof_type = DFT_INDEX;
859 dof->dof_type = dt_mode_to_dft(attr->la_mode);
861 dof->u.dof_idx.di_feat = feat;
863 /* @hint will be initialized by underlying device. */
864 next->do_ops->do_ah_init(env, hint,
865 p ? mdd_object_child(p) : NULL,
866 attr->la_mode & S_IFMT);
868 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
869 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
877 * Make sure the ctime is increased only.
879 static inline int mdd_attr_check(const struct lu_env *env,
880 struct mdd_object *obj,
881 struct lu_attr *attr)
883 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
887 if (attr->la_valid & LA_CTIME) {
888 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
892 if (attr->la_ctime < tmp_la->la_ctime)
893 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
894 else if (attr->la_valid == LA_CTIME &&
895 attr->la_ctime == tmp_la->la_ctime)
896 attr->la_valid &= ~LA_CTIME;
901 int mdd_attr_set_internal(const struct lu_env *env,
902 struct mdd_object *obj,
903 struct lu_attr *attr,
904 struct thandle *handle,
910 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
911 #ifdef CONFIG_FS_POSIX_ACL
912 if (!rc && (attr->la_valid & LA_MODE) && needacl)
913 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
918 int mdd_attr_check_set_internal(const struct lu_env *env,
919 struct mdd_object *obj,
920 struct lu_attr *attr,
921 struct thandle *handle,
927 rc = mdd_attr_check(env, obj, attr);
932 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
936 static int mdd_attr_set_internal_locked(const struct lu_env *env,
937 struct mdd_object *obj,
938 struct lu_attr *attr,
939 struct thandle *handle,
945 needacl = needacl && (attr->la_valid & LA_MODE);
947 mdd_write_lock(env, obj, MOR_TGT_CHILD);
948 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
950 mdd_write_unlock(env, obj);
954 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
955 struct mdd_object *obj,
956 struct lu_attr *attr,
957 struct thandle *handle,
963 needacl = needacl && (attr->la_valid & LA_MODE);
965 mdd_write_lock(env, obj, MOR_TGT_CHILD);
966 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
968 mdd_write_unlock(env, obj);
972 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
973 const struct lu_buf *buf, const char *name,
974 int fl, struct thandle *handle)
976 struct lustre_capa *capa = mdd_object_capa(env, obj);
980 if (buf->lb_buf && buf->lb_len > 0)
981 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
982 else if (buf->lb_buf == NULL && buf->lb_len == 0)
983 rc = mdo_xattr_del(env, obj, name, handle, capa);
989 * This gives the same functionality as the code between
990 * sys_chmod and inode_setattr
991 * chown_common and inode_setattr
992 * utimes and inode_setattr
993 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
995 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
996 struct lu_attr *la, const struct md_attr *ma)
998 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1006 /* Do not permit change file type */
1007 if (la->la_valid & LA_TYPE)
1010 /* They should not be processed by setattr */
1011 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1014 /* export destroy does not have ->le_ses, but we may want
1015 * to drop LUSTRE_SOM_FL. */
1021 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1025 if (la->la_valid == LA_CTIME) {
1026 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1027 /* This is only for set ctime when rename's source is
1029 rc = mdd_may_delete(env, NULL, obj,
1030 (struct md_attr *)ma, 1, 0);
1031 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1032 la->la_valid &= ~LA_CTIME;
1036 if (la->la_valid == LA_ATIME) {
1037 /* This is atime only set for read atime update on close. */
1038 if (la->la_atime >= tmp_la->la_atime &&
1039 la->la_atime < (tmp_la->la_atime +
1040 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1041 la->la_valid &= ~LA_ATIME;
1045 /* Check if flags change. */
1046 if (la->la_valid & LA_FLAGS) {
1047 unsigned int oldflags = 0;
1048 unsigned int newflags = la->la_flags &
1049 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1051 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1052 !mdd_capable(uc, CFS_CAP_FOWNER))
1055 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1056 * only be changed by the relevant capability. */
1057 if (mdd_is_immutable(obj))
1058 oldflags |= LUSTRE_IMMUTABLE_FL;
1059 if (mdd_is_append(obj))
1060 oldflags |= LUSTRE_APPEND_FL;
1061 if ((oldflags ^ newflags) &&
1062 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1065 if (!S_ISDIR(tmp_la->la_mode))
1066 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1069 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1070 (la->la_valid & ~LA_FLAGS) &&
1071 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1074 /* Check for setting the obj time. */
1075 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1076 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1077 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1078 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1079 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1087 if (la->la_valid & LA_KILL_SUID) {
1088 la->la_valid &= ~LA_KILL_SUID;
1089 if ((tmp_la->la_mode & S_ISUID) &&
1090 !(la->la_valid & LA_MODE)) {
1091 la->la_mode = tmp_la->la_mode;
1092 la->la_valid |= LA_MODE;
1094 la->la_mode &= ~S_ISUID;
1097 if (la->la_valid & LA_KILL_SGID) {
1098 la->la_valid &= ~LA_KILL_SGID;
1099 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1100 (S_ISGID | S_IXGRP)) &&
1101 !(la->la_valid & LA_MODE)) {
1102 la->la_mode = tmp_la->la_mode;
1103 la->la_valid |= LA_MODE;
1105 la->la_mode &= ~S_ISGID;
1108 /* Make sure a caller can chmod. */
1109 if (la->la_valid & LA_MODE) {
1110 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1111 (uc->mu_fsuid != tmp_la->la_uid) &&
1112 !mdd_capable(uc, CFS_CAP_FOWNER))
1115 if (la->la_mode == (cfs_umode_t) -1)
1116 la->la_mode = tmp_la->la_mode;
1118 la->la_mode = (la->la_mode & S_IALLUGO) |
1119 (tmp_la->la_mode & ~S_IALLUGO);
1121 /* Also check the setgid bit! */
1122 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1123 la->la_gid : tmp_la->la_gid) &&
1124 !mdd_capable(uc, CFS_CAP_FSETID))
1125 la->la_mode &= ~S_ISGID;
1127 la->la_mode = tmp_la->la_mode;
1130 /* Make sure a caller can chown. */
1131 if (la->la_valid & LA_UID) {
1132 if (la->la_uid == (uid_t) -1)
1133 la->la_uid = tmp_la->la_uid;
1134 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1135 (la->la_uid != tmp_la->la_uid)) &&
1136 !mdd_capable(uc, CFS_CAP_CHOWN))
1139 /* If the user or group of a non-directory has been
1140 * changed by a non-root user, remove the setuid bit.
1141 * 19981026 David C Niemi <niemi@tux.org>
1143 * Changed this to apply to all users, including root,
1144 * to avoid some races. This is the behavior we had in
1145 * 2.0. The check for non-root was definitely wrong
1146 * for 2.2 anyway, as it should have been using
1147 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1148 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1149 !S_ISDIR(tmp_la->la_mode)) {
1150 la->la_mode &= ~S_ISUID;
1151 la->la_valid |= LA_MODE;
1155 /* Make sure caller can chgrp. */
1156 if (la->la_valid & LA_GID) {
1157 if (la->la_gid == (gid_t) -1)
1158 la->la_gid = tmp_la->la_gid;
1159 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1160 ((la->la_gid != tmp_la->la_gid) &&
1161 !lustre_in_group_p(uc, la->la_gid))) &&
1162 !mdd_capable(uc, CFS_CAP_CHOWN))
1165 /* Likewise, if the user or group of a non-directory
1166 * has been changed by a non-root user, remove the
1167 * setgid bit UNLESS there is no group execute bit
1168 * (this would be a file marked for mandatory
1169 * locking). 19981026 David C Niemi <niemi@tux.org>
1171 * Removed the fsuid check (see the comment above) --
1173 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1174 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1175 la->la_mode &= ~S_ISGID;
1176 la->la_valid |= LA_MODE;
1180 /* For both Size-on-MDS case and truncate case,
1181 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1182 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1183 * For SOM case, it is true, the MAY_WRITE perm has been checked
1184 * when open, no need check again. For truncate case, it is false,
1185 * the MAY_WRITE perm should be checked here. */
1186 if (ma->ma_attr_flags & MDS_SOM) {
1187 /* For the "Size-on-MDS" setattr update, merge coming
1188 * attributes with the set in the inode. BUG 10641 */
1189 if ((la->la_valid & LA_ATIME) &&
1190 (la->la_atime <= tmp_la->la_atime))
1191 la->la_valid &= ~LA_ATIME;
1193 /* OST attributes do not have a priority over MDS attributes,
1194 * so drop times if ctime is equal. */
1195 if ((la->la_valid & LA_CTIME) &&
1196 (la->la_ctime <= tmp_la->la_ctime))
1197 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1199 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1200 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1201 (uc->mu_fsuid == tmp_la->la_uid)) &&
1202 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1203 rc = mdd_permission_internal_locked(env, obj,
1210 if (la->la_valid & LA_CTIME) {
1211 /* The pure setattr, it has the priority over what is
1212 * already set, do not drop it if ctime is equal. */
1213 if (la->la_ctime < tmp_la->la_ctime)
1214 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1222 /** Store a data change changelog record
1223 * If this fails, we must fail the whole transaction; we don't
1224 * want the change to commit without the log entry.
1225 * \param mdd_obj - mdd_object of change
1226 * \param handle - transacion handle
1228 static int mdd_changelog_data_store(const struct lu_env *env,
1229 struct mdd_device *mdd,
1230 enum changelog_rec_type type,
1232 struct mdd_object *mdd_obj,
1233 struct thandle *handle)
1235 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1236 struct llog_changelog_rec *rec;
1242 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1244 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1247 LASSERT(handle != NULL);
1248 LASSERT(mdd_obj != NULL);
1250 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1251 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1252 /* Don't need multiple updates in this log */
1253 /* Don't check under lock - no big deal if we get an extra
1258 reclen = llog_data_len(sizeof(*rec));
1259 buf = mdd_buf_alloc(env, reclen);
1260 if (buf->lb_buf == NULL)
1262 rec = (struct llog_changelog_rec *)buf->lb_buf;
1264 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1265 rec->cr.cr_type = (__u32)type;
1266 rec->cr.cr_tfid = *tfid;
1267 rec->cr.cr_namelen = 0;
1268 mdd_obj->mod_cltime = cfs_time_current_64();
1270 rc = mdd_changelog_llog_write(mdd, rec, handle);
1272 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1273 rc, type, PFID(tfid));
1280 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1281 int flags, struct md_object *obj)
1283 struct thandle *handle;
1284 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1285 struct mdd_device *mdd = mdo2mdd(obj);
1289 handle = mdd_trans_start(env, mdd);
1292 return(PTR_ERR(handle));
1294 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1297 mdd_trans_stop(env, mdd, rc, handle);
1303 * Should be called with write lock held.
1305 * \see mdd_lma_set_locked().
1307 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1308 const struct md_attr *ma, struct thandle *handle)
1310 struct mdd_thread_info *info = mdd_env_info(env);
1312 struct lustre_mdt_attrs *lma =
1313 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1314 int lmasize = sizeof(struct lustre_mdt_attrs);
1319 /* Either HSM or SOM part is not valid, we need to read it before */
1320 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1321 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1325 lustre_lma_swab(lma);
1327 memset(lma, 0, lmasize);
1331 if (ma->ma_valid & MA_HSM) {
1332 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1333 lma->lma_compat |= LMAC_HSM;
1337 if (ma->ma_valid & MA_SOM) {
1338 LASSERT(ma->ma_som != NULL);
1339 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1340 lma->lma_compat &= ~LMAC_SOM;
1342 lma->lma_compat |= LMAC_SOM;
1343 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1344 lma->lma_som_size = ma->ma_som->msd_size;
1345 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1346 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1351 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1353 lustre_lma_swab(lma);
1354 buf = mdd_buf_get(env, lma, lmasize);
1355 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1361 * Save LMA extended attributes with data from \a ma.
1363 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1364 * not, LMA EA will be first read from disk, modified and write back.
1367 static int mdd_lma_set_locked(const struct lu_env *env,
1368 struct mdd_object *mdd_obj,
1369 const struct md_attr *ma, struct thandle *handle)
1373 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1374 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1375 mdd_write_unlock(env, mdd_obj);
1379 /* Precedence for choosing record type when multiple
1380 * attributes change: setattr > mtime > ctime > atime
1381 * (ctime changes when mtime does, plus chmod/chown.
1382 * atime and ctime are independent.) */
1383 static int mdd_attr_set_changelog(const struct lu_env *env,
1384 struct md_object *obj, struct thandle *handle,
1387 struct mdd_device *mdd = mdo2mdd(obj);
1390 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1391 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1392 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1393 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1394 bits = bits & mdd->mdd_cl.mc_mask;
1398 /* The record type is the lowest non-masked set bit */
1399 while (bits && ((bits & 1) == 0)) {
1404 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1405 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1406 md2mdd_obj(obj), handle);
1409 /* set attr and LOV EA at once, return updated attr */
1410 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1411 const struct md_attr *ma)
1413 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1414 struct mdd_device *mdd = mdo2mdd(obj);
1415 struct thandle *handle;
1416 struct lov_mds_md *lmm = NULL;
1417 struct llog_cookie *logcookies = NULL;
1418 int rc, lmm_size = 0, cookie_size = 0;
1419 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1420 #ifdef HAVE_QUOTA_SUPPORT
1421 struct obd_device *obd = mdd->mdd_obd_dev;
1422 struct mds_obd *mds = &obd->u.mds;
1423 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1424 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1425 int quota_opc = 0, block_count = 0;
1426 int inode_pending[MAXQUOTAS] = { 0, 0 };
1427 int block_pending[MAXQUOTAS] = { 0, 0 };
1431 *la_copy = ma->ma_attr;
1432 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1436 /* setattr on "close" only change atime, or do nothing */
1437 if (ma->ma_valid == MA_INODE &&
1438 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1441 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1442 MDD_TXN_ATTR_SET_OP);
1443 handle = mdd_trans_start(env, mdd);
1445 RETURN(PTR_ERR(handle));
1446 /*TODO: add lock here*/
1447 /* start a log jounal handle if needed */
1448 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1449 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1450 lmm_size = mdd_lov_mdsize(env, mdd);
1451 lmm = mdd_max_lmm_get(env, mdd);
1453 GOTO(cleanup, rc = -ENOMEM);
1455 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1462 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1463 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1464 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1466 #ifdef HAVE_QUOTA_SUPPORT
1467 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1468 struct obd_export *exp = md_quota(env)->mq_exp;
1469 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1471 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1473 quota_opc = FSFILT_OP_SETATTR;
1474 mdd_quota_wrapper(la_copy, qnids);
1475 mdd_quota_wrapper(la_tmp, qoids);
1476 /* get file quota for new owner */
1477 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1478 qnids, inode_pending, 1, NULL, 0,
1480 block_count = (la_tmp->la_blocks + 7) >> 3;
1483 mdd_data_get(env, mdd_obj, &data);
1484 /* get block quota for new owner */
1485 lquota_chkquota(mds_quota_interface_ref, obd,
1486 exp, qnids, block_pending,
1488 LQUOTA_FLAGS_BLK, data, 1);
1494 if (la_copy->la_valid & LA_FLAGS) {
1495 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1498 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1499 } else if (la_copy->la_valid) { /* setattr */
1500 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1502 /* journal chown/chgrp in llog, just like unlink */
1503 if (rc == 0 && lmm_size){
1504 cookie_size = mdd_lov_cookiesize(env, mdd);
1505 logcookies = mdd_max_cookie_get(env, mdd);
1506 if (logcookies == NULL)
1507 GOTO(cleanup, rc = -ENOMEM);
1509 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1510 logcookies, cookie_size) <= 0)
1515 if (rc == 0 && ma->ma_valid & MA_LOV) {
1518 mode = mdd_object_type(mdd_obj);
1519 if (S_ISREG(mode) || S_ISDIR(mode)) {
1520 rc = mdd_lsm_sanity_check(env, mdd_obj);
1524 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1525 ma->ma_lmm_size, handle, 1);
1529 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1532 mode = mdd_object_type(mdd_obj);
1534 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1539 rc = mdd_attr_set_changelog(env, obj, handle,
1540 ma->ma_attr.la_valid);
1541 mdd_trans_stop(env, mdd, rc, handle);
1542 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1543 /*set obd attr, if needed*/
1544 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1547 #ifdef HAVE_QUOTA_SUPPORT
1549 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1551 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1553 /* Trigger dqrel/dqacq for original owner and new owner.
1554 * If failed, the next call for lquota_chkquota will
1556 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1563 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1564 const struct lu_buf *buf, const char *name, int fl,
1565 struct thandle *handle)
1570 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1571 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1572 mdd_write_unlock(env, obj);
1577 static int mdd_xattr_sanity_check(const struct lu_env *env,
1578 struct mdd_object *obj)
1580 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1581 struct md_ucred *uc = md_ucred(env);
1585 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1588 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1592 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1593 !mdd_capable(uc, CFS_CAP_FOWNER))
1600 * The caller should guarantee to update the object ctime
1601 * after xattr_set if needed.
1603 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1604 const struct lu_buf *buf, const char *name,
1607 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1608 struct mdd_device *mdd = mdo2mdd(obj);
1609 struct thandle *handle;
1613 rc = mdd_xattr_sanity_check(env, mdd_obj);
1617 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1618 /* security-replated changes may require sync */
1619 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1620 mdd->mdd_sync_permission == 1)
1621 txn_param_sync(&mdd_env_info(env)->mti_param);
1623 handle = mdd_trans_start(env, mdd);
1625 RETURN(PTR_ERR(handle));
1627 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1629 /* Only record user xattr changes */
1630 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1631 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1633 mdd_trans_stop(env, mdd, rc, handle);
1639 * The caller should guarantee to update the object ctime
1640 * after xattr_set if needed.
1642 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1645 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1646 struct mdd_device *mdd = mdo2mdd(obj);
1647 struct thandle *handle;
1651 rc = mdd_xattr_sanity_check(env, mdd_obj);
1655 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1656 handle = mdd_trans_start(env, mdd);
1658 RETURN(PTR_ERR(handle));
1660 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1661 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1662 mdd_object_capa(env, mdd_obj));
1663 mdd_write_unlock(env, mdd_obj);
1665 /* Only record user xattr changes */
1666 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1667 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1670 mdd_trans_stop(env, mdd, rc, handle);
1675 /* partial unlink */
1676 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1679 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1680 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1681 struct mdd_device *mdd = mdo2mdd(obj);
1682 struct thandle *handle;
1683 #ifdef HAVE_QUOTA_SUPPORT
1684 struct obd_device *obd = mdd->mdd_obd_dev;
1685 struct mds_obd *mds = &obd->u.mds;
1686 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1693 * Check -ENOENT early here because we need to get object type
1694 * to calculate credits before transaction start
1696 if (!mdd_object_exists(mdd_obj))
1699 LASSERT(mdd_object_exists(mdd_obj) > 0);
1701 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1705 handle = mdd_trans_start(env, mdd);
1709 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1711 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1715 __mdd_ref_del(env, mdd_obj, handle, 0);
1717 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1719 __mdd_ref_del(env, mdd_obj, handle, 1);
1722 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1723 la_copy->la_ctime = ma->ma_attr.la_ctime;
1725 la_copy->la_valid = LA_CTIME;
1726 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1730 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1731 #ifdef HAVE_QUOTA_SUPPORT
1732 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1733 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1734 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1735 mdd_quota_wrapper(&ma->ma_attr, qids);
1742 mdd_write_unlock(env, mdd_obj);
1743 mdd_trans_stop(env, mdd, rc, handle);
1744 #ifdef HAVE_QUOTA_SUPPORT
1746 /* Trigger dqrel on the owner of child. If failed,
1747 * the next call for lquota_chkquota will process it */
1748 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1754 /* partial operation */
1755 static int mdd_oc_sanity_check(const struct lu_env *env,
1756 struct mdd_object *obj,
1762 switch (ma->ma_attr.la_mode & S_IFMT) {
1779 static int mdd_object_create(const struct lu_env *env,
1780 struct md_object *obj,
1781 const struct md_op_spec *spec,
1785 struct mdd_device *mdd = mdo2mdd(obj);
1786 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1787 const struct lu_fid *pfid = spec->u.sp_pfid;
1788 struct thandle *handle;
1789 #ifdef HAVE_QUOTA_SUPPORT
1790 struct obd_device *obd = mdd->mdd_obd_dev;
1791 struct obd_export *exp = md_quota(env)->mq_exp;
1792 struct mds_obd *mds = &obd->u.mds;
1793 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1794 int quota_opc = 0, block_count = 0;
1795 int inode_pending[MAXQUOTAS] = { 0, 0 };
1796 int block_pending[MAXQUOTAS] = { 0, 0 };
1801 #ifdef HAVE_QUOTA_SUPPORT
1802 if (mds->mds_quota) {
1803 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1804 mdd_quota_wrapper(&ma->ma_attr, qids);
1805 /* get file quota for child */
1806 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1807 qids, inode_pending, 1, NULL, 0,
1809 switch (ma->ma_attr.la_mode & S_IFMT) {
1818 /* get block quota for child */
1820 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1821 qids, block_pending, block_count,
1822 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1826 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1827 handle = mdd_trans_start(env, mdd);
1829 GOTO(out_pending, rc = PTR_ERR(handle));
1831 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1832 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1836 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1840 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1841 /* If creating the slave object, set slave EA here. */
1842 int lmv_size = spec->u.sp_ea.eadatalen;
1843 struct lmv_stripe_md *lmv;
1845 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1846 LASSERT(lmv != NULL && lmv_size > 0);
1848 rc = __mdd_xattr_set(env, mdd_obj,
1849 mdd_buf_get_const(env, lmv, lmv_size),
1850 XATTR_NAME_LMV, 0, handle);
1854 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1857 #ifdef CONFIG_FS_POSIX_ACL
1858 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1859 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1861 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1862 buf->lb_len = spec->u.sp_ea.eadatalen;
1863 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1864 rc = __mdd_acl_init(env, mdd_obj, buf,
1865 &ma->ma_attr.la_mode,
1870 ma->ma_attr.la_valid |= LA_MODE;
1873 pfid = spec->u.sp_ea.fid;
1876 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1882 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1883 mdd_write_unlock(env, mdd_obj);
1885 mdd_trans_stop(env, mdd, rc, handle);
1887 #ifdef HAVE_QUOTA_SUPPORT
1889 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1891 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1893 /* Trigger dqacq on the owner of child. If failed,
1894 * the next call for lquota_chkquota will process it. */
1895 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1903 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1904 const struct md_attr *ma)
1906 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1907 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1908 struct mdd_device *mdd = mdo2mdd(obj);
1909 struct thandle *handle;
1913 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1914 handle = mdd_trans_start(env, mdd);
1918 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1919 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1921 __mdd_ref_add(env, mdd_obj, handle);
1922 mdd_write_unlock(env, mdd_obj);
1924 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1925 la_copy->la_ctime = ma->ma_attr.la_ctime;
1927 la_copy->la_valid = LA_CTIME;
1928 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1931 mdd_trans_stop(env, mdd, 0, handle);
1937 * do NOT or the MAY_*'s, you'll get the weakest
1939 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1943 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1944 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1945 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1946 * owner can write to a file even if it is marked readonly to hide
1947 * its brokenness. (bug 5781) */
1948 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1949 struct md_ucred *uc = md_ucred(env);
1951 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1952 (la->la_uid == uc->mu_fsuid))
1956 if (flags & FMODE_READ)
1958 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1960 if (flags & MDS_FMODE_EXEC)
1965 static int mdd_open_sanity_check(const struct lu_env *env,
1966 struct mdd_object *obj, int flag)
1968 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1973 if (mdd_is_dead_obj(obj))
1976 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1980 if (S_ISLNK(tmp_la->la_mode))
1983 mode = accmode(env, tmp_la, flag);
1985 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1988 if (!(flag & MDS_OPEN_CREATED)) {
1989 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1994 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1995 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1996 flag &= ~MDS_OPEN_TRUNC;
1998 /* For writing append-only file must open it with append mode. */
1999 if (mdd_is_append(obj)) {
2000 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2002 if (flag & MDS_OPEN_TRUNC)
2008 * Now, flag -- O_NOATIME does not be packed by client.
2010 if (flag & O_NOATIME) {
2011 struct md_ucred *uc = md_ucred(env);
2013 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2014 (uc->mu_valid == UCRED_NEW)) &&
2015 (uc->mu_fsuid != tmp_la->la_uid) &&
2016 !mdd_capable(uc, CFS_CAP_FOWNER))
2024 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2027 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2030 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2032 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2034 mdd_obj->mod_count++;
2036 mdd_write_unlock(env, mdd_obj);
2040 /* return md_attr back,
2041 * if it is last unlink then return lov ea + llog cookie*/
2042 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2048 if (S_ISREG(mdd_object_type(obj))) {
2049 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2050 * Caller must be ready for that. */
2052 rc = __mdd_lmm_get(env, obj, ma);
2053 if ((ma->ma_valid & MA_LOV))
2054 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2061 * No permission check is needed.
2063 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2066 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2067 struct mdd_device *mdd = mdo2mdd(obj);
2068 struct thandle *handle = NULL;
2072 #ifdef HAVE_QUOTA_SUPPORT
2073 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2074 struct mds_obd *mds = &obd->u.mds;
2075 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2080 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2081 mdd_obj->mod_count--;
2083 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2084 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2085 "list\n", PFID(mdd_object_fid(mdd_obj)));
2089 /* check without any lock */
2090 if (mdd_obj->mod_count == 1 &&
2091 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2093 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2096 handle = mdd_trans_start(env, mdo2mdd(obj));
2098 RETURN(PTR_ERR(handle));
2101 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2102 if (handle == NULL && mdd_obj->mod_count == 1 &&
2103 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2104 mdd_write_unlock(env, mdd_obj);
2108 /* release open count */
2109 mdd_obj->mod_count --;
2111 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2112 /* remove link to object from orphan index */
2113 rc = __mdd_orphan_del(env, mdd_obj, handle);
2115 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2116 "list, OSS objects to be destroyed.\n",
2117 PFID(mdd_object_fid(mdd_obj)));
2119 CERROR("Object "DFID" can not be deleted from orphan "
2120 "list, maybe cause OST objects can not be "
2121 "destroyed (err: %d).\n",
2122 PFID(mdd_object_fid(mdd_obj)), rc);
2123 /* If object was not deleted from orphan list, do not
2124 * destroy OSS objects, which will be done when next
2130 rc = mdd_iattr_get(env, mdd_obj, ma);
2131 /* Object maybe not in orphan list originally, it is rare case for
2132 * mdd_finish_unlink() failure. */
2133 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2134 #ifdef HAVE_QUOTA_SUPPORT
2135 if (mds->mds_quota) {
2136 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2137 mdd_quota_wrapper(&ma->ma_attr, qids);
2140 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2141 if (ma->ma_valid & MA_FLAGS &&
2142 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2143 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2145 rc = mdd_object_kill(env, mdd_obj, ma);
2151 CERROR("Error when prepare to delete Object "DFID" , "
2152 "which will cause OST objects can not be "
2153 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2159 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2161 mdd_write_unlock(env, mdd_obj);
2163 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2164 #ifdef HAVE_QUOTA_SUPPORT
2166 /* Trigger dqrel on the owner of child. If failed,
2167 * the next call for lquota_chkquota will process it */
2168 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2175 * Permission check is done when open,
2176 * no need check again.
2178 static int mdd_readpage_sanity_check(const struct lu_env *env,
2179 struct mdd_object *obj)
2181 struct dt_object *next = mdd_object_child(obj);
2185 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2193 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2194 int first, void *area, int nob,
2195 const struct dt_it_ops *iops, struct dt_it *it,
2196 __u64 *start, __u64 *end,
2197 struct lu_dirent **last, __u32 attr)
2201 struct lu_dirent *ent;
2204 memset(area, 0, sizeof (struct lu_dirpage));
2205 area += sizeof (struct lu_dirpage);
2206 nob -= sizeof (struct lu_dirpage);
2214 len = iops->key_size(env, it);
2216 /* IAM iterator can return record with zero len. */
2220 hash = iops->store(env, it);
2221 if (unlikely(first)) {
2226 /* calculate max space required for lu_dirent */
2227 recsize = lu_dirent_calc_size(len, attr);
2229 if (nob >= recsize) {
2230 result = iops->rec(env, it, ent, attr);
2231 if (result == -ESTALE)
2236 /* osd might not able to pack all attributes,
2237 * so recheck rec length */
2238 recsize = le16_to_cpu(ent->lde_reclen);
2241 * record doesn't fit into page, enlarge previous one.
2244 (*last)->lde_reclen =
2245 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2254 ent = (void *)ent + recsize;
2258 result = iops->next(env, it);
2259 if (result == -ESTALE)
2261 } while (result == 0);
2268 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2269 const struct lu_rdpg *rdpg)
2272 struct dt_object *next = mdd_object_child(obj);
2273 const struct dt_it_ops *iops;
2275 struct lu_dirent *last = NULL;
2276 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2283 LASSERT(rdpg->rp_pages != NULL);
2284 LASSERT(next->do_index_ops != NULL);
2286 if (rdpg->rp_count <= 0)
2290 * iterate through directory and fill pages from @rdpg
2292 iops = &next->do_index_ops->dio_it;
2293 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2297 rc = iops->load(env, it, rdpg->rp_hash);
2301 * Iterator didn't find record with exactly the key requested.
2303 * It is currently either
2305 * - positioned above record with key less than
2306 * requested---skip it.
2308 * - or not positioned at all (is in IAM_IT_SKEWED
2309 * state)---position it on the next item.
2311 rc = iops->next(env, it);
2316 * At this point and across for-loop:
2318 * rc == 0 -> ok, proceed.
2319 * rc > 0 -> end of directory.
2322 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2323 i++, nob -= CFS_PAGE_SIZE) {
2324 LASSERT(i < rdpg->rp_npages);
2325 pg = rdpg->rp_pages[i];
2326 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2327 min_t(int, nob, CFS_PAGE_SIZE), iops,
2328 it, &hash_start, &hash_end, &last,
2330 if (rc != 0 || i == rdpg->rp_npages - 1) {
2332 last->lde_reclen = 0;
2340 hash_end = DIR_END_OFF;
2344 struct lu_dirpage *dp;
2346 dp = cfs_kmap(rdpg->rp_pages[0]);
2347 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2348 dp->ldp_hash_end = cpu_to_le64(hash_end);
2351 * No pages were processed, mark this.
2353 dp->ldp_flags |= LDF_EMPTY;
2355 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2356 cfs_kunmap(rdpg->rp_pages[0]);
2359 iops->fini(env, it);
2364 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2365 const struct lu_rdpg *rdpg)
2367 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2371 LASSERT(mdd_object_exists(mdd_obj));
2373 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2374 rc = mdd_readpage_sanity_check(env, mdd_obj);
2376 GOTO(out_unlock, rc);
2378 if (mdd_is_dead_obj(mdd_obj)) {
2380 struct lu_dirpage *dp;
2383 * According to POSIX, please do not return any entry to client:
2384 * even dot and dotdot should not be returned.
2386 CWARN("readdir from dead object: "DFID"\n",
2387 PFID(mdd_object_fid(mdd_obj)));
2389 if (rdpg->rp_count <= 0)
2390 GOTO(out_unlock, rc = -EFAULT);
2391 LASSERT(rdpg->rp_pages != NULL);
2393 pg = rdpg->rp_pages[0];
2394 dp = (struct lu_dirpage*)cfs_kmap(pg);
2395 memset(dp, 0 , sizeof(struct lu_dirpage));
2396 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2397 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2398 dp->ldp_flags |= LDF_EMPTY;
2399 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2401 GOTO(out_unlock, rc = 0);
2404 rc = __mdd_readpage(env, mdd_obj, rdpg);
2408 mdd_read_unlock(env, mdd_obj);
2412 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2414 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2415 struct dt_object *next;
2417 LASSERT(mdd_object_exists(mdd_obj));
2418 next = mdd_object_child(mdd_obj);
2419 return next->do_ops->do_object_sync(env, next);
2422 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2423 struct md_object *obj)
2425 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2427 LASSERT(mdd_object_exists(mdd_obj));
2428 return do_version_get(env, mdd_object_child(mdd_obj));
2431 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2432 dt_obj_version_t version)
2434 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2436 LASSERT(mdd_object_exists(mdd_obj));
2437 do_version_set(env, mdd_object_child(mdd_obj), version);
2440 const struct md_object_operations mdd_obj_ops = {
2441 .moo_permission = mdd_permission,
2442 .moo_attr_get = mdd_attr_get,
2443 .moo_attr_set = mdd_attr_set,
2444 .moo_xattr_get = mdd_xattr_get,
2445 .moo_xattr_set = mdd_xattr_set,
2446 .moo_xattr_list = mdd_xattr_list,
2447 .moo_xattr_del = mdd_xattr_del,
2448 .moo_object_create = mdd_object_create,
2449 .moo_ref_add = mdd_ref_add,
2450 .moo_ref_del = mdd_ref_del,
2451 .moo_open = mdd_open,
2452 .moo_close = mdd_close,
2453 .moo_readpage = mdd_readpage,
2454 .moo_readlink = mdd_readlink,
2455 .moo_changelog = mdd_changelog,
2456 .moo_capa_get = mdd_capa_get,
2457 .moo_object_sync = mdd_object_sync,
2458 .moo_version_get = mdd_version_get,
2459 .moo_version_set = mdd_version_set,
2460 .moo_path = mdd_path,
2461 .moo_file_lock = mdd_file_lock,
2462 .moo_file_unlock = mdd_file_unlock,