1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
129 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135 const void *area, ssize_t len)
139 buf = &mdd_env_info(env)->mti_buf;
140 buf->lb_buf = (void *)area;
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
147 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
149 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL) {
155 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL)
162 /** Increase the size of the \a mti_big_buf.
163 * preserves old data in buffer
164 * old buffer remains unchanged on error
165 * \retval 0 or -ENOMEM
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
169 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
172 LASSERT(len >= oldbuf->lb_len);
173 OBD_ALLOC_LARGE(buf.lb_buf, len);
175 if (buf.lb_buf == NULL)
179 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE_LARGE(mti->mti_max_cookie,
198 mti->mti_max_cookie_size);
199 mti->mti_max_cookie = NULL;
200 mti->mti_max_cookie_size = 0;
202 if (unlikely(mti->mti_max_cookie == NULL)) {
203 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204 if (likely(mti->mti_max_cookie != NULL))
205 mti->mti_max_cookie_size = max_cookie_size;
207 if (likely(mti->mti_max_cookie != NULL))
208 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209 return mti->mti_max_cookie;
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213 struct mdd_device *mdd)
215 struct mdd_thread_info *mti = mdd_env_info(env);
218 max_lmm_size = mdd_lov_mdsize(env, mdd);
219 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220 if (mti->mti_max_lmm)
221 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
225 if (unlikely(mti->mti_max_lmm == NULL)) {
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = max_lmm_size;
230 return mti->mti_max_lmm;
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234 const struct lu_object_header *hdr,
237 struct mdd_object *mdd_obj;
239 OBD_ALLOC_PTR(mdd_obj);
240 if (mdd_obj != NULL) {
243 o = mdd2lu_obj(mdd_obj);
244 lu_object_init(o, NULL, d);
245 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247 mdd_obj->mod_count = 0;
248 o->lo_ops = &mdd_lu_obj_ops;
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256 const struct lu_object_conf *unused)
258 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259 struct mdd_object *mdd_obj = lu2mdd_obj(o);
260 struct lu_object *below;
261 struct lu_device *under;
264 mdd_obj->mod_cltime = 0;
265 under = &d->mdd_child->dd_lu_dev;
266 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267 mdd_pdlock_init(mdd_obj);
271 lu_object_add(o, below);
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
278 if (lu_object_exists(o))
279 return mdd_get_flags(env, lu2mdd_obj(o));
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
286 struct mdd_object *mdd = lu2mdd_obj(o);
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293 lu_printer_t p, const struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297 "valid=%x, cltime="LPU64", flags=%lx)",
298 mdd, mdd->mod_count, mdd->mod_valid,
299 mdd->mod_cltime, mdd->mod_flags);
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303 .loo_object_init = mdd_object_init,
304 .loo_object_start = mdd_object_start,
305 .loo_object_free = mdd_object_free,
306 .loo_object_print = mdd_object_print,
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310 struct mdd_device *d,
311 const struct lu_fid *f)
313 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317 const char *path, struct lu_fid *fid)
320 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321 struct mdd_object *obj;
322 struct lu_name *lname = &mdd_env_info(env)->mti_name;
327 /* temp buffer for path element */
328 buf = mdd_buf_alloc(env, PATH_MAX);
329 if (buf->lb_buf == NULL)
332 lname->ln_name = name = buf->lb_buf;
333 lname->ln_namelen = 0;
334 *f = mdd->mdd_root_fid;
341 while (*path != '/' && *path != '\0') {
349 /* find obj corresponding to fid */
350 obj = mdd_object_find(env, mdd, f);
352 GOTO(out, rc = -EREMOTE);
354 GOTO(out, rc = PTR_ERR(obj));
355 /* get child fid from parent and name */
356 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357 mdd_object_put(env, obj);
362 lname->ln_namelen = 0;
371 /** The maximum depth that fid2path() will search.
372 * This is limited only because we want to store the fids for
373 * historical path lookup purposes.
375 #define MAX_PATH_DEPTH 100
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379 __u64 pli_recno; /**< history point */
380 __u64 pli_currec; /**< current record */
381 struct lu_fid pli_fid;
382 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383 struct mdd_object *pli_mdd_obj;
384 char *pli_path; /**< full path */
386 int pli_linkno; /**< which hardlink to follow */
387 int pli_fidcount; /**< number of \a pli_fids */
390 static int mdd_path_current(const struct lu_env *env,
391 struct path_lookup_info *pli)
393 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394 struct mdd_object *mdd_obj;
395 struct lu_buf *buf = NULL;
396 struct link_ea_header *leh;
397 struct link_ea_entry *lee;
398 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
405 ptr = pli->pli_path + pli->pli_pathlen - 1;
408 pli->pli_fidcount = 0;
409 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
411 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412 mdd_obj = mdd_object_find(env, mdd,
413 &pli->pli_fids[pli->pli_fidcount]);
415 GOTO(out, rc = -EREMOTE);
417 GOTO(out, rc = PTR_ERR(mdd_obj));
418 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
420 mdd_object_put(env, mdd_obj);
424 /* Do I need to error out here? */
429 /* Get parent fid and object name */
430 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431 buf = mdd_links_get(env, mdd_obj);
432 mdd_read_unlock(env, mdd_obj);
433 mdd_object_put(env, mdd_obj);
435 GOTO(out, rc = PTR_ERR(buf));
438 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
441 /* If set, use link #linkno for path lookup, otherwise use
442 link #0. Only do this for the final path element. */
443 if ((pli->pli_fidcount == 0) &&
444 (pli->pli_linkno < leh->leh_reccount)) {
446 for (count = 0; count < pli->pli_linkno; count++) {
447 lee = (struct link_ea_entry *)
448 ((char *)lee + reclen);
449 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
451 if (pli->pli_linkno < leh->leh_reccount - 1)
452 /* indicate to user there are more links */
456 /* Pack the name in the end of the buffer */
457 ptr -= tmpname->ln_namelen;
458 if (ptr - 1 <= pli->pli_path)
459 GOTO(out, rc = -EOVERFLOW);
460 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
463 /* Store the parent fid for historic lookup */
464 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465 GOTO(out, rc = -EOVERFLOW);
466 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
469 /* Verify that our path hasn't changed since we started the lookup.
470 Record the current index, and verify the path resolves to the
471 same fid. If it does, then the path is correct as of this index. */
472 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473 pli->pli_currec = mdd->mdd_cl.mc_index;
474 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
477 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478 GOTO (out, rc = -EAGAIN);
480 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483 PFID(&pli->pli_fid));
484 GOTO(out, rc = -EAGAIN);
486 ptr++; /* skip leading / */
487 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492 /* if we vmalloced a large buffer drop it */
498 static int mdd_path_historic(const struct lu_env *env,
499 struct path_lookup_info *pli)
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 *recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
525 pli->pli_mdd_obj = md2mdd_obj(obj);
526 pli->pli_recno = *recno;
527 pli->pli_path = path;
528 pli->pli_pathlen = pathlen;
529 pli->pli_linkno = *linkno;
531 /* Retry multiple times in case file is being moved */
532 while (tries-- && rc == -EAGAIN)
533 rc = mdd_path_current(env, pli);
535 /* For historical path lookup, the current links may not have existed
536 * at "recno" time. We must switch over to earlier links/parents
537 * by using the changelog records. If the earlier parent doesn't
538 * exist, we must search back through the changelog to reconstruct
539 * its parents, then check if it exists, etc.
540 * We may ignore this problem for the initial implementation and
541 * state that an "original" hardlink must still exist for us to find
542 * historic path name. */
543 if (pli->pli_recno != -1) {
544 rc = mdd_path_historic(env, pli);
546 *recno = pli->pli_currec;
547 /* Return next link index to caller */
548 *linkno = pli->pli_linkno;
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
558 struct lu_attr *la = &mdd_env_info(env)->mti_la;
562 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
564 mdd_flags_xlate(obj, la->la_flags);
565 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566 obj->mod_flags |= MNLINK_OBJ;
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
578 if (ma->ma_valid & MA_INODE)
581 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582 mdd_object_capa(env, mdd_obj));
584 ma->ma_valid |= MA_INODE;
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
590 struct lov_desc *ldesc;
591 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592 struct lov_user_md *lum = (struct lov_user_md*)lmm;
598 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599 LASSERT(ldesc != NULL);
601 lum->lmm_magic = LOV_MAGIC_V1;
602 lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603 lum->lmm_pattern = ldesc->ld_pattern;
604 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
608 RETURN(sizeof(*lum));
611 static int is_rootdir(struct mdd_object *mdd_obj)
613 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
614 const struct lu_fid *fid = mdo2fid(mdd_obj);
616 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
619 /* get lov EA only */
620 static int __mdd_lmm_get(const struct lu_env *env,
621 struct mdd_object *mdd_obj, struct md_attr *ma)
626 if (ma->ma_valid & MA_LOV)
629 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
631 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
632 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
634 ma->ma_lmm_size = rc;
635 ma->ma_valid |= MA_LOV;
641 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
647 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
648 rc = __mdd_lmm_get(env, mdd_obj, ma);
649 mdd_read_unlock(env, mdd_obj);
654 static int __mdd_lmv_get(const struct lu_env *env,
655 struct mdd_object *mdd_obj, struct md_attr *ma)
660 if (ma->ma_valid & MA_LMV)
663 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
666 ma->ma_valid |= MA_LMV;
672 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
675 struct mdd_thread_info *info = mdd_env_info(env);
676 struct lustre_mdt_attrs *lma =
677 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
682 /* If all needed data are already valid, nothing to do */
683 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
684 (ma->ma_need & (MA_HSM | MA_SOM)))
687 /* Read LMA from disk EA */
688 lma_size = sizeof(info->mti_xattr_buf);
689 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
693 /* Useless to check LMA incompatibility because this is already done in
694 * osd_ea_fid_get(), and this will fail long before this code is
696 * So, if we are here, LMA is compatible.
699 lustre_lma_swab(lma);
701 /* Swab and copy LMA */
702 if (ma->ma_need & MA_HSM) {
703 if (lma->lma_compat & LMAC_HSM)
704 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
706 ma->ma_hsm.mh_flags = 0;
707 ma->ma_valid |= MA_HSM;
711 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
712 LASSERT(ma->ma_som != NULL);
713 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
714 ma->ma_som->msd_size = lma->lma_som_size;
715 ma->ma_som->msd_blocks = lma->lma_som_blocks;
716 ma->ma_som->msd_mountid = lma->lma_som_mountid;
717 ma->ma_valid |= MA_SOM;
723 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
729 if (ma->ma_need & MA_INODE)
730 rc = mdd_iattr_get(env, mdd_obj, ma);
732 if (rc == 0 && ma->ma_need & MA_LOV) {
733 if (S_ISREG(mdd_object_type(mdd_obj)) ||
734 S_ISDIR(mdd_object_type(mdd_obj)))
735 rc = __mdd_lmm_get(env, mdd_obj, ma);
737 if (rc == 0 && ma->ma_need & MA_LMV) {
738 if (S_ISDIR(mdd_object_type(mdd_obj)))
739 rc = __mdd_lmv_get(env, mdd_obj, ma);
741 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
742 if (S_ISREG(mdd_object_type(mdd_obj)))
743 rc = __mdd_lma_get(env, mdd_obj, ma);
745 #ifdef CONFIG_FS_POSIX_ACL
746 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
747 if (S_ISDIR(mdd_object_type(mdd_obj)))
748 rc = mdd_def_acl_get(env, mdd_obj, ma);
751 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
752 rc, ma->ma_valid, ma->ma_lmm);
756 int mdd_attr_get_internal_locked(const struct lu_env *env,
757 struct mdd_object *mdd_obj, struct md_attr *ma)
760 int needlock = ma->ma_need &
761 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
764 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
765 rc = mdd_attr_get_internal(env, mdd_obj, ma);
767 mdd_read_unlock(env, mdd_obj);
772 * No permission check is needed.
774 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
777 struct mdd_object *mdd_obj = md2mdd_obj(obj);
781 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
786 * No permission check is needed.
788 static int mdd_xattr_get(const struct lu_env *env,
789 struct md_object *obj, struct lu_buf *buf,
792 struct mdd_object *mdd_obj = md2mdd_obj(obj);
797 LASSERT(mdd_object_exists(mdd_obj));
799 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800 rc = mdo_xattr_get(env, mdd_obj, buf, name,
801 mdd_object_capa(env, mdd_obj));
802 mdd_read_unlock(env, mdd_obj);
808 * Permission check is done when open,
809 * no need check again.
811 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
814 struct mdd_object *mdd_obj = md2mdd_obj(obj);
815 struct dt_object *next;
820 LASSERT(mdd_object_exists(mdd_obj));
822 next = mdd_object_child(mdd_obj);
823 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
824 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
825 mdd_object_capa(env, mdd_obj));
826 mdd_read_unlock(env, mdd_obj);
831 * No permission check is needed.
833 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
836 struct mdd_object *mdd_obj = md2mdd_obj(obj);
841 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
842 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
843 mdd_read_unlock(env, mdd_obj);
848 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
849 struct mdd_object *c, struct md_attr *ma,
850 struct thandle *handle,
851 const struct md_op_spec *spec)
853 struct lu_attr *attr = &ma->ma_attr;
854 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
855 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
856 const struct dt_index_features *feat = spec->sp_feat;
860 if (!mdd_object_exists(c)) {
861 struct dt_object *next = mdd_object_child(c);
864 if (feat != &dt_directory_features && feat != NULL)
865 dof->dof_type = DFT_INDEX;
867 dof->dof_type = dt_mode_to_dft(attr->la_mode);
869 dof->u.dof_idx.di_feat = feat;
871 /* @hint will be initialized by underlying device. */
872 next->do_ops->do_ah_init(env, hint,
873 p ? mdd_object_child(p) : NULL,
874 attr->la_mode & S_IFMT);
876 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
877 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
885 * Make sure the ctime is increased only.
887 static inline int mdd_attr_check(const struct lu_env *env,
888 struct mdd_object *obj,
889 struct lu_attr *attr)
891 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
895 if (attr->la_valid & LA_CTIME) {
896 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
900 if (attr->la_ctime < tmp_la->la_ctime)
901 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
902 else if (attr->la_valid == LA_CTIME &&
903 attr->la_ctime == tmp_la->la_ctime)
904 attr->la_valid &= ~LA_CTIME;
909 int mdd_attr_set_internal(const struct lu_env *env,
910 struct mdd_object *obj,
911 struct lu_attr *attr,
912 struct thandle *handle,
918 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
919 #ifdef CONFIG_FS_POSIX_ACL
920 if (!rc && (attr->la_valid & LA_MODE) && needacl)
921 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
926 int mdd_attr_check_set_internal(const struct lu_env *env,
927 struct mdd_object *obj,
928 struct lu_attr *attr,
929 struct thandle *handle,
935 rc = mdd_attr_check(env, obj, attr);
940 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
944 static int mdd_attr_set_internal_locked(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 needacl = needacl && (attr->la_valid & LA_MODE);
955 mdd_write_lock(env, obj, MOR_TGT_CHILD);
956 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
958 mdd_write_unlock(env, obj);
962 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
963 struct mdd_object *obj,
964 struct lu_attr *attr,
965 struct thandle *handle,
971 needacl = needacl && (attr->la_valid & LA_MODE);
973 mdd_write_lock(env, obj, MOR_TGT_CHILD);
974 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
976 mdd_write_unlock(env, obj);
980 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
981 const struct lu_buf *buf, const char *name,
982 int fl, struct thandle *handle)
984 struct lustre_capa *capa = mdd_object_capa(env, obj);
988 if (buf->lb_buf && buf->lb_len > 0)
989 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
990 else if (buf->lb_buf == NULL && buf->lb_len == 0)
991 rc = mdo_xattr_del(env, obj, name, handle, capa);
997 * This gives the same functionality as the code between
998 * sys_chmod and inode_setattr
999 * chown_common and inode_setattr
1000 * utimes and inode_setattr
1001 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1003 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1004 struct lu_attr *la, const struct md_attr *ma)
1006 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1007 struct md_ucred *uc;
1014 /* Do not permit change file type */
1015 if (la->la_valid & LA_TYPE)
1018 /* They should not be processed by setattr */
1019 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1022 /* export destroy does not have ->le_ses, but we may want
1023 * to drop LUSTRE_SOM_FL. */
1029 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1033 if (la->la_valid == LA_CTIME) {
1034 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1035 /* This is only for set ctime when rename's source is
1037 rc = mdd_may_delete(env, NULL, obj,
1038 (struct md_attr *)ma, 1, 0);
1039 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1040 la->la_valid &= ~LA_CTIME;
1044 if (la->la_valid == LA_ATIME) {
1045 /* This is atime only set for read atime update on close. */
1046 if (la->la_atime >= tmp_la->la_atime &&
1047 la->la_atime < (tmp_la->la_atime +
1048 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1049 la->la_valid &= ~LA_ATIME;
1053 /* Check if flags change. */
1054 if (la->la_valid & LA_FLAGS) {
1055 unsigned int oldflags = 0;
1056 unsigned int newflags = la->la_flags &
1057 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1059 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1060 !mdd_capable(uc, CFS_CAP_FOWNER))
1063 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1064 * only be changed by the relevant capability. */
1065 if (mdd_is_immutable(obj))
1066 oldflags |= LUSTRE_IMMUTABLE_FL;
1067 if (mdd_is_append(obj))
1068 oldflags |= LUSTRE_APPEND_FL;
1069 if ((oldflags ^ newflags) &&
1070 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1073 if (!S_ISDIR(tmp_la->la_mode))
1074 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1077 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1078 (la->la_valid & ~LA_FLAGS) &&
1079 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1082 /* Check for setting the obj time. */
1083 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1084 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1085 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1086 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1087 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1095 if (la->la_valid & LA_KILL_SUID) {
1096 la->la_valid &= ~LA_KILL_SUID;
1097 if ((tmp_la->la_mode & S_ISUID) &&
1098 !(la->la_valid & LA_MODE)) {
1099 la->la_mode = tmp_la->la_mode;
1100 la->la_valid |= LA_MODE;
1102 la->la_mode &= ~S_ISUID;
1105 if (la->la_valid & LA_KILL_SGID) {
1106 la->la_valid &= ~LA_KILL_SGID;
1107 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1108 (S_ISGID | S_IXGRP)) &&
1109 !(la->la_valid & LA_MODE)) {
1110 la->la_mode = tmp_la->la_mode;
1111 la->la_valid |= LA_MODE;
1113 la->la_mode &= ~S_ISGID;
1116 /* Make sure a caller can chmod. */
1117 if (la->la_valid & LA_MODE) {
1118 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1119 (uc->mu_fsuid != tmp_la->la_uid) &&
1120 !mdd_capable(uc, CFS_CAP_FOWNER))
1123 if (la->la_mode == (cfs_umode_t) -1)
1124 la->la_mode = tmp_la->la_mode;
1126 la->la_mode = (la->la_mode & S_IALLUGO) |
1127 (tmp_la->la_mode & ~S_IALLUGO);
1129 /* Also check the setgid bit! */
1130 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1131 la->la_gid : tmp_la->la_gid) &&
1132 !mdd_capable(uc, CFS_CAP_FSETID))
1133 la->la_mode &= ~S_ISGID;
1135 la->la_mode = tmp_la->la_mode;
1138 /* Make sure a caller can chown. */
1139 if (la->la_valid & LA_UID) {
1140 if (la->la_uid == (uid_t) -1)
1141 la->la_uid = tmp_la->la_uid;
1142 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1143 (la->la_uid != tmp_la->la_uid)) &&
1144 !mdd_capable(uc, CFS_CAP_CHOWN))
1147 /* If the user or group of a non-directory has been
1148 * changed by a non-root user, remove the setuid bit.
1149 * 19981026 David C Niemi <niemi@tux.org>
1151 * Changed this to apply to all users, including root,
1152 * to avoid some races. This is the behavior we had in
1153 * 2.0. The check for non-root was definitely wrong
1154 * for 2.2 anyway, as it should have been using
1155 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1156 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1157 !S_ISDIR(tmp_la->la_mode)) {
1158 la->la_mode &= ~S_ISUID;
1159 la->la_valid |= LA_MODE;
1163 /* Make sure caller can chgrp. */
1164 if (la->la_valid & LA_GID) {
1165 if (la->la_gid == (gid_t) -1)
1166 la->la_gid = tmp_la->la_gid;
1167 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168 ((la->la_gid != tmp_la->la_gid) &&
1169 !lustre_in_group_p(uc, la->la_gid))) &&
1170 !mdd_capable(uc, CFS_CAP_CHOWN))
1173 /* Likewise, if the user or group of a non-directory
1174 * has been changed by a non-root user, remove the
1175 * setgid bit UNLESS there is no group execute bit
1176 * (this would be a file marked for mandatory
1177 * locking). 19981026 David C Niemi <niemi@tux.org>
1179 * Removed the fsuid check (see the comment above) --
1181 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1182 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1183 la->la_mode &= ~S_ISGID;
1184 la->la_valid |= LA_MODE;
1188 /* For both Size-on-MDS case and truncate case,
1189 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1190 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1191 * For SOM case, it is true, the MAY_WRITE perm has been checked
1192 * when open, no need check again. For truncate case, it is false,
1193 * the MAY_WRITE perm should be checked here. */
1194 if (ma->ma_attr_flags & MDS_SOM) {
1195 /* For the "Size-on-MDS" setattr update, merge coming
1196 * attributes with the set in the inode. BUG 10641 */
1197 if ((la->la_valid & LA_ATIME) &&
1198 (la->la_atime <= tmp_la->la_atime))
1199 la->la_valid &= ~LA_ATIME;
1201 /* OST attributes do not have a priority over MDS attributes,
1202 * so drop times if ctime is equal. */
1203 if ((la->la_valid & LA_CTIME) &&
1204 (la->la_ctime <= tmp_la->la_ctime))
1205 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1207 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1208 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1209 (uc->mu_fsuid == tmp_la->la_uid)) &&
1210 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1211 rc = mdd_permission_internal_locked(env, obj,
1218 if (la->la_valid & LA_CTIME) {
1219 /* The pure setattr, it has the priority over what is
1220 * already set, do not drop it if ctime is equal. */
1221 if (la->la_ctime < tmp_la->la_ctime)
1222 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1230 /** Store a data change changelog record
1231 * If this fails, we must fail the whole transaction; we don't
1232 * want the change to commit without the log entry.
1233 * \param mdd_obj - mdd_object of change
1234 * \param handle - transacion handle
1236 static int mdd_changelog_data_store(const struct lu_env *env,
1237 struct mdd_device *mdd,
1238 enum changelog_rec_type type,
1240 struct mdd_object *mdd_obj,
1241 struct thandle *handle)
1243 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1244 struct llog_changelog_rec *rec;
1250 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1252 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1255 LASSERT(handle != NULL);
1256 LASSERT(mdd_obj != NULL);
1258 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1259 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1260 /* Don't need multiple updates in this log */
1261 /* Don't check under lock - no big deal if we get an extra
1266 reclen = llog_data_len(sizeof(*rec));
1267 buf = mdd_buf_alloc(env, reclen);
1268 if (buf->lb_buf == NULL)
1270 rec = (struct llog_changelog_rec *)buf->lb_buf;
1272 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1273 rec->cr.cr_type = (__u32)type;
1274 rec->cr.cr_tfid = *tfid;
1275 rec->cr.cr_namelen = 0;
1276 mdd_obj->mod_cltime = cfs_time_current_64();
1278 rc = mdd_changelog_llog_write(mdd, rec, handle);
1280 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1281 rc, type, PFID(tfid));
1288 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1289 int flags, struct md_object *obj)
1291 struct thandle *handle;
1292 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1293 struct mdd_device *mdd = mdo2mdd(obj);
1297 handle = mdd_trans_start(env, mdd);
1300 return(PTR_ERR(handle));
1302 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1305 mdd_trans_stop(env, mdd, rc, handle);
1311 * Should be called with write lock held.
1313 * \see mdd_lma_set_locked().
1315 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1316 const struct md_attr *ma, struct thandle *handle)
1318 struct mdd_thread_info *info = mdd_env_info(env);
1320 struct lustre_mdt_attrs *lma =
1321 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1322 int lmasize = sizeof(struct lustre_mdt_attrs);
1327 /* Either HSM or SOM part is not valid, we need to read it before */
1328 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1329 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1333 lustre_lma_swab(lma);
1335 memset(lma, 0, lmasize);
1339 if (ma->ma_valid & MA_HSM) {
1340 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1341 lma->lma_compat |= LMAC_HSM;
1345 if (ma->ma_valid & MA_SOM) {
1346 LASSERT(ma->ma_som != NULL);
1347 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1348 lma->lma_compat &= ~LMAC_SOM;
1350 lma->lma_compat |= LMAC_SOM;
1351 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1352 lma->lma_som_size = ma->ma_som->msd_size;
1353 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1354 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1359 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1361 lustre_lma_swab(lma);
1362 buf = mdd_buf_get(env, lma, lmasize);
1363 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1369 * Save LMA extended attributes with data from \a ma.
1371 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1372 * not, LMA EA will be first read from disk, modified and write back.
1375 static int mdd_lma_set_locked(const struct lu_env *env,
1376 struct mdd_object *mdd_obj,
1377 const struct md_attr *ma, struct thandle *handle)
1381 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1382 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1383 mdd_write_unlock(env, mdd_obj);
1387 /* Precedence for choosing record type when multiple
1388 * attributes change: setattr > mtime > ctime > atime
1389 * (ctime changes when mtime does, plus chmod/chown.
1390 * atime and ctime are independent.) */
1391 static int mdd_attr_set_changelog(const struct lu_env *env,
1392 struct md_object *obj, struct thandle *handle,
1395 struct mdd_device *mdd = mdo2mdd(obj);
1398 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1399 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1400 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1401 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1402 bits = bits & mdd->mdd_cl.mc_mask;
1406 /* The record type is the lowest non-masked set bit */
1407 while (bits && ((bits & 1) == 0)) {
1412 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1413 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1414 md2mdd_obj(obj), handle);
1417 /* set attr and LOV EA at once, return updated attr */
1418 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1419 const struct md_attr *ma)
1421 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1422 struct mdd_device *mdd = mdo2mdd(obj);
1423 struct thandle *handle;
1424 struct lov_mds_md *lmm = NULL;
1425 struct llog_cookie *logcookies = NULL;
1426 int rc, lmm_size = 0, cookie_size = 0;
1427 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1428 #ifdef HAVE_QUOTA_SUPPORT
1429 struct obd_device *obd = mdd->mdd_obd_dev;
1430 struct mds_obd *mds = &obd->u.mds;
1431 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1432 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1433 int quota_opc = 0, block_count = 0;
1434 int inode_pending[MAXQUOTAS] = { 0, 0 };
1435 int block_pending[MAXQUOTAS] = { 0, 0 };
1439 *la_copy = ma->ma_attr;
1440 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1444 /* setattr on "close" only change atime, or do nothing */
1445 if (ma->ma_valid == MA_INODE &&
1446 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1449 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1450 MDD_TXN_ATTR_SET_OP);
1451 handle = mdd_trans_start(env, mdd);
1453 RETURN(PTR_ERR(handle));
1454 /*TODO: add lock here*/
1455 /* start a log jounal handle if needed */
1456 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1457 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1458 lmm_size = mdd_lov_mdsize(env, mdd);
1459 lmm = mdd_max_lmm_get(env, mdd);
1461 GOTO(cleanup, rc = -ENOMEM);
1463 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1470 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1471 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1472 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1474 #ifdef HAVE_QUOTA_SUPPORT
1475 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1476 struct obd_export *exp = md_quota(env)->mq_exp;
1477 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1479 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1481 quota_opc = FSFILT_OP_SETATTR;
1482 mdd_quota_wrapper(la_copy, qnids);
1483 mdd_quota_wrapper(la_tmp, qoids);
1484 /* get file quota for new owner */
1485 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1486 qnids, inode_pending, 1, NULL, 0,
1488 block_count = (la_tmp->la_blocks + 7) >> 3;
1491 mdd_data_get(env, mdd_obj, &data);
1492 /* get block quota for new owner */
1493 lquota_chkquota(mds_quota_interface_ref, obd,
1494 exp, qnids, block_pending,
1496 LQUOTA_FLAGS_BLK, data, 1);
1502 if (la_copy->la_valid & LA_FLAGS) {
1503 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1506 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1507 } else if (la_copy->la_valid) { /* setattr */
1508 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1510 /* journal chown/chgrp in llog, just like unlink */
1511 if (rc == 0 && lmm_size){
1512 cookie_size = mdd_lov_cookiesize(env, mdd);
1513 logcookies = mdd_max_cookie_get(env, mdd);
1514 if (logcookies == NULL)
1515 GOTO(cleanup, rc = -ENOMEM);
1517 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1518 logcookies, cookie_size) <= 0)
1523 if (rc == 0 && ma->ma_valid & MA_LOV) {
1526 mode = mdd_object_type(mdd_obj);
1527 if (S_ISREG(mode) || S_ISDIR(mode)) {
1528 rc = mdd_lsm_sanity_check(env, mdd_obj);
1532 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1533 ma->ma_lmm_size, handle, 1);
1537 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1540 mode = mdd_object_type(mdd_obj);
1542 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1547 rc = mdd_attr_set_changelog(env, obj, handle,
1548 ma->ma_attr.la_valid);
1549 mdd_trans_stop(env, mdd, rc, handle);
1550 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1551 /*set obd attr, if needed*/
1552 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1555 #ifdef HAVE_QUOTA_SUPPORT
1557 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1559 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1561 /* Trigger dqrel/dqacq for original owner and new owner.
1562 * If failed, the next call for lquota_chkquota will
1564 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1571 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1572 const struct lu_buf *buf, const char *name, int fl,
1573 struct thandle *handle)
1578 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1579 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1580 mdd_write_unlock(env, obj);
1585 static int mdd_xattr_sanity_check(const struct lu_env *env,
1586 struct mdd_object *obj)
1588 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1589 struct md_ucred *uc = md_ucred(env);
1593 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1596 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1600 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1601 !mdd_capable(uc, CFS_CAP_FOWNER))
1608 * The caller should guarantee to update the object ctime
1609 * after xattr_set if needed.
1611 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1612 const struct lu_buf *buf, const char *name,
1615 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1616 struct mdd_device *mdd = mdo2mdd(obj);
1617 struct thandle *handle;
1621 rc = mdd_xattr_sanity_check(env, mdd_obj);
1625 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1626 /* security-replated changes may require sync */
1627 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1628 mdd->mdd_sync_permission == 1)
1629 txn_param_sync(&mdd_env_info(env)->mti_param);
1631 handle = mdd_trans_start(env, mdd);
1633 RETURN(PTR_ERR(handle));
1635 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1637 /* Only record user xattr changes */
1638 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1639 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1641 mdd_trans_stop(env, mdd, rc, handle);
1647 * The caller should guarantee to update the object ctime
1648 * after xattr_set if needed.
1650 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1653 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1654 struct mdd_device *mdd = mdo2mdd(obj);
1655 struct thandle *handle;
1659 rc = mdd_xattr_sanity_check(env, mdd_obj);
1663 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1664 handle = mdd_trans_start(env, mdd);
1666 RETURN(PTR_ERR(handle));
1668 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1669 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1670 mdd_object_capa(env, mdd_obj));
1671 mdd_write_unlock(env, mdd_obj);
1673 /* Only record user xattr changes */
1674 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1675 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1678 mdd_trans_stop(env, mdd, rc, handle);
1683 /* partial unlink */
1684 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1687 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1688 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1689 struct mdd_device *mdd = mdo2mdd(obj);
1690 struct thandle *handle;
1691 #ifdef HAVE_QUOTA_SUPPORT
1692 struct obd_device *obd = mdd->mdd_obd_dev;
1693 struct mds_obd *mds = &obd->u.mds;
1694 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1701 * Check -ENOENT early here because we need to get object type
1702 * to calculate credits before transaction start
1704 if (!mdd_object_exists(mdd_obj))
1707 LASSERT(mdd_object_exists(mdd_obj) > 0);
1709 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1713 handle = mdd_trans_start(env, mdd);
1717 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1719 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1723 __mdd_ref_del(env, mdd_obj, handle, 0);
1725 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1727 __mdd_ref_del(env, mdd_obj, handle, 1);
1730 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1731 la_copy->la_ctime = ma->ma_attr.la_ctime;
1733 la_copy->la_valid = LA_CTIME;
1734 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1738 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1739 #ifdef HAVE_QUOTA_SUPPORT
1740 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1741 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1742 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1743 mdd_quota_wrapper(&ma->ma_attr, qids);
1750 mdd_write_unlock(env, mdd_obj);
1751 mdd_trans_stop(env, mdd, rc, handle);
1752 #ifdef HAVE_QUOTA_SUPPORT
1754 /* Trigger dqrel on the owner of child. If failed,
1755 * the next call for lquota_chkquota will process it */
1756 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1762 /* partial operation */
1763 static int mdd_oc_sanity_check(const struct lu_env *env,
1764 struct mdd_object *obj,
1770 switch (ma->ma_attr.la_mode & S_IFMT) {
1787 static int mdd_object_create(const struct lu_env *env,
1788 struct md_object *obj,
1789 const struct md_op_spec *spec,
1793 struct mdd_device *mdd = mdo2mdd(obj);
1794 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1795 const struct lu_fid *pfid = spec->u.sp_pfid;
1796 struct thandle *handle;
1797 #ifdef HAVE_QUOTA_SUPPORT
1798 struct obd_device *obd = mdd->mdd_obd_dev;
1799 struct obd_export *exp = md_quota(env)->mq_exp;
1800 struct mds_obd *mds = &obd->u.mds;
1801 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1802 int quota_opc = 0, block_count = 0;
1803 int inode_pending[MAXQUOTAS] = { 0, 0 };
1804 int block_pending[MAXQUOTAS] = { 0, 0 };
1809 #ifdef HAVE_QUOTA_SUPPORT
1810 if (mds->mds_quota) {
1811 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1812 mdd_quota_wrapper(&ma->ma_attr, qids);
1813 /* get file quota for child */
1814 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1815 qids, inode_pending, 1, NULL, 0,
1817 switch (ma->ma_attr.la_mode & S_IFMT) {
1826 /* get block quota for child */
1828 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1829 qids, block_pending, block_count,
1830 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1834 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1835 handle = mdd_trans_start(env, mdd);
1837 GOTO(out_pending, rc = PTR_ERR(handle));
1839 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1840 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1844 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1848 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1849 /* If creating the slave object, set slave EA here. */
1850 int lmv_size = spec->u.sp_ea.eadatalen;
1851 struct lmv_stripe_md *lmv;
1853 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1854 LASSERT(lmv != NULL && lmv_size > 0);
1856 rc = __mdd_xattr_set(env, mdd_obj,
1857 mdd_buf_get_const(env, lmv, lmv_size),
1858 XATTR_NAME_LMV, 0, handle);
1862 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1865 #ifdef CONFIG_FS_POSIX_ACL
1866 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1867 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1869 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1870 buf->lb_len = spec->u.sp_ea.eadatalen;
1871 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1872 rc = __mdd_acl_init(env, mdd_obj, buf,
1873 &ma->ma_attr.la_mode,
1878 ma->ma_attr.la_valid |= LA_MODE;
1881 pfid = spec->u.sp_ea.fid;
1884 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1890 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1891 mdd_write_unlock(env, mdd_obj);
1893 mdd_trans_stop(env, mdd, rc, handle);
1895 #ifdef HAVE_QUOTA_SUPPORT
1897 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1899 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1901 /* Trigger dqacq on the owner of child. If failed,
1902 * the next call for lquota_chkquota will process it. */
1903 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1911 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1912 const struct md_attr *ma)
1914 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1915 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1916 struct mdd_device *mdd = mdo2mdd(obj);
1917 struct thandle *handle;
1921 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1922 handle = mdd_trans_start(env, mdd);
1926 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1927 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1929 __mdd_ref_add(env, mdd_obj, handle);
1930 mdd_write_unlock(env, mdd_obj);
1932 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1933 la_copy->la_ctime = ma->ma_attr.la_ctime;
1935 la_copy->la_valid = LA_CTIME;
1936 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1939 mdd_trans_stop(env, mdd, 0, handle);
1945 * do NOT or the MAY_*'s, you'll get the weakest
1947 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1951 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1952 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1953 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1954 * owner can write to a file even if it is marked readonly to hide
1955 * its brokenness. (bug 5781) */
1956 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1957 struct md_ucred *uc = md_ucred(env);
1959 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1960 (la->la_uid == uc->mu_fsuid))
1964 if (flags & FMODE_READ)
1966 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1968 if (flags & MDS_FMODE_EXEC)
1973 static int mdd_open_sanity_check(const struct lu_env *env,
1974 struct mdd_object *obj, int flag)
1976 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1981 if (mdd_is_dead_obj(obj))
1984 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1988 if (S_ISLNK(tmp_la->la_mode))
1991 mode = accmode(env, tmp_la, flag);
1993 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1996 if (!(flag & MDS_OPEN_CREATED)) {
1997 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2002 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2003 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2004 flag &= ~MDS_OPEN_TRUNC;
2006 /* For writing append-only file must open it with append mode. */
2007 if (mdd_is_append(obj)) {
2008 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2010 if (flag & MDS_OPEN_TRUNC)
2016 * Now, flag -- O_NOATIME does not be packed by client.
2018 if (flag & O_NOATIME) {
2019 struct md_ucred *uc = md_ucred(env);
2021 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2022 (uc->mu_valid == UCRED_NEW)) &&
2023 (uc->mu_fsuid != tmp_la->la_uid) &&
2024 !mdd_capable(uc, CFS_CAP_FOWNER))
2032 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2035 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2038 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2040 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2042 mdd_obj->mod_count++;
2044 mdd_write_unlock(env, mdd_obj);
2048 /* return md_attr back,
2049 * if it is last unlink then return lov ea + llog cookie*/
2050 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2056 if (S_ISREG(mdd_object_type(obj))) {
2057 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2058 * Caller must be ready for that. */
2060 rc = __mdd_lmm_get(env, obj, ma);
2061 if ((ma->ma_valid & MA_LOV))
2062 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2069 * No permission check is needed.
2071 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2074 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2075 struct mdd_device *mdd = mdo2mdd(obj);
2076 struct thandle *handle = NULL;
2080 #ifdef HAVE_QUOTA_SUPPORT
2081 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2082 struct mds_obd *mds = &obd->u.mds;
2083 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2088 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2089 mdd_obj->mod_count--;
2091 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2092 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2093 "list\n", PFID(mdd_object_fid(mdd_obj)));
2097 /* check without any lock */
2098 if (mdd_obj->mod_count == 1 &&
2099 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2101 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2104 handle = mdd_trans_start(env, mdo2mdd(obj));
2106 RETURN(PTR_ERR(handle));
2109 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2110 if (handle == NULL && mdd_obj->mod_count == 1 &&
2111 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2112 mdd_write_unlock(env, mdd_obj);
2116 /* release open count */
2117 mdd_obj->mod_count --;
2119 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2120 /* remove link to object from orphan index */
2121 rc = __mdd_orphan_del(env, mdd_obj, handle);
2123 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2124 "list, OSS objects to be destroyed.\n",
2125 PFID(mdd_object_fid(mdd_obj)));
2127 CERROR("Object "DFID" can not be deleted from orphan "
2128 "list, maybe cause OST objects can not be "
2129 "destroyed (err: %d).\n",
2130 PFID(mdd_object_fid(mdd_obj)), rc);
2131 /* If object was not deleted from orphan list, do not
2132 * destroy OSS objects, which will be done when next
2138 rc = mdd_iattr_get(env, mdd_obj, ma);
2139 /* Object maybe not in orphan list originally, it is rare case for
2140 * mdd_finish_unlink() failure. */
2141 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2142 #ifdef HAVE_QUOTA_SUPPORT
2143 if (mds->mds_quota) {
2144 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2145 mdd_quota_wrapper(&ma->ma_attr, qids);
2148 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2149 if (ma->ma_valid & MA_FLAGS &&
2150 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2151 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2153 rc = mdd_object_kill(env, mdd_obj, ma);
2159 CERROR("Error when prepare to delete Object "DFID" , "
2160 "which will cause OST objects can not be "
2161 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2167 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2169 mdd_write_unlock(env, mdd_obj);
2171 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2172 #ifdef HAVE_QUOTA_SUPPORT
2174 /* Trigger dqrel on the owner of child. If failed,
2175 * the next call for lquota_chkquota will process it */
2176 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2183 * Permission check is done when open,
2184 * no need check again.
2186 static int mdd_readpage_sanity_check(const struct lu_env *env,
2187 struct mdd_object *obj)
2189 struct dt_object *next = mdd_object_child(obj);
2193 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2201 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2202 int first, void *area, int nob,
2203 const struct dt_it_ops *iops, struct dt_it *it,
2204 __u64 *start, __u64 *end,
2205 struct lu_dirent **last, __u32 attr)
2209 struct lu_dirent *ent;
2212 memset(area, 0, sizeof (struct lu_dirpage));
2213 area += sizeof (struct lu_dirpage);
2214 nob -= sizeof (struct lu_dirpage);
2222 len = iops->key_size(env, it);
2224 /* IAM iterator can return record with zero len. */
2228 hash = iops->store(env, it);
2229 if (unlikely(first)) {
2234 /* calculate max space required for lu_dirent */
2235 recsize = lu_dirent_calc_size(len, attr);
2237 if (nob >= recsize) {
2238 result = iops->rec(env, it, ent, attr);
2239 if (result == -ESTALE)
2244 /* osd might not able to pack all attributes,
2245 * so recheck rec length */
2246 recsize = le16_to_cpu(ent->lde_reclen);
2249 * record doesn't fit into page, enlarge previous one.
2252 (*last)->lde_reclen =
2253 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2262 ent = (void *)ent + recsize;
2266 result = iops->next(env, it);
2267 if (result == -ESTALE)
2269 } while (result == 0);
2276 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2277 const struct lu_rdpg *rdpg)
2280 struct dt_object *next = mdd_object_child(obj);
2281 const struct dt_it_ops *iops;
2283 struct lu_dirent *last = NULL;
2284 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2291 LASSERT(rdpg->rp_pages != NULL);
2292 LASSERT(next->do_index_ops != NULL);
2294 if (rdpg->rp_count <= 0)
2298 * iterate through directory and fill pages from @rdpg
2300 iops = &next->do_index_ops->dio_it;
2301 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2305 rc = iops->load(env, it, rdpg->rp_hash);
2309 * Iterator didn't find record with exactly the key requested.
2311 * It is currently either
2313 * - positioned above record with key less than
2314 * requested---skip it.
2316 * - or not positioned at all (is in IAM_IT_SKEWED
2317 * state)---position it on the next item.
2319 rc = iops->next(env, it);
2324 * At this point and across for-loop:
2326 * rc == 0 -> ok, proceed.
2327 * rc > 0 -> end of directory.
2330 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2331 i++, nob -= CFS_PAGE_SIZE) {
2332 LASSERT(i < rdpg->rp_npages);
2333 pg = rdpg->rp_pages[i];
2334 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2335 min_t(int, nob, CFS_PAGE_SIZE), iops,
2336 it, &hash_start, &hash_end, &last,
2338 if (rc != 0 || i == rdpg->rp_npages - 1) {
2340 last->lde_reclen = 0;
2348 hash_end = DIR_END_OFF;
2352 struct lu_dirpage *dp;
2354 dp = cfs_kmap(rdpg->rp_pages[0]);
2355 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2356 dp->ldp_hash_end = cpu_to_le64(hash_end);
2359 * No pages were processed, mark this.
2361 dp->ldp_flags |= LDF_EMPTY;
2363 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2364 cfs_kunmap(rdpg->rp_pages[0]);
2367 iops->fini(env, it);
2372 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2373 const struct lu_rdpg *rdpg)
2375 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2379 LASSERT(mdd_object_exists(mdd_obj));
2381 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2382 rc = mdd_readpage_sanity_check(env, mdd_obj);
2384 GOTO(out_unlock, rc);
2386 if (mdd_is_dead_obj(mdd_obj)) {
2388 struct lu_dirpage *dp;
2391 * According to POSIX, please do not return any entry to client:
2392 * even dot and dotdot should not be returned.
2394 CWARN("readdir from dead object: "DFID"\n",
2395 PFID(mdd_object_fid(mdd_obj)));
2397 if (rdpg->rp_count <= 0)
2398 GOTO(out_unlock, rc = -EFAULT);
2399 LASSERT(rdpg->rp_pages != NULL);
2401 pg = rdpg->rp_pages[0];
2402 dp = (struct lu_dirpage*)cfs_kmap(pg);
2403 memset(dp, 0 , sizeof(struct lu_dirpage));
2404 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2405 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2406 dp->ldp_flags |= LDF_EMPTY;
2407 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2409 GOTO(out_unlock, rc = 0);
2412 rc = __mdd_readpage(env, mdd_obj, rdpg);
2416 mdd_read_unlock(env, mdd_obj);
2420 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2422 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2423 struct dt_object *next;
2425 LASSERT(mdd_object_exists(mdd_obj));
2426 next = mdd_object_child(mdd_obj);
2427 return next->do_ops->do_object_sync(env, next);
2430 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2431 struct md_object *obj)
2433 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2435 LASSERT(mdd_object_exists(mdd_obj));
2436 return do_version_get(env, mdd_object_child(mdd_obj));
2439 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2440 dt_obj_version_t version)
2442 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2444 LASSERT(mdd_object_exists(mdd_obj));
2445 do_version_set(env, mdd_object_child(mdd_obj), version);
2448 const struct md_object_operations mdd_obj_ops = {
2449 .moo_permission = mdd_permission,
2450 .moo_attr_get = mdd_attr_get,
2451 .moo_attr_set = mdd_attr_set,
2452 .moo_xattr_get = mdd_xattr_get,
2453 .moo_xattr_set = mdd_xattr_set,
2454 .moo_xattr_list = mdd_xattr_list,
2455 .moo_xattr_del = mdd_xattr_del,
2456 .moo_object_create = mdd_object_create,
2457 .moo_ref_add = mdd_ref_add,
2458 .moo_ref_del = mdd_ref_del,
2459 .moo_open = mdd_open,
2460 .moo_close = mdd_close,
2461 .moo_readpage = mdd_readpage,
2462 .moo_readlink = mdd_readlink,
2463 .moo_changelog = mdd_changelog,
2464 .moo_capa_get = mdd_capa_get,
2465 .moo_object_sync = mdd_object_sync,
2466 .moo_version_get = mdd_version_get,
2467 .moo_version_set = mdd_version_set,
2468 .moo_path = mdd_path,
2469 .moo_file_lock = mdd_file_lock,
2470 .moo_file_unlock = mdd_file_unlock,