1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
129 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135 const void *area, ssize_t len)
139 buf = &mdd_env_info(env)->mti_buf;
140 buf->lb_buf = (void *)area;
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
147 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
149 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL) {
155 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL)
162 /** Increase the size of the \a mti_big_buf.
163 * preserves old data in buffer
164 * old buffer remains unchanged on error
165 * \retval 0 or -ENOMEM
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
169 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
172 LASSERT(len >= oldbuf->lb_len);
173 OBD_ALLOC_LARGE(buf.lb_buf, len);
175 if (buf.lb_buf == NULL)
179 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE_LARGE(mti->mti_max_cookie,
198 mti->mti_max_cookie_size);
199 mti->mti_max_cookie = NULL;
200 mti->mti_max_cookie_size = 0;
202 if (unlikely(mti->mti_max_cookie == NULL)) {
203 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204 if (likely(mti->mti_max_cookie != NULL))
205 mti->mti_max_cookie_size = max_cookie_size;
207 if (likely(mti->mti_max_cookie != NULL))
208 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209 return mti->mti_max_cookie;
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213 struct mdd_device *mdd)
215 struct mdd_thread_info *mti = mdd_env_info(env);
218 max_lmm_size = mdd_lov_mdsize(env, mdd);
219 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220 if (mti->mti_max_lmm)
221 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
225 if (unlikely(mti->mti_max_lmm == NULL)) {
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = max_lmm_size;
230 return mti->mti_max_lmm;
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234 const struct lu_object_header *hdr,
237 struct mdd_object *mdd_obj;
239 OBD_ALLOC_PTR(mdd_obj);
240 if (mdd_obj != NULL) {
243 o = mdd2lu_obj(mdd_obj);
244 lu_object_init(o, NULL, d);
245 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247 mdd_obj->mod_count = 0;
248 o->lo_ops = &mdd_lu_obj_ops;
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256 const struct lu_object_conf *unused)
258 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259 struct mdd_object *mdd_obj = lu2mdd_obj(o);
260 struct lu_object *below;
261 struct lu_device *under;
264 mdd_obj->mod_cltime = 0;
265 under = &d->mdd_child->dd_lu_dev;
266 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267 mdd_pdlock_init(mdd_obj);
271 lu_object_add(o, below);
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
278 if (lu_object_exists(o))
279 return mdd_get_flags(env, lu2mdd_obj(o));
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
286 struct mdd_object *mdd = lu2mdd_obj(o);
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293 lu_printer_t p, const struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297 "valid=%x, cltime="LPU64", flags=%lx)",
298 mdd, mdd->mod_count, mdd->mod_valid,
299 mdd->mod_cltime, mdd->mod_flags);
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303 .loo_object_init = mdd_object_init,
304 .loo_object_start = mdd_object_start,
305 .loo_object_free = mdd_object_free,
306 .loo_object_print = mdd_object_print,
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310 struct mdd_device *d,
311 const struct lu_fid *f)
313 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317 const char *path, struct lu_fid *fid)
320 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321 struct mdd_object *obj;
322 struct lu_name *lname = &mdd_env_info(env)->mti_name;
327 /* temp buffer for path element */
328 buf = mdd_buf_alloc(env, PATH_MAX);
329 if (buf->lb_buf == NULL)
332 lname->ln_name = name = buf->lb_buf;
333 lname->ln_namelen = 0;
334 *f = mdd->mdd_root_fid;
341 while (*path != '/' && *path != '\0') {
349 /* find obj corresponding to fid */
350 obj = mdd_object_find(env, mdd, f);
352 GOTO(out, rc = -EREMOTE);
354 GOTO(out, rc = PTR_ERR(obj));
355 /* get child fid from parent and name */
356 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357 mdd_object_put(env, obj);
362 lname->ln_namelen = 0;
371 /** The maximum depth that fid2path() will search.
372 * This is limited only because we want to store the fids for
373 * historical path lookup purposes.
375 #define MAX_PATH_DEPTH 100
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379 __u64 pli_recno; /**< history point */
380 __u64 pli_currec; /**< current record */
381 struct lu_fid pli_fid;
382 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383 struct mdd_object *pli_mdd_obj;
384 char *pli_path; /**< full path */
386 int pli_linkno; /**< which hardlink to follow */
387 int pli_fidcount; /**< number of \a pli_fids */
390 static int mdd_path_current(const struct lu_env *env,
391 struct path_lookup_info *pli)
393 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394 struct mdd_object *mdd_obj;
395 struct lu_buf *buf = NULL;
396 struct link_ea_header *leh;
397 struct link_ea_entry *lee;
398 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
405 ptr = pli->pli_path + pli->pli_pathlen - 1;
408 pli->pli_fidcount = 0;
409 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
411 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412 mdd_obj = mdd_object_find(env, mdd,
413 &pli->pli_fids[pli->pli_fidcount]);
415 GOTO(out, rc = -EREMOTE);
417 GOTO(out, rc = PTR_ERR(mdd_obj));
418 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
420 mdd_object_put(env, mdd_obj);
424 /* Do I need to error out here? */
429 /* Get parent fid and object name */
430 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431 buf = mdd_links_get(env, mdd_obj);
432 mdd_read_unlock(env, mdd_obj);
433 mdd_object_put(env, mdd_obj);
435 GOTO(out, rc = PTR_ERR(buf));
438 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
441 /* If set, use link #linkno for path lookup, otherwise use
442 link #0. Only do this for the final path element. */
443 if ((pli->pli_fidcount == 0) &&
444 (pli->pli_linkno < leh->leh_reccount)) {
446 for (count = 0; count < pli->pli_linkno; count++) {
447 lee = (struct link_ea_entry *)
448 ((char *)lee + reclen);
449 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
451 if (pli->pli_linkno < leh->leh_reccount - 1)
452 /* indicate to user there are more links */
456 /* Pack the name in the end of the buffer */
457 ptr -= tmpname->ln_namelen;
458 if (ptr - 1 <= pli->pli_path)
459 GOTO(out, rc = -EOVERFLOW);
460 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
463 /* Store the parent fid for historic lookup */
464 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465 GOTO(out, rc = -EOVERFLOW);
466 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
469 /* Verify that our path hasn't changed since we started the lookup.
470 Record the current index, and verify the path resolves to the
471 same fid. If it does, then the path is correct as of this index. */
472 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473 pli->pli_currec = mdd->mdd_cl.mc_index;
474 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
477 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478 GOTO (out, rc = -EAGAIN);
480 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483 PFID(&pli->pli_fid));
484 GOTO(out, rc = -EAGAIN);
486 ptr++; /* skip leading / */
487 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492 /* if we vmalloced a large buffer drop it */
498 static int mdd_path_historic(const struct lu_env *env,
499 struct path_lookup_info *pli)
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 *recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
525 pli->pli_mdd_obj = md2mdd_obj(obj);
526 pli->pli_recno = *recno;
527 pli->pli_path = path;
528 pli->pli_pathlen = pathlen;
529 pli->pli_linkno = *linkno;
531 /* Retry multiple times in case file is being moved */
532 while (tries-- && rc == -EAGAIN)
533 rc = mdd_path_current(env, pli);
535 /* For historical path lookup, the current links may not have existed
536 * at "recno" time. We must switch over to earlier links/parents
537 * by using the changelog records. If the earlier parent doesn't
538 * exist, we must search back through the changelog to reconstruct
539 * its parents, then check if it exists, etc.
540 * We may ignore this problem for the initial implementation and
541 * state that an "original" hardlink must still exist for us to find
542 * historic path name. */
543 if (pli->pli_recno != -1) {
544 rc = mdd_path_historic(env, pli);
546 *recno = pli->pli_currec;
547 /* Return next link index to caller */
548 *linkno = pli->pli_linkno;
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
558 struct lu_attr *la = &mdd_env_info(env)->mti_la;
562 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
564 mdd_flags_xlate(obj, la->la_flags);
565 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566 obj->mod_flags |= MNLINK_OBJ;
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
578 if (ma->ma_valid & MA_INODE)
581 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582 mdd_object_capa(env, mdd_obj));
584 ma->ma_valid |= MA_INODE;
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
590 struct lov_desc *ldesc;
591 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592 struct lov_user_md *lum = (struct lov_user_md*)lmm;
598 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599 LASSERT(ldesc != NULL);
601 lum->lmm_magic = LOV_MAGIC_V1;
602 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
603 lum->lmm_pattern = ldesc->ld_pattern;
604 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
608 RETURN(sizeof(*lum));
611 static int is_rootdir(struct mdd_object *mdd_obj)
613 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
614 const struct lu_fid *fid = mdo2fid(mdd_obj);
616 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
619 /* get lov EA only */
620 static int __mdd_lmm_get(const struct lu_env *env,
621 struct mdd_object *mdd_obj, struct md_attr *ma)
626 if (ma->ma_valid & MA_LOV)
629 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
631 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
632 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
634 ma->ma_lmm_size = rc;
635 ma->ma_valid |= MA_LOV;
641 /* get the first parent fid from link EA */
642 static int mdd_pfid_get(const struct lu_env *env,
643 struct mdd_object *mdd_obj, struct md_attr *ma)
646 struct link_ea_header *leh;
647 struct link_ea_entry *lee;
648 struct lu_fid *pfid = &ma->ma_pfid;
651 if (ma->ma_valid & MA_PFID)
654 buf = mdd_links_get(env, mdd_obj);
656 RETURN(PTR_ERR(buf));
659 lee = (struct link_ea_entry *)(leh + 1);
660 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
661 fid_be_to_cpu(pfid, pfid);
662 ma->ma_valid |= MA_PFID;
663 if (buf->lb_len > OBD_ALLOC_BIG)
664 /* if we vmalloced a large buffer drop it */
669 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
675 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
676 rc = __mdd_lmm_get(env, mdd_obj, ma);
677 mdd_read_unlock(env, mdd_obj);
682 static int __mdd_lmv_get(const struct lu_env *env,
683 struct mdd_object *mdd_obj, struct md_attr *ma)
688 if (ma->ma_valid & MA_LMV)
691 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
694 ma->ma_valid |= MA_LMV;
700 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
703 struct mdd_thread_info *info = mdd_env_info(env);
704 struct lustre_mdt_attrs *lma =
705 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
710 /* If all needed data are already valid, nothing to do */
711 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
712 (ma->ma_need & (MA_HSM | MA_SOM)))
715 /* Read LMA from disk EA */
716 lma_size = sizeof(info->mti_xattr_buf);
717 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
721 /* Useless to check LMA incompatibility because this is already done in
722 * osd_ea_fid_get(), and this will fail long before this code is
724 * So, if we are here, LMA is compatible.
727 lustre_lma_swab(lma);
729 /* Swab and copy LMA */
730 if (ma->ma_need & MA_HSM) {
731 if (lma->lma_compat & LMAC_HSM)
732 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
734 ma->ma_hsm.mh_flags = 0;
735 ma->ma_valid |= MA_HSM;
739 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
740 LASSERT(ma->ma_som != NULL);
741 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
742 ma->ma_som->msd_size = lma->lma_som_size;
743 ma->ma_som->msd_blocks = lma->lma_som_blocks;
744 ma->ma_som->msd_mountid = lma->lma_som_mountid;
745 ma->ma_valid |= MA_SOM;
751 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
757 if (ma->ma_need & MA_INODE)
758 rc = mdd_iattr_get(env, mdd_obj, ma);
760 if (rc == 0 && ma->ma_need & MA_LOV) {
761 if (S_ISREG(mdd_object_type(mdd_obj)) ||
762 S_ISDIR(mdd_object_type(mdd_obj)))
763 rc = __mdd_lmm_get(env, mdd_obj, ma);
765 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
766 if (S_ISREG(mdd_object_type(mdd_obj)))
767 rc = mdd_pfid_get(env, mdd_obj, ma);
769 if (rc == 0 && ma->ma_need & MA_LMV) {
770 if (S_ISDIR(mdd_object_type(mdd_obj)))
771 rc = __mdd_lmv_get(env, mdd_obj, ma);
773 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
774 if (S_ISREG(mdd_object_type(mdd_obj)))
775 rc = __mdd_lma_get(env, mdd_obj, ma);
777 #ifdef CONFIG_FS_POSIX_ACL
778 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
779 if (S_ISDIR(mdd_object_type(mdd_obj)))
780 rc = mdd_def_acl_get(env, mdd_obj, ma);
783 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
784 rc, ma->ma_valid, ma->ma_lmm);
788 int mdd_attr_get_internal_locked(const struct lu_env *env,
789 struct mdd_object *mdd_obj, struct md_attr *ma)
792 int needlock = ma->ma_need &
793 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
796 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
797 rc = mdd_attr_get_internal(env, mdd_obj, ma);
799 mdd_read_unlock(env, mdd_obj);
804 * No permission check is needed.
806 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
809 struct mdd_object *mdd_obj = md2mdd_obj(obj);
813 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
818 * No permission check is needed.
820 static int mdd_xattr_get(const struct lu_env *env,
821 struct md_object *obj, struct lu_buf *buf,
824 struct mdd_object *mdd_obj = md2mdd_obj(obj);
829 LASSERT(mdd_object_exists(mdd_obj));
831 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
832 rc = mdo_xattr_get(env, mdd_obj, buf, name,
833 mdd_object_capa(env, mdd_obj));
834 mdd_read_unlock(env, mdd_obj);
840 * Permission check is done when open,
841 * no need check again.
843 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
846 struct mdd_object *mdd_obj = md2mdd_obj(obj);
847 struct dt_object *next;
852 LASSERT(mdd_object_exists(mdd_obj));
854 next = mdd_object_child(mdd_obj);
855 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
856 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
857 mdd_object_capa(env, mdd_obj));
858 mdd_read_unlock(env, mdd_obj);
863 * No permission check is needed.
865 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
868 struct mdd_object *mdd_obj = md2mdd_obj(obj);
873 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
874 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
875 mdd_read_unlock(env, mdd_obj);
880 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
881 struct mdd_object *c, struct md_attr *ma,
882 struct thandle *handle,
883 const struct md_op_spec *spec)
885 struct lu_attr *attr = &ma->ma_attr;
886 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
887 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
888 const struct dt_index_features *feat = spec->sp_feat;
892 if (!mdd_object_exists(c)) {
893 struct dt_object *next = mdd_object_child(c);
896 if (feat != &dt_directory_features && feat != NULL)
897 dof->dof_type = DFT_INDEX;
899 dof->dof_type = dt_mode_to_dft(attr->la_mode);
901 dof->u.dof_idx.di_feat = feat;
903 /* @hint will be initialized by underlying device. */
904 next->do_ops->do_ah_init(env, hint,
905 p ? mdd_object_child(p) : NULL,
906 attr->la_mode & S_IFMT);
908 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
909 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
917 * Make sure the ctime is increased only.
919 static inline int mdd_attr_check(const struct lu_env *env,
920 struct mdd_object *obj,
921 struct lu_attr *attr)
923 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
927 if (attr->la_valid & LA_CTIME) {
928 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
932 if (attr->la_ctime < tmp_la->la_ctime)
933 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
934 else if (attr->la_valid == LA_CTIME &&
935 attr->la_ctime == tmp_la->la_ctime)
936 attr->la_valid &= ~LA_CTIME;
941 int mdd_attr_set_internal(const struct lu_env *env,
942 struct mdd_object *obj,
943 struct lu_attr *attr,
944 struct thandle *handle,
950 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
951 #ifdef CONFIG_FS_POSIX_ACL
952 if (!rc && (attr->la_valid & LA_MODE) && needacl)
953 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
958 int mdd_attr_check_set_internal(const struct lu_env *env,
959 struct mdd_object *obj,
960 struct lu_attr *attr,
961 struct thandle *handle,
967 rc = mdd_attr_check(env, obj, attr);
972 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976 static int mdd_attr_set_internal_locked(const struct lu_env *env,
977 struct mdd_object *obj,
978 struct lu_attr *attr,
979 struct thandle *handle,
985 needacl = needacl && (attr->la_valid & LA_MODE);
987 mdd_write_lock(env, obj, MOR_TGT_CHILD);
988 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
990 mdd_write_unlock(env, obj);
994 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
995 struct mdd_object *obj,
996 struct lu_attr *attr,
997 struct thandle *handle,
1003 needacl = needacl && (attr->la_valid & LA_MODE);
1005 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1006 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1008 mdd_write_unlock(env, obj);
1012 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1013 const struct lu_buf *buf, const char *name,
1014 int fl, struct thandle *handle)
1016 struct lustre_capa *capa = mdd_object_capa(env, obj);
1020 if (buf->lb_buf && buf->lb_len > 0)
1021 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1022 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1023 rc = mdo_xattr_del(env, obj, name, handle, capa);
1029 * This gives the same functionality as the code between
1030 * sys_chmod and inode_setattr
1031 * chown_common and inode_setattr
1032 * utimes and inode_setattr
1033 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1035 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1036 struct lu_attr *la, const struct md_attr *ma)
1038 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1039 struct md_ucred *uc;
1046 /* Do not permit change file type */
1047 if (la->la_valid & LA_TYPE)
1050 /* They should not be processed by setattr */
1051 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1054 /* export destroy does not have ->le_ses, but we may want
1055 * to drop LUSTRE_SOM_FL. */
1061 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1065 if (la->la_valid == LA_CTIME) {
1066 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1067 /* This is only for set ctime when rename's source is
1069 rc = mdd_may_delete(env, NULL, obj,
1070 (struct md_attr *)ma, 1, 0);
1071 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1072 la->la_valid &= ~LA_CTIME;
1076 if (la->la_valid == LA_ATIME) {
1077 /* This is atime only set for read atime update on close. */
1078 if (la->la_atime >= tmp_la->la_atime &&
1079 la->la_atime < (tmp_la->la_atime +
1080 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1081 la->la_valid &= ~LA_ATIME;
1085 /* Check if flags change. */
1086 if (la->la_valid & LA_FLAGS) {
1087 unsigned int oldflags = 0;
1088 unsigned int newflags = la->la_flags &
1089 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1091 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1092 !mdd_capable(uc, CFS_CAP_FOWNER))
1095 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1096 * only be changed by the relevant capability. */
1097 if (mdd_is_immutable(obj))
1098 oldflags |= LUSTRE_IMMUTABLE_FL;
1099 if (mdd_is_append(obj))
1100 oldflags |= LUSTRE_APPEND_FL;
1101 if ((oldflags ^ newflags) &&
1102 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1105 if (!S_ISDIR(tmp_la->la_mode))
1106 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1109 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1110 (la->la_valid & ~LA_FLAGS) &&
1111 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1114 /* Check for setting the obj time. */
1115 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1116 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1117 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1118 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1119 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1127 if (la->la_valid & LA_KILL_SUID) {
1128 la->la_valid &= ~LA_KILL_SUID;
1129 if ((tmp_la->la_mode & S_ISUID) &&
1130 !(la->la_valid & LA_MODE)) {
1131 la->la_mode = tmp_la->la_mode;
1132 la->la_valid |= LA_MODE;
1134 la->la_mode &= ~S_ISUID;
1137 if (la->la_valid & LA_KILL_SGID) {
1138 la->la_valid &= ~LA_KILL_SGID;
1139 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1140 (S_ISGID | S_IXGRP)) &&
1141 !(la->la_valid & LA_MODE)) {
1142 la->la_mode = tmp_la->la_mode;
1143 la->la_valid |= LA_MODE;
1145 la->la_mode &= ~S_ISGID;
1148 /* Make sure a caller can chmod. */
1149 if (la->la_valid & LA_MODE) {
1150 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1151 (uc->mu_fsuid != tmp_la->la_uid) &&
1152 !mdd_capable(uc, CFS_CAP_FOWNER))
1155 if (la->la_mode == (cfs_umode_t) -1)
1156 la->la_mode = tmp_la->la_mode;
1158 la->la_mode = (la->la_mode & S_IALLUGO) |
1159 (tmp_la->la_mode & ~S_IALLUGO);
1161 /* Also check the setgid bit! */
1162 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1163 la->la_gid : tmp_la->la_gid) &&
1164 !mdd_capable(uc, CFS_CAP_FSETID))
1165 la->la_mode &= ~S_ISGID;
1167 la->la_mode = tmp_la->la_mode;
1170 /* Make sure a caller can chown. */
1171 if (la->la_valid & LA_UID) {
1172 if (la->la_uid == (uid_t) -1)
1173 la->la_uid = tmp_la->la_uid;
1174 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1175 (la->la_uid != tmp_la->la_uid)) &&
1176 !mdd_capable(uc, CFS_CAP_CHOWN))
1179 /* If the user or group of a non-directory has been
1180 * changed by a non-root user, remove the setuid bit.
1181 * 19981026 David C Niemi <niemi@tux.org>
1183 * Changed this to apply to all users, including root,
1184 * to avoid some races. This is the behavior we had in
1185 * 2.0. The check for non-root was definitely wrong
1186 * for 2.2 anyway, as it should have been using
1187 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1188 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1189 !S_ISDIR(tmp_la->la_mode)) {
1190 la->la_mode &= ~S_ISUID;
1191 la->la_valid |= LA_MODE;
1195 /* Make sure caller can chgrp. */
1196 if (la->la_valid & LA_GID) {
1197 if (la->la_gid == (gid_t) -1)
1198 la->la_gid = tmp_la->la_gid;
1199 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1200 ((la->la_gid != tmp_la->la_gid) &&
1201 !lustre_in_group_p(uc, la->la_gid))) &&
1202 !mdd_capable(uc, CFS_CAP_CHOWN))
1205 /* Likewise, if the user or group of a non-directory
1206 * has been changed by a non-root user, remove the
1207 * setgid bit UNLESS there is no group execute bit
1208 * (this would be a file marked for mandatory
1209 * locking). 19981026 David C Niemi <niemi@tux.org>
1211 * Removed the fsuid check (see the comment above) --
1213 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1214 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1215 la->la_mode &= ~S_ISGID;
1216 la->la_valid |= LA_MODE;
1220 /* For both Size-on-MDS case and truncate case,
1221 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1222 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1223 * For SOM case, it is true, the MAY_WRITE perm has been checked
1224 * when open, no need check again. For truncate case, it is false,
1225 * the MAY_WRITE perm should be checked here. */
1226 if (ma->ma_attr_flags & MDS_SOM) {
1227 /* For the "Size-on-MDS" setattr update, merge coming
1228 * attributes with the set in the inode. BUG 10641 */
1229 if ((la->la_valid & LA_ATIME) &&
1230 (la->la_atime <= tmp_la->la_atime))
1231 la->la_valid &= ~LA_ATIME;
1233 /* OST attributes do not have a priority over MDS attributes,
1234 * so drop times if ctime is equal. */
1235 if ((la->la_valid & LA_CTIME) &&
1236 (la->la_ctime <= tmp_la->la_ctime))
1237 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1239 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1240 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1241 (uc->mu_fsuid == tmp_la->la_uid)) &&
1242 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1243 rc = mdd_permission_internal_locked(env, obj,
1250 if (la->la_valid & LA_CTIME) {
1251 /* The pure setattr, it has the priority over what is
1252 * already set, do not drop it if ctime is equal. */
1253 if (la->la_ctime < tmp_la->la_ctime)
1254 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1262 /** Store a data change changelog record
1263 * If this fails, we must fail the whole transaction; we don't
1264 * want the change to commit without the log entry.
1265 * \param mdd_obj - mdd_object of change
1266 * \param handle - transacion handle
1268 static int mdd_changelog_data_store(const struct lu_env *env,
1269 struct mdd_device *mdd,
1270 enum changelog_rec_type type,
1272 struct mdd_object *mdd_obj,
1273 struct thandle *handle)
1275 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1276 struct llog_changelog_rec *rec;
1282 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1284 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1287 LASSERT(handle != NULL);
1288 LASSERT(mdd_obj != NULL);
1290 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1291 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1292 /* Don't need multiple updates in this log */
1293 /* Don't check under lock - no big deal if we get an extra
1298 reclen = llog_data_len(sizeof(*rec));
1299 buf = mdd_buf_alloc(env, reclen);
1300 if (buf->lb_buf == NULL)
1302 rec = (struct llog_changelog_rec *)buf->lb_buf;
1304 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1305 rec->cr.cr_type = (__u32)type;
1306 rec->cr.cr_tfid = *tfid;
1307 rec->cr.cr_namelen = 0;
1308 mdd_obj->mod_cltime = cfs_time_current_64();
1310 rc = mdd_changelog_llog_write(mdd, rec, handle);
1312 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1313 rc, type, PFID(tfid));
1320 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1321 int flags, struct md_object *obj)
1323 struct thandle *handle;
1324 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1325 struct mdd_device *mdd = mdo2mdd(obj);
1329 handle = mdd_trans_start(env, mdd);
1332 return(PTR_ERR(handle));
1334 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1337 mdd_trans_stop(env, mdd, rc, handle);
1343 * Should be called with write lock held.
1345 * \see mdd_lma_set_locked().
1347 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1348 const struct md_attr *ma, struct thandle *handle)
1350 struct mdd_thread_info *info = mdd_env_info(env);
1352 struct lustre_mdt_attrs *lma =
1353 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1354 int lmasize = sizeof(struct lustre_mdt_attrs);
1359 /* Either HSM or SOM part is not valid, we need to read it before */
1360 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1361 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1365 lustre_lma_swab(lma);
1367 memset(lma, 0, lmasize);
1371 if (ma->ma_valid & MA_HSM) {
1372 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1373 lma->lma_compat |= LMAC_HSM;
1377 if (ma->ma_valid & MA_SOM) {
1378 LASSERT(ma->ma_som != NULL);
1379 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1380 lma->lma_compat &= ~LMAC_SOM;
1382 lma->lma_compat |= LMAC_SOM;
1383 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1384 lma->lma_som_size = ma->ma_som->msd_size;
1385 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1386 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1391 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1393 lustre_lma_swab(lma);
1394 buf = mdd_buf_get(env, lma, lmasize);
1395 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1401 * Save LMA extended attributes with data from \a ma.
1403 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1404 * not, LMA EA will be first read from disk, modified and write back.
1407 static int mdd_lma_set_locked(const struct lu_env *env,
1408 struct mdd_object *mdd_obj,
1409 const struct md_attr *ma, struct thandle *handle)
1413 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1414 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1415 mdd_write_unlock(env, mdd_obj);
1419 /* Precedence for choosing record type when multiple
1420 * attributes change: setattr > mtime > ctime > atime
1421 * (ctime changes when mtime does, plus chmod/chown.
1422 * atime and ctime are independent.) */
1423 static int mdd_attr_set_changelog(const struct lu_env *env,
1424 struct md_object *obj, struct thandle *handle,
1427 struct mdd_device *mdd = mdo2mdd(obj);
1430 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1431 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1432 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1433 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1434 bits = bits & mdd->mdd_cl.mc_mask;
1438 /* The record type is the lowest non-masked set bit */
1439 while (bits && ((bits & 1) == 0)) {
1444 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1445 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1446 md2mdd_obj(obj), handle);
1449 /* set attr and LOV EA at once, return updated attr */
1450 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1451 const struct md_attr *ma)
1453 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1454 struct mdd_device *mdd = mdo2mdd(obj);
1455 struct thandle *handle;
1456 struct lov_mds_md *lmm = NULL;
1457 struct llog_cookie *logcookies = NULL;
1458 int rc, lmm_size = 0, cookie_size = 0;
1459 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1460 #ifdef HAVE_QUOTA_SUPPORT
1461 struct obd_device *obd = mdd->mdd_obd_dev;
1462 struct mds_obd *mds = &obd->u.mds;
1463 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1464 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1465 int quota_opc = 0, block_count = 0;
1466 int inode_pending[MAXQUOTAS] = { 0, 0 };
1467 int block_pending[MAXQUOTAS] = { 0, 0 };
1471 *la_copy = ma->ma_attr;
1472 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1476 /* setattr on "close" only change atime, or do nothing */
1477 if (ma->ma_valid == MA_INODE &&
1478 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1481 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1482 MDD_TXN_ATTR_SET_OP);
1483 handle = mdd_trans_start(env, mdd);
1485 RETURN(PTR_ERR(handle));
1486 /*TODO: add lock here*/
1487 /* start a log jounal handle if needed */
1488 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1489 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1490 lmm_size = mdd_lov_mdsize(env, mdd);
1491 lmm = mdd_max_lmm_get(env, mdd);
1493 GOTO(cleanup, rc = -ENOMEM);
1495 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1502 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1503 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1504 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1506 #ifdef HAVE_QUOTA_SUPPORT
1507 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1508 struct obd_export *exp = md_quota(env)->mq_exp;
1509 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1511 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1513 quota_opc = FSFILT_OP_SETATTR;
1514 mdd_quota_wrapper(la_copy, qnids);
1515 mdd_quota_wrapper(la_tmp, qoids);
1516 /* get file quota for new owner */
1517 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1518 qnids, inode_pending, 1, NULL, 0,
1520 block_count = (la_tmp->la_blocks + 7) >> 3;
1523 mdd_data_get(env, mdd_obj, &data);
1524 /* get block quota for new owner */
1525 lquota_chkquota(mds_quota_interface_ref, obd,
1526 exp, qnids, block_pending,
1528 LQUOTA_FLAGS_BLK, data, 1);
1534 if (la_copy->la_valid & LA_FLAGS) {
1535 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1538 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1539 } else if (la_copy->la_valid) { /* setattr */
1540 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1542 /* journal chown/chgrp in llog, just like unlink */
1543 if (rc == 0 && lmm_size){
1544 cookie_size = mdd_lov_cookiesize(env, mdd);
1545 logcookies = mdd_max_cookie_get(env, mdd);
1546 if (logcookies == NULL)
1547 GOTO(cleanup, rc = -ENOMEM);
1549 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1550 logcookies, cookie_size) <= 0)
1555 if (rc == 0 && ma->ma_valid & MA_LOV) {
1558 mode = mdd_object_type(mdd_obj);
1559 if (S_ISREG(mode) || S_ISDIR(mode)) {
1560 rc = mdd_lsm_sanity_check(env, mdd_obj);
1564 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1565 ma->ma_lmm_size, handle, 1);
1569 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1572 mode = mdd_object_type(mdd_obj);
1574 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1579 rc = mdd_attr_set_changelog(env, obj, handle,
1580 ma->ma_attr.la_valid);
1581 mdd_trans_stop(env, mdd, rc, handle);
1582 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1583 /*set obd attr, if needed*/
1584 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1587 #ifdef HAVE_QUOTA_SUPPORT
1589 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1591 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1593 /* Trigger dqrel/dqacq for original owner and new owner.
1594 * If failed, the next call for lquota_chkquota will
1596 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1603 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1604 const struct lu_buf *buf, const char *name, int fl,
1605 struct thandle *handle)
1610 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1611 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1612 mdd_write_unlock(env, obj);
1617 static int mdd_xattr_sanity_check(const struct lu_env *env,
1618 struct mdd_object *obj)
1620 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1621 struct md_ucred *uc = md_ucred(env);
1625 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1628 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1632 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1633 !mdd_capable(uc, CFS_CAP_FOWNER))
1640 * The caller should guarantee to update the object ctime
1641 * after xattr_set if needed.
1643 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1644 const struct lu_buf *buf, const char *name,
1647 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1648 struct mdd_device *mdd = mdo2mdd(obj);
1649 struct thandle *handle;
1653 rc = mdd_xattr_sanity_check(env, mdd_obj);
1657 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1658 /* security-replated changes may require sync */
1659 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1660 mdd->mdd_sync_permission == 1)
1661 txn_param_sync(&mdd_env_info(env)->mti_param);
1663 handle = mdd_trans_start(env, mdd);
1665 RETURN(PTR_ERR(handle));
1667 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1669 /* Only record user xattr changes */
1670 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1671 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1673 mdd_trans_stop(env, mdd, rc, handle);
1679 * The caller should guarantee to update the object ctime
1680 * after xattr_set if needed.
1682 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1685 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1686 struct mdd_device *mdd = mdo2mdd(obj);
1687 struct thandle *handle;
1691 rc = mdd_xattr_sanity_check(env, mdd_obj);
1695 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1696 handle = mdd_trans_start(env, mdd);
1698 RETURN(PTR_ERR(handle));
1700 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1701 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1702 mdd_object_capa(env, mdd_obj));
1703 mdd_write_unlock(env, mdd_obj);
1705 /* Only record user xattr changes */
1706 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1707 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1710 mdd_trans_stop(env, mdd, rc, handle);
1715 /* partial unlink */
1716 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1719 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1720 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1721 struct mdd_device *mdd = mdo2mdd(obj);
1722 struct thandle *handle;
1723 #ifdef HAVE_QUOTA_SUPPORT
1724 struct obd_device *obd = mdd->mdd_obd_dev;
1725 struct mds_obd *mds = &obd->u.mds;
1726 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1733 * Check -ENOENT early here because we need to get object type
1734 * to calculate credits before transaction start
1736 if (!mdd_object_exists(mdd_obj))
1739 LASSERT(mdd_object_exists(mdd_obj) > 0);
1741 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1745 handle = mdd_trans_start(env, mdd);
1749 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1751 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1755 __mdd_ref_del(env, mdd_obj, handle, 0);
1757 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1759 __mdd_ref_del(env, mdd_obj, handle, 1);
1762 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1763 la_copy->la_ctime = ma->ma_attr.la_ctime;
1765 la_copy->la_valid = LA_CTIME;
1766 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1770 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1771 #ifdef HAVE_QUOTA_SUPPORT
1772 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1773 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1774 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1775 mdd_quota_wrapper(&ma->ma_attr, qids);
1782 mdd_write_unlock(env, mdd_obj);
1783 mdd_trans_stop(env, mdd, rc, handle);
1784 #ifdef HAVE_QUOTA_SUPPORT
1786 /* Trigger dqrel on the owner of child. If failed,
1787 * the next call for lquota_chkquota will process it */
1788 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1794 /* partial operation */
1795 static int mdd_oc_sanity_check(const struct lu_env *env,
1796 struct mdd_object *obj,
1802 switch (ma->ma_attr.la_mode & S_IFMT) {
1819 static int mdd_object_create(const struct lu_env *env,
1820 struct md_object *obj,
1821 const struct md_op_spec *spec,
1825 struct mdd_device *mdd = mdo2mdd(obj);
1826 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1827 const struct lu_fid *pfid = spec->u.sp_pfid;
1828 struct thandle *handle;
1829 #ifdef HAVE_QUOTA_SUPPORT
1830 struct obd_device *obd = mdd->mdd_obd_dev;
1831 struct obd_export *exp = md_quota(env)->mq_exp;
1832 struct mds_obd *mds = &obd->u.mds;
1833 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1834 int quota_opc = 0, block_count = 0;
1835 int inode_pending[MAXQUOTAS] = { 0, 0 };
1836 int block_pending[MAXQUOTAS] = { 0, 0 };
1841 #ifdef HAVE_QUOTA_SUPPORT
1842 if (mds->mds_quota) {
1843 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1844 mdd_quota_wrapper(&ma->ma_attr, qids);
1845 /* get file quota for child */
1846 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1847 qids, inode_pending, 1, NULL, 0,
1849 switch (ma->ma_attr.la_mode & S_IFMT) {
1858 /* get block quota for child */
1860 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861 qids, block_pending, block_count,
1862 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1866 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1867 handle = mdd_trans_start(env, mdd);
1869 GOTO(out_pending, rc = PTR_ERR(handle));
1871 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1872 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1876 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1880 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1881 /* If creating the slave object, set slave EA here. */
1882 int lmv_size = spec->u.sp_ea.eadatalen;
1883 struct lmv_stripe_md *lmv;
1885 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1886 LASSERT(lmv != NULL && lmv_size > 0);
1888 rc = __mdd_xattr_set(env, mdd_obj,
1889 mdd_buf_get_const(env, lmv, lmv_size),
1890 XATTR_NAME_LMV, 0, handle);
1894 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1897 #ifdef CONFIG_FS_POSIX_ACL
1898 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1899 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1901 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1902 buf->lb_len = spec->u.sp_ea.eadatalen;
1903 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1904 rc = __mdd_acl_init(env, mdd_obj, buf,
1905 &ma->ma_attr.la_mode,
1910 ma->ma_attr.la_valid |= LA_MODE;
1913 pfid = spec->u.sp_ea.fid;
1916 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1922 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1923 mdd_write_unlock(env, mdd_obj);
1925 mdd_trans_stop(env, mdd, rc, handle);
1927 #ifdef HAVE_QUOTA_SUPPORT
1929 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1931 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1933 /* Trigger dqacq on the owner of child. If failed,
1934 * the next call for lquota_chkquota will process it. */
1935 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1943 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1944 const struct md_attr *ma)
1946 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1947 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1948 struct mdd_device *mdd = mdo2mdd(obj);
1949 struct thandle *handle;
1953 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1954 handle = mdd_trans_start(env, mdd);
1958 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1959 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1961 __mdd_ref_add(env, mdd_obj, handle);
1962 mdd_write_unlock(env, mdd_obj);
1964 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1965 la_copy->la_ctime = ma->ma_attr.la_ctime;
1967 la_copy->la_valid = LA_CTIME;
1968 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1971 mdd_trans_stop(env, mdd, 0, handle);
1977 * do NOT or the MAY_*'s, you'll get the weakest
1979 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1983 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1984 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1985 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1986 * owner can write to a file even if it is marked readonly to hide
1987 * its brokenness. (bug 5781) */
1988 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1989 struct md_ucred *uc = md_ucred(env);
1991 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1992 (la->la_uid == uc->mu_fsuid))
1996 if (flags & FMODE_READ)
1998 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2000 if (flags & MDS_FMODE_EXEC)
2005 static int mdd_open_sanity_check(const struct lu_env *env,
2006 struct mdd_object *obj, int flag)
2008 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2013 if (mdd_is_dead_obj(obj))
2016 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2020 if (S_ISLNK(tmp_la->la_mode))
2023 mode = accmode(env, tmp_la, flag);
2025 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2028 if (!(flag & MDS_OPEN_CREATED)) {
2029 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2034 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2035 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2036 flag &= ~MDS_OPEN_TRUNC;
2038 /* For writing append-only file must open it with append mode. */
2039 if (mdd_is_append(obj)) {
2040 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2042 if (flag & MDS_OPEN_TRUNC)
2048 * Now, flag -- O_NOATIME does not be packed by client.
2050 if (flag & O_NOATIME) {
2051 struct md_ucred *uc = md_ucred(env);
2053 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2054 (uc->mu_valid == UCRED_NEW)) &&
2055 (uc->mu_fsuid != tmp_la->la_uid) &&
2056 !mdd_capable(uc, CFS_CAP_FOWNER))
2064 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2067 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2070 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2072 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2074 mdd_obj->mod_count++;
2076 mdd_write_unlock(env, mdd_obj);
2080 /* return md_attr back,
2081 * if it is last unlink then return lov ea + llog cookie*/
2082 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2088 if (S_ISREG(mdd_object_type(obj))) {
2089 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2090 * Caller must be ready for that. */
2092 rc = __mdd_lmm_get(env, obj, ma);
2093 if ((ma->ma_valid & MA_LOV))
2094 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2101 * No permission check is needed.
2103 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2106 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2107 struct mdd_device *mdd = mdo2mdd(obj);
2108 struct thandle *handle = NULL;
2112 #ifdef HAVE_QUOTA_SUPPORT
2113 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2114 struct mds_obd *mds = &obd->u.mds;
2115 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2120 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2121 mdd_obj->mod_count--;
2123 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2124 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2125 "list\n", PFID(mdd_object_fid(mdd_obj)));
2129 /* check without any lock */
2130 if (mdd_obj->mod_count == 1 &&
2131 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2133 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2136 handle = mdd_trans_start(env, mdo2mdd(obj));
2138 RETURN(PTR_ERR(handle));
2141 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2142 if (handle == NULL && mdd_obj->mod_count == 1 &&
2143 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2144 mdd_write_unlock(env, mdd_obj);
2148 /* release open count */
2149 mdd_obj->mod_count --;
2151 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2152 /* remove link to object from orphan index */
2153 rc = __mdd_orphan_del(env, mdd_obj, handle);
2155 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2156 "list, OSS objects to be destroyed.\n",
2157 PFID(mdd_object_fid(mdd_obj)));
2159 CERROR("Object "DFID" can not be deleted from orphan "
2160 "list, maybe cause OST objects can not be "
2161 "destroyed (err: %d).\n",
2162 PFID(mdd_object_fid(mdd_obj)), rc);
2163 /* If object was not deleted from orphan list, do not
2164 * destroy OSS objects, which will be done when next
2170 rc = mdd_iattr_get(env, mdd_obj, ma);
2171 /* Object maybe not in orphan list originally, it is rare case for
2172 * mdd_finish_unlink() failure. */
2173 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2174 #ifdef HAVE_QUOTA_SUPPORT
2175 if (mds->mds_quota) {
2176 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2177 mdd_quota_wrapper(&ma->ma_attr, qids);
2180 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2181 if (ma->ma_valid & MA_FLAGS &&
2182 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2183 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2185 rc = mdd_object_kill(env, mdd_obj, ma);
2191 CERROR("Error when prepare to delete Object "DFID" , "
2192 "which will cause OST objects can not be "
2193 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2199 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2201 mdd_write_unlock(env, mdd_obj);
2203 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2204 #ifdef HAVE_QUOTA_SUPPORT
2206 /* Trigger dqrel on the owner of child. If failed,
2207 * the next call for lquota_chkquota will process it */
2208 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2215 * Permission check is done when open,
2216 * no need check again.
2218 static int mdd_readpage_sanity_check(const struct lu_env *env,
2219 struct mdd_object *obj)
2221 struct dt_object *next = mdd_object_child(obj);
2225 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2233 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2234 int first, void *area, int nob,
2235 const struct dt_it_ops *iops, struct dt_it *it,
2236 __u64 *start, __u64 *end,
2237 struct lu_dirent **last, __u32 attr)
2241 struct lu_dirent *ent;
2244 memset(area, 0, sizeof (struct lu_dirpage));
2245 area += sizeof (struct lu_dirpage);
2246 nob -= sizeof (struct lu_dirpage);
2254 len = iops->key_size(env, it);
2256 /* IAM iterator can return record with zero len. */
2260 hash = iops->store(env, it);
2261 if (unlikely(first)) {
2266 /* calculate max space required for lu_dirent */
2267 recsize = lu_dirent_calc_size(len, attr);
2269 if (nob >= recsize) {
2270 result = iops->rec(env, it, ent, attr);
2271 if (result == -ESTALE)
2276 /* osd might not able to pack all attributes,
2277 * so recheck rec length */
2278 recsize = le16_to_cpu(ent->lde_reclen);
2281 * record doesn't fit into page, enlarge previous one.
2284 (*last)->lde_reclen =
2285 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2294 ent = (void *)ent + recsize;
2298 result = iops->next(env, it);
2299 if (result == -ESTALE)
2301 } while (result == 0);
2308 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2309 const struct lu_rdpg *rdpg)
2312 struct dt_object *next = mdd_object_child(obj);
2313 const struct dt_it_ops *iops;
2315 struct lu_dirent *last = NULL;
2316 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2323 LASSERT(rdpg->rp_pages != NULL);
2324 LASSERT(next->do_index_ops != NULL);
2326 if (rdpg->rp_count <= 0)
2330 * iterate through directory and fill pages from @rdpg
2332 iops = &next->do_index_ops->dio_it;
2333 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2337 rc = iops->load(env, it, rdpg->rp_hash);
2341 * Iterator didn't find record with exactly the key requested.
2343 * It is currently either
2345 * - positioned above record with key less than
2346 * requested---skip it.
2348 * - or not positioned at all (is in IAM_IT_SKEWED
2349 * state)---position it on the next item.
2351 rc = iops->next(env, it);
2356 * At this point and across for-loop:
2358 * rc == 0 -> ok, proceed.
2359 * rc > 0 -> end of directory.
2362 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2363 i++, nob -= CFS_PAGE_SIZE) {
2364 LASSERT(i < rdpg->rp_npages);
2365 pg = rdpg->rp_pages[i];
2366 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2367 min_t(int, nob, CFS_PAGE_SIZE), iops,
2368 it, &hash_start, &hash_end, &last,
2370 if (rc != 0 || i == rdpg->rp_npages - 1) {
2372 last->lde_reclen = 0;
2380 hash_end = MDS_DIR_END_OFF;
2384 struct lu_dirpage *dp;
2386 dp = cfs_kmap(rdpg->rp_pages[0]);
2387 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2388 dp->ldp_hash_end = cpu_to_le64(hash_end);
2391 * No pages were processed, mark this.
2393 dp->ldp_flags |= LDF_EMPTY;
2395 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2396 cfs_kunmap(rdpg->rp_pages[0]);
2399 iops->fini(env, it);
2404 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2405 const struct lu_rdpg *rdpg)
2407 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2411 LASSERT(mdd_object_exists(mdd_obj));
2413 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2414 rc = mdd_readpage_sanity_check(env, mdd_obj);
2416 GOTO(out_unlock, rc);
2418 if (mdd_is_dead_obj(mdd_obj)) {
2420 struct lu_dirpage *dp;
2423 * According to POSIX, please do not return any entry to client:
2424 * even dot and dotdot should not be returned.
2426 CWARN("readdir from dead object: "DFID"\n",
2427 PFID(mdd_object_fid(mdd_obj)));
2429 if (rdpg->rp_count <= 0)
2430 GOTO(out_unlock, rc = -EFAULT);
2431 LASSERT(rdpg->rp_pages != NULL);
2433 pg = rdpg->rp_pages[0];
2434 dp = (struct lu_dirpage*)cfs_kmap(pg);
2435 memset(dp, 0 , sizeof(struct lu_dirpage));
2436 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2437 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2438 dp->ldp_flags |= LDF_EMPTY;
2439 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2441 GOTO(out_unlock, rc = 0);
2444 rc = __mdd_readpage(env, mdd_obj, rdpg);
2448 mdd_read_unlock(env, mdd_obj);
2452 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2454 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2455 struct dt_object *next;
2457 LASSERT(mdd_object_exists(mdd_obj));
2458 next = mdd_object_child(mdd_obj);
2459 return next->do_ops->do_object_sync(env, next);
2462 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2463 struct md_object *obj)
2465 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2467 LASSERT(mdd_object_exists(mdd_obj));
2468 return do_version_get(env, mdd_object_child(mdd_obj));
2471 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2472 dt_obj_version_t version)
2474 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2476 LASSERT(mdd_object_exists(mdd_obj));
2477 do_version_set(env, mdd_object_child(mdd_obj), version);
2480 const struct md_object_operations mdd_obj_ops = {
2481 .moo_permission = mdd_permission,
2482 .moo_attr_get = mdd_attr_get,
2483 .moo_attr_set = mdd_attr_set,
2484 .moo_xattr_get = mdd_xattr_get,
2485 .moo_xattr_set = mdd_xattr_set,
2486 .moo_xattr_list = mdd_xattr_list,
2487 .moo_xattr_del = mdd_xattr_del,
2488 .moo_object_create = mdd_object_create,
2489 .moo_ref_add = mdd_ref_add,
2490 .moo_ref_del = mdd_ref_del,
2491 .moo_open = mdd_open,
2492 .moo_close = mdd_close,
2493 .moo_readpage = mdd_readpage,
2494 .moo_readlink = mdd_readlink,
2495 .moo_changelog = mdd_changelog,
2496 .moo_capa_get = mdd_capa_get,
2497 .moo_object_sync = mdd_object_sync,
2498 .moo_version_get = mdd_version_get,
2499 .moo_version_set = mdd_version_set,
2500 .moo_path = mdd_path,
2501 .moo_file_lock = mdd_file_lock,
2502 .moo_file_unlock = mdd_file_unlock,