1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
55 #include <linux/jbd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
68 #include <linux/ldiskfs_fs.h>
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
73 #include "mdd_internal.h"
75 static const struct lu_object_operations mdd_lu_obj_ops;
77 static int mdd_xattr_get(const struct lu_env *env,
78 struct md_object *obj, struct lu_buf *buf,
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 mdo_data_get(env, obj, data);
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91 struct lu_attr *la, struct lustre_capa *capa)
93 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94 PFID(mdd_object_fid(obj)));
95 return mdo_attr_get(env, obj, la, capa);
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
100 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
102 if (flags & LUSTRE_APPEND_FL)
103 obj->mod_flags |= APPEND_OBJ;
105 if (flags & LUSTRE_IMMUTABLE_FL)
106 obj->mod_flags |= IMMUTE_OBJ;
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
111 struct mdd_thread_info *info;
113 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114 LASSERT(info != NULL);
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 buf = &mdd_env_info(env)->mti_buf;
128 void mdd_buf_put(struct lu_buf *buf)
130 if (buf == NULL || buf->lb_buf == NULL)
132 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL) {
158 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL)
165 /** Increase the size of the \a mti_big_buf.
166 * preserves old data in buffer
167 * old buffer remains unchanged on error
168 * \retval 0 or -ENOMEM
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
172 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175 LASSERT(len >= oldbuf->lb_len);
176 OBD_ALLOC_LARGE(buf.lb_buf, len);
178 if (buf.lb_buf == NULL)
182 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
184 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
186 memcpy(oldbuf, &buf, sizeof(buf));
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192 struct mdd_device *mdd)
194 struct mdd_thread_info *mti = mdd_env_info(env);
197 max_cookie_size = mdd_lov_cookiesize(env, mdd);
198 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199 if (mti->mti_max_cookie)
200 OBD_FREE_LARGE(mti->mti_max_cookie,
201 mti->mti_max_cookie_size);
202 mti->mti_max_cookie = NULL;
203 mti->mti_max_cookie_size = 0;
205 if (unlikely(mti->mti_max_cookie == NULL)) {
206 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207 if (likely(mti->mti_max_cookie != NULL))
208 mti->mti_max_cookie_size = max_cookie_size;
210 if (likely(mti->mti_max_cookie != NULL))
211 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212 return mti->mti_max_cookie;
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216 struct mdd_device *mdd)
218 struct mdd_thread_info *mti = mdd_env_info(env);
221 max_lmm_size = mdd_lov_mdsize(env, mdd);
222 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223 if (mti->mti_max_lmm)
224 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225 mti->mti_max_lmm = NULL;
226 mti->mti_max_lmm_size = 0;
228 if (unlikely(mti->mti_max_lmm == NULL)) {
229 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230 if (likely(mti->mti_max_lmm != NULL))
231 mti->mti_max_lmm_size = max_lmm_size;
233 return mti->mti_max_lmm;
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237 const struct lu_object_header *hdr,
240 struct mdd_object *mdd_obj;
242 OBD_ALLOC_PTR(mdd_obj);
243 if (mdd_obj != NULL) {
246 o = mdd2lu_obj(mdd_obj);
247 lu_object_init(o, NULL, d);
248 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250 mdd_obj->mod_count = 0;
251 o->lo_ops = &mdd_lu_obj_ops;
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259 const struct lu_object_conf *unused)
261 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262 struct mdd_object *mdd_obj = lu2mdd_obj(o);
263 struct lu_object *below;
264 struct lu_device *under;
267 mdd_obj->mod_cltime = 0;
268 under = &d->mdd_child->dd_lu_dev;
269 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270 mdd_pdlock_init(mdd_obj);
274 lu_object_add(o, below);
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 if (lu_object_exists(o))
282 return mdd_get_flags(env, lu2mdd_obj(o));
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj(o);
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296 lu_printer_t p, const struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300 "valid=%x, cltime="LPU64", flags=%lx)",
301 mdd, mdd->mod_count, mdd->mod_valid,
302 mdd->mod_cltime, mdd->mod_flags);
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306 .loo_object_init = mdd_object_init,
307 .loo_object_start = mdd_object_start,
308 .loo_object_free = mdd_object_free,
309 .loo_object_print = mdd_object_print,
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313 struct mdd_device *d,
314 const struct lu_fid *f)
316 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320 const char *path, struct lu_fid *fid)
323 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324 struct mdd_object *obj;
325 struct lu_name *lname = &mdd_env_info(env)->mti_name;
330 /* temp buffer for path element */
331 buf = mdd_buf_alloc(env, PATH_MAX);
332 if (buf->lb_buf == NULL)
335 lname->ln_name = name = buf->lb_buf;
336 lname->ln_namelen = 0;
337 *f = mdd->mdd_root_fid;
344 while (*path != '/' && *path != '\0') {
352 /* find obj corresponding to fid */
353 obj = mdd_object_find(env, mdd, f);
355 GOTO(out, rc = -EREMOTE);
357 GOTO(out, rc = PTR_ERR(obj));
358 /* get child fid from parent and name */
359 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360 mdd_object_put(env, obj);
365 lname->ln_namelen = 0;
374 /** The maximum depth that fid2path() will search.
375 * This is limited only because we want to store the fids for
376 * historical path lookup purposes.
378 #define MAX_PATH_DEPTH 100
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382 __u64 pli_recno; /**< history point */
383 __u64 pli_currec; /**< current record */
384 struct lu_fid pli_fid;
385 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386 struct mdd_object *pli_mdd_obj;
387 char *pli_path; /**< full path */
389 int pli_linkno; /**< which hardlink to follow */
390 int pli_fidcount; /**< number of \a pli_fids */
393 static int mdd_path_current(const struct lu_env *env,
394 struct path_lookup_info *pli)
396 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397 struct mdd_object *mdd_obj;
398 struct lu_buf *buf = NULL;
399 struct link_ea_header *leh;
400 struct link_ea_entry *lee;
401 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
408 ptr = pli->pli_path + pli->pli_pathlen - 1;
411 pli->pli_fidcount = 0;
412 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415 mdd_obj = mdd_object_find(env, mdd,
416 &pli->pli_fids[pli->pli_fidcount]);
418 GOTO(out, rc = -EREMOTE);
420 GOTO(out, rc = PTR_ERR(mdd_obj));
421 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423 mdd_object_put(env, mdd_obj);
427 /* Do I need to error out here? */
432 /* Get parent fid and object name */
433 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434 buf = mdd_links_get(env, mdd_obj);
435 mdd_read_unlock(env, mdd_obj);
436 mdd_object_put(env, mdd_obj);
438 GOTO(out, rc = PTR_ERR(buf));
441 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 /* If set, use link #linkno for path lookup, otherwise use
445 link #0. Only do this for the final path element. */
446 if ((pli->pli_fidcount == 0) &&
447 (pli->pli_linkno < leh->leh_reccount)) {
449 for (count = 0; count < pli->pli_linkno; count++) {
450 lee = (struct link_ea_entry *)
451 ((char *)lee + reclen);
452 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454 if (pli->pli_linkno < leh->leh_reccount - 1)
455 /* indicate to user there are more links */
459 /* Pack the name in the end of the buffer */
460 ptr -= tmpname->ln_namelen;
461 if (ptr - 1 <= pli->pli_path)
462 GOTO(out, rc = -EOVERFLOW);
463 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466 /* Store the parent fid for historic lookup */
467 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468 GOTO(out, rc = -EOVERFLOW);
469 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472 /* Verify that our path hasn't changed since we started the lookup.
473 Record the current index, and verify the path resolves to the
474 same fid. If it does, then the path is correct as of this index. */
475 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476 pli->pli_currec = mdd->mdd_cl.mc_index;
477 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481 GOTO (out, rc = -EAGAIN);
483 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486 PFID(&pli->pli_fid));
487 GOTO(out, rc = -EAGAIN);
489 ptr++; /* skip leading / */
490 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
494 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495 /* if we vmalloced a large buffer drop it */
501 static int mdd_path_historic(const struct lu_env *env,
502 struct path_lookup_info *pli)
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509 char *path, int pathlen, __u64 *recno, int *linkno)
511 struct path_lookup_info *pli;
519 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
528 pli->pli_mdd_obj = md2mdd_obj(obj);
529 pli->pli_recno = *recno;
530 pli->pli_path = path;
531 pli->pli_pathlen = pathlen;
532 pli->pli_linkno = *linkno;
534 /* Retry multiple times in case file is being moved */
535 while (tries-- && rc == -EAGAIN)
536 rc = mdd_path_current(env, pli);
538 /* For historical path lookup, the current links may not have existed
539 * at "recno" time. We must switch over to earlier links/parents
540 * by using the changelog records. If the earlier parent doesn't
541 * exist, we must search back through the changelog to reconstruct
542 * its parents, then check if it exists, etc.
543 * We may ignore this problem for the initial implementation and
544 * state that an "original" hardlink must still exist for us to find
545 * historic path name. */
546 if (pli->pli_recno != -1) {
547 rc = mdd_path_historic(env, pli);
549 *recno = pli->pli_currec;
550 /* Return next link index to caller */
551 *linkno = pli->pli_linkno;
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 struct lu_attr *la = &mdd_env_info(env)->mti_la;
565 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567 mdd_flags_xlate(obj, la->la_flags);
568 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569 obj->mod_flags |= MNLINK_OBJ;
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624 struct mdd_object *mdd_obj, struct md_attr *ma)
629 if (ma->ma_valid & MA_LOV)
632 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
634 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
637 ma->ma_lmm_size = rc;
638 ma->ma_valid |= MA_LOV;
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646 struct mdd_object *mdd_obj, struct md_attr *ma)
649 struct link_ea_header *leh;
650 struct link_ea_entry *lee;
651 struct lu_fid *pfid = &ma->ma_pfid;
654 if (ma->ma_valid & MA_PFID)
657 buf = mdd_links_get(env, mdd_obj);
659 RETURN(PTR_ERR(buf));
662 lee = (struct link_ea_entry *)(leh + 1);
663 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664 fid_be_to_cpu(pfid, pfid);
665 ma->ma_valid |= MA_PFID;
666 if (buf->lb_len > OBD_ALLOC_BIG)
667 /* if we vmalloced a large buffer drop it */
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
678 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679 rc = __mdd_lmm_get(env, mdd_obj, ma);
680 mdd_read_unlock(env, mdd_obj);
685 static int __mdd_lmv_get(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
691 if (ma->ma_valid & MA_LMV)
694 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
697 ma->ma_valid |= MA_LMV;
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
706 struct mdd_thread_info *info = mdd_env_info(env);
707 struct lustre_mdt_attrs *lma =
708 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
713 /* If all needed data are already valid, nothing to do */
714 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715 (ma->ma_need & (MA_HSM | MA_SOM)))
718 /* Read LMA from disk EA */
719 lma_size = sizeof(info->mti_xattr_buf);
720 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
724 /* Useless to check LMA incompatibility because this is already done in
725 * osd_ea_fid_get(), and this will fail long before this code is
727 * So, if we are here, LMA is compatible.
730 lustre_lma_swab(lma);
732 /* Swab and copy LMA */
733 if (ma->ma_need & MA_HSM) {
734 if (lma->lma_compat & LMAC_HSM)
735 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
737 ma->ma_hsm.mh_flags = 0;
738 ma->ma_valid |= MA_HSM;
742 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743 LASSERT(ma->ma_som != NULL);
744 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745 ma->ma_som->msd_size = lma->lma_som_size;
746 ma->ma_som->msd_blocks = lma->lma_som_blocks;
747 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748 ma->ma_valid |= MA_SOM;
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
760 if (ma->ma_need & MA_INODE)
761 rc = mdd_iattr_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_LOV) {
764 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765 S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = __mdd_lmm_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769 if (S_ISREG(mdd_object_type(mdd_obj)))
770 rc = mdd_pfid_get(env, mdd_obj, ma);
772 if (rc == 0 && ma->ma_need & MA_LMV) {
773 if (S_ISDIR(mdd_object_type(mdd_obj)))
774 rc = __mdd_lmv_get(env, mdd_obj, ma);
776 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777 if (S_ISREG(mdd_object_type(mdd_obj)))
778 rc = __mdd_lma_get(env, mdd_obj, ma);
780 #ifdef CONFIG_FS_POSIX_ACL
781 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782 if (S_ISDIR(mdd_object_type(mdd_obj)))
783 rc = mdd_def_acl_get(env, mdd_obj, ma);
786 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787 rc, ma->ma_valid, ma->ma_lmm);
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792 struct mdd_object *mdd_obj, struct md_attr *ma)
795 int needlock = ma->ma_need &
796 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
799 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800 rc = mdd_attr_get_internal(env, mdd_obj, ma);
802 mdd_read_unlock(env, mdd_obj);
807 * No permission check is needed.
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
812 struct mdd_object *mdd_obj = md2mdd_obj(obj);
816 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
821 * No permission check is needed.
823 static int mdd_xattr_get(const struct lu_env *env,
824 struct md_object *obj, struct lu_buf *buf,
827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
832 LASSERT(mdd_object_exists(mdd_obj));
834 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835 rc = mdo_xattr_get(env, mdd_obj, buf, name,
836 mdd_object_capa(env, mdd_obj));
837 mdd_read_unlock(env, mdd_obj);
843 * Permission check is done when open,
844 * no need check again.
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
850 struct dt_object *next;
855 LASSERT(mdd_object_exists(mdd_obj));
857 next = mdd_object_child(mdd_obj);
858 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860 mdd_object_capa(env, mdd_obj));
861 mdd_read_unlock(env, mdd_obj);
866 * No permission check is needed.
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
871 struct mdd_object *mdd_obj = md2mdd_obj(obj);
876 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878 mdd_read_unlock(env, mdd_obj);
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884 struct mdd_object *c, struct md_attr *ma,
885 struct thandle *handle,
886 const struct md_op_spec *spec)
888 struct lu_attr *attr = &ma->ma_attr;
889 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891 const struct dt_index_features *feat = spec->sp_feat;
895 if (!mdd_object_exists(c)) {
896 struct dt_object *next = mdd_object_child(c);
899 if (feat != &dt_directory_features && feat != NULL)
900 dof->dof_type = DFT_INDEX;
902 dof->dof_type = dt_mode_to_dft(attr->la_mode);
904 dof->u.dof_idx.di_feat = feat;
906 /* @hint will be initialized by underlying device. */
907 next->do_ops->do_ah_init(env, hint,
908 p ? mdd_object_child(p) : NULL,
909 attr->la_mode & S_IFMT);
911 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
920 * Make sure the ctime is increased only.
922 static inline int mdd_attr_check(const struct lu_env *env,
923 struct mdd_object *obj,
924 struct lu_attr *attr)
926 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
930 if (attr->la_valid & LA_CTIME) {
931 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
935 if (attr->la_ctime < tmp_la->la_ctime)
936 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937 else if (attr->la_valid == LA_CTIME &&
938 attr->la_ctime == tmp_la->la_ctime)
939 attr->la_valid &= ~LA_CTIME;
944 int mdd_attr_set_internal(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955 if (!rc && (attr->la_valid & LA_MODE) && needacl)
956 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962 struct mdd_object *obj,
963 struct lu_attr *attr,
964 struct thandle *handle,
970 rc = mdd_attr_check(env, obj, attr);
975 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980 struct mdd_object *obj,
981 struct lu_attr *attr,
982 struct thandle *handle,
988 needacl = needacl && (attr->la_valid & LA_MODE);
990 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
993 mdd_write_unlock(env, obj);
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998 struct mdd_object *obj,
999 struct lu_attr *attr,
1000 struct thandle *handle,
1006 needacl = needacl && (attr->la_valid & LA_MODE);
1008 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1011 mdd_write_unlock(env, obj);
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016 const struct lu_buf *buf, const char *name,
1017 int fl, struct thandle *handle)
1019 struct lustre_capa *capa = mdd_object_capa(env, obj);
1023 if (buf->lb_buf && buf->lb_len > 0)
1024 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026 rc = mdo_xattr_del(env, obj, name, handle, capa);
1032 * This gives the same functionality as the code between
1033 * sys_chmod and inode_setattr
1034 * chown_common and inode_setattr
1035 * utimes and inode_setattr
1036 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039 struct lu_attr *la, const struct md_attr *ma)
1041 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1042 struct md_ucred *uc;
1049 /* Do not permit change file type */
1050 if (la->la_valid & LA_TYPE)
1053 /* They should not be processed by setattr */
1054 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1057 /* export destroy does not have ->le_ses, but we may want
1058 * to drop LUSTRE_SOM_FL. */
1064 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1068 if (la->la_valid == LA_CTIME) {
1069 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070 /* This is only for set ctime when rename's source is
1072 rc = mdd_may_delete(env, NULL, obj,
1073 (struct md_attr *)ma, 1, 0);
1074 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075 la->la_valid &= ~LA_CTIME;
1079 if (la->la_valid == LA_ATIME) {
1080 /* This is atime only set for read atime update on close. */
1081 if (la->la_atime >= tmp_la->la_atime &&
1082 la->la_atime < (tmp_la->la_atime +
1083 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084 la->la_valid &= ~LA_ATIME;
1088 /* Check if flags change. */
1089 if (la->la_valid & LA_FLAGS) {
1090 unsigned int oldflags = 0;
1091 unsigned int newflags = la->la_flags &
1092 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1094 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095 !mdd_capable(uc, CFS_CAP_FOWNER))
1098 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099 * only be changed by the relevant capability. */
1100 if (mdd_is_immutable(obj))
1101 oldflags |= LUSTRE_IMMUTABLE_FL;
1102 if (mdd_is_append(obj))
1103 oldflags |= LUSTRE_APPEND_FL;
1104 if ((oldflags ^ newflags) &&
1105 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1108 if (!S_ISDIR(tmp_la->la_mode))
1109 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1112 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113 (la->la_valid & ~LA_FLAGS) &&
1114 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1117 /* Check for setting the obj time. */
1118 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1130 if (la->la_valid & LA_KILL_SUID) {
1131 la->la_valid &= ~LA_KILL_SUID;
1132 if ((tmp_la->la_mode & S_ISUID) &&
1133 !(la->la_valid & LA_MODE)) {
1134 la->la_mode = tmp_la->la_mode;
1135 la->la_valid |= LA_MODE;
1137 la->la_mode &= ~S_ISUID;
1140 if (la->la_valid & LA_KILL_SGID) {
1141 la->la_valid &= ~LA_KILL_SGID;
1142 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143 (S_ISGID | S_IXGRP)) &&
1144 !(la->la_valid & LA_MODE)) {
1145 la->la_mode = tmp_la->la_mode;
1146 la->la_valid |= LA_MODE;
1148 la->la_mode &= ~S_ISGID;
1151 /* Make sure a caller can chmod. */
1152 if (la->la_valid & LA_MODE) {
1153 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154 (uc->mu_fsuid != tmp_la->la_uid) &&
1155 !mdd_capable(uc, CFS_CAP_FOWNER))
1158 if (la->la_mode == (cfs_umode_t) -1)
1159 la->la_mode = tmp_la->la_mode;
1161 la->la_mode = (la->la_mode & S_IALLUGO) |
1162 (tmp_la->la_mode & ~S_IALLUGO);
1164 /* Also check the setgid bit! */
1165 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166 la->la_gid : tmp_la->la_gid) &&
1167 !mdd_capable(uc, CFS_CAP_FSETID))
1168 la->la_mode &= ~S_ISGID;
1170 la->la_mode = tmp_la->la_mode;
1173 /* Make sure a caller can chown. */
1174 if (la->la_valid & LA_UID) {
1175 if (la->la_uid == (uid_t) -1)
1176 la->la_uid = tmp_la->la_uid;
1177 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178 (la->la_uid != tmp_la->la_uid)) &&
1179 !mdd_capable(uc, CFS_CAP_CHOWN))
1182 /* If the user or group of a non-directory has been
1183 * changed by a non-root user, remove the setuid bit.
1184 * 19981026 David C Niemi <niemi@tux.org>
1186 * Changed this to apply to all users, including root,
1187 * to avoid some races. This is the behavior we had in
1188 * 2.0. The check for non-root was definitely wrong
1189 * for 2.2 anyway, as it should have been using
1190 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192 !S_ISDIR(tmp_la->la_mode)) {
1193 la->la_mode &= ~S_ISUID;
1194 la->la_valid |= LA_MODE;
1198 /* Make sure caller can chgrp. */
1199 if (la->la_valid & LA_GID) {
1200 if (la->la_gid == (gid_t) -1)
1201 la->la_gid = tmp_la->la_gid;
1202 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203 ((la->la_gid != tmp_la->la_gid) &&
1204 !lustre_in_group_p(uc, la->la_gid))) &&
1205 !mdd_capable(uc, CFS_CAP_CHOWN))
1208 /* Likewise, if the user or group of a non-directory
1209 * has been changed by a non-root user, remove the
1210 * setgid bit UNLESS there is no group execute bit
1211 * (this would be a file marked for mandatory
1212 * locking). 19981026 David C Niemi <niemi@tux.org>
1214 * Removed the fsuid check (see the comment above) --
1216 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218 la->la_mode &= ~S_ISGID;
1219 la->la_valid |= LA_MODE;
1223 /* For both Size-on-MDS case and truncate case,
1224 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226 * For SOM case, it is true, the MAY_WRITE perm has been checked
1227 * when open, no need check again. For truncate case, it is false,
1228 * the MAY_WRITE perm should be checked here. */
1229 if (ma->ma_attr_flags & MDS_SOM) {
1230 /* For the "Size-on-MDS" setattr update, merge coming
1231 * attributes with the set in the inode. BUG 10641 */
1232 if ((la->la_valid & LA_ATIME) &&
1233 (la->la_atime <= tmp_la->la_atime))
1234 la->la_valid &= ~LA_ATIME;
1236 /* OST attributes do not have a priority over MDS attributes,
1237 * so drop times if ctime is equal. */
1238 if ((la->la_valid & LA_CTIME) &&
1239 (la->la_ctime <= tmp_la->la_ctime))
1240 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1242 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244 (uc->mu_fsuid == tmp_la->la_uid)) &&
1245 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246 rc = mdd_permission_internal_locked(env, obj,
1253 if (la->la_valid & LA_CTIME) {
1254 /* The pure setattr, it has the priority over what is
1255 * already set, do not drop it if ctime is equal. */
1256 if (la->la_ctime < tmp_la->la_ctime)
1257 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1265 /** Store a data change changelog record
1266 * If this fails, we must fail the whole transaction; we don't
1267 * want the change to commit without the log entry.
1268 * \param mdd_obj - mdd_object of change
1269 * \param handle - transacion handle
1271 static int mdd_changelog_data_store(const struct lu_env *env,
1272 struct mdd_device *mdd,
1273 enum changelog_rec_type type,
1275 struct mdd_object *mdd_obj,
1276 struct thandle *handle)
1278 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279 struct llog_changelog_rec *rec;
1285 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1287 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1290 LASSERT(handle != NULL);
1291 LASSERT(mdd_obj != NULL);
1293 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295 /* Don't need multiple updates in this log */
1296 /* Don't check under lock - no big deal if we get an extra
1301 reclen = llog_data_len(sizeof(*rec));
1302 buf = mdd_buf_alloc(env, reclen);
1303 if (buf->lb_buf == NULL)
1305 rec = (struct llog_changelog_rec *)buf->lb_buf;
1307 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308 rec->cr.cr_type = (__u32)type;
1309 rec->cr.cr_tfid = *tfid;
1310 rec->cr.cr_namelen = 0;
1311 mdd_obj->mod_cltime = cfs_time_current_64();
1313 rc = mdd_changelog_llog_write(mdd, rec, handle);
1315 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316 rc, type, PFID(tfid));
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324 int flags, struct md_object *obj)
1326 struct thandle *handle;
1327 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328 struct mdd_device *mdd = mdo2mdd(obj);
1332 handle = mdd_trans_start(env, mdd);
1335 return(PTR_ERR(handle));
1337 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1340 mdd_trans_stop(env, mdd, rc, handle);
1346 * Should be called with write lock held.
1348 * \see mdd_lma_set_locked().
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1353 struct mdd_thread_info *info = mdd_env_info(env);
1355 struct lustre_mdt_attrs *lma =
1356 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357 int lmasize = sizeof(struct lustre_mdt_attrs);
1362 /* Either HSM or SOM part is not valid, we need to read it before */
1363 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1368 lustre_lma_swab(lma);
1370 memset(lma, 0, lmasize);
1374 if (ma->ma_valid & MA_HSM) {
1375 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376 lma->lma_compat |= LMAC_HSM;
1380 if (ma->ma_valid & MA_SOM) {
1381 LASSERT(ma->ma_som != NULL);
1382 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383 lma->lma_compat &= ~LMAC_SOM;
1385 lma->lma_compat |= LMAC_SOM;
1386 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1387 lma->lma_som_size = ma->ma_som->msd_size;
1388 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1389 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1394 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1396 lustre_lma_swab(lma);
1397 buf = mdd_buf_get(env, lma, lmasize);
1398 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1404 * Save LMA extended attributes with data from \a ma.
1406 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407 * not, LMA EA will be first read from disk, modified and write back.
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411 struct mdd_object *mdd_obj,
1412 const struct md_attr *ma, struct thandle *handle)
1416 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418 mdd_write_unlock(env, mdd_obj);
1422 /* Precedence for choosing record type when multiple
1423 * attributes change: setattr > mtime > ctime > atime
1424 * (ctime changes when mtime does, plus chmod/chown.
1425 * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427 struct md_object *obj, struct thandle *handle,
1430 struct mdd_device *mdd = mdo2mdd(obj);
1433 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437 bits = bits & mdd->mdd_cl.mc_mask;
1441 /* The record type is the lowest non-masked set bit */
1442 while (bits && ((bits & 1) == 0)) {
1447 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449 md2mdd_obj(obj), handle);
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454 const struct md_attr *ma)
1456 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457 struct mdd_device *mdd = mdo2mdd(obj);
1458 struct thandle *handle;
1459 struct lov_mds_md *lmm = NULL;
1460 struct llog_cookie *logcookies = NULL;
1461 int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463 struct obd_device *obd = mdd->mdd_obd_dev;
1464 struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468 int quota_opc = 0, block_count = 0;
1469 int inode_pending[MAXQUOTAS] = { 0, 0 };
1470 int block_pending[MAXQUOTAS] = { 0, 0 };
1474 *la_copy = ma->ma_attr;
1475 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1479 /* setattr on "close" only change atime, or do nothing */
1480 if (ma->ma_valid == MA_INODE &&
1481 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1484 /*TODO: add lock here*/
1485 /* start a log jounal handle if needed */
1486 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488 lmm_size = mdd_lov_mdsize(env, mdd);
1489 lmm = mdd_max_lmm_get(env, mdd);
1491 GOTO(no_trans, rc = -ENOMEM);
1493 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1501 if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503 lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1506 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1507 MDD_TXN_ATTR_SET_OP, chlog_cnt);
1508 handle = mdd_trans_start(env, mdd);
1510 GOTO(no_trans, rc = PTR_ERR(handle));
1512 /* permission changes may require sync operation */
1513 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1514 handle->th_sync |= mdd->mdd_sync_permission;
1516 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1517 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1518 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1520 #ifdef HAVE_QUOTA_SUPPORT
1521 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1522 struct obd_export *exp = md_quota(env)->mq_exp;
1523 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1525 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1527 quota_opc = FSFILT_OP_SETATTR;
1528 mdd_quota_wrapper(la_copy, qnids);
1529 mdd_quota_wrapper(la_tmp, qoids);
1530 /* get file quota for new owner */
1531 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1532 qnids, inode_pending, 1, NULL, 0,
1534 block_count = (la_tmp->la_blocks + 7) >> 3;
1537 mdd_data_get(env, mdd_obj, &data);
1538 /* get block quota for new owner */
1539 lquota_chkquota(mds_quota_interface_ref, obd,
1540 exp, qnids, block_pending,
1542 LQUOTA_FLAGS_BLK, data, 1);
1548 if (la_copy->la_valid & LA_FLAGS) {
1549 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1552 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1553 } else if (la_copy->la_valid) { /* setattr */
1554 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1556 /* journal chown/chgrp in llog, just like unlink */
1557 if (rc == 0 && lmm_size){
1558 cookie_size = mdd_lov_cookiesize(env, mdd);
1559 logcookies = mdd_max_cookie_get(env, mdd);
1560 if (logcookies == NULL)
1561 GOTO(cleanup, rc = -ENOMEM);
1563 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1564 logcookies, cookie_size) <= 0)
1569 if (rc == 0 && ma->ma_valid & MA_LOV) {
1572 mode = mdd_object_type(mdd_obj);
1573 if (S_ISREG(mode) || S_ISDIR(mode)) {
1574 rc = mdd_lsm_sanity_check(env, mdd_obj);
1578 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1579 ma->ma_lmm_size, handle, 1);
1583 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1586 mode = mdd_object_type(mdd_obj);
1588 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1593 rc = mdd_attr_set_changelog(env, obj, handle,
1594 ma->ma_attr.la_valid);
1595 mdd_trans_stop(env, mdd, rc, handle);
1597 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1598 /*set obd attr, if needed*/
1599 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1602 #ifdef HAVE_QUOTA_SUPPORT
1604 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1606 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1608 /* Trigger dqrel/dqacq for original owner and new owner.
1609 * If failed, the next call for lquota_chkquota will
1611 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1618 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1619 const struct lu_buf *buf, const char *name, int fl,
1620 struct thandle *handle)
1625 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1626 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1627 mdd_write_unlock(env, obj);
1632 static int mdd_xattr_sanity_check(const struct lu_env *env,
1633 struct mdd_object *obj)
1635 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1636 struct md_ucred *uc = md_ucred(env);
1640 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1643 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1647 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1648 !mdd_capable(uc, CFS_CAP_FOWNER))
1655 * The caller should guarantee to update the object ctime
1656 * after xattr_set if needed.
1658 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1659 const struct lu_buf *buf, const char *name,
1662 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663 struct mdd_device *mdd = mdo2mdd(obj);
1664 struct thandle *handle;
1668 rc = mdd_xattr_sanity_check(env, mdd_obj);
1672 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1673 handle = mdd_trans_start(env, mdd);
1675 RETURN(PTR_ERR(handle));
1677 /* security-replated changes may require sync */
1678 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1679 handle->th_sync |= mdd->mdd_sync_permission;
1681 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1683 /* Only record user xattr changes */
1684 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1685 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1687 mdd_trans_stop(env, mdd, rc, handle);
1693 * The caller should guarantee to update the object ctime
1694 * after xattr_set if needed.
1696 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1699 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1700 struct mdd_device *mdd = mdo2mdd(obj);
1701 struct thandle *handle;
1705 rc = mdd_xattr_sanity_check(env, mdd_obj);
1709 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1710 handle = mdd_trans_start(env, mdd);
1712 RETURN(PTR_ERR(handle));
1714 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1715 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1716 mdd_object_capa(env, mdd_obj));
1717 mdd_write_unlock(env, mdd_obj);
1719 /* Only record user xattr changes */
1720 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1721 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1724 mdd_trans_stop(env, mdd, rc, handle);
1729 /* partial unlink */
1730 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1733 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1734 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1735 struct mdd_device *mdd = mdo2mdd(obj);
1736 struct thandle *handle;
1737 #ifdef HAVE_QUOTA_SUPPORT
1738 struct obd_device *obd = mdd->mdd_obd_dev;
1739 struct mds_obd *mds = &obd->u.mds;
1740 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1747 * Check -ENOENT early here because we need to get object type
1748 * to calculate credits before transaction start
1750 if (!mdd_object_exists(mdd_obj))
1753 LASSERT(mdd_object_exists(mdd_obj) > 0);
1755 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1759 handle = mdd_trans_start(env, mdd);
1763 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1765 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1769 __mdd_ref_del(env, mdd_obj, handle, 0);
1771 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1773 __mdd_ref_del(env, mdd_obj, handle, 1);
1776 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1777 la_copy->la_ctime = ma->ma_attr.la_ctime;
1779 la_copy->la_valid = LA_CTIME;
1780 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1784 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1785 #ifdef HAVE_QUOTA_SUPPORT
1786 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1787 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1788 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1789 mdd_quota_wrapper(&ma->ma_attr, qids);
1796 mdd_write_unlock(env, mdd_obj);
1797 mdd_trans_stop(env, mdd, rc, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1800 /* Trigger dqrel on the owner of child. If failed,
1801 * the next call for lquota_chkquota will process it */
1802 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1808 /* partial operation */
1809 static int mdd_oc_sanity_check(const struct lu_env *env,
1810 struct mdd_object *obj,
1816 switch (ma->ma_attr.la_mode & S_IFMT) {
1833 static int mdd_object_create(const struct lu_env *env,
1834 struct md_object *obj,
1835 const struct md_op_spec *spec,
1839 struct mdd_device *mdd = mdo2mdd(obj);
1840 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1841 const struct lu_fid *pfid = spec->u.sp_pfid;
1842 struct thandle *handle;
1843 #ifdef HAVE_QUOTA_SUPPORT
1844 struct obd_device *obd = mdd->mdd_obd_dev;
1845 struct obd_export *exp = md_quota(env)->mq_exp;
1846 struct mds_obd *mds = &obd->u.mds;
1847 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1848 int quota_opc = 0, block_count = 0;
1849 int inode_pending[MAXQUOTAS] = { 0, 0 };
1850 int block_pending[MAXQUOTAS] = { 0, 0 };
1855 #ifdef HAVE_QUOTA_SUPPORT
1856 if (mds->mds_quota) {
1857 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1858 mdd_quota_wrapper(&ma->ma_attr, qids);
1859 /* get file quota for child */
1860 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861 qids, inode_pending, 1, NULL, 0,
1863 switch (ma->ma_attr.la_mode & S_IFMT) {
1872 /* get block quota for child */
1874 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1875 qids, block_pending, block_count,
1876 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1880 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1881 handle = mdd_trans_start(env, mdd);
1883 GOTO(out_pending, rc = PTR_ERR(handle));
1885 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1886 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1890 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1894 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1895 /* If creating the slave object, set slave EA here. */
1896 int lmv_size = spec->u.sp_ea.eadatalen;
1897 struct lmv_stripe_md *lmv;
1899 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1900 LASSERT(lmv != NULL && lmv_size > 0);
1902 rc = __mdd_xattr_set(env, mdd_obj,
1903 mdd_buf_get_const(env, lmv, lmv_size),
1904 XATTR_NAME_LMV, 0, handle);
1908 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1911 #ifdef CONFIG_FS_POSIX_ACL
1912 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1913 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1915 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1916 buf->lb_len = spec->u.sp_ea.eadatalen;
1917 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1918 rc = __mdd_acl_init(env, mdd_obj, buf,
1919 &ma->ma_attr.la_mode,
1924 ma->ma_attr.la_valid |= LA_MODE;
1927 pfid = spec->u.sp_ea.fid;
1930 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1936 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1937 mdd_write_unlock(env, mdd_obj);
1939 mdd_trans_stop(env, mdd, rc, handle);
1941 #ifdef HAVE_QUOTA_SUPPORT
1943 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1945 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1947 /* Trigger dqacq on the owner of child. If failed,
1948 * the next call for lquota_chkquota will process it. */
1949 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1957 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1958 const struct md_attr *ma)
1960 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1961 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1962 struct mdd_device *mdd = mdo2mdd(obj);
1963 struct thandle *handle;
1967 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1968 handle = mdd_trans_start(env, mdd);
1972 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1973 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1975 __mdd_ref_add(env, mdd_obj, handle);
1976 mdd_write_unlock(env, mdd_obj);
1978 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1979 la_copy->la_ctime = ma->ma_attr.la_ctime;
1981 la_copy->la_valid = LA_CTIME;
1982 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1985 mdd_trans_stop(env, mdd, 0, handle);
1991 * do NOT or the MAY_*'s, you'll get the weakest
1993 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1997 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1998 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1999 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2000 * owner can write to a file even if it is marked readonly to hide
2001 * its brokenness. (bug 5781) */
2002 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2003 struct md_ucred *uc = md_ucred(env);
2005 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2006 (la->la_uid == uc->mu_fsuid))
2010 if (flags & FMODE_READ)
2012 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2014 if (flags & MDS_FMODE_EXEC)
2019 static int mdd_open_sanity_check(const struct lu_env *env,
2020 struct mdd_object *obj, int flag)
2022 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2027 if (mdd_is_dead_obj(obj))
2030 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2034 if (S_ISLNK(tmp_la->la_mode))
2037 mode = accmode(env, tmp_la, flag);
2039 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2042 if (!(flag & MDS_OPEN_CREATED)) {
2043 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2048 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2049 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2050 flag &= ~MDS_OPEN_TRUNC;
2052 /* For writing append-only file must open it with append mode. */
2053 if (mdd_is_append(obj)) {
2054 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2056 if (flag & MDS_OPEN_TRUNC)
2062 * Now, flag -- O_NOATIME does not be packed by client.
2064 if (flag & O_NOATIME) {
2065 struct md_ucred *uc = md_ucred(env);
2067 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2068 (uc->mu_valid == UCRED_NEW)) &&
2069 (uc->mu_fsuid != tmp_la->la_uid) &&
2070 !mdd_capable(uc, CFS_CAP_FOWNER))
2078 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2081 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2084 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2086 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2088 mdd_obj->mod_count++;
2090 mdd_write_unlock(env, mdd_obj);
2094 /* return md_attr back,
2095 * if it is last unlink then return lov ea + llog cookie*/
2096 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2102 if (S_ISREG(mdd_object_type(obj))) {
2103 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2104 * Caller must be ready for that. */
2106 rc = __mdd_lmm_get(env, obj, ma);
2107 if ((ma->ma_valid & MA_LOV))
2108 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2115 * No permission check is needed.
2117 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2120 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2121 struct mdd_device *mdd = mdo2mdd(obj);
2122 struct thandle *handle = NULL;
2126 #ifdef HAVE_QUOTA_SUPPORT
2127 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2128 struct mds_obd *mds = &obd->u.mds;
2129 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2134 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2135 mdd_obj->mod_count--;
2137 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2138 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2139 "list\n", PFID(mdd_object_fid(mdd_obj)));
2143 /* check without any lock */
2144 if (mdd_obj->mod_count == 1 &&
2145 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2147 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2150 handle = mdd_trans_start(env, mdo2mdd(obj));
2152 RETURN(PTR_ERR(handle));
2155 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2156 if (handle == NULL && mdd_obj->mod_count == 1 &&
2157 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2158 mdd_write_unlock(env, mdd_obj);
2162 /* release open count */
2163 mdd_obj->mod_count --;
2165 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2166 /* remove link to object from orphan index */
2167 rc = __mdd_orphan_del(env, mdd_obj, handle);
2169 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2170 "list, OSS objects to be destroyed.\n",
2171 PFID(mdd_object_fid(mdd_obj)));
2173 CERROR("Object "DFID" can not be deleted from orphan "
2174 "list, maybe cause OST objects can not be "
2175 "destroyed (err: %d).\n",
2176 PFID(mdd_object_fid(mdd_obj)), rc);
2177 /* If object was not deleted from orphan list, do not
2178 * destroy OSS objects, which will be done when next
2184 rc = mdd_iattr_get(env, mdd_obj, ma);
2185 /* Object maybe not in orphan list originally, it is rare case for
2186 * mdd_finish_unlink() failure. */
2187 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2188 #ifdef HAVE_QUOTA_SUPPORT
2189 if (mds->mds_quota) {
2190 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2191 mdd_quota_wrapper(&ma->ma_attr, qids);
2194 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2195 if (ma->ma_valid & MA_FLAGS &&
2196 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2197 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2199 rc = mdd_object_kill(env, mdd_obj, ma);
2205 CERROR("Error when prepare to delete Object "DFID" , "
2206 "which will cause OST objects can not be "
2207 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2213 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2215 mdd_write_unlock(env, mdd_obj);
2217 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2218 #ifdef HAVE_QUOTA_SUPPORT
2220 /* Trigger dqrel on the owner of child. If failed,
2221 * the next call for lquota_chkquota will process it */
2222 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2229 * Permission check is done when open,
2230 * no need check again.
2232 static int mdd_readpage_sanity_check(const struct lu_env *env,
2233 struct mdd_object *obj)
2235 struct dt_object *next = mdd_object_child(obj);
2239 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2247 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2248 struct lu_dirpage *dp, int nob,
2249 const struct dt_it_ops *iops, struct dt_it *it,
2255 struct lu_dirent *ent;
2256 struct lu_dirent *last = NULL;
2259 memset(area, 0, sizeof (*dp));
2260 area += sizeof (*dp);
2261 nob -= sizeof (*dp);
2268 len = iops->key_size(env, it);
2270 /* IAM iterator can return record with zero len. */
2274 hash = iops->store(env, it);
2275 if (unlikely(first)) {
2277 dp->ldp_hash_start = cpu_to_le64(hash);
2280 /* calculate max space required for lu_dirent */
2281 recsize = lu_dirent_calc_size(len, attr);
2283 if (nob >= recsize) {
2284 result = iops->rec(env, it, ent, attr);
2285 if (result == -ESTALE)
2290 /* osd might not able to pack all attributes,
2291 * so recheck rec length */
2292 recsize = le16_to_cpu(ent->lde_reclen);
2294 result = (last != NULL) ? 0 :-EINVAL;
2298 ent = (void *)ent + recsize;
2302 result = iops->next(env, it);
2303 if (result == -ESTALE)
2305 } while (result == 0);
2308 dp->ldp_hash_end = cpu_to_le64(hash);
2310 if (last->lde_hash == dp->ldp_hash_end)
2311 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2312 last->lde_reclen = 0; /* end mark */
2317 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2318 const struct lu_rdpg *rdpg)
2321 struct dt_object *next = mdd_object_child(obj);
2322 const struct dt_it_ops *iops;
2324 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2330 LASSERT(rdpg->rp_pages != NULL);
2331 LASSERT(next->do_index_ops != NULL);
2333 if (rdpg->rp_count <= 0)
2337 * iterate through directory and fill pages from @rdpg
2339 iops = &next->do_index_ops->dio_it;
2340 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2344 rc = iops->load(env, it, rdpg->rp_hash);
2348 * Iterator didn't find record with exactly the key requested.
2350 * It is currently either
2352 * - positioned above record with key less than
2353 * requested---skip it.
2355 * - or not positioned at all (is in IAM_IT_SKEWED
2356 * state)---position it on the next item.
2358 rc = iops->next(env, it);
2363 * At this point and across for-loop:
2365 * rc == 0 -> ok, proceed.
2366 * rc > 0 -> end of directory.
2369 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2370 i++, nob -= CFS_PAGE_SIZE) {
2371 struct lu_dirpage *dp;
2373 LASSERT(i < rdpg->rp_npages);
2374 pg = rdpg->rp_pages[i];
2376 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2379 rc = mdd_dir_page_build(env, mdd, dp,
2380 min_t(int, nob, LU_PAGE_SIZE),
2381 iops, it, rdpg->rp_attrs);
2386 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2388 } else if (rc < 0) {
2389 CWARN("build page failed: %d!\n", rc);
2392 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2393 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2394 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2401 struct lu_dirpage *dp;
2403 dp = cfs_kmap(rdpg->rp_pages[0]);
2404 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2407 * No pages were processed, mark this for first page
2410 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2413 cfs_kunmap(rdpg->rp_pages[0]);
2415 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2418 iops->fini(env, it);
2423 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2424 const struct lu_rdpg *rdpg)
2426 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2430 LASSERT(mdd_object_exists(mdd_obj));
2432 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2433 rc = mdd_readpage_sanity_check(env, mdd_obj);
2435 GOTO(out_unlock, rc);
2437 if (mdd_is_dead_obj(mdd_obj)) {
2439 struct lu_dirpage *dp;
2442 * According to POSIX, please do not return any entry to client:
2443 * even dot and dotdot should not be returned.
2445 CWARN("readdir from dead object: "DFID"\n",
2446 PFID(mdd_object_fid(mdd_obj)));
2448 if (rdpg->rp_count <= 0)
2449 GOTO(out_unlock, rc = -EFAULT);
2450 LASSERT(rdpg->rp_pages != NULL);
2452 pg = rdpg->rp_pages[0];
2453 dp = (struct lu_dirpage*)cfs_kmap(pg);
2454 memset(dp, 0 , sizeof(struct lu_dirpage));
2455 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2456 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2457 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2459 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2462 rc = __mdd_readpage(env, mdd_obj, rdpg);
2466 mdd_read_unlock(env, mdd_obj);
2470 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2472 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2473 struct dt_object *next;
2475 LASSERT(mdd_object_exists(mdd_obj));
2476 next = mdd_object_child(mdd_obj);
2477 return next->do_ops->do_object_sync(env, next);
2480 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2481 struct md_object *obj)
2483 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2485 LASSERT(mdd_object_exists(mdd_obj));
2486 return do_version_get(env, mdd_object_child(mdd_obj));
2489 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2490 dt_obj_version_t version)
2492 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2494 LASSERT(mdd_object_exists(mdd_obj));
2495 do_version_set(env, mdd_object_child(mdd_obj), version);
2498 const struct md_object_operations mdd_obj_ops = {
2499 .moo_permission = mdd_permission,
2500 .moo_attr_get = mdd_attr_get,
2501 .moo_attr_set = mdd_attr_set,
2502 .moo_xattr_get = mdd_xattr_get,
2503 .moo_xattr_set = mdd_xattr_set,
2504 .moo_xattr_list = mdd_xattr_list,
2505 .moo_xattr_del = mdd_xattr_del,
2506 .moo_object_create = mdd_object_create,
2507 .moo_ref_add = mdd_ref_add,
2508 .moo_ref_del = mdd_ref_del,
2509 .moo_open = mdd_open,
2510 .moo_close = mdd_close,
2511 .moo_readpage = mdd_readpage,
2512 .moo_readlink = mdd_readlink,
2513 .moo_changelog = mdd_changelog,
2514 .moo_capa_get = mdd_capa_get,
2515 .moo_object_sync = mdd_object_sync,
2516 .moo_version_get = mdd_version_get,
2517 .moo_version_set = mdd_version_set,
2518 .moo_path = mdd_path,
2519 .moo_file_lock = mdd_file_lock,
2520 .moo_file_unlock = mdd_file_unlock,