1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
55 #include <linux/jbd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
68 #include <linux/ldiskfs_fs.h>
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
73 #include "mdd_internal.h"
75 static const struct lu_object_operations mdd_lu_obj_ops;
77 static int mdd_xattr_get(const struct lu_env *env,
78 struct md_object *obj, struct lu_buf *buf,
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 mdo_data_get(env, obj, data);
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91 struct lu_attr *la, struct lustre_capa *capa)
93 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94 PFID(mdd_object_fid(obj)));
95 return mdo_attr_get(env, obj, la, capa);
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
100 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
102 if (flags & LUSTRE_APPEND_FL)
103 obj->mod_flags |= APPEND_OBJ;
105 if (flags & LUSTRE_IMMUTABLE_FL)
106 obj->mod_flags |= IMMUTE_OBJ;
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
111 struct mdd_thread_info *info;
113 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114 LASSERT(info != NULL);
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 buf = &mdd_env_info(env)->mti_buf;
128 void mdd_buf_put(struct lu_buf *buf)
130 if (buf == NULL || buf->lb_buf == NULL)
132 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138 const void *area, ssize_t len)
142 buf = &mdd_env_info(env)->mti_buf;
143 buf->lb_buf = (void *)area;
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL) {
158 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL)
165 /** Increase the size of the \a mti_big_buf.
166 * preserves old data in buffer
167 * old buffer remains unchanged on error
168 * \retval 0 or -ENOMEM
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
172 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175 LASSERT(len >= oldbuf->lb_len);
176 OBD_ALLOC_LARGE(buf.lb_buf, len);
178 if (buf.lb_buf == NULL)
182 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
184 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
186 memcpy(oldbuf, &buf, sizeof(buf));
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192 struct mdd_device *mdd)
194 struct mdd_thread_info *mti = mdd_env_info(env);
197 max_cookie_size = mdd_lov_cookiesize(env, mdd);
198 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199 if (mti->mti_max_cookie)
200 OBD_FREE_LARGE(mti->mti_max_cookie,
201 mti->mti_max_cookie_size);
202 mti->mti_max_cookie = NULL;
203 mti->mti_max_cookie_size = 0;
205 if (unlikely(mti->mti_max_cookie == NULL)) {
206 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207 if (likely(mti->mti_max_cookie != NULL))
208 mti->mti_max_cookie_size = max_cookie_size;
210 if (likely(mti->mti_max_cookie != NULL))
211 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212 return mti->mti_max_cookie;
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216 struct mdd_device *mdd)
218 struct mdd_thread_info *mti = mdd_env_info(env);
221 max_lmm_size = mdd_lov_mdsize(env, mdd);
222 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223 if (mti->mti_max_lmm)
224 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225 mti->mti_max_lmm = NULL;
226 mti->mti_max_lmm_size = 0;
228 if (unlikely(mti->mti_max_lmm == NULL)) {
229 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230 if (likely(mti->mti_max_lmm != NULL))
231 mti->mti_max_lmm_size = max_lmm_size;
233 return mti->mti_max_lmm;
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237 const struct lu_object_header *hdr,
240 struct mdd_object *mdd_obj;
242 OBD_ALLOC_PTR(mdd_obj);
243 if (mdd_obj != NULL) {
246 o = mdd2lu_obj(mdd_obj);
247 lu_object_init(o, NULL, d);
248 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250 mdd_obj->mod_count = 0;
251 o->lo_ops = &mdd_lu_obj_ops;
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259 const struct lu_object_conf *unused)
261 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262 struct mdd_object *mdd_obj = lu2mdd_obj(o);
263 struct lu_object *below;
264 struct lu_device *under;
267 mdd_obj->mod_cltime = 0;
268 under = &d->mdd_child->dd_lu_dev;
269 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270 mdd_pdlock_init(mdd_obj);
274 lu_object_add(o, below);
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 if (lu_object_exists(o))
282 return mdd_get_flags(env, lu2mdd_obj(o));
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj(o);
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296 lu_printer_t p, const struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300 "valid=%x, cltime="LPU64", flags=%lx)",
301 mdd, mdd->mod_count, mdd->mod_valid,
302 mdd->mod_cltime, mdd->mod_flags);
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306 .loo_object_init = mdd_object_init,
307 .loo_object_start = mdd_object_start,
308 .loo_object_free = mdd_object_free,
309 .loo_object_print = mdd_object_print,
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313 struct mdd_device *d,
314 const struct lu_fid *f)
316 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320 const char *path, struct lu_fid *fid)
323 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324 struct mdd_object *obj;
325 struct lu_name *lname = &mdd_env_info(env)->mti_name;
330 /* temp buffer for path element */
331 buf = mdd_buf_alloc(env, PATH_MAX);
332 if (buf->lb_buf == NULL)
335 lname->ln_name = name = buf->lb_buf;
336 lname->ln_namelen = 0;
337 *f = mdd->mdd_root_fid;
344 while (*path != '/' && *path != '\0') {
352 /* find obj corresponding to fid */
353 obj = mdd_object_find(env, mdd, f);
355 GOTO(out, rc = -EREMOTE);
357 GOTO(out, rc = PTR_ERR(obj));
358 /* get child fid from parent and name */
359 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360 mdd_object_put(env, obj);
365 lname->ln_namelen = 0;
374 /** The maximum depth that fid2path() will search.
375 * This is limited only because we want to store the fids for
376 * historical path lookup purposes.
378 #define MAX_PATH_DEPTH 100
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382 __u64 pli_recno; /**< history point */
383 __u64 pli_currec; /**< current record */
384 struct lu_fid pli_fid;
385 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386 struct mdd_object *pli_mdd_obj;
387 char *pli_path; /**< full path */
389 int pli_linkno; /**< which hardlink to follow */
390 int pli_fidcount; /**< number of \a pli_fids */
393 static int mdd_path_current(const struct lu_env *env,
394 struct path_lookup_info *pli)
396 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397 struct mdd_object *mdd_obj;
398 struct lu_buf *buf = NULL;
399 struct link_ea_header *leh;
400 struct link_ea_entry *lee;
401 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
408 ptr = pli->pli_path + pli->pli_pathlen - 1;
411 pli->pli_fidcount = 0;
412 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415 mdd_obj = mdd_object_find(env, mdd,
416 &pli->pli_fids[pli->pli_fidcount]);
418 GOTO(out, rc = -EREMOTE);
420 GOTO(out, rc = PTR_ERR(mdd_obj));
421 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423 mdd_object_put(env, mdd_obj);
427 /* Do I need to error out here? */
432 /* Get parent fid and object name */
433 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434 buf = mdd_links_get(env, mdd_obj);
435 mdd_read_unlock(env, mdd_obj);
436 mdd_object_put(env, mdd_obj);
438 GOTO(out, rc = PTR_ERR(buf));
441 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 /* If set, use link #linkno for path lookup, otherwise use
445 link #0. Only do this for the final path element. */
446 if ((pli->pli_fidcount == 0) &&
447 (pli->pli_linkno < leh->leh_reccount)) {
449 for (count = 0; count < pli->pli_linkno; count++) {
450 lee = (struct link_ea_entry *)
451 ((char *)lee + reclen);
452 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454 if (pli->pli_linkno < leh->leh_reccount - 1)
455 /* indicate to user there are more links */
459 /* Pack the name in the end of the buffer */
460 ptr -= tmpname->ln_namelen;
461 if (ptr - 1 <= pli->pli_path)
462 GOTO(out, rc = -EOVERFLOW);
463 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466 /* Store the parent fid for historic lookup */
467 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468 GOTO(out, rc = -EOVERFLOW);
469 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472 /* Verify that our path hasn't changed since we started the lookup.
473 Record the current index, and verify the path resolves to the
474 same fid. If it does, then the path is correct as of this index. */
475 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476 pli->pli_currec = mdd->mdd_cl.mc_index;
477 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481 GOTO (out, rc = -EAGAIN);
483 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486 PFID(&pli->pli_fid));
487 GOTO(out, rc = -EAGAIN);
489 ptr++; /* skip leading / */
490 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
494 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495 /* if we vmalloced a large buffer drop it */
501 static int mdd_path_historic(const struct lu_env *env,
502 struct path_lookup_info *pli)
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509 char *path, int pathlen, __u64 *recno, int *linkno)
511 struct path_lookup_info *pli;
519 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
528 pli->pli_mdd_obj = md2mdd_obj(obj);
529 pli->pli_recno = *recno;
530 pli->pli_path = path;
531 pli->pli_pathlen = pathlen;
532 pli->pli_linkno = *linkno;
534 /* Retry multiple times in case file is being moved */
535 while (tries-- && rc == -EAGAIN)
536 rc = mdd_path_current(env, pli);
538 /* For historical path lookup, the current links may not have existed
539 * at "recno" time. We must switch over to earlier links/parents
540 * by using the changelog records. If the earlier parent doesn't
541 * exist, we must search back through the changelog to reconstruct
542 * its parents, then check if it exists, etc.
543 * We may ignore this problem for the initial implementation and
544 * state that an "original" hardlink must still exist for us to find
545 * historic path name. */
546 if (pli->pli_recno != -1) {
547 rc = mdd_path_historic(env, pli);
549 *recno = pli->pli_currec;
550 /* Return next link index to caller */
551 *linkno = pli->pli_linkno;
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 struct lu_attr *la = &mdd_env_info(env)->mti_la;
565 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567 mdd_flags_xlate(obj, la->la_flags);
568 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569 obj->mod_flags |= MNLINK_OBJ;
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624 struct mdd_object *mdd_obj, struct md_attr *ma)
629 if (ma->ma_valid & MA_LOV)
632 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
634 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
637 ma->ma_lmm_size = rc;
638 ma->ma_valid |= MA_LOV;
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646 struct mdd_object *mdd_obj, struct md_attr *ma)
649 struct link_ea_header *leh;
650 struct link_ea_entry *lee;
651 struct lu_fid *pfid = &ma->ma_pfid;
654 if (ma->ma_valid & MA_PFID)
657 buf = mdd_links_get(env, mdd_obj);
659 RETURN(PTR_ERR(buf));
662 lee = (struct link_ea_entry *)(leh + 1);
663 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664 fid_be_to_cpu(pfid, pfid);
665 ma->ma_valid |= MA_PFID;
666 if (buf->lb_len > OBD_ALLOC_BIG)
667 /* if we vmalloced a large buffer drop it */
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
678 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679 rc = __mdd_lmm_get(env, mdd_obj, ma);
680 mdd_read_unlock(env, mdd_obj);
685 static int __mdd_lmv_get(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
691 if (ma->ma_valid & MA_LMV)
694 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
697 ma->ma_valid |= MA_LMV;
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
706 struct mdd_thread_info *info = mdd_env_info(env);
707 struct lustre_mdt_attrs *lma =
708 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
713 /* If all needed data are already valid, nothing to do */
714 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715 (ma->ma_need & (MA_HSM | MA_SOM)))
718 /* Read LMA from disk EA */
719 lma_size = sizeof(info->mti_xattr_buf);
720 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
724 /* Useless to check LMA incompatibility because this is already done in
725 * osd_ea_fid_get(), and this will fail long before this code is
727 * So, if we are here, LMA is compatible.
730 lustre_lma_swab(lma);
732 /* Swab and copy LMA */
733 if (ma->ma_need & MA_HSM) {
734 if (lma->lma_compat & LMAC_HSM)
735 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
737 ma->ma_hsm.mh_flags = 0;
738 ma->ma_valid |= MA_HSM;
742 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743 LASSERT(ma->ma_som != NULL);
744 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745 ma->ma_som->msd_size = lma->lma_som_size;
746 ma->ma_som->msd_blocks = lma->lma_som_blocks;
747 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748 ma->ma_valid |= MA_SOM;
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
760 if (ma->ma_need & MA_INODE)
761 rc = mdd_iattr_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_LOV) {
764 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765 S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = __mdd_lmm_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769 if (S_ISREG(mdd_object_type(mdd_obj)))
770 rc = mdd_pfid_get(env, mdd_obj, ma);
772 if (rc == 0 && ma->ma_need & MA_LMV) {
773 if (S_ISDIR(mdd_object_type(mdd_obj)))
774 rc = __mdd_lmv_get(env, mdd_obj, ma);
776 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777 if (S_ISREG(mdd_object_type(mdd_obj)))
778 rc = __mdd_lma_get(env, mdd_obj, ma);
780 #ifdef CONFIG_FS_POSIX_ACL
781 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782 if (S_ISDIR(mdd_object_type(mdd_obj)))
783 rc = mdd_def_acl_get(env, mdd_obj, ma);
786 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787 rc, ma->ma_valid, ma->ma_lmm);
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792 struct mdd_object *mdd_obj, struct md_attr *ma)
795 int needlock = ma->ma_need &
796 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
799 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800 rc = mdd_attr_get_internal(env, mdd_obj, ma);
802 mdd_read_unlock(env, mdd_obj);
807 * No permission check is needed.
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
812 struct mdd_object *mdd_obj = md2mdd_obj(obj);
816 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
821 * No permission check is needed.
823 static int mdd_xattr_get(const struct lu_env *env,
824 struct md_object *obj, struct lu_buf *buf,
827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
832 LASSERT(mdd_object_exists(mdd_obj));
834 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835 rc = mdo_xattr_get(env, mdd_obj, buf, name,
836 mdd_object_capa(env, mdd_obj));
837 mdd_read_unlock(env, mdd_obj);
843 * Permission check is done when open,
844 * no need check again.
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
850 struct dt_object *next;
855 LASSERT(mdd_object_exists(mdd_obj));
857 next = mdd_object_child(mdd_obj);
858 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860 mdd_object_capa(env, mdd_obj));
861 mdd_read_unlock(env, mdd_obj);
866 * No permission check is needed.
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
871 struct mdd_object *mdd_obj = md2mdd_obj(obj);
876 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878 mdd_read_unlock(env, mdd_obj);
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884 struct mdd_object *c, struct md_attr *ma,
885 struct thandle *handle,
886 const struct md_op_spec *spec)
888 struct lu_attr *attr = &ma->ma_attr;
889 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891 const struct dt_index_features *feat = spec->sp_feat;
895 if (!mdd_object_exists(c)) {
896 struct dt_object *next = mdd_object_child(c);
899 if (feat != &dt_directory_features && feat != NULL)
900 dof->dof_type = DFT_INDEX;
902 dof->dof_type = dt_mode_to_dft(attr->la_mode);
904 dof->u.dof_idx.di_feat = feat;
906 /* @hint will be initialized by underlying device. */
907 next->do_ops->do_ah_init(env, hint,
908 p ? mdd_object_child(p) : NULL,
909 attr->la_mode & S_IFMT);
911 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
920 * Make sure the ctime is increased only.
922 static inline int mdd_attr_check(const struct lu_env *env,
923 struct mdd_object *obj,
924 struct lu_attr *attr)
926 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
930 if (attr->la_valid & LA_CTIME) {
931 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
935 if (attr->la_ctime < tmp_la->la_ctime)
936 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937 else if (attr->la_valid == LA_CTIME &&
938 attr->la_ctime == tmp_la->la_ctime)
939 attr->la_valid &= ~LA_CTIME;
944 int mdd_attr_set_internal(const struct lu_env *env,
945 struct mdd_object *obj,
946 struct lu_attr *attr,
947 struct thandle *handle,
953 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955 if (!rc && (attr->la_valid & LA_MODE) && needacl)
956 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962 struct mdd_object *obj,
963 struct lu_attr *attr,
964 struct thandle *handle,
970 rc = mdd_attr_check(env, obj, attr);
975 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980 struct mdd_object *obj,
981 struct lu_attr *attr,
982 struct thandle *handle,
988 needacl = needacl && (attr->la_valid & LA_MODE);
990 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
993 mdd_write_unlock(env, obj);
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998 struct mdd_object *obj,
999 struct lu_attr *attr,
1000 struct thandle *handle,
1006 needacl = needacl && (attr->la_valid & LA_MODE);
1008 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1011 mdd_write_unlock(env, obj);
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016 const struct lu_buf *buf, const char *name,
1017 int fl, struct thandle *handle)
1019 struct lustre_capa *capa = mdd_object_capa(env, obj);
1023 if (buf->lb_buf && buf->lb_len > 0)
1024 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026 rc = mdo_xattr_del(env, obj, name, handle, capa);
1032 * This gives the same functionality as the code between
1033 * sys_chmod and inode_setattr
1034 * chown_common and inode_setattr
1035 * utimes and inode_setattr
1036 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039 struct lu_attr *la, const struct md_attr *ma)
1041 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1042 struct md_ucred *uc;
1049 /* Do not permit change file type */
1050 if (la->la_valid & LA_TYPE)
1053 /* They should not be processed by setattr */
1054 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1057 /* export destroy does not have ->le_ses, but we may want
1058 * to drop LUSTRE_SOM_FL. */
1064 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1068 if (la->la_valid == LA_CTIME) {
1069 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070 /* This is only for set ctime when rename's source is
1072 rc = mdd_may_delete(env, NULL, obj,
1073 (struct md_attr *)ma, 1, 0);
1074 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075 la->la_valid &= ~LA_CTIME;
1079 if (la->la_valid == LA_ATIME) {
1080 /* This is atime only set for read atime update on close. */
1081 if (la->la_atime >= tmp_la->la_atime &&
1082 la->la_atime < (tmp_la->la_atime +
1083 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084 la->la_valid &= ~LA_ATIME;
1088 /* Check if flags change. */
1089 if (la->la_valid & LA_FLAGS) {
1090 unsigned int oldflags = 0;
1091 unsigned int newflags = la->la_flags &
1092 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1094 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095 !mdd_capable(uc, CFS_CAP_FOWNER))
1098 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099 * only be changed by the relevant capability. */
1100 if (mdd_is_immutable(obj))
1101 oldflags |= LUSTRE_IMMUTABLE_FL;
1102 if (mdd_is_append(obj))
1103 oldflags |= LUSTRE_APPEND_FL;
1104 if ((oldflags ^ newflags) &&
1105 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1108 if (!S_ISDIR(tmp_la->la_mode))
1109 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1112 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113 (la->la_valid & ~LA_FLAGS) &&
1114 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1117 /* Check for setting the obj time. */
1118 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1130 if (la->la_valid & LA_KILL_SUID) {
1131 la->la_valid &= ~LA_KILL_SUID;
1132 if ((tmp_la->la_mode & S_ISUID) &&
1133 !(la->la_valid & LA_MODE)) {
1134 la->la_mode = tmp_la->la_mode;
1135 la->la_valid |= LA_MODE;
1137 la->la_mode &= ~S_ISUID;
1140 if (la->la_valid & LA_KILL_SGID) {
1141 la->la_valid &= ~LA_KILL_SGID;
1142 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143 (S_ISGID | S_IXGRP)) &&
1144 !(la->la_valid & LA_MODE)) {
1145 la->la_mode = tmp_la->la_mode;
1146 la->la_valid |= LA_MODE;
1148 la->la_mode &= ~S_ISGID;
1151 /* Make sure a caller can chmod. */
1152 if (la->la_valid & LA_MODE) {
1153 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154 (uc->mu_fsuid != tmp_la->la_uid) &&
1155 !mdd_capable(uc, CFS_CAP_FOWNER))
1158 if (la->la_mode == (cfs_umode_t) -1)
1159 la->la_mode = tmp_la->la_mode;
1161 la->la_mode = (la->la_mode & S_IALLUGO) |
1162 (tmp_la->la_mode & ~S_IALLUGO);
1164 /* Also check the setgid bit! */
1165 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166 la->la_gid : tmp_la->la_gid) &&
1167 !mdd_capable(uc, CFS_CAP_FSETID))
1168 la->la_mode &= ~S_ISGID;
1170 la->la_mode = tmp_la->la_mode;
1173 /* Make sure a caller can chown. */
1174 if (la->la_valid & LA_UID) {
1175 if (la->la_uid == (uid_t) -1)
1176 la->la_uid = tmp_la->la_uid;
1177 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178 (la->la_uid != tmp_la->la_uid)) &&
1179 !mdd_capable(uc, CFS_CAP_CHOWN))
1182 /* If the user or group of a non-directory has been
1183 * changed by a non-root user, remove the setuid bit.
1184 * 19981026 David C Niemi <niemi@tux.org>
1186 * Changed this to apply to all users, including root,
1187 * to avoid some races. This is the behavior we had in
1188 * 2.0. The check for non-root was definitely wrong
1189 * for 2.2 anyway, as it should have been using
1190 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192 !S_ISDIR(tmp_la->la_mode)) {
1193 la->la_mode &= ~S_ISUID;
1194 la->la_valid |= LA_MODE;
1198 /* Make sure caller can chgrp. */
1199 if (la->la_valid & LA_GID) {
1200 if (la->la_gid == (gid_t) -1)
1201 la->la_gid = tmp_la->la_gid;
1202 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203 ((la->la_gid != tmp_la->la_gid) &&
1204 !lustre_in_group_p(uc, la->la_gid))) &&
1205 !mdd_capable(uc, CFS_CAP_CHOWN))
1208 /* Likewise, if the user or group of a non-directory
1209 * has been changed by a non-root user, remove the
1210 * setgid bit UNLESS there is no group execute bit
1211 * (this would be a file marked for mandatory
1212 * locking). 19981026 David C Niemi <niemi@tux.org>
1214 * Removed the fsuid check (see the comment above) --
1216 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218 la->la_mode &= ~S_ISGID;
1219 la->la_valid |= LA_MODE;
1223 /* For both Size-on-MDS case and truncate case,
1224 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226 * For SOM case, it is true, the MAY_WRITE perm has been checked
1227 * when open, no need check again. For truncate case, it is false,
1228 * the MAY_WRITE perm should be checked here. */
1229 if (ma->ma_attr_flags & MDS_SOM) {
1230 /* For the "Size-on-MDS" setattr update, merge coming
1231 * attributes with the set in the inode. BUG 10641 */
1232 if ((la->la_valid & LA_ATIME) &&
1233 (la->la_atime <= tmp_la->la_atime))
1234 la->la_valid &= ~LA_ATIME;
1236 /* OST attributes do not have a priority over MDS attributes,
1237 * so drop times if ctime is equal. */
1238 if ((la->la_valid & LA_CTIME) &&
1239 (la->la_ctime <= tmp_la->la_ctime))
1240 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1242 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244 (uc->mu_fsuid == tmp_la->la_uid)) &&
1245 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246 rc = mdd_permission_internal_locked(env, obj,
1253 if (la->la_valid & LA_CTIME) {
1254 /* The pure setattr, it has the priority over what is
1255 * already set, do not drop it if ctime is equal. */
1256 if (la->la_ctime < tmp_la->la_ctime)
1257 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1265 /** Store a data change changelog record
1266 * If this fails, we must fail the whole transaction; we don't
1267 * want the change to commit without the log entry.
1268 * \param mdd_obj - mdd_object of change
1269 * \param handle - transacion handle
1271 static int mdd_changelog_data_store(const struct lu_env *env,
1272 struct mdd_device *mdd,
1273 enum changelog_rec_type type,
1275 struct mdd_object *mdd_obj,
1276 struct thandle *handle)
1278 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279 struct llog_changelog_rec *rec;
1285 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1287 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1290 LASSERT(handle != NULL);
1291 LASSERT(mdd_obj != NULL);
1293 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295 /* Don't need multiple updates in this log */
1296 /* Don't check under lock - no big deal if we get an extra
1301 reclen = llog_data_len(sizeof(*rec));
1302 buf = mdd_buf_alloc(env, reclen);
1303 if (buf->lb_buf == NULL)
1305 rec = (struct llog_changelog_rec *)buf->lb_buf;
1307 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308 rec->cr.cr_type = (__u32)type;
1309 rec->cr.cr_tfid = *tfid;
1310 rec->cr.cr_namelen = 0;
1311 mdd_obj->mod_cltime = cfs_time_current_64();
1313 rc = mdd_changelog_llog_write(mdd, rec, handle);
1315 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316 rc, type, PFID(tfid));
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324 int flags, struct md_object *obj)
1326 struct thandle *handle;
1327 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328 struct mdd_device *mdd = mdo2mdd(obj);
1332 handle = mdd_trans_start(env, mdd);
1335 return(PTR_ERR(handle));
1337 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1340 mdd_trans_stop(env, mdd, rc, handle);
1346 * Should be called with write lock held.
1348 * \see mdd_lma_set_locked().
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351 const struct md_attr *ma, struct thandle *handle)
1353 struct mdd_thread_info *info = mdd_env_info(env);
1355 struct lustre_mdt_attrs *lma =
1356 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357 int lmasize = sizeof(struct lustre_mdt_attrs);
1362 /* Either HSM or SOM part is not valid, we need to read it before */
1363 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1368 lustre_lma_swab(lma);
1370 memset(lma, 0, lmasize);
1374 if (ma->ma_valid & MA_HSM) {
1375 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376 lma->lma_compat |= LMAC_HSM;
1380 if (ma->ma_valid & MA_SOM) {
1381 LASSERT(ma->ma_som != NULL);
1382 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383 lma->lma_compat &= ~LMAC_SOM;
1385 lma->lma_compat |= LMAC_SOM;
1386 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1387 lma->lma_som_size = ma->ma_som->msd_size;
1388 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1389 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1394 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1396 lustre_lma_swab(lma);
1397 buf = mdd_buf_get(env, lma, lmasize);
1398 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1404 * Save LMA extended attributes with data from \a ma.
1406 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407 * not, LMA EA will be first read from disk, modified and write back.
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411 struct mdd_object *mdd_obj,
1412 const struct md_attr *ma, struct thandle *handle)
1416 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418 mdd_write_unlock(env, mdd_obj);
1422 /* Precedence for choosing record type when multiple
1423 * attributes change: setattr > mtime > ctime > atime
1424 * (ctime changes when mtime does, plus chmod/chown.
1425 * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427 struct md_object *obj, struct thandle *handle,
1430 struct mdd_device *mdd = mdo2mdd(obj);
1433 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437 bits = bits & mdd->mdd_cl.mc_mask;
1441 /* The record type is the lowest non-masked set bit */
1442 while (bits && ((bits & 1) == 0)) {
1447 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449 md2mdd_obj(obj), handle);
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454 const struct md_attr *ma)
1456 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457 struct mdd_device *mdd = mdo2mdd(obj);
1458 struct thandle *handle;
1459 struct lov_mds_md *lmm = NULL;
1460 struct llog_cookie *logcookies = NULL;
1461 int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463 struct obd_device *obd = mdd->mdd_obd_dev;
1464 struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468 int quota_opc = 0, block_count = 0;
1469 int inode_pending[MAXQUOTAS] = { 0, 0 };
1470 int block_pending[MAXQUOTAS] = { 0, 0 };
1474 *la_copy = ma->ma_attr;
1475 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1479 /* setattr on "close" only change atime, or do nothing */
1480 if (ma->ma_valid == MA_INODE &&
1481 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1484 /*TODO: add lock here*/
1485 /* start a log jounal handle if needed */
1486 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488 lmm_size = mdd_lov_mdsize(env, mdd);
1489 lmm = mdd_max_lmm_get(env, mdd);
1491 GOTO(no_trans, rc = -ENOMEM);
1493 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1501 if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503 lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1506 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1507 MDD_TXN_ATTR_SET_OP, chlog_cnt);
1508 handle = mdd_trans_start(env, mdd);
1510 GOTO(no_trans, rc = PTR_ERR(handle));
1512 /* permission changes may require sync operation */
1513 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1514 handle->th_sync |= mdd->mdd_sync_permission;
1516 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1517 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1518 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1520 #ifdef HAVE_QUOTA_SUPPORT
1521 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1522 struct obd_export *exp = md_quota(env)->mq_exp;
1523 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1525 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1527 quota_opc = FSFILT_OP_SETATTR;
1528 mdd_quota_wrapper(la_copy, qnids);
1529 mdd_quota_wrapper(la_tmp, qoids);
1530 /* get file quota for new owner */
1531 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1532 qnids, inode_pending, 1, NULL, 0,
1534 block_count = (la_tmp->la_blocks + 7) >> 3;
1537 mdd_data_get(env, mdd_obj, &data);
1538 /* get block quota for new owner */
1539 lquota_chkquota(mds_quota_interface_ref, obd,
1540 exp, qnids, block_pending,
1542 LQUOTA_FLAGS_BLK, data, 1);
1548 if (la_copy->la_valid & LA_FLAGS) {
1549 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1552 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1553 } else if (la_copy->la_valid) { /* setattr */
1554 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1556 /* journal chown/chgrp in llog, just like unlink */
1557 if (rc == 0 && lmm_size){
1558 cookie_size = mdd_lov_cookiesize(env, mdd);
1559 logcookies = mdd_max_cookie_get(env, mdd);
1560 if (logcookies == NULL)
1561 GOTO(cleanup, rc = -ENOMEM);
1563 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1564 logcookies, cookie_size) <= 0)
1569 if (rc == 0 && ma->ma_valid & MA_LOV) {
1572 mode = mdd_object_type(mdd_obj);
1573 if (S_ISREG(mode) || S_ISDIR(mode)) {
1574 rc = mdd_lsm_sanity_check(env, mdd_obj);
1578 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1579 ma->ma_lmm_size, handle, 1);
1583 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1586 mode = mdd_object_type(mdd_obj);
1588 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1593 rc = mdd_attr_set_changelog(env, obj, handle,
1594 ma->ma_attr.la_valid);
1595 mdd_trans_stop(env, mdd, rc, handle);
1597 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1598 /*set obd attr, if needed*/
1599 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1602 #ifdef HAVE_QUOTA_SUPPORT
1604 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1606 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1608 /* Trigger dqrel/dqacq for original owner and new owner.
1609 * If failed, the next call for lquota_chkquota will
1611 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1618 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1619 const struct lu_buf *buf, const char *name, int fl,
1620 struct thandle *handle)
1625 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1626 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1627 mdd_write_unlock(env, obj);
1632 static int mdd_xattr_sanity_check(const struct lu_env *env,
1633 struct mdd_object *obj)
1635 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1636 struct md_ucred *uc = md_ucred(env);
1640 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1643 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1647 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1648 !mdd_capable(uc, CFS_CAP_FOWNER))
1655 * The caller should guarantee to update the object ctime
1656 * after xattr_set if needed.
1658 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1659 const struct lu_buf *buf, const char *name,
1662 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663 struct mdd_device *mdd = mdo2mdd(obj);
1664 struct thandle *handle;
1668 rc = mdd_xattr_sanity_check(env, mdd_obj);
1672 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1673 handle = mdd_trans_start(env, mdd);
1675 RETURN(PTR_ERR(handle));
1677 /* security-replated changes may require sync */
1678 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1679 handle->th_sync |= mdd->mdd_sync_permission;
1681 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1683 /* Only record system & user xattr changes */
1684 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1685 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1686 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1687 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1688 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1689 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1690 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1692 mdd_trans_stop(env, mdd, rc, handle);
1698 * The caller should guarantee to update the object ctime
1699 * after xattr_set if needed.
1701 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1704 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1705 struct mdd_device *mdd = mdo2mdd(obj);
1706 struct thandle *handle;
1710 rc = mdd_xattr_sanity_check(env, mdd_obj);
1714 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1715 handle = mdd_trans_start(env, mdd);
1717 RETURN(PTR_ERR(handle));
1719 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1720 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1721 mdd_object_capa(env, mdd_obj));
1722 mdd_write_unlock(env, mdd_obj);
1724 /* Only record system & user xattr changes */
1725 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1726 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1727 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1728 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1729 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1730 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1731 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1734 mdd_trans_stop(env, mdd, rc, handle);
1739 /* partial unlink */
1740 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1743 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1744 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1745 struct mdd_device *mdd = mdo2mdd(obj);
1746 struct thandle *handle;
1747 #ifdef HAVE_QUOTA_SUPPORT
1748 struct obd_device *obd = mdd->mdd_obd_dev;
1749 struct mds_obd *mds = &obd->u.mds;
1750 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1757 * Check -ENOENT early here because we need to get object type
1758 * to calculate credits before transaction start
1760 if (!mdd_object_exists(mdd_obj))
1763 LASSERT(mdd_object_exists(mdd_obj) > 0);
1765 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1769 handle = mdd_trans_start(env, mdd);
1773 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1775 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1779 __mdd_ref_del(env, mdd_obj, handle, 0);
1781 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1783 __mdd_ref_del(env, mdd_obj, handle, 1);
1786 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1787 la_copy->la_ctime = ma->ma_attr.la_ctime;
1789 la_copy->la_valid = LA_CTIME;
1790 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1794 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1795 #ifdef HAVE_QUOTA_SUPPORT
1796 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1797 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1798 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1799 mdd_quota_wrapper(&ma->ma_attr, qids);
1806 mdd_write_unlock(env, mdd_obj);
1807 mdd_trans_stop(env, mdd, rc, handle);
1808 #ifdef HAVE_QUOTA_SUPPORT
1810 /* Trigger dqrel on the owner of child. If failed,
1811 * the next call for lquota_chkquota will process it */
1812 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1818 /* partial operation */
1819 static int mdd_oc_sanity_check(const struct lu_env *env,
1820 struct mdd_object *obj,
1826 switch (ma->ma_attr.la_mode & S_IFMT) {
1843 static int mdd_object_create(const struct lu_env *env,
1844 struct md_object *obj,
1845 const struct md_op_spec *spec,
1849 struct mdd_device *mdd = mdo2mdd(obj);
1850 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1851 const struct lu_fid *pfid = spec->u.sp_pfid;
1852 struct thandle *handle;
1853 #ifdef HAVE_QUOTA_SUPPORT
1854 struct obd_device *obd = mdd->mdd_obd_dev;
1855 struct obd_export *exp = md_quota(env)->mq_exp;
1856 struct mds_obd *mds = &obd->u.mds;
1857 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1858 int quota_opc = 0, block_count = 0;
1859 int inode_pending[MAXQUOTAS] = { 0, 0 };
1860 int block_pending[MAXQUOTAS] = { 0, 0 };
1865 #ifdef HAVE_QUOTA_SUPPORT
1866 if (mds->mds_quota) {
1867 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1868 mdd_quota_wrapper(&ma->ma_attr, qids);
1869 /* get file quota for child */
1870 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1871 qids, inode_pending, 1, NULL, 0,
1873 switch (ma->ma_attr.la_mode & S_IFMT) {
1882 /* get block quota for child */
1884 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1885 qids, block_pending, block_count,
1886 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1890 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1891 handle = mdd_trans_start(env, mdd);
1893 GOTO(out_pending, rc = PTR_ERR(handle));
1895 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1896 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1900 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1904 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1905 /* If creating the slave object, set slave EA here. */
1906 int lmv_size = spec->u.sp_ea.eadatalen;
1907 struct lmv_stripe_md *lmv;
1909 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1910 LASSERT(lmv != NULL && lmv_size > 0);
1912 rc = __mdd_xattr_set(env, mdd_obj,
1913 mdd_buf_get_const(env, lmv, lmv_size),
1914 XATTR_NAME_LMV, 0, handle);
1918 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1921 #ifdef CONFIG_FS_POSIX_ACL
1922 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1923 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1925 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1926 buf->lb_len = spec->u.sp_ea.eadatalen;
1927 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1928 rc = __mdd_acl_init(env, mdd_obj, buf,
1929 &ma->ma_attr.la_mode,
1934 ma->ma_attr.la_valid |= LA_MODE;
1937 pfid = spec->u.sp_ea.fid;
1940 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1946 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1947 mdd_write_unlock(env, mdd_obj);
1949 mdd_trans_stop(env, mdd, rc, handle);
1951 #ifdef HAVE_QUOTA_SUPPORT
1953 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1955 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1957 /* Trigger dqacq on the owner of child. If failed,
1958 * the next call for lquota_chkquota will process it. */
1959 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1967 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1968 const struct md_attr *ma)
1970 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1971 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1972 struct mdd_device *mdd = mdo2mdd(obj);
1973 struct thandle *handle;
1977 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1978 handle = mdd_trans_start(env, mdd);
1982 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1983 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1985 __mdd_ref_add(env, mdd_obj, handle);
1986 mdd_write_unlock(env, mdd_obj);
1988 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1989 la_copy->la_ctime = ma->ma_attr.la_ctime;
1991 la_copy->la_valid = LA_CTIME;
1992 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1995 mdd_trans_stop(env, mdd, 0, handle);
2001 * do NOT or the MAY_*'s, you'll get the weakest
2003 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2007 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2008 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2009 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2010 * owner can write to a file even if it is marked readonly to hide
2011 * its brokenness. (bug 5781) */
2012 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2013 struct md_ucred *uc = md_ucred(env);
2015 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2016 (la->la_uid == uc->mu_fsuid))
2020 if (flags & FMODE_READ)
2022 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2024 if (flags & MDS_FMODE_EXEC)
2029 static int mdd_open_sanity_check(const struct lu_env *env,
2030 struct mdd_object *obj, int flag)
2032 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2037 if (mdd_is_dead_obj(obj))
2040 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2044 if (S_ISLNK(tmp_la->la_mode))
2047 mode = accmode(env, tmp_la, flag);
2049 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2052 if (!(flag & MDS_OPEN_CREATED)) {
2053 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2058 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2059 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2060 flag &= ~MDS_OPEN_TRUNC;
2062 /* For writing append-only file must open it with append mode. */
2063 if (mdd_is_append(obj)) {
2064 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2066 if (flag & MDS_OPEN_TRUNC)
2072 * Now, flag -- O_NOATIME does not be packed by client.
2074 if (flag & O_NOATIME) {
2075 struct md_ucred *uc = md_ucred(env);
2077 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2078 (uc->mu_valid == UCRED_NEW)) &&
2079 (uc->mu_fsuid != tmp_la->la_uid) &&
2080 !mdd_capable(uc, CFS_CAP_FOWNER))
2088 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2091 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2094 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2096 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2098 mdd_obj->mod_count++;
2100 mdd_write_unlock(env, mdd_obj);
2104 /* return md_attr back,
2105 * if it is last unlink then return lov ea + llog cookie*/
2106 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2112 if (S_ISREG(mdd_object_type(obj))) {
2113 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2114 * Caller must be ready for that. */
2116 rc = __mdd_lmm_get(env, obj, ma);
2117 if ((ma->ma_valid & MA_LOV))
2118 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2125 * No permission check is needed.
2127 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2130 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2131 struct mdd_device *mdd = mdo2mdd(obj);
2132 struct thandle *handle = NULL;
2136 #ifdef HAVE_QUOTA_SUPPORT
2137 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2138 struct mds_obd *mds = &obd->u.mds;
2139 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2144 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2145 mdd_obj->mod_count--;
2147 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2148 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2149 "list\n", PFID(mdd_object_fid(mdd_obj)));
2153 /* check without any lock */
2154 if (mdd_obj->mod_count == 1 &&
2155 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2157 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2160 handle = mdd_trans_start(env, mdo2mdd(obj));
2162 RETURN(PTR_ERR(handle));
2165 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2166 if (handle == NULL && mdd_obj->mod_count == 1 &&
2167 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2168 mdd_write_unlock(env, mdd_obj);
2172 /* release open count */
2173 mdd_obj->mod_count --;
2175 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2176 /* remove link to object from orphan index */
2177 rc = __mdd_orphan_del(env, mdd_obj, handle);
2179 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2180 "list, OSS objects to be destroyed.\n",
2181 PFID(mdd_object_fid(mdd_obj)));
2183 CERROR("Object "DFID" can not be deleted from orphan "
2184 "list, maybe cause OST objects can not be "
2185 "destroyed (err: %d).\n",
2186 PFID(mdd_object_fid(mdd_obj)), rc);
2187 /* If object was not deleted from orphan list, do not
2188 * destroy OSS objects, which will be done when next
2194 rc = mdd_iattr_get(env, mdd_obj, ma);
2195 /* Object maybe not in orphan list originally, it is rare case for
2196 * mdd_finish_unlink() failure. */
2197 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2198 #ifdef HAVE_QUOTA_SUPPORT
2199 if (mds->mds_quota) {
2200 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2201 mdd_quota_wrapper(&ma->ma_attr, qids);
2204 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2205 if (ma->ma_valid & MA_FLAGS &&
2206 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2207 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2209 rc = mdd_object_kill(env, mdd_obj, ma);
2215 CERROR("Error when prepare to delete Object "DFID" , "
2216 "which will cause OST objects can not be "
2217 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2223 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2225 mdd_write_unlock(env, mdd_obj);
2227 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2228 #ifdef HAVE_QUOTA_SUPPORT
2230 /* Trigger dqrel on the owner of child. If failed,
2231 * the next call for lquota_chkquota will process it */
2232 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2239 * Permission check is done when open,
2240 * no need check again.
2242 static int mdd_readpage_sanity_check(const struct lu_env *env,
2243 struct mdd_object *obj)
2245 struct dt_object *next = mdd_object_child(obj);
2249 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2257 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2258 struct lu_dirpage *dp, int nob,
2259 const struct dt_it_ops *iops, struct dt_it *it,
2265 struct lu_dirent *ent;
2266 struct lu_dirent *last = NULL;
2269 memset(area, 0, sizeof (*dp));
2270 area += sizeof (*dp);
2271 nob -= sizeof (*dp);
2278 len = iops->key_size(env, it);
2280 /* IAM iterator can return record with zero len. */
2284 hash = iops->store(env, it);
2285 if (unlikely(first)) {
2287 dp->ldp_hash_start = cpu_to_le64(hash);
2290 /* calculate max space required for lu_dirent */
2291 recsize = lu_dirent_calc_size(len, attr);
2293 if (nob >= recsize) {
2294 result = iops->rec(env, it, ent, attr);
2295 if (result == -ESTALE)
2300 /* osd might not able to pack all attributes,
2301 * so recheck rec length */
2302 recsize = le16_to_cpu(ent->lde_reclen);
2304 result = (last != NULL) ? 0 :-EINVAL;
2308 ent = (void *)ent + recsize;
2312 result = iops->next(env, it);
2313 if (result == -ESTALE)
2315 } while (result == 0);
2318 dp->ldp_hash_end = cpu_to_le64(hash);
2320 if (last->lde_hash == dp->ldp_hash_end)
2321 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2322 last->lde_reclen = 0; /* end mark */
2327 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2328 const struct lu_rdpg *rdpg)
2331 struct dt_object *next = mdd_object_child(obj);
2332 const struct dt_it_ops *iops;
2334 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2340 LASSERT(rdpg->rp_pages != NULL);
2341 LASSERT(next->do_index_ops != NULL);
2343 if (rdpg->rp_count <= 0)
2347 * iterate through directory and fill pages from @rdpg
2349 iops = &next->do_index_ops->dio_it;
2350 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2354 rc = iops->load(env, it, rdpg->rp_hash);
2358 * Iterator didn't find record with exactly the key requested.
2360 * It is currently either
2362 * - positioned above record with key less than
2363 * requested---skip it.
2365 * - or not positioned at all (is in IAM_IT_SKEWED
2366 * state)---position it on the next item.
2368 rc = iops->next(env, it);
2373 * At this point and across for-loop:
2375 * rc == 0 -> ok, proceed.
2376 * rc > 0 -> end of directory.
2379 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2380 i++, nob -= CFS_PAGE_SIZE) {
2381 struct lu_dirpage *dp;
2383 LASSERT(i < rdpg->rp_npages);
2384 pg = rdpg->rp_pages[i];
2386 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2389 rc = mdd_dir_page_build(env, mdd, dp,
2390 min_t(int, nob, LU_PAGE_SIZE),
2391 iops, it, rdpg->rp_attrs);
2396 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2398 } else if (rc < 0) {
2399 CWARN("build page failed: %d!\n", rc);
2402 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2403 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2404 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2411 struct lu_dirpage *dp;
2413 dp = cfs_kmap(rdpg->rp_pages[0]);
2414 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2417 * No pages were processed, mark this for first page
2420 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2423 cfs_kunmap(rdpg->rp_pages[0]);
2425 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2428 iops->fini(env, it);
2433 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2434 const struct lu_rdpg *rdpg)
2436 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2440 LASSERT(mdd_object_exists(mdd_obj));
2442 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2443 rc = mdd_readpage_sanity_check(env, mdd_obj);
2445 GOTO(out_unlock, rc);
2447 if (mdd_is_dead_obj(mdd_obj)) {
2449 struct lu_dirpage *dp;
2452 * According to POSIX, please do not return any entry to client:
2453 * even dot and dotdot should not be returned.
2455 CWARN("readdir from dead object: "DFID"\n",
2456 PFID(mdd_object_fid(mdd_obj)));
2458 if (rdpg->rp_count <= 0)
2459 GOTO(out_unlock, rc = -EFAULT);
2460 LASSERT(rdpg->rp_pages != NULL);
2462 pg = rdpg->rp_pages[0];
2463 dp = (struct lu_dirpage*)cfs_kmap(pg);
2464 memset(dp, 0 , sizeof(struct lu_dirpage));
2465 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2466 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2467 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2469 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2472 rc = __mdd_readpage(env, mdd_obj, rdpg);
2476 mdd_read_unlock(env, mdd_obj);
2480 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2482 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2483 struct dt_object *next;
2485 LASSERT(mdd_object_exists(mdd_obj));
2486 next = mdd_object_child(mdd_obj);
2487 return next->do_ops->do_object_sync(env, next);
2490 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2491 struct md_object *obj)
2493 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2495 LASSERT(mdd_object_exists(mdd_obj));
2496 return do_version_get(env, mdd_object_child(mdd_obj));
2499 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2500 dt_obj_version_t version)
2502 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2504 LASSERT(mdd_object_exists(mdd_obj));
2505 do_version_set(env, mdd_object_child(mdd_obj), version);
2508 const struct md_object_operations mdd_obj_ops = {
2509 .moo_permission = mdd_permission,
2510 .moo_attr_get = mdd_attr_get,
2511 .moo_attr_set = mdd_attr_set,
2512 .moo_xattr_get = mdd_xattr_get,
2513 .moo_xattr_set = mdd_xattr_set,
2514 .moo_xattr_list = mdd_xattr_list,
2515 .moo_xattr_del = mdd_xattr_del,
2516 .moo_object_create = mdd_object_create,
2517 .moo_ref_add = mdd_ref_add,
2518 .moo_ref_del = mdd_ref_del,
2519 .moo_open = mdd_open,
2520 .moo_close = mdd_close,
2521 .moo_readpage = mdd_readpage,
2522 .moo_readlink = mdd_readlink,
2523 .moo_changelog = mdd_changelog,
2524 .moo_capa_get = mdd_capa_get,
2525 .moo_object_sync = mdd_object_sync,
2526 .moo_version_get = mdd_version_get,
2527 .moo_version_set = mdd_version_set,
2528 .moo_path = mdd_path,
2529 .moo_file_lock = mdd_file_lock,
2530 .moo_file_unlock = mdd_file_unlock,