1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
55 #include <linux/jbd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
68 #include <linux/ldiskfs_fs.h>
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
73 #include "mdd_internal.h"
75 static const struct lu_object_operations mdd_lu_obj_ops;
77 static int mdd_xattr_get(const struct lu_env *env,
78 struct md_object *obj, struct lu_buf *buf,
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 mdo_data_get(env, obj, data);
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91 struct lu_attr *la, struct lustre_capa *capa)
93 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94 PFID(mdd_object_fid(obj)));
95 return mdo_attr_get(env, obj, la, capa);
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
100 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
102 if (flags & LUSTRE_APPEND_FL)
103 obj->mod_flags |= APPEND_OBJ;
105 if (flags & LUSTRE_IMMUTABLE_FL)
106 obj->mod_flags |= IMMUTE_OBJ;
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
111 struct mdd_thread_info *info;
113 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114 LASSERT(info != NULL);
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 buf = &mdd_env_info(env)->mti_buf;
128 void mdd_buf_put(struct lu_buf *buf)
130 if (buf == NULL || buf->lb_buf == NULL)
132 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL) {
158 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL)
165 /** Increase the size of the \a mti_big_buf.
166 * preserves old data in buffer
167 * old buffer remains unchanged on error
168 * \retval 0 or -ENOMEM
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
172 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175 LASSERT(len >= oldbuf->lb_len);
176 OBD_ALLOC_LARGE(buf.lb_buf, len);
178 if (buf.lb_buf == NULL)
182 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
184 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
186 memcpy(oldbuf, &buf, sizeof(buf));
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192 struct mdd_device *mdd)
194 struct mdd_thread_info *mti = mdd_env_info(env);
197 max_cookie_size = mdd_lov_cookiesize(env, mdd);
198 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199 if (mti->mti_max_cookie)
200 OBD_FREE_LARGE(mti->mti_max_cookie,
201 mti->mti_max_cookie_size);
202 mti->mti_max_cookie = NULL;
203 mti->mti_max_cookie_size = 0;
205 if (unlikely(mti->mti_max_cookie == NULL)) {
206 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207 if (likely(mti->mti_max_cookie != NULL))
208 mti->mti_max_cookie_size = max_cookie_size;
210 if (likely(mti->mti_max_cookie != NULL))
211 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212 return mti->mti_max_cookie;
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216 struct mdd_device *mdd)
218 struct mdd_thread_info *mti = mdd_env_info(env);
221 max_lmm_size = mdd_lov_mdsize(env, mdd);
222 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223 if (mti->mti_max_lmm)
224 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225 mti->mti_max_lmm = NULL;
226 mti->mti_max_lmm_size = 0;
228 if (unlikely(mti->mti_max_lmm == NULL)) {
229 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230 if (likely(mti->mti_max_lmm != NULL))
231 mti->mti_max_lmm_size = max_lmm_size;
233 return mti->mti_max_lmm;
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237 const struct lu_object_header *hdr,
240 struct mdd_object *mdd_obj;
242 OBD_ALLOC_PTR(mdd_obj);
243 if (mdd_obj != NULL) {
246 o = mdd2lu_obj(mdd_obj);
247 lu_object_init(o, NULL, d);
248 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250 mdd_obj->mod_count = 0;
251 o->lo_ops = &mdd_lu_obj_ops;
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259 const struct lu_object_conf *unused)
261 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262 struct mdd_object *mdd_obj = lu2mdd_obj(o);
263 struct lu_object *below;
264 struct lu_device *under;
267 mdd_obj->mod_cltime = 0;
268 under = &d->mdd_child->dd_lu_dev;
269 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270 mdd_pdlock_init(mdd_obj);
274 lu_object_add(o, below);
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 if (lu_object_exists(o))
282 return mdd_get_flags(env, lu2mdd_obj(o));
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj(o);
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296 lu_printer_t p, const struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300 "valid=%x, cltime="LPU64", flags=%lx)",
301 mdd, mdd->mod_count, mdd->mod_valid,
302 mdd->mod_cltime, mdd->mod_flags);
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306 .loo_object_init = mdd_object_init,
307 .loo_object_start = mdd_object_start,
308 .loo_object_free = mdd_object_free,
309 .loo_object_print = mdd_object_print,
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313 struct mdd_device *d,
314 const struct lu_fid *f)
316 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320 const char *path, struct lu_fid *fid)
323 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324 struct mdd_object *obj;
325 struct lu_name *lname = &mdd_env_info(env)->mti_name;
330 /* temp buffer for path element */
331 buf = mdd_buf_alloc(env, PATH_MAX);
332 if (buf->lb_buf == NULL)
335 lname->ln_name = name = buf->lb_buf;
336 lname->ln_namelen = 0;
337 *f = mdd->mdd_root_fid;
344 while (*path != '/' && *path != '\0') {
352 /* find obj corresponding to fid */
353 obj = mdd_object_find(env, mdd, f);
355 GOTO(out, rc = -EREMOTE);
357 GOTO(out, rc = PTR_ERR(obj));
358 /* get child fid from parent and name */
359 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360 mdd_object_put(env, obj);
365 lname->ln_namelen = 0;
374 /** The maximum depth that fid2path() will search.
375 * This is limited only because we want to store the fids for
376 * historical path lookup purposes.
378 #define MAX_PATH_DEPTH 100
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382 __u64 pli_recno; /**< history point */
383 __u64 pli_currec; /**< current record */
384 struct lu_fid pli_fid;
385 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386 struct mdd_object *pli_mdd_obj;
387 char *pli_path; /**< full path */
389 int pli_linkno; /**< which hardlink to follow */
390 int pli_fidcount; /**< number of \a pli_fids */
393 static int mdd_path_current(const struct lu_env *env,
394 struct path_lookup_info *pli)
396 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397 struct mdd_object *mdd_obj;
398 struct lu_buf *buf = NULL;
399 struct link_ea_header *leh;
400 struct link_ea_entry *lee;
401 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
408 ptr = pli->pli_path + pli->pli_pathlen - 1;
411 pli->pli_fidcount = 0;
412 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415 mdd_obj = mdd_object_find(env, mdd,
416 &pli->pli_fids[pli->pli_fidcount]);
418 GOTO(out, rc = -EREMOTE);
420 GOTO(out, rc = PTR_ERR(mdd_obj));
421 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423 mdd_object_put(env, mdd_obj);
427 /* Do I need to error out here? */
432 /* Get parent fid and object name */
433 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434 buf = mdd_links_get(env, mdd_obj);
435 mdd_read_unlock(env, mdd_obj);
436 mdd_object_put(env, mdd_obj);
438 GOTO(out, rc = PTR_ERR(buf));
441 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 /* If set, use link #linkno for path lookup, otherwise use
445 link #0. Only do this for the final path element. */
446 if ((pli->pli_fidcount == 0) &&
447 (pli->pli_linkno < leh->leh_reccount)) {
449 for (count = 0; count < pli->pli_linkno; count++) {
450 lee = (struct link_ea_entry *)
451 ((char *)lee + reclen);
452 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454 if (pli->pli_linkno < leh->leh_reccount - 1)
455 /* indicate to user there are more links */
459 /* Pack the name in the end of the buffer */
460 ptr -= tmpname->ln_namelen;
461 if (ptr - 1 <= pli->pli_path)
462 GOTO(out, rc = -EOVERFLOW);
463 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466 /* Store the parent fid for historic lookup */
467 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468 GOTO(out, rc = -EOVERFLOW);
469 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472 /* Verify that our path hasn't changed since we started the lookup.
473 Record the current index, and verify the path resolves to the
474 same fid. If it does, then the path is correct as of this index. */
475 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476 pli->pli_currec = mdd->mdd_cl.mc_index;
477 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481 GOTO (out, rc = -EAGAIN);
483 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486 PFID(&pli->pli_fid));
487 GOTO(out, rc = -EAGAIN);
489 ptr++; /* skip leading / */
490 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
494 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495 /* if we vmalloced a large buffer drop it */
501 static int mdd_path_historic(const struct lu_env *env,
502 struct path_lookup_info *pli)
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509 char *path, int pathlen, __u64 *recno, int *linkno)
511 struct path_lookup_info *pli;
519 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
528 pli->pli_mdd_obj = md2mdd_obj(obj);
529 pli->pli_recno = *recno;
530 pli->pli_path = path;
531 pli->pli_pathlen = pathlen;
532 pli->pli_linkno = *linkno;
534 /* Retry multiple times in case file is being moved */
535 while (tries-- && rc == -EAGAIN)
536 rc = mdd_path_current(env, pli);
538 /* For historical path lookup, the current links may not have existed
539 * at "recno" time. We must switch over to earlier links/parents
540 * by using the changelog records. If the earlier parent doesn't
541 * exist, we must search back through the changelog to reconstruct
542 * its parents, then check if it exists, etc.
543 * We may ignore this problem for the initial implementation and
544 * state that an "original" hardlink must still exist for us to find
545 * historic path name. */
546 if (pli->pli_recno != -1) {
547 rc = mdd_path_historic(env, pli);
549 *recno = pli->pli_currec;
550 /* Return next link index to caller */
551 *linkno = pli->pli_linkno;
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 struct lu_attr *la = &mdd_env_info(env)->mti_la;
565 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567 mdd_flags_xlate(obj, la->la_flags);
568 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569 obj->mod_flags |= MNLINK_OBJ;
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624 struct mdd_object *mdd_obj, struct md_attr *ma)
629 if (ma->ma_valid & MA_LOV)
632 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
634 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
637 ma->ma_lmm_size = rc;
638 ma->ma_valid |= MA_LOV;
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646 struct mdd_object *mdd_obj, struct md_attr *ma)
649 struct link_ea_header *leh;
650 struct link_ea_entry *lee;
651 struct lu_fid *pfid = &ma->ma_pfid;
654 if (ma->ma_valid & MA_PFID)
657 buf = mdd_links_get(env, mdd_obj);
659 RETURN(PTR_ERR(buf));
662 lee = (struct link_ea_entry *)(leh + 1);
663 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664 fid_be_to_cpu(pfid, pfid);
665 ma->ma_valid |= MA_PFID;
666 if (buf->lb_len > OBD_ALLOC_BIG)
667 /* if we vmalloced a large buffer drop it */
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
678 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679 rc = __mdd_lmm_get(env, mdd_obj, ma);
680 mdd_read_unlock(env, mdd_obj);
685 static int __mdd_lmv_get(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
691 if (ma->ma_valid & MA_LMV)
694 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
697 ma->ma_valid |= MA_LMV;
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
706 struct mdd_thread_info *info = mdd_env_info(env);
707 struct lustre_mdt_attrs *lma =
708 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
713 /* If all needed data are already valid, nothing to do */
714 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715 (ma->ma_need & (MA_HSM | MA_SOM)))
718 /* Read LMA from disk EA */
719 lma_size = sizeof(info->mti_xattr_buf);
720 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
724 /* Useless to check LMA incompatibility because this is already done in
725 * osd_ea_fid_get(), and this will fail long before this code is
727 * So, if we are here, LMA is compatible.
730 lustre_lma_swab(lma);
732 /* Swab and copy LMA */
733 if (ma->ma_need & MA_HSM) {
734 if (lma->lma_compat & LMAC_HSM)
735 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
737 ma->ma_hsm.mh_flags = 0;
738 ma->ma_valid |= MA_HSM;
742 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743 LASSERT(ma->ma_som != NULL);
744 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745 ma->ma_som->msd_size = lma->lma_som_size;
746 ma->ma_som->msd_blocks = lma->lma_som_blocks;
747 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748 ma->ma_valid |= MA_SOM;
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
760 if (ma->ma_need & MA_INODE)
761 rc = mdd_iattr_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_LOV) {
764 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765 S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = __mdd_lmm_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769 if (S_ISREG(mdd_object_type(mdd_obj)))
770 rc = mdd_pfid_get(env, mdd_obj, ma);
772 if (rc == 0 && ma->ma_need & MA_LMV) {
773 if (S_ISDIR(mdd_object_type(mdd_obj)))
774 rc = __mdd_lmv_get(env, mdd_obj, ma);
776 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777 if (S_ISREG(mdd_object_type(mdd_obj)))
778 rc = __mdd_lma_get(env, mdd_obj, ma);
780 #ifdef CONFIG_FS_POSIX_ACL
781 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782 if (S_ISDIR(mdd_object_type(mdd_obj)))
783 rc = mdd_def_acl_get(env, mdd_obj, ma);
786 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787 rc, ma->ma_valid, ma->ma_lmm);
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792 struct mdd_object *mdd_obj, struct md_attr *ma)
795 int needlock = ma->ma_need &
796 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
799 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800 rc = mdd_attr_get_internal(env, mdd_obj, ma);
802 mdd_read_unlock(env, mdd_obj);
807 * No permission check is needed.
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
812 struct mdd_object *mdd_obj = md2mdd_obj(obj);
816 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
821 * No permission check is needed.
823 static int mdd_xattr_get(const struct lu_env *env,
824 struct md_object *obj, struct lu_buf *buf,
827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
832 LASSERT(mdd_object_exists(mdd_obj));
834 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835 rc = mdo_xattr_get(env, mdd_obj, buf, name,
836 mdd_object_capa(env, mdd_obj));
837 mdd_read_unlock(env, mdd_obj);
843 * Permission check is done when open,
844 * no need check again.
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
850 struct dt_object *next;
855 LASSERT(mdd_object_exists(mdd_obj));
857 next = mdd_object_child(mdd_obj);
858 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860 mdd_object_capa(env, mdd_obj));
861 mdd_read_unlock(env, mdd_obj);
866 * No permission check is needed.
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
871 struct mdd_object *mdd_obj = md2mdd_obj(obj);
876 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878 mdd_read_unlock(env, mdd_obj);
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884 struct mdd_object *c, struct md_attr *ma,
885 struct thandle *handle,
886 const struct md_op_spec *spec)
888 struct lu_attr *attr = &ma->ma_attr;
889 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891 const struct dt_index_features *feat = spec->sp_feat;
895 if (!mdd_object_exists(c)) {
896 struct dt_object *next = mdd_object_child(c);
899 if (feat != &dt_directory_features && feat != NULL)
900 dof->dof_type = DFT_INDEX;
902 dof->dof_type = dt_mode_to_dft(attr->la_mode);
904 dof->u.dof_idx.di_feat = feat;
906 /* @hint will be initialized by underlying device. */
907 next->do_ops->do_ah_init(env, hint,
908 p ? mdd_object_child(p) : NULL,
909 attr->la_mode & S_IFMT);
911 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
920 * Make sure the ctime is increased only.
922 static inline int mdd_attr_check(const struct lu_env *env,
923 struct mdd_object *obj,
924 struct lu_attr *attr)
926 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
930 if (attr->la_valid & LA_CTIME) {
931 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
935 if (attr->la_ctime < tmp_la->la_ctime)
936 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937 else if (attr->la_valid == LA_CTIME &&
938 attr->la_ctime == tmp_la->la_ctime)
939 attr->la_valid &= ~LA_CTIME;
944 int mdd_attr_set_internal(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955 if (!rc && (attr->la_valid & LA_MODE) && needacl)
956 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962 struct mdd_object *obj,
963 struct lu_attr *attr,
964 struct thandle *handle,
970 rc = mdd_attr_check(env, obj, attr);
975 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980 struct mdd_object *obj,
981 struct lu_attr *attr,
982 struct thandle *handle,
988 needacl = needacl && (attr->la_valid & LA_MODE);
990 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
993 mdd_write_unlock(env, obj);
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998 struct mdd_object *obj,
999 struct lu_attr *attr,
1000 struct thandle *handle,
1006 needacl = needacl && (attr->la_valid & LA_MODE);
1008 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1011 mdd_write_unlock(env, obj);
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016 const struct lu_buf *buf, const char *name,
1017 int fl, struct thandle *handle)
1019 struct lustre_capa *capa = mdd_object_capa(env, obj);
1023 if (buf->lb_buf && buf->lb_len > 0)
1024 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026 rc = mdo_xattr_del(env, obj, name, handle, capa);
1032 * This gives the same functionality as the code between
1033 * sys_chmod and inode_setattr
1034 * chown_common and inode_setattr
1035 * utimes and inode_setattr
1036 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039 struct lu_attr *la, const struct md_attr *ma)
1041 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1042 struct md_ucred *uc;
1049 /* Do not permit change file type */
1050 if (la->la_valid & LA_TYPE)
1053 /* They should not be processed by setattr */
1054 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1057 /* export destroy does not have ->le_ses, but we may want
1058 * to drop LUSTRE_SOM_FL. */
1064 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1068 if (la->la_valid == LA_CTIME) {
1069 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070 /* This is only for set ctime when rename's source is
1072 rc = mdd_may_delete(env, NULL, obj,
1073 (struct md_attr *)ma, 1, 0);
1074 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075 la->la_valid &= ~LA_CTIME;
1079 if (la->la_valid == LA_ATIME) {
1080 /* This is atime only set for read atime update on close. */
1081 if (la->la_atime >= tmp_la->la_atime &&
1082 la->la_atime < (tmp_la->la_atime +
1083 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084 la->la_valid &= ~LA_ATIME;
1088 /* Check if flags change. */
1089 if (la->la_valid & LA_FLAGS) {
1090 unsigned int oldflags = 0;
1091 unsigned int newflags = la->la_flags &
1092 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1094 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095 !mdd_capable(uc, CFS_CAP_FOWNER))
1098 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099 * only be changed by the relevant capability. */
1100 if (mdd_is_immutable(obj))
1101 oldflags |= LUSTRE_IMMUTABLE_FL;
1102 if (mdd_is_append(obj))
1103 oldflags |= LUSTRE_APPEND_FL;
1104 if ((oldflags ^ newflags) &&
1105 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1108 if (!S_ISDIR(tmp_la->la_mode))
1109 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1112 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113 (la->la_valid & ~LA_FLAGS) &&
1114 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1117 /* Check for setting the obj time. */
1118 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1130 if (la->la_valid & LA_KILL_SUID) {
1131 la->la_valid &= ~LA_KILL_SUID;
1132 if ((tmp_la->la_mode & S_ISUID) &&
1133 !(la->la_valid & LA_MODE)) {
1134 la->la_mode = tmp_la->la_mode;
1135 la->la_valid |= LA_MODE;
1137 la->la_mode &= ~S_ISUID;
1140 if (la->la_valid & LA_KILL_SGID) {
1141 la->la_valid &= ~LA_KILL_SGID;
1142 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143 (S_ISGID | S_IXGRP)) &&
1144 !(la->la_valid & LA_MODE)) {
1145 la->la_mode = tmp_la->la_mode;
1146 la->la_valid |= LA_MODE;
1148 la->la_mode &= ~S_ISGID;
1151 /* Make sure a caller can chmod. */
1152 if (la->la_valid & LA_MODE) {
1153 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154 (uc->mu_fsuid != tmp_la->la_uid) &&
1155 !mdd_capable(uc, CFS_CAP_FOWNER))
1158 if (la->la_mode == (cfs_umode_t) -1)
1159 la->la_mode = tmp_la->la_mode;
1161 la->la_mode = (la->la_mode & S_IALLUGO) |
1162 (tmp_la->la_mode & ~S_IALLUGO);
1164 /* Also check the setgid bit! */
1165 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166 la->la_gid : tmp_la->la_gid) &&
1167 !mdd_capable(uc, CFS_CAP_FSETID))
1168 la->la_mode &= ~S_ISGID;
1170 la->la_mode = tmp_la->la_mode;
1173 /* Make sure a caller can chown. */
1174 if (la->la_valid & LA_UID) {
1175 if (la->la_uid == (uid_t) -1)
1176 la->la_uid = tmp_la->la_uid;
1177 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178 (la->la_uid != tmp_la->la_uid)) &&
1179 !mdd_capable(uc, CFS_CAP_CHOWN))
1182 /* If the user or group of a non-directory has been
1183 * changed by a non-root user, remove the setuid bit.
1184 * 19981026 David C Niemi <niemi@tux.org>
1186 * Changed this to apply to all users, including root,
1187 * to avoid some races. This is the behavior we had in
1188 * 2.0. The check for non-root was definitely wrong
1189 * for 2.2 anyway, as it should have been using
1190 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192 !S_ISDIR(tmp_la->la_mode)) {
1193 la->la_mode &= ~S_ISUID;
1194 la->la_valid |= LA_MODE;
1198 /* Make sure caller can chgrp. */
1199 if (la->la_valid & LA_GID) {
1200 if (la->la_gid == (gid_t) -1)
1201 la->la_gid = tmp_la->la_gid;
1202 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203 ((la->la_gid != tmp_la->la_gid) &&
1204 !lustre_in_group_p(uc, la->la_gid))) &&
1205 !mdd_capable(uc, CFS_CAP_CHOWN))
1208 /* Likewise, if the user or group of a non-directory
1209 * has been changed by a non-root user, remove the
1210 * setgid bit UNLESS there is no group execute bit
1211 * (this would be a file marked for mandatory
1212 * locking). 19981026 David C Niemi <niemi@tux.org>
1214 * Removed the fsuid check (see the comment above) --
1216 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218 la->la_mode &= ~S_ISGID;
1219 la->la_valid |= LA_MODE;
1223 /* For both Size-on-MDS case and truncate case,
1224 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226 * For SOM case, it is true, the MAY_WRITE perm has been checked
1227 * when open, no need check again. For truncate case, it is false,
1228 * the MAY_WRITE perm should be checked here. */
1229 if (ma->ma_attr_flags & MDS_SOM) {
1230 /* For the "Size-on-MDS" setattr update, merge coming
1231 * attributes with the set in the inode. BUG 10641 */
1232 if ((la->la_valid & LA_ATIME) &&
1233 (la->la_atime <= tmp_la->la_atime))
1234 la->la_valid &= ~LA_ATIME;
1236 /* OST attributes do not have a priority over MDS attributes,
1237 * so drop times if ctime is equal. */
1238 if ((la->la_valid & LA_CTIME) &&
1239 (la->la_ctime <= tmp_la->la_ctime))
1240 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1242 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244 (uc->mu_fsuid == tmp_la->la_uid)) &&
1245 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246 rc = mdd_permission_internal_locked(env, obj,
1253 if (la->la_valid & LA_CTIME) {
1254 /* The pure setattr, it has the priority over what is
1255 * already set, do not drop it if ctime is equal. */
1256 if (la->la_ctime < tmp_la->la_ctime)
1257 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1265 /** Store a data change changelog record
1266 * If this fails, we must fail the whole transaction; we don't
1267 * want the change to commit without the log entry.
1268 * \param mdd_obj - mdd_object of change
1269 * \param handle - transacion handle
1271 static int mdd_changelog_data_store(const struct lu_env *env,
1272 struct mdd_device *mdd,
1273 enum changelog_rec_type type,
1275 struct mdd_object *mdd_obj,
1276 struct thandle *handle)
1278 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279 struct llog_changelog_rec *rec;
1285 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1287 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1290 LASSERT(handle != NULL);
1291 LASSERT(mdd_obj != NULL);
1293 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295 /* Don't need multiple updates in this log */
1296 /* Don't check under lock - no big deal if we get an extra
1301 reclen = llog_data_len(sizeof(*rec));
1302 buf = mdd_buf_alloc(env, reclen);
1303 if (buf->lb_buf == NULL)
1305 rec = (struct llog_changelog_rec *)buf->lb_buf;
1307 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308 rec->cr.cr_type = (__u32)type;
1309 rec->cr.cr_tfid = *tfid;
1310 rec->cr.cr_namelen = 0;
1311 mdd_obj->mod_cltime = cfs_time_current_64();
1313 rc = mdd_changelog_llog_write(mdd, rec, handle);
1315 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316 rc, type, PFID(tfid));
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324 int flags, struct md_object *obj)
1326 struct thandle *handle;
1327 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328 struct mdd_device *mdd = mdo2mdd(obj);
1332 handle = mdd_trans_start(env, mdd);
1335 return(PTR_ERR(handle));
1337 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1340 mdd_trans_stop(env, mdd, rc, handle);
1346 * Should be called with write lock held.
1348 * \see mdd_lma_set_locked().
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1353 struct mdd_thread_info *info = mdd_env_info(env);
1355 struct lustre_mdt_attrs *lma =
1356 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357 int lmasize = sizeof(struct lustre_mdt_attrs);
1362 /* Either HSM or SOM part is not valid, we need to read it before */
1363 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1368 lustre_lma_swab(lma);
1370 memset(lma, 0, lmasize);
1374 if (ma->ma_valid & MA_HSM) {
1375 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376 lma->lma_compat |= LMAC_HSM;
1380 if (ma->ma_valid & MA_SOM) {
1381 LASSERT(ma->ma_som != NULL);
1382 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383 lma->lma_compat &= ~LMAC_SOM;
1385 lma->lma_compat |= LMAC_SOM;
1386 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1387 lma->lma_som_size = ma->ma_som->msd_size;
1388 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1389 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1394 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1396 lustre_lma_swab(lma);
1397 buf = mdd_buf_get(env, lma, lmasize);
1398 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1404 * Save LMA extended attributes with data from \a ma.
1406 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407 * not, LMA EA will be first read from disk, modified and write back.
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411 struct mdd_object *mdd_obj,
1412 const struct md_attr *ma, struct thandle *handle)
1416 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418 mdd_write_unlock(env, mdd_obj);
1422 /* Precedence for choosing record type when multiple
1423 * attributes change: setattr > mtime > ctime > atime
1424 * (ctime changes when mtime does, plus chmod/chown.
1425 * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427 struct md_object *obj, struct thandle *handle,
1430 struct mdd_device *mdd = mdo2mdd(obj);
1433 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437 bits = bits & mdd->mdd_cl.mc_mask;
1441 /* The record type is the lowest non-masked set bit */
1442 while (bits && ((bits & 1) == 0)) {
1447 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449 md2mdd_obj(obj), handle);
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454 const struct md_attr *ma)
1456 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457 struct mdd_device *mdd = mdo2mdd(obj);
1458 struct thandle *handle;
1459 struct lov_mds_md *lmm = NULL;
1460 struct llog_cookie *logcookies = NULL;
1461 int rc, lmm_size = 0, cookie_size = 0;
1462 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463 #ifdef HAVE_QUOTA_SUPPORT
1464 struct obd_device *obd = mdd->mdd_obd_dev;
1465 struct mds_obd *mds = &obd->u.mds;
1466 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468 int quota_opc = 0, block_count = 0;
1469 int inode_pending[MAXQUOTAS] = { 0, 0 };
1470 int block_pending[MAXQUOTAS] = { 0, 0 };
1474 *la_copy = ma->ma_attr;
1475 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1479 /* setattr on "close" only change atime, or do nothing */
1480 if (ma->ma_valid == MA_INODE &&
1481 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1484 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1485 MDD_TXN_ATTR_SET_OP);
1486 handle = mdd_trans_start(env, mdd);
1488 RETURN(PTR_ERR(handle));
1489 /*TODO: add lock here*/
1490 /* start a log jounal handle if needed */
1491 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1492 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1493 lmm_size = mdd_lov_mdsize(env, mdd);
1494 lmm = mdd_max_lmm_get(env, mdd);
1496 GOTO(cleanup, rc = -ENOMEM);
1498 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1505 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1506 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1507 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1509 #ifdef HAVE_QUOTA_SUPPORT
1510 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1511 struct obd_export *exp = md_quota(env)->mq_exp;
1512 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1514 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1516 quota_opc = FSFILT_OP_SETATTR;
1517 mdd_quota_wrapper(la_copy, qnids);
1518 mdd_quota_wrapper(la_tmp, qoids);
1519 /* get file quota for new owner */
1520 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1521 qnids, inode_pending, 1, NULL, 0,
1523 block_count = (la_tmp->la_blocks + 7) >> 3;
1526 mdd_data_get(env, mdd_obj, &data);
1527 /* get block quota for new owner */
1528 lquota_chkquota(mds_quota_interface_ref, obd,
1529 exp, qnids, block_pending,
1531 LQUOTA_FLAGS_BLK, data, 1);
1537 if (la_copy->la_valid & LA_FLAGS) {
1538 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1541 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1542 } else if (la_copy->la_valid) { /* setattr */
1543 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1545 /* journal chown/chgrp in llog, just like unlink */
1546 if (rc == 0 && lmm_size){
1547 cookie_size = mdd_lov_cookiesize(env, mdd);
1548 logcookies = mdd_max_cookie_get(env, mdd);
1549 if (logcookies == NULL)
1550 GOTO(cleanup, rc = -ENOMEM);
1552 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1553 logcookies, cookie_size) <= 0)
1558 if (rc == 0 && ma->ma_valid & MA_LOV) {
1561 mode = mdd_object_type(mdd_obj);
1562 if (S_ISREG(mode) || S_ISDIR(mode)) {
1563 rc = mdd_lsm_sanity_check(env, mdd_obj);
1567 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1568 ma->ma_lmm_size, handle, 1);
1572 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1575 mode = mdd_object_type(mdd_obj);
1577 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1582 rc = mdd_attr_set_changelog(env, obj, handle,
1583 ma->ma_attr.la_valid);
1584 mdd_trans_stop(env, mdd, rc, handle);
1585 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1586 /*set obd attr, if needed*/
1587 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1590 #ifdef HAVE_QUOTA_SUPPORT
1592 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1594 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1596 /* Trigger dqrel/dqacq for original owner and new owner.
1597 * If failed, the next call for lquota_chkquota will
1599 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1606 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1607 const struct lu_buf *buf, const char *name, int fl,
1608 struct thandle *handle)
1613 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1614 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1615 mdd_write_unlock(env, obj);
1620 static int mdd_xattr_sanity_check(const struct lu_env *env,
1621 struct mdd_object *obj)
1623 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1624 struct md_ucred *uc = md_ucred(env);
1628 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1631 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1635 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1636 !mdd_capable(uc, CFS_CAP_FOWNER))
1643 * The caller should guarantee to update the object ctime
1644 * after xattr_set if needed.
1646 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1647 const struct lu_buf *buf, const char *name,
1650 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1651 struct mdd_device *mdd = mdo2mdd(obj);
1652 struct thandle *handle;
1656 rc = mdd_xattr_sanity_check(env, mdd_obj);
1660 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1661 /* security-replated changes may require sync */
1662 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1663 mdd->mdd_sync_permission == 1)
1664 txn_param_sync(&mdd_env_info(env)->mti_param);
1666 handle = mdd_trans_start(env, mdd);
1668 RETURN(PTR_ERR(handle));
1670 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1672 /* Only record user xattr changes */
1673 if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1674 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1676 mdd_trans_stop(env, mdd, rc, handle);
1682 * The caller should guarantee to update the object ctime
1683 * after xattr_set if needed.
1685 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1688 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1689 struct mdd_device *mdd = mdo2mdd(obj);
1690 struct thandle *handle;
1694 rc = mdd_xattr_sanity_check(env, mdd_obj);
1698 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1699 handle = mdd_trans_start(env, mdd);
1701 RETURN(PTR_ERR(handle));
1703 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1704 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1705 mdd_object_capa(env, mdd_obj));
1706 mdd_write_unlock(env, mdd_obj);
1708 /* Only record user xattr changes */
1709 if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1710 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1713 mdd_trans_stop(env, mdd, rc, handle);
1718 /* partial unlink */
1719 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1722 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1723 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1724 struct mdd_device *mdd = mdo2mdd(obj);
1725 struct thandle *handle;
1726 #ifdef HAVE_QUOTA_SUPPORT
1727 struct obd_device *obd = mdd->mdd_obd_dev;
1728 struct mds_obd *mds = &obd->u.mds;
1729 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1736 * Check -ENOENT early here because we need to get object type
1737 * to calculate credits before transaction start
1739 if (!mdd_object_exists(mdd_obj))
1742 LASSERT(mdd_object_exists(mdd_obj) > 0);
1744 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1748 handle = mdd_trans_start(env, mdd);
1752 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1754 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1758 __mdd_ref_del(env, mdd_obj, handle, 0);
1760 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1762 __mdd_ref_del(env, mdd_obj, handle, 1);
1765 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1766 la_copy->la_ctime = ma->ma_attr.la_ctime;
1768 la_copy->la_valid = LA_CTIME;
1769 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1773 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1774 #ifdef HAVE_QUOTA_SUPPORT
1775 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1776 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1777 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1778 mdd_quota_wrapper(&ma->ma_attr, qids);
1785 mdd_write_unlock(env, mdd_obj);
1786 mdd_trans_stop(env, mdd, rc, handle);
1787 #ifdef HAVE_QUOTA_SUPPORT
1789 /* Trigger dqrel on the owner of child. If failed,
1790 * the next call for lquota_chkquota will process it */
1791 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1797 /* partial operation */
1798 static int mdd_oc_sanity_check(const struct lu_env *env,
1799 struct mdd_object *obj,
1805 switch (ma->ma_attr.la_mode & S_IFMT) {
1822 static int mdd_object_create(const struct lu_env *env,
1823 struct md_object *obj,
1824 const struct md_op_spec *spec,
1828 struct mdd_device *mdd = mdo2mdd(obj);
1829 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1830 const struct lu_fid *pfid = spec->u.sp_pfid;
1831 struct thandle *handle;
1832 #ifdef HAVE_QUOTA_SUPPORT
1833 struct obd_device *obd = mdd->mdd_obd_dev;
1834 struct obd_export *exp = md_quota(env)->mq_exp;
1835 struct mds_obd *mds = &obd->u.mds;
1836 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1837 int quota_opc = 0, block_count = 0;
1838 int inode_pending[MAXQUOTAS] = { 0, 0 };
1839 int block_pending[MAXQUOTAS] = { 0, 0 };
1844 #ifdef HAVE_QUOTA_SUPPORT
1845 if (mds->mds_quota) {
1846 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1847 mdd_quota_wrapper(&ma->ma_attr, qids);
1848 /* get file quota for child */
1849 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1850 qids, inode_pending, 1, NULL, 0,
1852 switch (ma->ma_attr.la_mode & S_IFMT) {
1861 /* get block quota for child */
1863 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1864 qids, block_pending, block_count,
1865 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1869 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1870 handle = mdd_trans_start(env, mdd);
1872 GOTO(out_pending, rc = PTR_ERR(handle));
1874 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1875 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1879 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1883 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1884 /* If creating the slave object, set slave EA here. */
1885 int lmv_size = spec->u.sp_ea.eadatalen;
1886 struct lmv_stripe_md *lmv;
1888 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1889 LASSERT(lmv != NULL && lmv_size > 0);
1891 rc = __mdd_xattr_set(env, mdd_obj,
1892 mdd_buf_get_const(env, lmv, lmv_size),
1893 XATTR_NAME_LMV, 0, handle);
1897 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1900 #ifdef CONFIG_FS_POSIX_ACL
1901 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1902 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1904 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1905 buf->lb_len = spec->u.sp_ea.eadatalen;
1906 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1907 rc = __mdd_acl_init(env, mdd_obj, buf,
1908 &ma->ma_attr.la_mode,
1913 ma->ma_attr.la_valid |= LA_MODE;
1916 pfid = spec->u.sp_ea.fid;
1919 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1925 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1926 mdd_write_unlock(env, mdd_obj);
1928 mdd_trans_stop(env, mdd, rc, handle);
1930 #ifdef HAVE_QUOTA_SUPPORT
1932 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1934 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1936 /* Trigger dqacq on the owner of child. If failed,
1937 * the next call for lquota_chkquota will process it. */
1938 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1946 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1947 const struct md_attr *ma)
1949 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1950 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1951 struct mdd_device *mdd = mdo2mdd(obj);
1952 struct thandle *handle;
1956 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1957 handle = mdd_trans_start(env, mdd);
1961 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1962 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1964 __mdd_ref_add(env, mdd_obj, handle);
1965 mdd_write_unlock(env, mdd_obj);
1967 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1968 la_copy->la_ctime = ma->ma_attr.la_ctime;
1970 la_copy->la_valid = LA_CTIME;
1971 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1974 mdd_trans_stop(env, mdd, 0, handle);
1980 * do NOT or the MAY_*'s, you'll get the weakest
1982 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1986 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1987 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1988 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1989 * owner can write to a file even if it is marked readonly to hide
1990 * its brokenness. (bug 5781) */
1991 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1992 struct md_ucred *uc = md_ucred(env);
1994 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1995 (la->la_uid == uc->mu_fsuid))
1999 if (flags & FMODE_READ)
2001 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2003 if (flags & MDS_FMODE_EXEC)
2008 static int mdd_open_sanity_check(const struct lu_env *env,
2009 struct mdd_object *obj, int flag)
2011 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2016 if (mdd_is_dead_obj(obj))
2019 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2023 if (S_ISLNK(tmp_la->la_mode))
2026 mode = accmode(env, tmp_la, flag);
2028 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2031 if (!(flag & MDS_OPEN_CREATED)) {
2032 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2037 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2038 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2039 flag &= ~MDS_OPEN_TRUNC;
2041 /* For writing append-only file must open it with append mode. */
2042 if (mdd_is_append(obj)) {
2043 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2045 if (flag & MDS_OPEN_TRUNC)
2051 * Now, flag -- O_NOATIME does not be packed by client.
2053 if (flag & O_NOATIME) {
2054 struct md_ucred *uc = md_ucred(env);
2056 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2057 (uc->mu_valid == UCRED_NEW)) &&
2058 (uc->mu_fsuid != tmp_la->la_uid) &&
2059 !mdd_capable(uc, CFS_CAP_FOWNER))
2067 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2070 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2073 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2075 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2077 mdd_obj->mod_count++;
2079 mdd_write_unlock(env, mdd_obj);
2083 /* return md_attr back,
2084 * if it is last unlink then return lov ea + llog cookie*/
2085 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2091 if (S_ISREG(mdd_object_type(obj))) {
2092 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2093 * Caller must be ready for that. */
2095 rc = __mdd_lmm_get(env, obj, ma);
2096 if ((ma->ma_valid & MA_LOV))
2097 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2104 * No permission check is needed.
2106 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2109 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2110 struct mdd_device *mdd = mdo2mdd(obj);
2111 struct thandle *handle = NULL;
2115 #ifdef HAVE_QUOTA_SUPPORT
2116 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2117 struct mds_obd *mds = &obd->u.mds;
2118 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2123 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2124 mdd_obj->mod_count--;
2126 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2127 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2128 "list\n", PFID(mdd_object_fid(mdd_obj)));
2132 /* check without any lock */
2133 if (mdd_obj->mod_count == 1 &&
2134 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2136 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2139 handle = mdd_trans_start(env, mdo2mdd(obj));
2141 RETURN(PTR_ERR(handle));
2144 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2145 if (handle == NULL && mdd_obj->mod_count == 1 &&
2146 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2147 mdd_write_unlock(env, mdd_obj);
2151 /* release open count */
2152 mdd_obj->mod_count --;
2154 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2155 /* remove link to object from orphan index */
2156 rc = __mdd_orphan_del(env, mdd_obj, handle);
2158 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2159 "list, OSS objects to be destroyed.\n",
2160 PFID(mdd_object_fid(mdd_obj)));
2162 CERROR("Object "DFID" can not be deleted from orphan "
2163 "list, maybe cause OST objects can not be "
2164 "destroyed (err: %d).\n",
2165 PFID(mdd_object_fid(mdd_obj)), rc);
2166 /* If object was not deleted from orphan list, do not
2167 * destroy OSS objects, which will be done when next
2173 rc = mdd_iattr_get(env, mdd_obj, ma);
2174 /* Object maybe not in orphan list originally, it is rare case for
2175 * mdd_finish_unlink() failure. */
2176 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2177 #ifdef HAVE_QUOTA_SUPPORT
2178 if (mds->mds_quota) {
2179 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2180 mdd_quota_wrapper(&ma->ma_attr, qids);
2183 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2184 if (ma->ma_valid & MA_FLAGS &&
2185 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2186 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2188 rc = mdd_object_kill(env, mdd_obj, ma);
2194 CERROR("Error when prepare to delete Object "DFID" , "
2195 "which will cause OST objects can not be "
2196 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2202 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2204 mdd_write_unlock(env, mdd_obj);
2206 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2207 #ifdef HAVE_QUOTA_SUPPORT
2209 /* Trigger dqrel on the owner of child. If failed,
2210 * the next call for lquota_chkquota will process it */
2211 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2218 * Permission check is done when open,
2219 * no need check again.
2221 static int mdd_readpage_sanity_check(const struct lu_env *env,
2222 struct mdd_object *obj)
2224 struct dt_object *next = mdd_object_child(obj);
2228 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2236 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2237 struct lu_dirpage *dp, int nob,
2238 const struct dt_it_ops *iops, struct dt_it *it,
2244 struct lu_dirent *ent;
2245 struct lu_dirent *last = NULL;
2248 memset(area, 0, sizeof (*dp));
2249 area += sizeof (*dp);
2250 nob -= sizeof (*dp);
2257 len = iops->key_size(env, it);
2259 /* IAM iterator can return record with zero len. */
2263 hash = iops->store(env, it);
2264 if (unlikely(first)) {
2266 dp->ldp_hash_start = cpu_to_le64(hash);
2269 /* calculate max space required for lu_dirent */
2270 recsize = lu_dirent_calc_size(len, attr);
2272 if (nob >= recsize) {
2273 result = iops->rec(env, it, ent, attr);
2274 if (result == -ESTALE)
2279 /* osd might not able to pack all attributes,
2280 * so recheck rec length */
2281 recsize = le16_to_cpu(ent->lde_reclen);
2283 result = (last != NULL) ? 0 :-EINVAL;
2287 ent = (void *)ent + recsize;
2291 result = iops->next(env, it);
2292 if (result == -ESTALE)
2294 } while (result == 0);
2297 dp->ldp_hash_end = cpu_to_le64(hash);
2299 if (last->lde_hash == dp->ldp_hash_end)
2300 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2301 last->lde_reclen = 0; /* end mark */
2306 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2307 const struct lu_rdpg *rdpg)
2310 struct dt_object *next = mdd_object_child(obj);
2311 const struct dt_it_ops *iops;
2313 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2319 LASSERT(rdpg->rp_pages != NULL);
2320 LASSERT(next->do_index_ops != NULL);
2322 if (rdpg->rp_count <= 0)
2326 * iterate through directory and fill pages from @rdpg
2328 iops = &next->do_index_ops->dio_it;
2329 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2333 rc = iops->load(env, it, rdpg->rp_hash);
2337 * Iterator didn't find record with exactly the key requested.
2339 * It is currently either
2341 * - positioned above record with key less than
2342 * requested---skip it.
2344 * - or not positioned at all (is in IAM_IT_SKEWED
2345 * state)---position it on the next item.
2347 rc = iops->next(env, it);
2352 * At this point and across for-loop:
2354 * rc == 0 -> ok, proceed.
2355 * rc > 0 -> end of directory.
2358 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2359 i++, nob -= CFS_PAGE_SIZE) {
2360 struct lu_dirpage *dp;
2362 LASSERT(i < rdpg->rp_npages);
2363 pg = rdpg->rp_pages[i];
2365 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2368 rc = mdd_dir_page_build(env, mdd, dp,
2369 min_t(int, nob, LU_PAGE_SIZE),
2370 iops, it, rdpg->rp_attrs);
2375 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2377 } else if (rc < 0) {
2378 CWARN("build page failed: %d!\n", rc);
2381 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2382 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2383 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2390 struct lu_dirpage *dp;
2392 dp = cfs_kmap(rdpg->rp_pages[0]);
2393 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2396 * No pages were processed, mark this for first page
2399 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2402 cfs_kunmap(rdpg->rp_pages[0]);
2404 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2407 iops->fini(env, it);
2412 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2413 const struct lu_rdpg *rdpg)
2415 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2419 LASSERT(mdd_object_exists(mdd_obj));
2421 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2422 rc = mdd_readpage_sanity_check(env, mdd_obj);
2424 GOTO(out_unlock, rc);
2426 if (mdd_is_dead_obj(mdd_obj)) {
2428 struct lu_dirpage *dp;
2431 * According to POSIX, please do not return any entry to client:
2432 * even dot and dotdot should not be returned.
2434 CWARN("readdir from dead object: "DFID"\n",
2435 PFID(mdd_object_fid(mdd_obj)));
2437 if (rdpg->rp_count <= 0)
2438 GOTO(out_unlock, rc = -EFAULT);
2439 LASSERT(rdpg->rp_pages != NULL);
2441 pg = rdpg->rp_pages[0];
2442 dp = (struct lu_dirpage*)cfs_kmap(pg);
2443 memset(dp, 0 , sizeof(struct lu_dirpage));
2444 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2445 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2446 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2448 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2451 rc = __mdd_readpage(env, mdd_obj, rdpg);
2455 mdd_read_unlock(env, mdd_obj);
2459 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2461 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2462 struct dt_object *next;
2464 LASSERT(mdd_object_exists(mdd_obj));
2465 next = mdd_object_child(mdd_obj);
2466 return next->do_ops->do_object_sync(env, next);
2469 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2470 struct md_object *obj)
2472 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2474 LASSERT(mdd_object_exists(mdd_obj));
2475 return do_version_get(env, mdd_object_child(mdd_obj));
2478 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2479 dt_obj_version_t version)
2481 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2483 LASSERT(mdd_object_exists(mdd_obj));
2484 do_version_set(env, mdd_object_child(mdd_obj), version);
2487 const struct md_object_operations mdd_obj_ops = {
2488 .moo_permission = mdd_permission,
2489 .moo_attr_get = mdd_attr_get,
2490 .moo_attr_set = mdd_attr_set,
2491 .moo_xattr_get = mdd_xattr_get,
2492 .moo_xattr_set = mdd_xattr_set,
2493 .moo_xattr_list = mdd_xattr_list,
2494 .moo_xattr_del = mdd_xattr_del,
2495 .moo_object_create = mdd_object_create,
2496 .moo_ref_add = mdd_ref_add,
2497 .moo_ref_del = mdd_ref_del,
2498 .moo_open = mdd_open,
2499 .moo_close = mdd_close,
2500 .moo_readpage = mdd_readpage,
2501 .moo_readlink = mdd_readlink,
2502 .moo_changelog = mdd_changelog,
2503 .moo_capa_get = mdd_capa_get,
2504 .moo_object_sync = mdd_object_sync,
2505 .moo_version_get = mdd_version_get,
2506 .moo_version_set = mdd_version_set,
2507 .moo_path = mdd_path,
2508 .moo_file_lock = mdd_file_lock,
2509 .moo_file_unlock = mdd_file_unlock,