1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
55 #include <linux/jbd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
68 #include <linux/ldiskfs_fs.h>
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
73 #include "mdd_internal.h"
75 static const struct lu_object_operations mdd_lu_obj_ops;
77 static int mdd_xattr_get(const struct lu_env *env,
78 struct md_object *obj, struct lu_buf *buf,
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 mdo_data_get(env, obj, data);
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91 struct lu_attr *la, struct lustre_capa *capa)
93 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94 PFID(mdd_object_fid(obj)));
95 return mdo_attr_get(env, obj, la, capa);
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
100 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
102 if (flags & LUSTRE_APPEND_FL)
103 obj->mod_flags |= APPEND_OBJ;
105 if (flags & LUSTRE_IMMUTABLE_FL)
106 obj->mod_flags |= IMMUTE_OBJ;
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
111 struct mdd_thread_info *info;
113 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114 LASSERT(info != NULL);
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 buf = &mdd_env_info(env)->mti_buf;
128 void mdd_buf_put(struct lu_buf *buf)
130 if (buf == NULL || buf->lb_buf == NULL)
132 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL) {
158 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL)
165 /** Increase the size of the \a mti_big_buf.
166 * preserves old data in buffer
167 * old buffer remains unchanged on error
168 * \retval 0 or -ENOMEM
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
172 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175 LASSERT(len >= oldbuf->lb_len);
176 OBD_ALLOC_LARGE(buf.lb_buf, len);
178 if (buf.lb_buf == NULL)
182 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
184 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
186 memcpy(oldbuf, &buf, sizeof(buf));
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192 struct mdd_device *mdd)
194 struct mdd_thread_info *mti = mdd_env_info(env);
197 max_cookie_size = mdd_lov_cookiesize(env, mdd);
198 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199 if (mti->mti_max_cookie)
200 OBD_FREE_LARGE(mti->mti_max_cookie,
201 mti->mti_max_cookie_size);
202 mti->mti_max_cookie = NULL;
203 mti->mti_max_cookie_size = 0;
205 if (unlikely(mti->mti_max_cookie == NULL)) {
206 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207 if (likely(mti->mti_max_cookie != NULL))
208 mti->mti_max_cookie_size = max_cookie_size;
210 if (likely(mti->mti_max_cookie != NULL))
211 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212 return mti->mti_max_cookie;
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216 struct mdd_device *mdd)
218 struct mdd_thread_info *mti = mdd_env_info(env);
221 max_lmm_size = mdd_lov_mdsize(env, mdd);
222 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223 if (mti->mti_max_lmm)
224 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225 mti->mti_max_lmm = NULL;
226 mti->mti_max_lmm_size = 0;
228 if (unlikely(mti->mti_max_lmm == NULL)) {
229 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230 if (likely(mti->mti_max_lmm != NULL))
231 mti->mti_max_lmm_size = max_lmm_size;
233 return mti->mti_max_lmm;
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237 const struct lu_object_header *hdr,
240 struct mdd_object *mdd_obj;
242 OBD_ALLOC_PTR(mdd_obj);
243 if (mdd_obj != NULL) {
246 o = mdd2lu_obj(mdd_obj);
247 lu_object_init(o, NULL, d);
248 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250 mdd_obj->mod_count = 0;
251 o->lo_ops = &mdd_lu_obj_ops;
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259 const struct lu_object_conf *unused)
261 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262 struct mdd_object *mdd_obj = lu2mdd_obj(o);
263 struct lu_object *below;
264 struct lu_device *under;
267 mdd_obj->mod_cltime = 0;
268 under = &d->mdd_child->dd_lu_dev;
269 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270 mdd_pdlock_init(mdd_obj);
274 lu_object_add(o, below);
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 if (lu_object_exists(o))
282 return mdd_get_flags(env, lu2mdd_obj(o));
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj(o);
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296 lu_printer_t p, const struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300 "valid=%x, cltime="LPU64", flags=%lx)",
301 mdd, mdd->mod_count, mdd->mod_valid,
302 mdd->mod_cltime, mdd->mod_flags);
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306 .loo_object_init = mdd_object_init,
307 .loo_object_start = mdd_object_start,
308 .loo_object_free = mdd_object_free,
309 .loo_object_print = mdd_object_print,
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313 struct mdd_device *d,
314 const struct lu_fid *f)
316 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320 const char *path, struct lu_fid *fid)
323 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324 struct mdd_object *obj;
325 struct lu_name *lname = &mdd_env_info(env)->mti_name;
330 /* temp buffer for path element */
331 buf = mdd_buf_alloc(env, PATH_MAX);
332 if (buf->lb_buf == NULL)
335 lname->ln_name = name = buf->lb_buf;
336 lname->ln_namelen = 0;
337 *f = mdd->mdd_root_fid;
344 while (*path != '/' && *path != '\0') {
352 /* find obj corresponding to fid */
353 obj = mdd_object_find(env, mdd, f);
355 GOTO(out, rc = -EREMOTE);
357 GOTO(out, rc = PTR_ERR(obj));
358 /* get child fid from parent and name */
359 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360 mdd_object_put(env, obj);
365 lname->ln_namelen = 0;
374 /** The maximum depth that fid2path() will search.
375 * This is limited only because we want to store the fids for
376 * historical path lookup purposes.
378 #define MAX_PATH_DEPTH 100
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382 __u64 pli_recno; /**< history point */
383 __u64 pli_currec; /**< current record */
384 struct lu_fid pli_fid;
385 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386 struct mdd_object *pli_mdd_obj;
387 char *pli_path; /**< full path */
389 int pli_linkno; /**< which hardlink to follow */
390 int pli_fidcount; /**< number of \a pli_fids */
393 static int mdd_path_current(const struct lu_env *env,
394 struct path_lookup_info *pli)
396 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397 struct mdd_object *mdd_obj;
398 struct lu_buf *buf = NULL;
399 struct link_ea_header *leh;
400 struct link_ea_entry *lee;
401 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
408 ptr = pli->pli_path + pli->pli_pathlen - 1;
411 pli->pli_fidcount = 0;
412 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415 mdd_obj = mdd_object_find(env, mdd,
416 &pli->pli_fids[pli->pli_fidcount]);
418 GOTO(out, rc = -EREMOTE);
420 GOTO(out, rc = PTR_ERR(mdd_obj));
421 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423 mdd_object_put(env, mdd_obj);
427 /* Do I need to error out here? */
432 /* Get parent fid and object name */
433 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434 buf = mdd_links_get(env, mdd_obj);
435 mdd_read_unlock(env, mdd_obj);
436 mdd_object_put(env, mdd_obj);
438 GOTO(out, rc = PTR_ERR(buf));
441 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 /* If set, use link #linkno for path lookup, otherwise use
445 link #0. Only do this for the final path element. */
446 if ((pli->pli_fidcount == 0) &&
447 (pli->pli_linkno < leh->leh_reccount)) {
449 for (count = 0; count < pli->pli_linkno; count++) {
450 lee = (struct link_ea_entry *)
451 ((char *)lee + reclen);
452 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454 if (pli->pli_linkno < leh->leh_reccount - 1)
455 /* indicate to user there are more links */
459 /* Pack the name in the end of the buffer */
460 ptr -= tmpname->ln_namelen;
461 if (ptr - 1 <= pli->pli_path)
462 GOTO(out, rc = -EOVERFLOW);
463 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466 /* Store the parent fid for historic lookup */
467 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468 GOTO(out, rc = -EOVERFLOW);
469 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472 /* Verify that our path hasn't changed since we started the lookup.
473 Record the current index, and verify the path resolves to the
474 same fid. If it does, then the path is correct as of this index. */
475 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476 pli->pli_currec = mdd->mdd_cl.mc_index;
477 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481 GOTO (out, rc = -EAGAIN);
483 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486 PFID(&pli->pli_fid));
487 GOTO(out, rc = -EAGAIN);
489 ptr++; /* skip leading / */
490 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
494 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495 /* if we vmalloced a large buffer drop it */
501 static int mdd_path_historic(const struct lu_env *env,
502 struct path_lookup_info *pli)
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509 char *path, int pathlen, __u64 *recno, int *linkno)
511 struct path_lookup_info *pli;
519 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
528 pli->pli_mdd_obj = md2mdd_obj(obj);
529 pli->pli_recno = *recno;
530 pli->pli_path = path;
531 pli->pli_pathlen = pathlen;
532 pli->pli_linkno = *linkno;
534 /* Retry multiple times in case file is being moved */
535 while (tries-- && rc == -EAGAIN)
536 rc = mdd_path_current(env, pli);
538 /* For historical path lookup, the current links may not have existed
539 * at "recno" time. We must switch over to earlier links/parents
540 * by using the changelog records. If the earlier parent doesn't
541 * exist, we must search back through the changelog to reconstruct
542 * its parents, then check if it exists, etc.
543 * We may ignore this problem for the initial implementation and
544 * state that an "original" hardlink must still exist for us to find
545 * historic path name. */
546 if (pli->pli_recno != -1) {
547 rc = mdd_path_historic(env, pli);
549 *recno = pli->pli_currec;
550 /* Return next link index to caller */
551 *linkno = pli->pli_linkno;
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 struct lu_attr *la = &mdd_env_info(env)->mti_la;
565 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567 mdd_flags_xlate(obj, la->la_flags);
568 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569 obj->mod_flags |= MNLINK_OBJ;
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624 struct mdd_object *mdd_obj, struct md_attr *ma)
629 if (ma->ma_valid & MA_LOV)
632 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
634 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
637 ma->ma_lmm_size = rc;
638 ma->ma_valid |= MA_LOV;
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646 struct mdd_object *mdd_obj, struct md_attr *ma)
649 struct link_ea_header *leh;
650 struct link_ea_entry *lee;
651 struct lu_fid *pfid = &ma->ma_pfid;
654 if (ma->ma_valid & MA_PFID)
657 buf = mdd_links_get(env, mdd_obj);
659 RETURN(PTR_ERR(buf));
662 lee = (struct link_ea_entry *)(leh + 1);
663 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664 fid_be_to_cpu(pfid, pfid);
665 ma->ma_valid |= MA_PFID;
666 if (buf->lb_len > OBD_ALLOC_BIG)
667 /* if we vmalloced a large buffer drop it */
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
678 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679 rc = __mdd_lmm_get(env, mdd_obj, ma);
680 mdd_read_unlock(env, mdd_obj);
685 static int __mdd_lmv_get(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
691 if (ma->ma_valid & MA_LMV)
694 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
697 ma->ma_valid |= MA_LMV;
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
706 struct mdd_thread_info *info = mdd_env_info(env);
707 struct lustre_mdt_attrs *lma =
708 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
713 /* If all needed data are already valid, nothing to do */
714 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715 (ma->ma_need & (MA_HSM | MA_SOM)))
718 /* Read LMA from disk EA */
719 lma_size = sizeof(info->mti_xattr_buf);
720 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
724 /* Useless to check LMA incompatibility because this is already done in
725 * osd_ea_fid_get(), and this will fail long before this code is
727 * So, if we are here, LMA is compatible.
730 lustre_lma_swab(lma);
732 /* Swab and copy LMA */
733 if (ma->ma_need & MA_HSM) {
734 if (lma->lma_compat & LMAC_HSM)
735 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
737 ma->ma_hsm.mh_flags = 0;
738 ma->ma_valid |= MA_HSM;
742 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743 LASSERT(ma->ma_som != NULL);
744 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745 ma->ma_som->msd_size = lma->lma_som_size;
746 ma->ma_som->msd_blocks = lma->lma_som_blocks;
747 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748 ma->ma_valid |= MA_SOM;
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
760 if (ma->ma_need & MA_INODE)
761 rc = mdd_iattr_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_LOV) {
764 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765 S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = __mdd_lmm_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769 if (S_ISREG(mdd_object_type(mdd_obj)))
770 rc = mdd_pfid_get(env, mdd_obj, ma);
772 if (rc == 0 && ma->ma_need & MA_LMV) {
773 if (S_ISDIR(mdd_object_type(mdd_obj)))
774 rc = __mdd_lmv_get(env, mdd_obj, ma);
776 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777 if (S_ISREG(mdd_object_type(mdd_obj)))
778 rc = __mdd_lma_get(env, mdd_obj, ma);
780 #ifdef CONFIG_FS_POSIX_ACL
781 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782 if (S_ISDIR(mdd_object_type(mdd_obj)))
783 rc = mdd_def_acl_get(env, mdd_obj, ma);
786 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787 rc, ma->ma_valid, ma->ma_lmm);
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792 struct mdd_object *mdd_obj, struct md_attr *ma)
795 int needlock = ma->ma_need &
796 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
799 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800 rc = mdd_attr_get_internal(env, mdd_obj, ma);
802 mdd_read_unlock(env, mdd_obj);
807 * No permission check is needed.
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
812 struct mdd_object *mdd_obj = md2mdd_obj(obj);
816 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
821 * No permission check is needed.
823 static int mdd_xattr_get(const struct lu_env *env,
824 struct md_object *obj, struct lu_buf *buf,
827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
832 LASSERT(mdd_object_exists(mdd_obj));
834 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835 rc = mdo_xattr_get(env, mdd_obj, buf, name,
836 mdd_object_capa(env, mdd_obj));
837 mdd_read_unlock(env, mdd_obj);
843 * Permission check is done when open,
844 * no need check again.
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
850 struct dt_object *next;
855 LASSERT(mdd_object_exists(mdd_obj));
857 next = mdd_object_child(mdd_obj);
858 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860 mdd_object_capa(env, mdd_obj));
861 mdd_read_unlock(env, mdd_obj);
866 * No permission check is needed.
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
871 struct mdd_object *mdd_obj = md2mdd_obj(obj);
876 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878 mdd_read_unlock(env, mdd_obj);
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884 struct mdd_object *c, struct md_attr *ma,
885 struct thandle *handle,
886 const struct md_op_spec *spec)
888 struct lu_attr *attr = &ma->ma_attr;
889 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891 const struct dt_index_features *feat = spec->sp_feat;
895 if (!mdd_object_exists(c)) {
896 struct dt_object *next = mdd_object_child(c);
899 if (feat != &dt_directory_features && feat != NULL)
900 dof->dof_type = DFT_INDEX;
902 dof->dof_type = dt_mode_to_dft(attr->la_mode);
904 dof->u.dof_idx.di_feat = feat;
906 /* @hint will be initialized by underlying device. */
907 next->do_ops->do_ah_init(env, hint,
908 p ? mdd_object_child(p) : NULL,
909 attr->la_mode & S_IFMT);
911 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
920 * Make sure the ctime is increased only.
922 static inline int mdd_attr_check(const struct lu_env *env,
923 struct mdd_object *obj,
924 struct lu_attr *attr)
926 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
930 if (attr->la_valid & LA_CTIME) {
931 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
935 if (attr->la_ctime < tmp_la->la_ctime)
936 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937 else if (attr->la_valid == LA_CTIME &&
938 attr->la_ctime == tmp_la->la_ctime)
939 attr->la_valid &= ~LA_CTIME;
944 int mdd_attr_set_internal(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955 if (!rc && (attr->la_valid & LA_MODE) && needacl)
956 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962 struct mdd_object *obj,
963 struct lu_attr *attr,
964 struct thandle *handle,
970 rc = mdd_attr_check(env, obj, attr);
975 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980 struct mdd_object *obj,
981 struct lu_attr *attr,
982 struct thandle *handle,
988 needacl = needacl && (attr->la_valid & LA_MODE);
990 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
993 mdd_write_unlock(env, obj);
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998 struct mdd_object *obj,
999 struct lu_attr *attr,
1000 struct thandle *handle,
1006 needacl = needacl && (attr->la_valid & LA_MODE);
1008 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1011 mdd_write_unlock(env, obj);
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016 const struct lu_buf *buf, const char *name,
1017 int fl, struct thandle *handle)
1019 struct lustre_capa *capa = mdd_object_capa(env, obj);
1023 if (buf->lb_buf && buf->lb_len > 0)
1024 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026 rc = mdo_xattr_del(env, obj, name, handle, capa);
1032 * This gives the same functionality as the code between
1033 * sys_chmod and inode_setattr
1034 * chown_common and inode_setattr
1035 * utimes and inode_setattr
1036 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039 struct lu_attr *la, const struct md_attr *ma)
1041 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1042 struct md_ucred *uc;
1049 /* Do not permit change file type */
1050 if (la->la_valid & LA_TYPE)
1053 /* They should not be processed by setattr */
1054 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1057 /* export destroy does not have ->le_ses, but we may want
1058 * to drop LUSTRE_SOM_FL. */
1064 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1068 if (la->la_valid == LA_CTIME) {
1069 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070 /* This is only for set ctime when rename's source is
1072 rc = mdd_may_delete(env, NULL, obj,
1073 (struct md_attr *)ma, 1, 0);
1074 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075 la->la_valid &= ~LA_CTIME;
1079 if (la->la_valid == LA_ATIME) {
1080 /* This is atime only set for read atime update on close. */
1081 if (la->la_atime >= tmp_la->la_atime &&
1082 la->la_atime < (tmp_la->la_atime +
1083 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084 la->la_valid &= ~LA_ATIME;
1088 /* Check if flags change. */
1089 if (la->la_valid & LA_FLAGS) {
1090 unsigned int oldflags = 0;
1091 unsigned int newflags = la->la_flags &
1092 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1094 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095 !mdd_capable(uc, CFS_CAP_FOWNER))
1098 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099 * only be changed by the relevant capability. */
1100 if (mdd_is_immutable(obj))
1101 oldflags |= LUSTRE_IMMUTABLE_FL;
1102 if (mdd_is_append(obj))
1103 oldflags |= LUSTRE_APPEND_FL;
1104 if ((oldflags ^ newflags) &&
1105 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1108 if (!S_ISDIR(tmp_la->la_mode))
1109 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1112 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113 (la->la_valid & ~LA_FLAGS) &&
1114 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1117 /* Check for setting the obj time. */
1118 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1130 if (la->la_valid & LA_KILL_SUID) {
1131 la->la_valid &= ~LA_KILL_SUID;
1132 if ((tmp_la->la_mode & S_ISUID) &&
1133 !(la->la_valid & LA_MODE)) {
1134 la->la_mode = tmp_la->la_mode;
1135 la->la_valid |= LA_MODE;
1137 la->la_mode &= ~S_ISUID;
1140 if (la->la_valid & LA_KILL_SGID) {
1141 la->la_valid &= ~LA_KILL_SGID;
1142 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143 (S_ISGID | S_IXGRP)) &&
1144 !(la->la_valid & LA_MODE)) {
1145 la->la_mode = tmp_la->la_mode;
1146 la->la_valid |= LA_MODE;
1148 la->la_mode &= ~S_ISGID;
1151 /* Make sure a caller can chmod. */
1152 if (la->la_valid & LA_MODE) {
1153 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154 (uc->mu_fsuid != tmp_la->la_uid) &&
1155 !mdd_capable(uc, CFS_CAP_FOWNER))
1158 if (la->la_mode == (cfs_umode_t) -1)
1159 la->la_mode = tmp_la->la_mode;
1161 la->la_mode = (la->la_mode & S_IALLUGO) |
1162 (tmp_la->la_mode & ~S_IALLUGO);
1164 /* Also check the setgid bit! */
1165 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166 la->la_gid : tmp_la->la_gid) &&
1167 !mdd_capable(uc, CFS_CAP_FSETID))
1168 la->la_mode &= ~S_ISGID;
1170 la->la_mode = tmp_la->la_mode;
1173 /* Make sure a caller can chown. */
1174 if (la->la_valid & LA_UID) {
1175 if (la->la_uid == (uid_t) -1)
1176 la->la_uid = tmp_la->la_uid;
1177 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178 (la->la_uid != tmp_la->la_uid)) &&
1179 !mdd_capable(uc, CFS_CAP_CHOWN))
1182 /* If the user or group of a non-directory has been
1183 * changed by a non-root user, remove the setuid bit.
1184 * 19981026 David C Niemi <niemi@tux.org>
1186 * Changed this to apply to all users, including root,
1187 * to avoid some races. This is the behavior we had in
1188 * 2.0. The check for non-root was definitely wrong
1189 * for 2.2 anyway, as it should have been using
1190 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192 !S_ISDIR(tmp_la->la_mode)) {
1193 la->la_mode &= ~S_ISUID;
1194 la->la_valid |= LA_MODE;
1198 /* Make sure caller can chgrp. */
1199 if (la->la_valid & LA_GID) {
1200 if (la->la_gid == (gid_t) -1)
1201 la->la_gid = tmp_la->la_gid;
1202 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203 ((la->la_gid != tmp_la->la_gid) &&
1204 !lustre_in_group_p(uc, la->la_gid))) &&
1205 !mdd_capable(uc, CFS_CAP_CHOWN))
1208 /* Likewise, if the user or group of a non-directory
1209 * has been changed by a non-root user, remove the
1210 * setgid bit UNLESS there is no group execute bit
1211 * (this would be a file marked for mandatory
1212 * locking). 19981026 David C Niemi <niemi@tux.org>
1214 * Removed the fsuid check (see the comment above) --
1216 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218 la->la_mode &= ~S_ISGID;
1219 la->la_valid |= LA_MODE;
1223 /* For both Size-on-MDS case and truncate case,
1224 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226 * For SOM case, it is true, the MAY_WRITE perm has been checked
1227 * when open, no need check again. For truncate case, it is false,
1228 * the MAY_WRITE perm should be checked here. */
1229 if (ma->ma_attr_flags & MDS_SOM) {
1230 /* For the "Size-on-MDS" setattr update, merge coming
1231 * attributes with the set in the inode. BUG 10641 */
1232 if ((la->la_valid & LA_ATIME) &&
1233 (la->la_atime <= tmp_la->la_atime))
1234 la->la_valid &= ~LA_ATIME;
1236 /* OST attributes do not have a priority over MDS attributes,
1237 * so drop times if ctime is equal. */
1238 if ((la->la_valid & LA_CTIME) &&
1239 (la->la_ctime <= tmp_la->la_ctime))
1240 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1242 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244 (uc->mu_fsuid == tmp_la->la_uid)) &&
1245 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246 rc = mdd_permission_internal_locked(env, obj,
1253 if (la->la_valid & LA_CTIME) {
1254 /* The pure setattr, it has the priority over what is
1255 * already set, do not drop it if ctime is equal. */
1256 if (la->la_ctime < tmp_la->la_ctime)
1257 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1265 /** Store a data change changelog record
1266 * If this fails, we must fail the whole transaction; we don't
1267 * want the change to commit without the log entry.
1268 * \param mdd_obj - mdd_object of change
1269 * \param handle - transacion handle
1271 static int mdd_changelog_data_store(const struct lu_env *env,
1272 struct mdd_device *mdd,
1273 enum changelog_rec_type type,
1275 struct mdd_object *mdd_obj,
1276 struct thandle *handle)
1278 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279 struct llog_changelog_rec *rec;
1285 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1287 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1290 LASSERT(handle != NULL);
1291 LASSERT(mdd_obj != NULL);
1293 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295 /* Don't need multiple updates in this log */
1296 /* Don't check under lock - no big deal if we get an extra
1301 reclen = llog_data_len(sizeof(*rec));
1302 buf = mdd_buf_alloc(env, reclen);
1303 if (buf->lb_buf == NULL)
1305 rec = (struct llog_changelog_rec *)buf->lb_buf;
1307 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308 rec->cr.cr_type = (__u32)type;
1309 rec->cr.cr_tfid = *tfid;
1310 rec->cr.cr_namelen = 0;
1311 mdd_obj->mod_cltime = cfs_time_current_64();
1313 rc = mdd_changelog_llog_write(mdd, rec, handle);
1315 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316 rc, type, PFID(tfid));
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324 int flags, struct md_object *obj)
1326 struct thandle *handle;
1327 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328 struct mdd_device *mdd = mdo2mdd(obj);
1332 handle = mdd_trans_start(env, mdd);
1335 return(PTR_ERR(handle));
1337 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1340 mdd_trans_stop(env, mdd, rc, handle);
1346 * Should be called with write lock held.
1348 * \see mdd_lma_set_locked().
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1353 struct mdd_thread_info *info = mdd_env_info(env);
1355 struct lustre_mdt_attrs *lma =
1356 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357 int lmasize = sizeof(struct lustre_mdt_attrs);
1362 /* Either HSM or SOM part is not valid, we need to read it before */
1363 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1368 lustre_lma_swab(lma);
1370 memset(lma, 0, lmasize);
1374 if (ma->ma_valid & MA_HSM) {
1375 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376 lma->lma_compat |= LMAC_HSM;
1380 if (ma->ma_valid & MA_SOM) {
1381 LASSERT(ma->ma_som != NULL);
1382 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383 lma->lma_compat &= ~LMAC_SOM;
1385 lma->lma_compat |= LMAC_SOM;
1386 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1387 lma->lma_som_size = ma->ma_som->msd_size;
1388 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1389 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1394 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1396 lustre_lma_swab(lma);
1397 buf = mdd_buf_get(env, lma, lmasize);
1398 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1404 * Save LMA extended attributes with data from \a ma.
1406 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407 * not, LMA EA will be first read from disk, modified and write back.
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411 struct mdd_object *mdd_obj,
1412 const struct md_attr *ma, struct thandle *handle)
1416 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418 mdd_write_unlock(env, mdd_obj);
1422 /* Precedence for choosing record type when multiple
1423 * attributes change: setattr > mtime > ctime > atime
1424 * (ctime changes when mtime does, plus chmod/chown.
1425 * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427 struct md_object *obj, struct thandle *handle,
1430 struct mdd_device *mdd = mdo2mdd(obj);
1433 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437 bits = bits & mdd->mdd_cl.mc_mask;
1441 /* The record type is the lowest non-masked set bit */
1442 while (bits && ((bits & 1) == 0)) {
1447 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449 md2mdd_obj(obj), handle);
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454 const struct md_attr *ma)
1456 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457 struct mdd_device *mdd = mdo2mdd(obj);
1458 struct thandle *handle;
1459 struct lov_mds_md *lmm = NULL;
1460 struct llog_cookie *logcookies = NULL;
1461 int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463 struct obd_device *obd = mdd->mdd_obd_dev;
1464 struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468 int quota_opc = 0, block_count = 0;
1469 int inode_pending[MAXQUOTAS] = { 0, 0 };
1470 int block_pending[MAXQUOTAS] = { 0, 0 };
1474 *la_copy = ma->ma_attr;
1475 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1479 /* setattr on "close" only change atime, or do nothing */
1480 if (ma->ma_valid == MA_INODE &&
1481 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1484 /*TODO: add lock here*/
1485 /* start a log jounal handle if needed */
1486 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488 lmm_size = mdd_lov_mdsize(env, mdd);
1489 lmm = mdd_max_lmm_get(env, mdd);
1491 GOTO(no_trans, rc = -ENOMEM);
1493 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1501 if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503 lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1505 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1506 MDD_TXN_ATTR_SET_OP, chlog_cnt);
1507 handle = mdd_trans_start(env, mdd);
1509 GOTO(no_trans, rc = PTR_ERR(handle));
1511 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1512 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1513 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1515 #ifdef HAVE_QUOTA_SUPPORT
1516 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1517 struct obd_export *exp = md_quota(env)->mq_exp;
1518 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1520 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1522 quota_opc = FSFILT_OP_SETATTR;
1523 mdd_quota_wrapper(la_copy, qnids);
1524 mdd_quota_wrapper(la_tmp, qoids);
1525 /* get file quota for new owner */
1526 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1527 qnids, inode_pending, 1, NULL, 0,
1529 block_count = (la_tmp->la_blocks + 7) >> 3;
1532 mdd_data_get(env, mdd_obj, &data);
1533 /* get block quota for new owner */
1534 lquota_chkquota(mds_quota_interface_ref, obd,
1535 exp, qnids, block_pending,
1537 LQUOTA_FLAGS_BLK, data, 1);
1543 if (la_copy->la_valid & LA_FLAGS) {
1544 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1547 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1548 } else if (la_copy->la_valid) { /* setattr */
1549 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1551 /* journal chown/chgrp in llog, just like unlink */
1552 if (rc == 0 && lmm_size){
1553 cookie_size = mdd_lov_cookiesize(env, mdd);
1554 logcookies = mdd_max_cookie_get(env, mdd);
1555 if (logcookies == NULL)
1556 GOTO(cleanup, rc = -ENOMEM);
1558 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1559 logcookies, cookie_size) <= 0)
1564 if (rc == 0 && ma->ma_valid & MA_LOV) {
1567 mode = mdd_object_type(mdd_obj);
1568 if (S_ISREG(mode) || S_ISDIR(mode)) {
1569 rc = mdd_lsm_sanity_check(env, mdd_obj);
1573 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1574 ma->ma_lmm_size, handle, 1);
1578 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1581 mode = mdd_object_type(mdd_obj);
1583 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1588 rc = mdd_attr_set_changelog(env, obj, handle,
1589 ma->ma_attr.la_valid);
1590 mdd_trans_stop(env, mdd, rc, handle);
1592 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1593 /*set obd attr, if needed*/
1594 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1597 #ifdef HAVE_QUOTA_SUPPORT
1599 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1601 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1603 /* Trigger dqrel/dqacq for original owner and new owner.
1604 * If failed, the next call for lquota_chkquota will
1606 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1613 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1614 const struct lu_buf *buf, const char *name, int fl,
1615 struct thandle *handle)
1620 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1621 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1622 mdd_write_unlock(env, obj);
1627 static int mdd_xattr_sanity_check(const struct lu_env *env,
1628 struct mdd_object *obj)
1630 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1631 struct md_ucred *uc = md_ucred(env);
1635 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1638 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1642 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1643 !mdd_capable(uc, CFS_CAP_FOWNER))
1650 * The caller should guarantee to update the object ctime
1651 * after xattr_set if needed.
1653 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1654 const struct lu_buf *buf, const char *name,
1657 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1658 struct mdd_device *mdd = mdo2mdd(obj);
1659 struct thandle *handle;
1663 rc = mdd_xattr_sanity_check(env, mdd_obj);
1667 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1668 /* security-replated changes may require sync */
1669 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1670 mdd->mdd_sync_permission == 1)
1671 txn_param_sync(&mdd_env_info(env)->mti_param);
1673 handle = mdd_trans_start(env, mdd);
1675 RETURN(PTR_ERR(handle));
1677 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1679 /* Only record user xattr changes */
1680 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1681 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1683 mdd_trans_stop(env, mdd, rc, handle);
1689 * The caller should guarantee to update the object ctime
1690 * after xattr_set if needed.
1692 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1695 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1696 struct mdd_device *mdd = mdo2mdd(obj);
1697 struct thandle *handle;
1701 rc = mdd_xattr_sanity_check(env, mdd_obj);
1705 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1706 handle = mdd_trans_start(env, mdd);
1708 RETURN(PTR_ERR(handle));
1710 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1711 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1712 mdd_object_capa(env, mdd_obj));
1713 mdd_write_unlock(env, mdd_obj);
1715 /* Only record user xattr changes */
1716 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1717 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1720 mdd_trans_stop(env, mdd, rc, handle);
1725 /* partial unlink */
1726 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1729 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1730 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1731 struct mdd_device *mdd = mdo2mdd(obj);
1732 struct thandle *handle;
1733 #ifdef HAVE_QUOTA_SUPPORT
1734 struct obd_device *obd = mdd->mdd_obd_dev;
1735 struct mds_obd *mds = &obd->u.mds;
1736 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1743 * Check -ENOENT early here because we need to get object type
1744 * to calculate credits before transaction start
1746 if (!mdd_object_exists(mdd_obj))
1749 LASSERT(mdd_object_exists(mdd_obj) > 0);
1751 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1755 handle = mdd_trans_start(env, mdd);
1759 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1761 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1765 __mdd_ref_del(env, mdd_obj, handle, 0);
1767 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1769 __mdd_ref_del(env, mdd_obj, handle, 1);
1772 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1773 la_copy->la_ctime = ma->ma_attr.la_ctime;
1775 la_copy->la_valid = LA_CTIME;
1776 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1780 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1781 #ifdef HAVE_QUOTA_SUPPORT
1782 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1783 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1784 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1785 mdd_quota_wrapper(&ma->ma_attr, qids);
1792 mdd_write_unlock(env, mdd_obj);
1793 mdd_trans_stop(env, mdd, rc, handle);
1794 #ifdef HAVE_QUOTA_SUPPORT
1796 /* Trigger dqrel on the owner of child. If failed,
1797 * the next call for lquota_chkquota will process it */
1798 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1804 /* partial operation */
1805 static int mdd_oc_sanity_check(const struct lu_env *env,
1806 struct mdd_object *obj,
1812 switch (ma->ma_attr.la_mode & S_IFMT) {
1829 static int mdd_object_create(const struct lu_env *env,
1830 struct md_object *obj,
1831 const struct md_op_spec *spec,
1835 struct mdd_device *mdd = mdo2mdd(obj);
1836 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1837 const struct lu_fid *pfid = spec->u.sp_pfid;
1838 struct thandle *handle;
1839 #ifdef HAVE_QUOTA_SUPPORT
1840 struct obd_device *obd = mdd->mdd_obd_dev;
1841 struct obd_export *exp = md_quota(env)->mq_exp;
1842 struct mds_obd *mds = &obd->u.mds;
1843 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1844 int quota_opc = 0, block_count = 0;
1845 int inode_pending[MAXQUOTAS] = { 0, 0 };
1846 int block_pending[MAXQUOTAS] = { 0, 0 };
1851 #ifdef HAVE_QUOTA_SUPPORT
1852 if (mds->mds_quota) {
1853 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1854 mdd_quota_wrapper(&ma->ma_attr, qids);
1855 /* get file quota for child */
1856 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1857 qids, inode_pending, 1, NULL, 0,
1859 switch (ma->ma_attr.la_mode & S_IFMT) {
1868 /* get block quota for child */
1870 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1871 qids, block_pending, block_count,
1872 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1876 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1877 handle = mdd_trans_start(env, mdd);
1879 GOTO(out_pending, rc = PTR_ERR(handle));
1881 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1882 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1886 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1890 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1891 /* If creating the slave object, set slave EA here. */
1892 int lmv_size = spec->u.sp_ea.eadatalen;
1893 struct lmv_stripe_md *lmv;
1895 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1896 LASSERT(lmv != NULL && lmv_size > 0);
1898 rc = __mdd_xattr_set(env, mdd_obj,
1899 mdd_buf_get_const(env, lmv, lmv_size),
1900 XATTR_NAME_LMV, 0, handle);
1904 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1907 #ifdef CONFIG_FS_POSIX_ACL
1908 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1909 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1911 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1912 buf->lb_len = spec->u.sp_ea.eadatalen;
1913 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1914 rc = __mdd_acl_init(env, mdd_obj, buf,
1915 &ma->ma_attr.la_mode,
1920 ma->ma_attr.la_valid |= LA_MODE;
1923 pfid = spec->u.sp_ea.fid;
1926 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1932 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1933 mdd_write_unlock(env, mdd_obj);
1935 mdd_trans_stop(env, mdd, rc, handle);
1937 #ifdef HAVE_QUOTA_SUPPORT
1939 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1941 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1943 /* Trigger dqacq on the owner of child. If failed,
1944 * the next call for lquota_chkquota will process it. */
1945 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1953 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1954 const struct md_attr *ma)
1956 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1957 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1958 struct mdd_device *mdd = mdo2mdd(obj);
1959 struct thandle *handle;
1963 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1964 handle = mdd_trans_start(env, mdd);
1968 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1969 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1971 __mdd_ref_add(env, mdd_obj, handle);
1972 mdd_write_unlock(env, mdd_obj);
1974 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1975 la_copy->la_ctime = ma->ma_attr.la_ctime;
1977 la_copy->la_valid = LA_CTIME;
1978 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1981 mdd_trans_stop(env, mdd, 0, handle);
1987 * do NOT or the MAY_*'s, you'll get the weakest
1989 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1993 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1994 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1995 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1996 * owner can write to a file even if it is marked readonly to hide
1997 * its brokenness. (bug 5781) */
1998 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1999 struct md_ucred *uc = md_ucred(env);
2001 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2002 (la->la_uid == uc->mu_fsuid))
2006 if (flags & FMODE_READ)
2008 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2010 if (flags & MDS_FMODE_EXEC)
2015 static int mdd_open_sanity_check(const struct lu_env *env,
2016 struct mdd_object *obj, int flag)
2018 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2023 if (mdd_is_dead_obj(obj))
2026 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2030 if (S_ISLNK(tmp_la->la_mode))
2033 mode = accmode(env, tmp_la, flag);
2035 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2038 if (!(flag & MDS_OPEN_CREATED)) {
2039 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2044 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2045 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2046 flag &= ~MDS_OPEN_TRUNC;
2048 /* For writing append-only file must open it with append mode. */
2049 if (mdd_is_append(obj)) {
2050 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2052 if (flag & MDS_OPEN_TRUNC)
2058 * Now, flag -- O_NOATIME does not be packed by client.
2060 if (flag & O_NOATIME) {
2061 struct md_ucred *uc = md_ucred(env);
2063 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2064 (uc->mu_valid == UCRED_NEW)) &&
2065 (uc->mu_fsuid != tmp_la->la_uid) &&
2066 !mdd_capable(uc, CFS_CAP_FOWNER))
2074 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2077 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2080 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2082 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2084 mdd_obj->mod_count++;
2086 mdd_write_unlock(env, mdd_obj);
2090 /* return md_attr back,
2091 * if it is last unlink then return lov ea + llog cookie*/
2092 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2098 if (S_ISREG(mdd_object_type(obj))) {
2099 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2100 * Caller must be ready for that. */
2102 rc = __mdd_lmm_get(env, obj, ma);
2103 if ((ma->ma_valid & MA_LOV))
2104 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2111 * No permission check is needed.
2113 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2116 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2117 struct mdd_device *mdd = mdo2mdd(obj);
2118 struct thandle *handle = NULL;
2122 #ifdef HAVE_QUOTA_SUPPORT
2123 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2124 struct mds_obd *mds = &obd->u.mds;
2125 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2130 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2131 mdd_obj->mod_count--;
2133 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2134 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2135 "list\n", PFID(mdd_object_fid(mdd_obj)));
2139 /* check without any lock */
2140 if (mdd_obj->mod_count == 1 &&
2141 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2143 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2146 handle = mdd_trans_start(env, mdo2mdd(obj));
2148 RETURN(PTR_ERR(handle));
2151 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2152 if (handle == NULL && mdd_obj->mod_count == 1 &&
2153 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2154 mdd_write_unlock(env, mdd_obj);
2158 /* release open count */
2159 mdd_obj->mod_count --;
2161 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2162 /* remove link to object from orphan index */
2163 rc = __mdd_orphan_del(env, mdd_obj, handle);
2165 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2166 "list, OSS objects to be destroyed.\n",
2167 PFID(mdd_object_fid(mdd_obj)));
2169 CERROR("Object "DFID" can not be deleted from orphan "
2170 "list, maybe cause OST objects can not be "
2171 "destroyed (err: %d).\n",
2172 PFID(mdd_object_fid(mdd_obj)), rc);
2173 /* If object was not deleted from orphan list, do not
2174 * destroy OSS objects, which will be done when next
2180 rc = mdd_iattr_get(env, mdd_obj, ma);
2181 /* Object maybe not in orphan list originally, it is rare case for
2182 * mdd_finish_unlink() failure. */
2183 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2184 #ifdef HAVE_QUOTA_SUPPORT
2185 if (mds->mds_quota) {
2186 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2187 mdd_quota_wrapper(&ma->ma_attr, qids);
2190 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2191 if (ma->ma_valid & MA_FLAGS &&
2192 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2193 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2195 rc = mdd_object_kill(env, mdd_obj, ma);
2201 CERROR("Error when prepare to delete Object "DFID" , "
2202 "which will cause OST objects can not be "
2203 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2209 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2211 mdd_write_unlock(env, mdd_obj);
2213 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2214 #ifdef HAVE_QUOTA_SUPPORT
2216 /* Trigger dqrel on the owner of child. If failed,
2217 * the next call for lquota_chkquota will process it */
2218 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2225 * Permission check is done when open,
2226 * no need check again.
2228 static int mdd_readpage_sanity_check(const struct lu_env *env,
2229 struct mdd_object *obj)
2231 struct dt_object *next = mdd_object_child(obj);
2235 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2243 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2244 struct lu_dirpage *dp, int nob,
2245 const struct dt_it_ops *iops, struct dt_it *it,
2251 struct lu_dirent *ent;
2252 struct lu_dirent *last = NULL;
2255 memset(area, 0, sizeof (*dp));
2256 area += sizeof (*dp);
2257 nob -= sizeof (*dp);
2264 len = iops->key_size(env, it);
2266 /* IAM iterator can return record with zero len. */
2270 hash = iops->store(env, it);
2271 if (unlikely(first)) {
2273 dp->ldp_hash_start = cpu_to_le64(hash);
2276 /* calculate max space required for lu_dirent */
2277 recsize = lu_dirent_calc_size(len, attr);
2279 if (nob >= recsize) {
2280 result = iops->rec(env, it, ent, attr);
2281 if (result == -ESTALE)
2286 /* osd might not able to pack all attributes,
2287 * so recheck rec length */
2288 recsize = le16_to_cpu(ent->lde_reclen);
2290 result = (last != NULL) ? 0 :-EINVAL;
2294 ent = (void *)ent + recsize;
2298 result = iops->next(env, it);
2299 if (result == -ESTALE)
2301 } while (result == 0);
2304 dp->ldp_hash_end = cpu_to_le64(hash);
2306 if (last->lde_hash == dp->ldp_hash_end)
2307 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2308 last->lde_reclen = 0; /* end mark */
2313 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2314 const struct lu_rdpg *rdpg)
2317 struct dt_object *next = mdd_object_child(obj);
2318 const struct dt_it_ops *iops;
2320 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2326 LASSERT(rdpg->rp_pages != NULL);
2327 LASSERT(next->do_index_ops != NULL);
2329 if (rdpg->rp_count <= 0)
2333 * iterate through directory and fill pages from @rdpg
2335 iops = &next->do_index_ops->dio_it;
2336 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2340 rc = iops->load(env, it, rdpg->rp_hash);
2344 * Iterator didn't find record with exactly the key requested.
2346 * It is currently either
2348 * - positioned above record with key less than
2349 * requested---skip it.
2351 * - or not positioned at all (is in IAM_IT_SKEWED
2352 * state)---position it on the next item.
2354 rc = iops->next(env, it);
2359 * At this point and across for-loop:
2361 * rc == 0 -> ok, proceed.
2362 * rc > 0 -> end of directory.
2365 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2366 i++, nob -= CFS_PAGE_SIZE) {
2367 struct lu_dirpage *dp;
2369 LASSERT(i < rdpg->rp_npages);
2370 pg = rdpg->rp_pages[i];
2372 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2375 rc = mdd_dir_page_build(env, mdd, dp,
2376 min_t(int, nob, LU_PAGE_SIZE),
2377 iops, it, rdpg->rp_attrs);
2382 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2384 } else if (rc < 0) {
2385 CWARN("build page failed: %d!\n", rc);
2388 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2389 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2390 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2397 struct lu_dirpage *dp;
2399 dp = cfs_kmap(rdpg->rp_pages[0]);
2400 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2403 * No pages were processed, mark this for first page
2406 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2409 cfs_kunmap(rdpg->rp_pages[0]);
2411 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2414 iops->fini(env, it);
2419 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2420 const struct lu_rdpg *rdpg)
2422 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2426 LASSERT(mdd_object_exists(mdd_obj));
2428 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2429 rc = mdd_readpage_sanity_check(env, mdd_obj);
2431 GOTO(out_unlock, rc);
2433 if (mdd_is_dead_obj(mdd_obj)) {
2435 struct lu_dirpage *dp;
2438 * According to POSIX, please do not return any entry to client:
2439 * even dot and dotdot should not be returned.
2441 CWARN("readdir from dead object: "DFID"\n",
2442 PFID(mdd_object_fid(mdd_obj)));
2444 if (rdpg->rp_count <= 0)
2445 GOTO(out_unlock, rc = -EFAULT);
2446 LASSERT(rdpg->rp_pages != NULL);
2448 pg = rdpg->rp_pages[0];
2449 dp = (struct lu_dirpage*)cfs_kmap(pg);
2450 memset(dp, 0 , sizeof(struct lu_dirpage));
2451 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2452 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2453 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2455 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2458 rc = __mdd_readpage(env, mdd_obj, rdpg);
2462 mdd_read_unlock(env, mdd_obj);
2466 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2468 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2469 struct dt_object *next;
2471 LASSERT(mdd_object_exists(mdd_obj));
2472 next = mdd_object_child(mdd_obj);
2473 return next->do_ops->do_object_sync(env, next);
2476 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2477 struct md_object *obj)
2479 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2481 LASSERT(mdd_object_exists(mdd_obj));
2482 return do_version_get(env, mdd_object_child(mdd_obj));
2485 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2486 dt_obj_version_t version)
2488 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2490 LASSERT(mdd_object_exists(mdd_obj));
2491 do_version_set(env, mdd_object_child(mdd_obj), version);
2494 const struct md_object_operations mdd_obj_ops = {
2495 .moo_permission = mdd_permission,
2496 .moo_attr_get = mdd_attr_get,
2497 .moo_attr_set = mdd_attr_set,
2498 .moo_xattr_get = mdd_xattr_get,
2499 .moo_xattr_set = mdd_xattr_set,
2500 .moo_xattr_list = mdd_xattr_list,
2501 .moo_xattr_del = mdd_xattr_del,
2502 .moo_object_create = mdd_object_create,
2503 .moo_ref_add = mdd_ref_add,
2504 .moo_ref_del = mdd_ref_del,
2505 .moo_open = mdd_open,
2506 .moo_close = mdd_close,
2507 .moo_readpage = mdd_readpage,
2508 .moo_readlink = mdd_readlink,
2509 .moo_changelog = mdd_changelog,
2510 .moo_capa_get = mdd_capa_get,
2511 .moo_object_sync = mdd_object_sync,
2512 .moo_version_get = mdd_version_get,
2513 .moo_version_set = mdd_version_set,
2514 .moo_path = mdd_path,
2515 .moo_file_lock = mdd_file_lock,
2516 .moo_file_unlock = mdd_file_unlock,