1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
63 #include "mdd_internal.h"
65 static const struct lu_object_operations mdd_lu_obj_ops;
67 static int mdd_xattr_get(const struct lu_env *env,
68 struct md_object *obj, struct lu_buf *buf,
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
74 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
75 PFID(mdd_object_fid(obj)));
76 mdo_data_get(env, obj, data);
80 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
81 struct lu_attr *la, struct lustre_capa *capa)
83 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
84 PFID(mdd_object_fid(obj)));
85 return mdo_attr_get(env, obj, la, capa);
88 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92 if (flags & LUSTRE_APPEND_FL)
93 obj->mod_flags |= APPEND_OBJ;
95 if (flags & LUSTRE_IMMUTABLE_FL)
96 obj->mod_flags |= IMMUTE_OBJ;
99 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 struct mdd_thread_info *info;
103 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
104 LASSERT(info != NULL);
108 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
112 buf = &mdd_env_info(env)->mti_buf;
118 void mdd_buf_put(struct lu_buf *buf)
120 if (buf == NULL || buf->lb_buf == NULL)
122 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
127 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
128 const void *area, ssize_t len)
132 buf = &mdd_env_info(env)->mti_buf;
133 buf->lb_buf = (void *)area;
138 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
143 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146 if (buf->lb_buf == NULL) {
148 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
149 if (buf->lb_buf == NULL)
155 /** Increase the size of the \a mti_big_buf.
156 * preserves old data in buffer
157 * old buffer remains unchanged on error
158 * \retval 0 or -ENOMEM
160 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165 LASSERT(len >= oldbuf->lb_len);
166 OBD_ALLOC_LARGE(buf.lb_buf, len);
168 if (buf.lb_buf == NULL)
172 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176 memcpy(oldbuf, &buf, sizeof(buf));
181 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
182 struct mdd_device *mdd)
184 struct mdd_thread_info *mti = mdd_env_info(env);
187 max_cookie_size = mdd_lov_cookiesize(env, mdd);
188 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
189 if (mti->mti_max_cookie)
190 OBD_FREE_LARGE(mti->mti_max_cookie,
191 mti->mti_max_cookie_size);
192 mti->mti_max_cookie = NULL;
193 mti->mti_max_cookie_size = 0;
195 if (unlikely(mti->mti_max_cookie == NULL)) {
196 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
197 if (likely(mti->mti_max_cookie != NULL))
198 mti->mti_max_cookie_size = max_cookie_size;
200 if (likely(mti->mti_max_cookie != NULL))
201 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
202 return mti->mti_max_cookie;
205 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
206 struct mdd_device *mdd)
208 struct mdd_thread_info *mti = mdd_env_info(env);
211 max_lmm_size = mdd_lov_mdsize(env, mdd);
212 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
213 if (mti->mti_max_lmm)
214 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
215 mti->mti_max_lmm = NULL;
216 mti->mti_max_lmm_size = 0;
218 if (unlikely(mti->mti_max_lmm == NULL)) {
219 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
220 if (likely(mti->mti_max_lmm != NULL))
221 mti->mti_max_lmm_size = max_lmm_size;
223 return mti->mti_max_lmm;
226 struct lu_object *mdd_object_alloc(const struct lu_env *env,
227 const struct lu_object_header *hdr,
230 struct mdd_object *mdd_obj;
232 OBD_ALLOC_PTR(mdd_obj);
233 if (mdd_obj != NULL) {
236 o = mdd2lu_obj(mdd_obj);
237 lu_object_init(o, NULL, d);
238 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
239 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
240 mdd_obj->mod_count = 0;
241 o->lo_ops = &mdd_lu_obj_ops;
248 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
249 const struct lu_object_conf *unused)
251 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
252 struct mdd_object *mdd_obj = lu2mdd_obj(o);
253 struct lu_object *below;
254 struct lu_device *under;
257 mdd_obj->mod_cltime = 0;
258 under = &d->mdd_child->dd_lu_dev;
259 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
260 mdd_pdlock_init(mdd_obj);
264 lu_object_add(o, below);
269 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 if (lu_object_exists(o))
272 return mdd_get_flags(env, lu2mdd_obj(o));
277 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 struct mdd_object *mdd = lu2mdd_obj(o);
285 static int mdd_object_print(const struct lu_env *env, void *cookie,
286 lu_printer_t p, const struct lu_object *o)
288 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
289 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
290 "valid=%x, cltime="LPU64", flags=%lx)",
291 mdd, mdd->mod_count, mdd->mod_valid,
292 mdd->mod_cltime, mdd->mod_flags);
295 static const struct lu_object_operations mdd_lu_obj_ops = {
296 .loo_object_init = mdd_object_init,
297 .loo_object_start = mdd_object_start,
298 .loo_object_free = mdd_object_free,
299 .loo_object_print = mdd_object_print,
302 struct mdd_object *mdd_object_find(const struct lu_env *env,
303 struct mdd_device *d,
304 const struct lu_fid *f)
306 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
309 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
310 const char *path, struct lu_fid *fid)
313 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
314 struct mdd_object *obj;
315 struct lu_name *lname = &mdd_env_info(env)->mti_name;
320 /* temp buffer for path element */
321 buf = mdd_buf_alloc(env, PATH_MAX);
322 if (buf->lb_buf == NULL)
325 lname->ln_name = name = buf->lb_buf;
326 lname->ln_namelen = 0;
327 *f = mdd->mdd_root_fid;
334 while (*path != '/' && *path != '\0') {
342 /* find obj corresponding to fid */
343 obj = mdd_object_find(env, mdd, f);
345 GOTO(out, rc = -EREMOTE);
347 GOTO(out, rc = PTR_ERR(obj));
348 /* get child fid from parent and name */
349 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
350 mdd_object_put(env, obj);
355 lname->ln_namelen = 0;
364 /** The maximum depth that fid2path() will search.
365 * This is limited only because we want to store the fids for
366 * historical path lookup purposes.
368 #define MAX_PATH_DEPTH 100
370 /** mdd_path() lookup structure. */
371 struct path_lookup_info {
372 __u64 pli_recno; /**< history point */
373 __u64 pli_currec; /**< current record */
374 struct lu_fid pli_fid;
375 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
376 struct mdd_object *pli_mdd_obj;
377 char *pli_path; /**< full path */
379 int pli_linkno; /**< which hardlink to follow */
380 int pli_fidcount; /**< number of \a pli_fids */
383 static int mdd_path_current(const struct lu_env *env,
384 struct path_lookup_info *pli)
386 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
387 struct mdd_object *mdd_obj;
388 struct lu_buf *buf = NULL;
389 struct link_ea_header *leh;
390 struct link_ea_entry *lee;
391 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
392 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
398 ptr = pli->pli_path + pli->pli_pathlen - 1;
401 pli->pli_fidcount = 0;
402 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
405 mdd_obj = mdd_object_find(env, mdd,
406 &pli->pli_fids[pli->pli_fidcount]);
408 GOTO(out, rc = -EREMOTE);
410 GOTO(out, rc = PTR_ERR(mdd_obj));
411 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413 mdd_object_put(env, mdd_obj);
417 /* Do I need to error out here? */
422 /* Get parent fid and object name */
423 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
424 buf = mdd_links_get(env, mdd_obj);
425 mdd_read_unlock(env, mdd_obj);
426 mdd_object_put(env, mdd_obj);
428 GOTO(out, rc = PTR_ERR(buf));
431 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
432 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434 /* If set, use link #linkno for path lookup, otherwise use
435 link #0. Only do this for the final path element. */
436 if ((pli->pli_fidcount == 0) &&
437 (pli->pli_linkno < leh->leh_reccount)) {
439 for (count = 0; count < pli->pli_linkno; count++) {
440 lee = (struct link_ea_entry *)
441 ((char *)lee + reclen);
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 if (pli->pli_linkno < leh->leh_reccount - 1)
445 /* indicate to user there are more links */
449 /* Pack the name in the end of the buffer */
450 ptr -= tmpname->ln_namelen;
451 if (ptr - 1 <= pli->pli_path)
452 GOTO(out, rc = -EOVERFLOW);
453 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
456 /* Store the parent fid for historic lookup */
457 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
458 GOTO(out, rc = -EOVERFLOW);
459 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
462 /* Verify that our path hasn't changed since we started the lookup.
463 Record the current index, and verify the path resolves to the
464 same fid. If it does, then the path is correct as of this index. */
465 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
466 pli->pli_currec = mdd->mdd_cl.mc_index;
467 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
468 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
471 GOTO (out, rc = -EAGAIN);
473 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
474 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
475 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
476 PFID(&pli->pli_fid));
477 GOTO(out, rc = -EAGAIN);
479 ptr++; /* skip leading / */
480 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
484 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
485 /* if we vmalloced a large buffer drop it */
491 static int mdd_path_historic(const struct lu_env *env,
492 struct path_lookup_info *pli)
497 /* Returns the full path to this fid, as of changelog record recno. */
498 static int mdd_path(const struct lu_env *env, struct md_object *obj,
499 char *path, int pathlen, __u64 *recno, int *linkno)
501 struct path_lookup_info *pli;
509 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
518 pli->pli_mdd_obj = md2mdd_obj(obj);
519 pli->pli_recno = *recno;
520 pli->pli_path = path;
521 pli->pli_pathlen = pathlen;
522 pli->pli_linkno = *linkno;
524 /* Retry multiple times in case file is being moved */
525 while (tries-- && rc == -EAGAIN)
526 rc = mdd_path_current(env, pli);
528 /* For historical path lookup, the current links may not have existed
529 * at "recno" time. We must switch over to earlier links/parents
530 * by using the changelog records. If the earlier parent doesn't
531 * exist, we must search back through the changelog to reconstruct
532 * its parents, then check if it exists, etc.
533 * We may ignore this problem for the initial implementation and
534 * state that an "original" hardlink must still exist for us to find
535 * historic path name. */
536 if (pli->pli_recno != -1) {
537 rc = mdd_path_historic(env, pli);
539 *recno = pli->pli_currec;
540 /* Return next link index to caller */
541 *linkno = pli->pli_linkno;
549 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 struct lu_attr *la = &mdd_env_info(env)->mti_la;
555 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557 mdd_flags_xlate(obj, la->la_flags);
558 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
559 obj->mod_flags |= MNLINK_OBJ;
564 /* get only inode attributes */
565 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
571 if (ma->ma_valid & MA_INODE)
574 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
575 mdd_object_capa(env, mdd_obj));
577 ma->ma_valid |= MA_INODE;
581 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
583 struct lov_desc *ldesc;
584 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
585 struct lov_user_md *lum = (struct lov_user_md*)lmm;
591 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
592 LASSERT(ldesc != NULL);
594 lum->lmm_magic = LOV_MAGIC_V1;
595 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
596 lum->lmm_pattern = ldesc->ld_pattern;
597 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
598 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
599 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
601 RETURN(sizeof(*lum));
604 static int is_rootdir(struct mdd_object *mdd_obj)
606 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
607 const struct lu_fid *fid = mdo2fid(mdd_obj);
609 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
612 /* get lov EA only */
613 static int __mdd_lmm_get(const struct lu_env *env,
614 struct mdd_object *mdd_obj, struct md_attr *ma)
619 if (ma->ma_valid & MA_LOV)
622 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
624 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
625 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
627 ma->ma_lmm_size = rc;
628 ma->ma_valid |= MA_LOV;
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636 struct mdd_object *mdd_obj, struct md_attr *ma)
639 struct link_ea_header *leh;
640 struct link_ea_entry *lee;
641 struct lu_fid *pfid = &ma->ma_pfid;
644 if (ma->ma_valid & MA_PFID)
647 buf = mdd_links_get(env, mdd_obj);
649 RETURN(PTR_ERR(buf));
652 lee = (struct link_ea_entry *)(leh + 1);
653 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654 fid_be_to_cpu(pfid, pfid);
655 ma->ma_valid |= MA_PFID;
656 if (buf->lb_len > OBD_ALLOC_BIG)
657 /* if we vmalloced a large buffer drop it */
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
668 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669 rc = __mdd_lmm_get(env, mdd_obj, ma);
670 mdd_read_unlock(env, mdd_obj);
675 static int __mdd_lmv_get(const struct lu_env *env,
676 struct mdd_object *mdd_obj, struct md_attr *ma)
681 if (ma->ma_valid & MA_LMV)
684 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
687 ma->ma_valid |= MA_LMV;
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
696 struct mdd_thread_info *info = mdd_env_info(env);
697 struct lustre_mdt_attrs *lma =
698 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
703 /* If all needed data are already valid, nothing to do */
704 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705 (ma->ma_need & (MA_HSM | MA_SOM)))
708 /* Read LMA from disk EA */
709 lma_size = sizeof(info->mti_xattr_buf);
710 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
714 /* Useless to check LMA incompatibility because this is already done in
715 * osd_ea_fid_get(), and this will fail long before this code is
717 * So, if we are here, LMA is compatible.
720 lustre_lma_swab(lma);
722 /* Swab and copy LMA */
723 if (ma->ma_need & MA_HSM) {
724 if (lma->lma_compat & LMAC_HSM)
725 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
727 ma->ma_hsm.mh_flags = 0;
728 ma->ma_valid |= MA_HSM;
732 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733 LASSERT(ma->ma_som != NULL);
734 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735 ma->ma_som->msd_size = lma->lma_som_size;
736 ma->ma_som->msd_blocks = lma->lma_som_blocks;
737 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738 ma->ma_valid |= MA_SOM;
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
750 if (ma->ma_need & MA_INODE)
751 rc = mdd_iattr_get(env, mdd_obj, ma);
753 if (rc == 0 && ma->ma_need & MA_LOV) {
754 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755 S_ISDIR(mdd_object_type(mdd_obj)))
756 rc = __mdd_lmm_get(env, mdd_obj, ma);
758 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759 if (S_ISREG(mdd_object_type(mdd_obj)))
760 rc = mdd_pfid_get(env, mdd_obj, ma);
762 if (rc == 0 && ma->ma_need & MA_LMV) {
763 if (S_ISDIR(mdd_object_type(mdd_obj)))
764 rc = __mdd_lmv_get(env, mdd_obj, ma);
766 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767 if (S_ISREG(mdd_object_type(mdd_obj)))
768 rc = __mdd_lma_get(env, mdd_obj, ma);
770 #ifdef CONFIG_FS_POSIX_ACL
771 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772 if (S_ISDIR(mdd_object_type(mdd_obj)))
773 rc = mdd_def_acl_get(env, mdd_obj, ma);
776 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777 rc, ma->ma_valid, ma->ma_lmm);
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782 struct mdd_object *mdd_obj, struct md_attr *ma)
785 int needlock = ma->ma_need &
786 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
789 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790 rc = mdd_attr_get_internal(env, mdd_obj, ma);
792 mdd_read_unlock(env, mdd_obj);
797 * No permission check is needed.
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
802 struct mdd_object *mdd_obj = md2mdd_obj(obj);
806 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
811 * No permission check is needed.
813 static int mdd_xattr_get(const struct lu_env *env,
814 struct md_object *obj, struct lu_buf *buf,
817 struct mdd_object *mdd_obj = md2mdd_obj(obj);
822 LASSERT(mdd_object_exists(mdd_obj));
824 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825 rc = mdo_xattr_get(env, mdd_obj, buf, name,
826 mdd_object_capa(env, mdd_obj));
827 mdd_read_unlock(env, mdd_obj);
833 * Permission check is done when open,
834 * no need check again.
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
839 struct mdd_object *mdd_obj = md2mdd_obj(obj);
840 struct dt_object *next;
845 LASSERT(mdd_object_exists(mdd_obj));
847 next = mdd_object_child(mdd_obj);
848 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850 mdd_object_capa(env, mdd_obj));
851 mdd_read_unlock(env, mdd_obj);
856 * No permission check is needed.
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
861 struct mdd_object *mdd_obj = md2mdd_obj(obj);
866 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868 mdd_read_unlock(env, mdd_obj);
873 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
874 struct mdd_object *c, struct md_attr *ma,
875 struct thandle *handle,
876 const struct md_op_spec *spec)
878 struct lu_attr *attr = &ma->ma_attr;
879 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
880 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881 const struct dt_index_features *feat = spec->sp_feat;
885 if (!mdd_object_exists(c)) {
886 struct dt_object *next = mdd_object_child(c);
889 if (feat != &dt_directory_features && feat != NULL)
890 dof->dof_type = DFT_INDEX;
892 dof->dof_type = dt_mode_to_dft(attr->la_mode);
894 dof->u.dof_idx.di_feat = feat;
896 /* @hint will be initialized by underlying device. */
897 next->do_ops->do_ah_init(env, hint,
898 p ? mdd_object_child(p) : NULL,
899 attr->la_mode & S_IFMT);
901 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
902 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
910 * Make sure the ctime is increased only.
912 static inline int mdd_attr_check(const struct lu_env *env,
913 struct mdd_object *obj,
914 struct lu_attr *attr)
916 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
920 if (attr->la_valid & LA_CTIME) {
921 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
925 if (attr->la_ctime < tmp_la->la_ctime)
926 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
927 else if (attr->la_valid == LA_CTIME &&
928 attr->la_ctime == tmp_la->la_ctime)
929 attr->la_valid &= ~LA_CTIME;
934 int mdd_attr_set_internal(const struct lu_env *env,
935 struct mdd_object *obj,
936 struct lu_attr *attr,
937 struct thandle *handle,
943 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
944 #ifdef CONFIG_FS_POSIX_ACL
945 if (!rc && (attr->la_valid & LA_MODE) && needacl)
946 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
951 int mdd_attr_check_set_internal(const struct lu_env *env,
952 struct mdd_object *obj,
953 struct lu_attr *attr,
954 struct thandle *handle,
960 rc = mdd_attr_check(env, obj, attr);
965 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
969 static int mdd_attr_set_internal_locked(const struct lu_env *env,
970 struct mdd_object *obj,
971 struct lu_attr *attr,
972 struct thandle *handle,
978 needacl = needacl && (attr->la_valid & LA_MODE);
980 mdd_write_lock(env, obj, MOR_TGT_CHILD);
981 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
983 mdd_write_unlock(env, obj);
987 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
988 struct mdd_object *obj,
989 struct lu_attr *attr,
990 struct thandle *handle,
996 needacl = needacl && (attr->la_valid & LA_MODE);
998 mdd_write_lock(env, obj, MOR_TGT_CHILD);
999 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1001 mdd_write_unlock(env, obj);
1005 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1006 const struct lu_buf *buf, const char *name,
1007 int fl, struct thandle *handle)
1009 struct lustre_capa *capa = mdd_object_capa(env, obj);
1013 if (buf->lb_buf && buf->lb_len > 0)
1014 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1015 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1016 rc = mdo_xattr_del(env, obj, name, handle, capa);
1022 * This gives the same functionality as the code between
1023 * sys_chmod and inode_setattr
1024 * chown_common and inode_setattr
1025 * utimes and inode_setattr
1026 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1028 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1029 struct lu_attr *la, const struct md_attr *ma)
1031 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1032 struct md_ucred *uc;
1039 /* Do not permit change file type */
1040 if (la->la_valid & LA_TYPE)
1043 /* They should not be processed by setattr */
1044 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1047 /* export destroy does not have ->le_ses, but we may want
1048 * to drop LUSTRE_SOM_FL. */
1054 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1058 if (la->la_valid == LA_CTIME) {
1059 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1060 /* This is only for set ctime when rename's source is
1062 rc = mdd_may_delete(env, NULL, obj,
1063 (struct md_attr *)ma, 1, 0);
1064 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1065 la->la_valid &= ~LA_CTIME;
1069 if (la->la_valid == LA_ATIME) {
1070 /* This is atime only set for read atime update on close. */
1071 if (la->la_atime >= tmp_la->la_atime &&
1072 la->la_atime < (tmp_la->la_atime +
1073 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1074 la->la_valid &= ~LA_ATIME;
1078 /* Check if flags change. */
1079 if (la->la_valid & LA_FLAGS) {
1080 unsigned int oldflags = 0;
1081 unsigned int newflags = la->la_flags &
1082 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1084 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1085 !mdd_capable(uc, CFS_CAP_FOWNER))
1088 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1089 * only be changed by the relevant capability. */
1090 if (mdd_is_immutable(obj))
1091 oldflags |= LUSTRE_IMMUTABLE_FL;
1092 if (mdd_is_append(obj))
1093 oldflags |= LUSTRE_APPEND_FL;
1094 if ((oldflags ^ newflags) &&
1095 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1098 if (!S_ISDIR(tmp_la->la_mode))
1099 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1102 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1103 (la->la_valid & ~LA_FLAGS) &&
1104 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1107 /* Check for setting the obj time. */
1108 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1109 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1110 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1112 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1120 if (la->la_valid & LA_KILL_SUID) {
1121 la->la_valid &= ~LA_KILL_SUID;
1122 if ((tmp_la->la_mode & S_ISUID) &&
1123 !(la->la_valid & LA_MODE)) {
1124 la->la_mode = tmp_la->la_mode;
1125 la->la_valid |= LA_MODE;
1127 la->la_mode &= ~S_ISUID;
1130 if (la->la_valid & LA_KILL_SGID) {
1131 la->la_valid &= ~LA_KILL_SGID;
1132 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1133 (S_ISGID | S_IXGRP)) &&
1134 !(la->la_valid & LA_MODE)) {
1135 la->la_mode = tmp_la->la_mode;
1136 la->la_valid |= LA_MODE;
1138 la->la_mode &= ~S_ISGID;
1141 /* Make sure a caller can chmod. */
1142 if (la->la_valid & LA_MODE) {
1143 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1144 (uc->mu_fsuid != tmp_la->la_uid) &&
1145 !mdd_capable(uc, CFS_CAP_FOWNER))
1148 if (la->la_mode == (cfs_umode_t) -1)
1149 la->la_mode = tmp_la->la_mode;
1151 la->la_mode = (la->la_mode & S_IALLUGO) |
1152 (tmp_la->la_mode & ~S_IALLUGO);
1154 /* Also check the setgid bit! */
1155 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1156 la->la_gid : tmp_la->la_gid) &&
1157 !mdd_capable(uc, CFS_CAP_FSETID))
1158 la->la_mode &= ~S_ISGID;
1160 la->la_mode = tmp_la->la_mode;
1163 /* Make sure a caller can chown. */
1164 if (la->la_valid & LA_UID) {
1165 if (la->la_uid == (uid_t) -1)
1166 la->la_uid = tmp_la->la_uid;
1167 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168 (la->la_uid != tmp_la->la_uid)) &&
1169 !mdd_capable(uc, CFS_CAP_CHOWN))
1172 /* If the user or group of a non-directory has been
1173 * changed by a non-root user, remove the setuid bit.
1174 * 19981026 David C Niemi <niemi@tux.org>
1176 * Changed this to apply to all users, including root,
1177 * to avoid some races. This is the behavior we had in
1178 * 2.0. The check for non-root was definitely wrong
1179 * for 2.2 anyway, as it should have been using
1180 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1181 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1182 !S_ISDIR(tmp_la->la_mode)) {
1183 la->la_mode &= ~S_ISUID;
1184 la->la_valid |= LA_MODE;
1188 /* Make sure caller can chgrp. */
1189 if (la->la_valid & LA_GID) {
1190 if (la->la_gid == (gid_t) -1)
1191 la->la_gid = tmp_la->la_gid;
1192 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193 ((la->la_gid != tmp_la->la_gid) &&
1194 !lustre_in_group_p(uc, la->la_gid))) &&
1195 !mdd_capable(uc, CFS_CAP_CHOWN))
1198 /* Likewise, if the user or group of a non-directory
1199 * has been changed by a non-root user, remove the
1200 * setgid bit UNLESS there is no group execute bit
1201 * (this would be a file marked for mandatory
1202 * locking). 19981026 David C Niemi <niemi@tux.org>
1204 * Removed the fsuid check (see the comment above) --
1206 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1207 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1208 la->la_mode &= ~S_ISGID;
1209 la->la_valid |= LA_MODE;
1213 /* For both Size-on-MDS case and truncate case,
1214 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1215 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1216 * For SOM case, it is true, the MAY_WRITE perm has been checked
1217 * when open, no need check again. For truncate case, it is false,
1218 * the MAY_WRITE perm should be checked here. */
1219 if (ma->ma_attr_flags & MDS_SOM) {
1220 /* For the "Size-on-MDS" setattr update, merge coming
1221 * attributes with the set in the inode. BUG 10641 */
1222 if ((la->la_valid & LA_ATIME) &&
1223 (la->la_atime <= tmp_la->la_atime))
1224 la->la_valid &= ~LA_ATIME;
1226 /* OST attributes do not have a priority over MDS attributes,
1227 * so drop times if ctime is equal. */
1228 if ((la->la_valid & LA_CTIME) &&
1229 (la->la_ctime <= tmp_la->la_ctime))
1230 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1232 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1233 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1234 (uc->mu_fsuid == tmp_la->la_uid)) &&
1235 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1236 rc = mdd_permission_internal_locked(env, obj,
1243 if (la->la_valid & LA_CTIME) {
1244 /* The pure setattr, it has the priority over what is
1245 * already set, do not drop it if ctime is equal. */
1246 if (la->la_ctime < tmp_la->la_ctime)
1247 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1255 /** Store a data change changelog record
1256 * If this fails, we must fail the whole transaction; we don't
1257 * want the change to commit without the log entry.
1258 * \param mdd_obj - mdd_object of change
1259 * \param handle - transacion handle
1261 static int mdd_changelog_data_store(const struct lu_env *env,
1262 struct mdd_device *mdd,
1263 enum changelog_rec_type type,
1265 struct mdd_object *mdd_obj,
1266 struct thandle *handle)
1268 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1269 struct llog_changelog_rec *rec;
1275 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1277 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1280 LASSERT(handle != NULL);
1281 LASSERT(mdd_obj != NULL);
1283 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1284 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1285 /* Don't need multiple updates in this log */
1286 /* Don't check under lock - no big deal if we get an extra
1291 reclen = llog_data_len(sizeof(*rec));
1292 buf = mdd_buf_alloc(env, reclen);
1293 if (buf->lb_buf == NULL)
1295 rec = (struct llog_changelog_rec *)buf->lb_buf;
1297 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1298 rec->cr.cr_type = (__u32)type;
1299 rec->cr.cr_tfid = *tfid;
1300 rec->cr.cr_namelen = 0;
1301 mdd_obj->mod_cltime = cfs_time_current_64();
1303 rc = mdd_changelog_llog_write(mdd, rec, handle);
1305 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1306 rc, type, PFID(tfid));
1313 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1314 int flags, struct md_object *obj)
1316 struct thandle *handle;
1317 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1318 struct mdd_device *mdd = mdo2mdd(obj);
1322 handle = mdd_trans_start(env, mdd);
1325 return(PTR_ERR(handle));
1327 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1330 mdd_trans_stop(env, mdd, rc, handle);
1336 * Should be called with write lock held.
1338 * \see mdd_lma_set_locked().
1340 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1341 const struct md_attr *ma, struct thandle *handle)
1343 struct mdd_thread_info *info = mdd_env_info(env);
1345 struct lustre_mdt_attrs *lma =
1346 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1347 int lmasize = sizeof(struct lustre_mdt_attrs);
1352 /* Either HSM or SOM part is not valid, we need to read it before */
1353 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1354 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1358 lustre_lma_swab(lma);
1360 memset(lma, 0, lmasize);
1364 if (ma->ma_valid & MA_HSM) {
1365 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1366 lma->lma_compat |= LMAC_HSM;
1370 if (ma->ma_valid & MA_SOM) {
1371 LASSERT(ma->ma_som != NULL);
1372 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1373 lma->lma_compat &= ~LMAC_SOM;
1375 lma->lma_compat |= LMAC_SOM;
1376 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1377 lma->lma_som_size = ma->ma_som->msd_size;
1378 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1379 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1384 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1386 lustre_lma_swab(lma);
1387 buf = mdd_buf_get(env, lma, lmasize);
1388 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1394 * Save LMA extended attributes with data from \a ma.
1396 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1397 * not, LMA EA will be first read from disk, modified and write back.
1400 static int mdd_lma_set_locked(const struct lu_env *env,
1401 struct mdd_object *mdd_obj,
1402 const struct md_attr *ma, struct thandle *handle)
1406 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1407 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1408 mdd_write_unlock(env, mdd_obj);
1412 /* Precedence for choosing record type when multiple
1413 * attributes change: setattr > mtime > ctime > atime
1414 * (ctime changes when mtime does, plus chmod/chown.
1415 * atime and ctime are independent.) */
1416 static int mdd_attr_set_changelog(const struct lu_env *env,
1417 struct md_object *obj, struct thandle *handle,
1420 struct mdd_device *mdd = mdo2mdd(obj);
1423 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1424 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1425 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1426 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1427 bits = bits & mdd->mdd_cl.mc_mask;
1431 /* The record type is the lowest non-masked set bit */
1432 while (bits && ((bits & 1) == 0)) {
1437 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1438 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1439 md2mdd_obj(obj), handle);
1442 /* set attr and LOV EA at once, return updated attr */
1443 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1444 const struct md_attr *ma)
1446 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1447 struct mdd_device *mdd = mdo2mdd(obj);
1448 struct thandle *handle;
1449 struct lov_mds_md *lmm = NULL;
1450 struct llog_cookie *logcookies = NULL;
1451 int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1452 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1453 struct obd_device *obd = mdd->mdd_obd_dev;
1454 struct mds_obd *mds = &obd->u.mds;
1455 #ifdef HAVE_QUOTA_SUPPORT
1456 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1457 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1458 int quota_opc = 0, block_count = 0;
1459 int inode_pending[MAXQUOTAS] = { 0, 0 };
1460 int block_pending[MAXQUOTAS] = { 0, 0 };
1464 *la_copy = ma->ma_attr;
1465 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1469 /* setattr on "close" only change atime, or do nothing */
1470 if (ma->ma_valid == MA_INODE &&
1471 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1474 /*TODO: add lock here*/
1475 /* start a log jounal handle if needed */
1476 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1477 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1478 lmm_size = mdd_lov_mdsize(env, mdd);
1479 lmm = mdd_max_lmm_get(env, mdd);
1481 GOTO(no_trans, rc = -ENOMEM);
1483 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1491 if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1492 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1493 lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1496 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1497 MDD_TXN_ATTR_SET_OP, chlog_cnt);
1498 handle = mdd_trans_start(env, mdd);
1500 GOTO(no_trans, rc = PTR_ERR(handle));
1502 /* permission changes may require sync operation */
1503 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1504 handle->th_sync |= mdd->mdd_sync_permission;
1506 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1507 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1508 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1510 #ifdef HAVE_QUOTA_SUPPORT
1511 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1512 struct obd_export *exp = md_quota(env)->mq_exp;
1513 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1515 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1517 quota_opc = FSFILT_OP_SETATTR;
1518 mdd_quota_wrapper(la_copy, qnids);
1519 mdd_quota_wrapper(la_tmp, qoids);
1520 /* get file quota for new owner */
1521 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1522 qnids, inode_pending, 1, NULL, 0,
1524 block_count = (la_tmp->la_blocks + 7) >> 3;
1527 mdd_data_get(env, mdd_obj, &data);
1528 /* get block quota for new owner */
1529 lquota_chkquota(mds_quota_interface_ref, obd,
1530 exp, qnids, block_pending,
1532 LQUOTA_FLAGS_BLK, data, 1);
1538 if (la_copy->la_valid & LA_FLAGS) {
1539 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1542 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1543 } else if (la_copy->la_valid) { /* setattr */
1544 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1546 /* journal chown/chgrp in llog, just like unlink */
1547 if (rc == 0 && lmm_size){
1548 cookie_size = mdd_lov_cookiesize(env, mdd);
1549 logcookies = mdd_max_cookie_get(env, mdd);
1550 if (logcookies == NULL)
1551 GOTO(cleanup, rc = -ENOMEM);
1553 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1554 logcookies, cookie_size) <= 0)
1559 if (rc == 0 && ma->ma_valid & MA_LOV) {
1562 mode = mdd_object_type(mdd_obj);
1563 if (S_ISREG(mode) || S_ISDIR(mode)) {
1564 rc = mdd_lsm_sanity_check(env, mdd_obj);
1568 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1569 ma->ma_lmm_size, handle, 1);
1573 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1576 mode = mdd_object_type(mdd_obj);
1578 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1583 rc = mdd_attr_set_changelog(env, obj, handle,
1584 ma->ma_attr.la_valid);
1585 mdd_trans_stop(env, mdd, rc, handle);
1587 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1588 /*set obd attr, if needed*/
1589 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1592 #ifdef HAVE_QUOTA_SUPPORT
1594 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1596 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1598 /* Trigger dqrel/dqacq for original owner and new owner.
1599 * If failed, the next call for lquota_chkquota will
1601 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1608 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1609 const struct lu_buf *buf, const char *name, int fl,
1610 struct thandle *handle)
1615 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1616 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1617 mdd_write_unlock(env, obj);
1622 static int mdd_xattr_sanity_check(const struct lu_env *env,
1623 struct mdd_object *obj)
1625 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1626 struct md_ucred *uc = md_ucred(env);
1630 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1633 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1637 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1638 !mdd_capable(uc, CFS_CAP_FOWNER))
1645 * The caller should guarantee to update the object ctime
1646 * after xattr_set if needed.
1648 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1649 const struct lu_buf *buf, const char *name,
1652 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1653 struct mdd_device *mdd = mdo2mdd(obj);
1654 struct thandle *handle;
1658 rc = mdd_xattr_sanity_check(env, mdd_obj);
1662 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1663 handle = mdd_trans_start(env, mdd);
1665 RETURN(PTR_ERR(handle));
1667 /* security-replated changes may require sync */
1668 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1669 handle->th_sync |= mdd->mdd_sync_permission;
1671 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1673 /* Only record system & user xattr changes */
1674 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1675 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1676 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1677 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1678 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1679 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1680 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1682 mdd_trans_stop(env, mdd, rc, handle);
1688 * The caller should guarantee to update the object ctime
1689 * after xattr_set if needed.
1691 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1694 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1695 struct mdd_device *mdd = mdo2mdd(obj);
1696 struct thandle *handle;
1700 rc = mdd_xattr_sanity_check(env, mdd_obj);
1704 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1705 handle = mdd_trans_start(env, mdd);
1707 RETURN(PTR_ERR(handle));
1709 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1710 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1711 mdd_object_capa(env, mdd_obj));
1712 mdd_write_unlock(env, mdd_obj);
1714 /* Only record system & user xattr changes */
1715 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1716 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1717 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1718 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1719 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1720 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1721 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1724 mdd_trans_stop(env, mdd, rc, handle);
1729 /* partial unlink */
1730 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1733 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1734 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1735 struct mdd_device *mdd = mdo2mdd(obj);
1736 struct thandle *handle;
1737 #ifdef HAVE_QUOTA_SUPPORT
1738 struct obd_device *obd = mdd->mdd_obd_dev;
1739 struct mds_obd *mds = &obd->u.mds;
1740 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1747 * Check -ENOENT early here because we need to get object type
1748 * to calculate credits before transaction start
1750 if (!mdd_object_exists(mdd_obj))
1753 LASSERT(mdd_object_exists(mdd_obj) > 0);
1755 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1759 handle = mdd_trans_start(env, mdd);
1763 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1765 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1769 __mdd_ref_del(env, mdd_obj, handle, 0);
1771 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1773 __mdd_ref_del(env, mdd_obj, handle, 1);
1776 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1777 la_copy->la_ctime = ma->ma_attr.la_ctime;
1779 la_copy->la_valid = LA_CTIME;
1780 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1784 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1785 #ifdef HAVE_QUOTA_SUPPORT
1786 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1787 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1788 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1789 mdd_quota_wrapper(&ma->ma_attr, qids);
1796 mdd_write_unlock(env, mdd_obj);
1797 mdd_trans_stop(env, mdd, rc, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1800 /* Trigger dqrel on the owner of child. If failed,
1801 * the next call for lquota_chkquota will process it */
1802 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1808 /* partial operation */
1809 static int mdd_oc_sanity_check(const struct lu_env *env,
1810 struct mdd_object *obj,
1816 switch (ma->ma_attr.la_mode & S_IFMT) {
1833 static int mdd_object_create(const struct lu_env *env,
1834 struct md_object *obj,
1835 const struct md_op_spec *spec,
1839 struct mdd_device *mdd = mdo2mdd(obj);
1840 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1841 const struct lu_fid *pfid = spec->u.sp_pfid;
1842 struct thandle *handle;
1843 #ifdef HAVE_QUOTA_SUPPORT
1844 struct obd_device *obd = mdd->mdd_obd_dev;
1845 struct obd_export *exp = md_quota(env)->mq_exp;
1846 struct mds_obd *mds = &obd->u.mds;
1847 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1848 int quota_opc = 0, block_count = 0;
1849 int inode_pending[MAXQUOTAS] = { 0, 0 };
1850 int block_pending[MAXQUOTAS] = { 0, 0 };
1855 #ifdef HAVE_QUOTA_SUPPORT
1856 if (mds->mds_quota) {
1857 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1858 mdd_quota_wrapper(&ma->ma_attr, qids);
1859 /* get file quota for child */
1860 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861 qids, inode_pending, 1, NULL, 0,
1863 switch (ma->ma_attr.la_mode & S_IFMT) {
1872 /* get block quota for child */
1874 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1875 qids, block_pending, block_count,
1876 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1880 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1881 handle = mdd_trans_start(env, mdd);
1883 GOTO(out_pending, rc = PTR_ERR(handle));
1885 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1886 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1890 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1894 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1895 /* If creating the slave object, set slave EA here. */
1896 int lmv_size = spec->u.sp_ea.eadatalen;
1897 struct lmv_stripe_md *lmv;
1899 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1900 LASSERT(lmv != NULL && lmv_size > 0);
1902 rc = __mdd_xattr_set(env, mdd_obj,
1903 mdd_buf_get_const(env, lmv, lmv_size),
1904 XATTR_NAME_LMV, 0, handle);
1908 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1911 #ifdef CONFIG_FS_POSIX_ACL
1912 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1913 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1915 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1916 buf->lb_len = spec->u.sp_ea.eadatalen;
1917 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1918 rc = __mdd_acl_init(env, mdd_obj, buf,
1919 &ma->ma_attr.la_mode,
1924 ma->ma_attr.la_valid |= LA_MODE;
1927 pfid = spec->u.sp_ea.fid;
1930 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1936 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1937 mdd_write_unlock(env, mdd_obj);
1939 mdd_trans_stop(env, mdd, rc, handle);
1941 #ifdef HAVE_QUOTA_SUPPORT
1943 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1945 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1947 /* Trigger dqacq on the owner of child. If failed,
1948 * the next call for lquota_chkquota will process it. */
1949 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1957 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1958 const struct md_attr *ma)
1960 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1961 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1962 struct mdd_device *mdd = mdo2mdd(obj);
1963 struct thandle *handle;
1967 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1968 handle = mdd_trans_start(env, mdd);
1972 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1973 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1975 __mdd_ref_add(env, mdd_obj, handle);
1976 mdd_write_unlock(env, mdd_obj);
1978 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1979 la_copy->la_ctime = ma->ma_attr.la_ctime;
1981 la_copy->la_valid = LA_CTIME;
1982 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1985 mdd_trans_stop(env, mdd, 0, handle);
1991 * do NOT or the MAY_*'s, you'll get the weakest
1993 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1997 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1998 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1999 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2000 * owner can write to a file even if it is marked readonly to hide
2001 * its brokenness. (bug 5781) */
2002 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2003 struct md_ucred *uc = md_ucred(env);
2005 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2006 (la->la_uid == uc->mu_fsuid))
2010 if (flags & FMODE_READ)
2012 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2014 if (flags & MDS_FMODE_EXEC)
2019 static int mdd_open_sanity_check(const struct lu_env *env,
2020 struct mdd_object *obj, int flag)
2022 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2027 if (mdd_is_dead_obj(obj))
2030 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2034 if (S_ISLNK(tmp_la->la_mode))
2037 mode = accmode(env, tmp_la, flag);
2039 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2042 if (!(flag & MDS_OPEN_CREATED)) {
2043 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2048 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2049 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2050 flag &= ~MDS_OPEN_TRUNC;
2052 /* For writing append-only file must open it with append mode. */
2053 if (mdd_is_append(obj)) {
2054 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2056 if (flag & MDS_OPEN_TRUNC)
2062 * Now, flag -- O_NOATIME does not be packed by client.
2064 if (flag & O_NOATIME) {
2065 struct md_ucred *uc = md_ucred(env);
2067 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2068 (uc->mu_valid == UCRED_NEW)) &&
2069 (uc->mu_fsuid != tmp_la->la_uid) &&
2070 !mdd_capable(uc, CFS_CAP_FOWNER))
2078 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2081 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2084 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2086 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2088 mdd_obj->mod_count++;
2090 mdd_write_unlock(env, mdd_obj);
2094 /* return md_attr back,
2095 * if it is last unlink then return lov ea + llog cookie*/
2096 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2102 if (S_ISREG(mdd_object_type(obj))) {
2103 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2104 * Caller must be ready for that. */
2106 rc = __mdd_lmm_get(env, obj, ma);
2107 if ((ma->ma_valid & MA_LOV))
2108 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2115 * No permission check is needed.
2117 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2120 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2121 struct mdd_device *mdd = mdo2mdd(obj);
2122 struct thandle *handle = NULL;
2126 #ifdef HAVE_QUOTA_SUPPORT
2127 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2128 struct mds_obd *mds = &obd->u.mds;
2129 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2134 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2135 mdd_obj->mod_count--;
2137 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2138 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2139 "list\n", PFID(mdd_object_fid(mdd_obj)));
2143 /* check without any lock */
2144 if (mdd_obj->mod_count == 1 &&
2145 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2147 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2150 handle = mdd_trans_start(env, mdo2mdd(obj));
2152 RETURN(PTR_ERR(handle));
2155 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2156 if (handle == NULL && mdd_obj->mod_count == 1 &&
2157 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2158 mdd_write_unlock(env, mdd_obj);
2162 /* release open count */
2163 mdd_obj->mod_count --;
2165 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2166 /* remove link to object from orphan index */
2167 rc = __mdd_orphan_del(env, mdd_obj, handle);
2169 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2170 "list, OSS objects to be destroyed.\n",
2171 PFID(mdd_object_fid(mdd_obj)));
2173 CERROR("Object "DFID" can not be deleted from orphan "
2174 "list, maybe cause OST objects can not be "
2175 "destroyed (err: %d).\n",
2176 PFID(mdd_object_fid(mdd_obj)), rc);
2177 /* If object was not deleted from orphan list, do not
2178 * destroy OSS objects, which will be done when next
2184 rc = mdd_iattr_get(env, mdd_obj, ma);
2185 /* Object maybe not in orphan list originally, it is rare case for
2186 * mdd_finish_unlink() failure. */
2187 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2188 #ifdef HAVE_QUOTA_SUPPORT
2189 if (mds->mds_quota) {
2190 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2191 mdd_quota_wrapper(&ma->ma_attr, qids);
2194 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2195 if (ma->ma_valid & MA_FLAGS &&
2196 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2197 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2199 rc = mdd_object_kill(env, mdd_obj, ma);
2205 CERROR("Error when prepare to delete Object "DFID" , "
2206 "which will cause OST objects can not be "
2207 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2213 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2215 mdd_write_unlock(env, mdd_obj);
2217 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2218 #ifdef HAVE_QUOTA_SUPPORT
2220 /* Trigger dqrel on the owner of child. If failed,
2221 * the next call for lquota_chkquota will process it */
2222 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2229 * Permission check is done when open,
2230 * no need check again.
2232 static int mdd_readpage_sanity_check(const struct lu_env *env,
2233 struct mdd_object *obj)
2235 struct dt_object *next = mdd_object_child(obj);
2239 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2247 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2248 struct lu_dirpage *dp, int nob,
2249 const struct dt_it_ops *iops, struct dt_it *it,
2255 struct lu_dirent *ent;
2256 struct lu_dirent *last = NULL;
2259 memset(area, 0, sizeof (*dp));
2260 area += sizeof (*dp);
2261 nob -= sizeof (*dp);
2268 len = iops->key_size(env, it);
2270 /* IAM iterator can return record with zero len. */
2274 hash = iops->store(env, it);
2275 if (unlikely(first)) {
2277 dp->ldp_hash_start = cpu_to_le64(hash);
2280 /* calculate max space required for lu_dirent */
2281 recsize = lu_dirent_calc_size(len, attr);
2283 if (nob >= recsize) {
2284 result = iops->rec(env, it, ent, attr);
2285 if (result == -ESTALE)
2290 /* osd might not able to pack all attributes,
2291 * so recheck rec length */
2292 recsize = le16_to_cpu(ent->lde_reclen);
2294 result = (last != NULL) ? 0 :-EINVAL;
2298 ent = (void *)ent + recsize;
2302 result = iops->next(env, it);
2303 if (result == -ESTALE)
2305 } while (result == 0);
2308 dp->ldp_hash_end = cpu_to_le64(hash);
2310 if (last->lde_hash == dp->ldp_hash_end)
2311 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2312 last->lde_reclen = 0; /* end mark */
2317 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2318 const struct lu_rdpg *rdpg)
2321 struct dt_object *next = mdd_object_child(obj);
2322 const struct dt_it_ops *iops;
2324 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2330 LASSERT(rdpg->rp_pages != NULL);
2331 LASSERT(next->do_index_ops != NULL);
2333 if (rdpg->rp_count <= 0)
2337 * iterate through directory and fill pages from @rdpg
2339 iops = &next->do_index_ops->dio_it;
2340 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2344 rc = iops->load(env, it, rdpg->rp_hash);
2348 * Iterator didn't find record with exactly the key requested.
2350 * It is currently either
2352 * - positioned above record with key less than
2353 * requested---skip it.
2355 * - or not positioned at all (is in IAM_IT_SKEWED
2356 * state)---position it on the next item.
2358 rc = iops->next(env, it);
2363 * At this point and across for-loop:
2365 * rc == 0 -> ok, proceed.
2366 * rc > 0 -> end of directory.
2369 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2370 i++, nob -= CFS_PAGE_SIZE) {
2371 struct lu_dirpage *dp;
2373 LASSERT(i < rdpg->rp_npages);
2374 pg = rdpg->rp_pages[i];
2376 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2379 rc = mdd_dir_page_build(env, mdd, dp,
2380 min_t(int, nob, LU_PAGE_SIZE),
2381 iops, it, rdpg->rp_attrs);
2386 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2388 } else if (rc < 0) {
2389 CWARN("build page failed: %d!\n", rc);
2392 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2393 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2394 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2401 struct lu_dirpage *dp;
2403 dp = cfs_kmap(rdpg->rp_pages[0]);
2404 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2407 * No pages were processed, mark this for first page
2410 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2413 cfs_kunmap(rdpg->rp_pages[0]);
2415 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2418 iops->fini(env, it);
2423 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2424 const struct lu_rdpg *rdpg)
2426 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2430 LASSERT(mdd_object_exists(mdd_obj));
2432 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2433 rc = mdd_readpage_sanity_check(env, mdd_obj);
2435 GOTO(out_unlock, rc);
2437 if (mdd_is_dead_obj(mdd_obj)) {
2439 struct lu_dirpage *dp;
2442 * According to POSIX, please do not return any entry to client:
2443 * even dot and dotdot should not be returned.
2445 CWARN("readdir from dead object: "DFID"\n",
2446 PFID(mdd_object_fid(mdd_obj)));
2448 if (rdpg->rp_count <= 0)
2449 GOTO(out_unlock, rc = -EFAULT);
2450 LASSERT(rdpg->rp_pages != NULL);
2452 pg = rdpg->rp_pages[0];
2453 dp = (struct lu_dirpage*)cfs_kmap(pg);
2454 memset(dp, 0 , sizeof(struct lu_dirpage));
2455 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2456 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2457 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2459 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2462 rc = __mdd_readpage(env, mdd_obj, rdpg);
2466 mdd_read_unlock(env, mdd_obj);
2470 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2472 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2473 struct dt_object *next;
2475 LASSERT(mdd_object_exists(mdd_obj));
2476 next = mdd_object_child(mdd_obj);
2477 return next->do_ops->do_object_sync(env, next);
2480 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2481 struct md_object *obj)
2483 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2485 LASSERT(mdd_object_exists(mdd_obj));
2486 return do_version_get(env, mdd_object_child(mdd_obj));
2489 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2490 dt_obj_version_t version)
2492 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2494 LASSERT(mdd_object_exists(mdd_obj));
2495 do_version_set(env, mdd_object_child(mdd_obj), version);
2498 const struct md_object_operations mdd_obj_ops = {
2499 .moo_permission = mdd_permission,
2500 .moo_attr_get = mdd_attr_get,
2501 .moo_attr_set = mdd_attr_set,
2502 .moo_xattr_get = mdd_xattr_get,
2503 .moo_xattr_set = mdd_xattr_set,
2504 .moo_xattr_list = mdd_xattr_list,
2505 .moo_xattr_del = mdd_xattr_del,
2506 .moo_object_create = mdd_object_create,
2507 .moo_ref_add = mdd_ref_add,
2508 .moo_ref_del = mdd_ref_del,
2509 .moo_open = mdd_open,
2510 .moo_close = mdd_close,
2511 .moo_readpage = mdd_readpage,
2512 .moo_readlink = mdd_readlink,
2513 .moo_changelog = mdd_changelog,
2514 .moo_capa_get = mdd_capa_get,
2515 .moo_object_sync = mdd_object_sync,
2516 .moo_version_get = mdd_version_get,
2517 .moo_version_set = mdd_version_set,
2518 .moo_path = mdd_path,
2519 .moo_file_lock = mdd_file_lock,
2520 .moo_file_unlock = mdd_file_unlock,