1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
63 #include "mdd_internal.h"
65 static const struct lu_object_operations mdd_lu_obj_ops;
67 static int mdd_xattr_get(const struct lu_env *env,
68 struct md_object *obj, struct lu_buf *buf,
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
74 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
75 PFID(mdd_object_fid(obj)));
76 mdo_data_get(env, obj, data);
80 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
81 struct lu_attr *la, struct lustre_capa *capa)
83 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
84 PFID(mdd_object_fid(obj)));
85 return mdo_attr_get(env, obj, la, capa);
88 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92 if (flags & LUSTRE_APPEND_FL)
93 obj->mod_flags |= APPEND_OBJ;
95 if (flags & LUSTRE_IMMUTABLE_FL)
96 obj->mod_flags |= IMMUTE_OBJ;
99 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 struct mdd_thread_info *info;
103 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
104 LASSERT(info != NULL);
108 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
112 buf = &mdd_env_info(env)->mti_buf;
118 void mdd_buf_put(struct lu_buf *buf)
120 if (buf == NULL || buf->lb_buf == NULL)
122 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
127 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
128 const void *area, ssize_t len)
132 buf = &mdd_env_info(env)->mti_buf;
133 buf->lb_buf = (void *)area;
138 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
143 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146 if (buf->lb_buf == NULL) {
148 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
149 if (buf->lb_buf == NULL)
155 /** Increase the size of the \a mti_big_buf.
156 * preserves old data in buffer
157 * old buffer remains unchanged on error
158 * \retval 0 or -ENOMEM
160 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165 LASSERT(len >= oldbuf->lb_len);
166 OBD_ALLOC_LARGE(buf.lb_buf, len);
168 if (buf.lb_buf == NULL)
172 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176 memcpy(oldbuf, &buf, sizeof(buf));
181 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
182 struct mdd_device *mdd)
184 struct mdd_thread_info *mti = mdd_env_info(env);
187 max_cookie_size = mdd_lov_cookiesize(env, mdd);
188 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
189 if (mti->mti_max_cookie)
190 OBD_FREE_LARGE(mti->mti_max_cookie,
191 mti->mti_max_cookie_size);
192 mti->mti_max_cookie = NULL;
193 mti->mti_max_cookie_size = 0;
195 if (unlikely(mti->mti_max_cookie == NULL)) {
196 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
197 if (likely(mti->mti_max_cookie != NULL))
198 mti->mti_max_cookie_size = max_cookie_size;
200 if (likely(mti->mti_max_cookie != NULL))
201 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
202 return mti->mti_max_cookie;
205 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
206 struct mdd_device *mdd)
208 struct mdd_thread_info *mti = mdd_env_info(env);
211 max_lmm_size = mdd_lov_mdsize(env, mdd);
212 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
213 if (mti->mti_max_lmm)
214 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
215 mti->mti_max_lmm = NULL;
216 mti->mti_max_lmm_size = 0;
218 if (unlikely(mti->mti_max_lmm == NULL)) {
219 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
220 if (likely(mti->mti_max_lmm != NULL))
221 mti->mti_max_lmm_size = max_lmm_size;
223 return mti->mti_max_lmm;
226 struct lu_object *mdd_object_alloc(const struct lu_env *env,
227 const struct lu_object_header *hdr,
230 struct mdd_object *mdd_obj;
232 OBD_ALLOC_PTR(mdd_obj);
233 if (mdd_obj != NULL) {
236 o = mdd2lu_obj(mdd_obj);
237 lu_object_init(o, NULL, d);
238 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
239 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
240 mdd_obj->mod_count = 0;
241 o->lo_ops = &mdd_lu_obj_ops;
248 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
249 const struct lu_object_conf *unused)
251 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
252 struct mdd_object *mdd_obj = lu2mdd_obj(o);
253 struct lu_object *below;
254 struct lu_device *under;
257 mdd_obj->mod_cltime = 0;
258 under = &d->mdd_child->dd_lu_dev;
259 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
260 mdd_pdlock_init(mdd_obj);
264 lu_object_add(o, below);
269 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 if (lu_object_exists(o))
272 return mdd_get_flags(env, lu2mdd_obj(o));
277 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 struct mdd_object *mdd = lu2mdd_obj(o);
285 static int mdd_object_print(const struct lu_env *env, void *cookie,
286 lu_printer_t p, const struct lu_object *o)
288 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
289 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
290 "valid=%x, cltime="LPU64", flags=%lx)",
291 mdd, mdd->mod_count, mdd->mod_valid,
292 mdd->mod_cltime, mdd->mod_flags);
295 static const struct lu_object_operations mdd_lu_obj_ops = {
296 .loo_object_init = mdd_object_init,
297 .loo_object_start = mdd_object_start,
298 .loo_object_free = mdd_object_free,
299 .loo_object_print = mdd_object_print,
302 struct mdd_object *mdd_object_find(const struct lu_env *env,
303 struct mdd_device *d,
304 const struct lu_fid *f)
306 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
309 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
310 const char *path, struct lu_fid *fid)
313 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
314 struct mdd_object *obj;
315 struct lu_name *lname = &mdd_env_info(env)->mti_name;
320 /* temp buffer for path element */
321 buf = mdd_buf_alloc(env, PATH_MAX);
322 if (buf->lb_buf == NULL)
325 lname->ln_name = name = buf->lb_buf;
326 lname->ln_namelen = 0;
327 *f = mdd->mdd_root_fid;
334 while (*path != '/' && *path != '\0') {
342 /* find obj corresponding to fid */
343 obj = mdd_object_find(env, mdd, f);
345 GOTO(out, rc = -EREMOTE);
347 GOTO(out, rc = PTR_ERR(obj));
348 /* get child fid from parent and name */
349 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
350 mdd_object_put(env, obj);
355 lname->ln_namelen = 0;
364 /** The maximum depth that fid2path() will search.
365 * This is limited only because we want to store the fids for
366 * historical path lookup purposes.
368 #define MAX_PATH_DEPTH 100
370 /** mdd_path() lookup structure. */
371 struct path_lookup_info {
372 __u64 pli_recno; /**< history point */
373 __u64 pli_currec; /**< current record */
374 struct lu_fid pli_fid;
375 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
376 struct mdd_object *pli_mdd_obj;
377 char *pli_path; /**< full path */
379 int pli_linkno; /**< which hardlink to follow */
380 int pli_fidcount; /**< number of \a pli_fids */
383 static int mdd_path_current(const struct lu_env *env,
384 struct path_lookup_info *pli)
386 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
387 struct mdd_object *mdd_obj;
388 struct lu_buf *buf = NULL;
389 struct link_ea_header *leh;
390 struct link_ea_entry *lee;
391 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
392 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
398 ptr = pli->pli_path + pli->pli_pathlen - 1;
401 pli->pli_fidcount = 0;
402 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
405 mdd_obj = mdd_object_find(env, mdd,
406 &pli->pli_fids[pli->pli_fidcount]);
408 GOTO(out, rc = -EREMOTE);
410 GOTO(out, rc = PTR_ERR(mdd_obj));
411 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413 mdd_object_put(env, mdd_obj);
417 /* Do I need to error out here? */
422 /* Get parent fid and object name */
423 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
424 buf = mdd_links_get(env, mdd_obj);
425 mdd_read_unlock(env, mdd_obj);
426 mdd_object_put(env, mdd_obj);
428 GOTO(out, rc = PTR_ERR(buf));
431 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
432 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434 /* If set, use link #linkno for path lookup, otherwise use
435 link #0. Only do this for the final path element. */
436 if ((pli->pli_fidcount == 0) &&
437 (pli->pli_linkno < leh->leh_reccount)) {
439 for (count = 0; count < pli->pli_linkno; count++) {
440 lee = (struct link_ea_entry *)
441 ((char *)lee + reclen);
442 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444 if (pli->pli_linkno < leh->leh_reccount - 1)
445 /* indicate to user there are more links */
449 /* Pack the name in the end of the buffer */
450 ptr -= tmpname->ln_namelen;
451 if (ptr - 1 <= pli->pli_path)
452 GOTO(out, rc = -EOVERFLOW);
453 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
456 /* Store the parent fid for historic lookup */
457 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
458 GOTO(out, rc = -EOVERFLOW);
459 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
462 /* Verify that our path hasn't changed since we started the lookup.
463 Record the current index, and verify the path resolves to the
464 same fid. If it does, then the path is correct as of this index. */
465 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
466 pli->pli_currec = mdd->mdd_cl.mc_index;
467 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
468 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
471 GOTO (out, rc = -EAGAIN);
473 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
474 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
475 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
476 PFID(&pli->pli_fid));
477 GOTO(out, rc = -EAGAIN);
479 ptr++; /* skip leading / */
480 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
484 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
485 /* if we vmalloced a large buffer drop it */
491 static int mdd_path_historic(const struct lu_env *env,
492 struct path_lookup_info *pli)
497 /* Returns the full path to this fid, as of changelog record recno. */
498 static int mdd_path(const struct lu_env *env, struct md_object *obj,
499 char *path, int pathlen, __u64 *recno, int *linkno)
501 struct path_lookup_info *pli;
509 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
518 pli->pli_mdd_obj = md2mdd_obj(obj);
519 pli->pli_recno = *recno;
520 pli->pli_path = path;
521 pli->pli_pathlen = pathlen;
522 pli->pli_linkno = *linkno;
524 /* Retry multiple times in case file is being moved */
525 while (tries-- && rc == -EAGAIN)
526 rc = mdd_path_current(env, pli);
528 /* For historical path lookup, the current links may not have existed
529 * at "recno" time. We must switch over to earlier links/parents
530 * by using the changelog records. If the earlier parent doesn't
531 * exist, we must search back through the changelog to reconstruct
532 * its parents, then check if it exists, etc.
533 * We may ignore this problem for the initial implementation and
534 * state that an "original" hardlink must still exist for us to find
535 * historic path name. */
536 if (pli->pli_recno != -1) {
537 rc = mdd_path_historic(env, pli);
539 *recno = pli->pli_currec;
540 /* Return next link index to caller */
541 *linkno = pli->pli_linkno;
549 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 struct lu_attr *la = &mdd_env_info(env)->mti_la;
555 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557 mdd_flags_xlate(obj, la->la_flags);
558 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
559 obj->mod_flags |= MNLINK_OBJ;
564 /* get only inode attributes */
565 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
571 if (ma->ma_valid & MA_INODE)
574 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
575 mdd_object_capa(env, mdd_obj));
577 ma->ma_valid |= MA_INODE;
581 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
583 struct lov_desc *ldesc;
584 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
585 struct lov_user_md *lum = (struct lov_user_md*)lmm;
591 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
592 LASSERT(ldesc != NULL);
594 lum->lmm_magic = LOV_MAGIC_V1;
595 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
596 lum->lmm_pattern = ldesc->ld_pattern;
597 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
598 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
599 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
601 RETURN(sizeof(*lum));
604 static int is_rootdir(struct mdd_object *mdd_obj)
606 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
607 const struct lu_fid *fid = mdo2fid(mdd_obj);
609 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
612 /* get lov EA only */
613 static int __mdd_lmm_get(const struct lu_env *env,
614 struct mdd_object *mdd_obj, struct md_attr *ma)
619 if (ma->ma_valid & MA_LOV)
622 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
624 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
625 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
627 ma->ma_lmm_size = rc;
628 ma->ma_valid |= MA_LOV;
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636 struct mdd_object *mdd_obj, struct md_attr *ma)
639 struct link_ea_header *leh;
640 struct link_ea_entry *lee;
641 struct lu_fid *pfid = &ma->ma_pfid;
644 if (ma->ma_valid & MA_PFID)
647 buf = mdd_links_get(env, mdd_obj);
649 RETURN(PTR_ERR(buf));
652 lee = (struct link_ea_entry *)(leh + 1);
653 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654 fid_be_to_cpu(pfid, pfid);
655 ma->ma_valid |= MA_PFID;
656 if (buf->lb_len > OBD_ALLOC_BIG)
657 /* if we vmalloced a large buffer drop it */
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
668 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669 rc = __mdd_lmm_get(env, mdd_obj, ma);
670 mdd_read_unlock(env, mdd_obj);
675 static int __mdd_lmv_get(const struct lu_env *env,
676 struct mdd_object *mdd_obj, struct md_attr *ma)
681 if (ma->ma_valid & MA_LMV)
684 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
687 ma->ma_valid |= MA_LMV;
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
696 struct mdd_thread_info *info = mdd_env_info(env);
697 struct lustre_mdt_attrs *lma =
698 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
703 /* If all needed data are already valid, nothing to do */
704 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705 (ma->ma_need & (MA_HSM | MA_SOM)))
708 /* Read LMA from disk EA */
709 lma_size = sizeof(info->mti_xattr_buf);
710 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
714 /* Useless to check LMA incompatibility because this is already done in
715 * osd_ea_fid_get(), and this will fail long before this code is
717 * So, if we are here, LMA is compatible.
720 lustre_lma_swab(lma);
722 /* Swab and copy LMA */
723 if (ma->ma_need & MA_HSM) {
724 if (lma->lma_compat & LMAC_HSM)
725 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
727 ma->ma_hsm.mh_flags = 0;
728 ma->ma_valid |= MA_HSM;
732 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733 LASSERT(ma->ma_som != NULL);
734 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735 ma->ma_som->msd_size = lma->lma_som_size;
736 ma->ma_som->msd_blocks = lma->lma_som_blocks;
737 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738 ma->ma_valid |= MA_SOM;
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
750 if (ma->ma_need & MA_INODE)
751 rc = mdd_iattr_get(env, mdd_obj, ma);
753 if (rc == 0 && ma->ma_need & MA_LOV) {
754 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755 S_ISDIR(mdd_object_type(mdd_obj)))
756 rc = __mdd_lmm_get(env, mdd_obj, ma);
758 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759 if (S_ISREG(mdd_object_type(mdd_obj)))
760 rc = mdd_pfid_get(env, mdd_obj, ma);
762 if (rc == 0 && ma->ma_need & MA_LMV) {
763 if (S_ISDIR(mdd_object_type(mdd_obj)))
764 rc = __mdd_lmv_get(env, mdd_obj, ma);
766 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767 if (S_ISREG(mdd_object_type(mdd_obj)))
768 rc = __mdd_lma_get(env, mdd_obj, ma);
770 #ifdef CONFIG_FS_POSIX_ACL
771 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772 if (S_ISDIR(mdd_object_type(mdd_obj)))
773 rc = mdd_def_acl_get(env, mdd_obj, ma);
776 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777 rc, ma->ma_valid, ma->ma_lmm);
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782 struct mdd_object *mdd_obj, struct md_attr *ma)
785 int needlock = ma->ma_need &
786 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
789 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790 rc = mdd_attr_get_internal(env, mdd_obj, ma);
792 mdd_read_unlock(env, mdd_obj);
797 * No permission check is needed.
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
802 struct mdd_object *mdd_obj = md2mdd_obj(obj);
806 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
811 * No permission check is needed.
813 static int mdd_xattr_get(const struct lu_env *env,
814 struct md_object *obj, struct lu_buf *buf,
817 struct mdd_object *mdd_obj = md2mdd_obj(obj);
822 LASSERT(mdd_object_exists(mdd_obj));
824 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825 rc = mdo_xattr_get(env, mdd_obj, buf, name,
826 mdd_object_capa(env, mdd_obj));
827 mdd_read_unlock(env, mdd_obj);
833 * Permission check is done when open,
834 * no need check again.
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
839 struct mdd_object *mdd_obj = md2mdd_obj(obj);
840 struct dt_object *next;
845 LASSERT(mdd_object_exists(mdd_obj));
847 next = mdd_object_child(mdd_obj);
848 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850 mdd_object_capa(env, mdd_obj));
851 mdd_read_unlock(env, mdd_obj);
856 * No permission check is needed.
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
861 struct mdd_object *mdd_obj = md2mdd_obj(obj);
866 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868 mdd_read_unlock(env, mdd_obj);
873 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
874 struct mdd_object *c, struct md_attr *ma,
875 struct thandle *handle,
876 const struct md_op_spec *spec)
878 struct lu_attr *attr = &ma->ma_attr;
879 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
880 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881 const struct dt_index_features *feat = spec->sp_feat;
885 if (!mdd_object_exists(c)) {
886 struct dt_object *next = mdd_object_child(c);
889 if (feat != &dt_directory_features && feat != NULL)
890 dof->dof_type = DFT_INDEX;
892 dof->dof_type = dt_mode_to_dft(attr->la_mode);
894 dof->u.dof_idx.di_feat = feat;
896 /* @hint will be initialized by underlying device. */
897 next->do_ops->do_ah_init(env, hint,
898 p ? mdd_object_child(p) : NULL,
899 attr->la_mode & S_IFMT);
901 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
902 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
910 * Make sure the ctime is increased only.
912 static inline int mdd_attr_check(const struct lu_env *env,
913 struct mdd_object *obj,
914 struct lu_attr *attr)
916 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
920 if (attr->la_valid & LA_CTIME) {
921 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
925 if (attr->la_ctime < tmp_la->la_ctime)
926 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
927 else if (attr->la_valid == LA_CTIME &&
928 attr->la_ctime == tmp_la->la_ctime)
929 attr->la_valid &= ~LA_CTIME;
934 int mdd_attr_set_internal(const struct lu_env *env,
935 struct mdd_object *obj,
936 struct lu_attr *attr,
937 struct thandle *handle,
943 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
944 #ifdef CONFIG_FS_POSIX_ACL
945 if (!rc && (attr->la_valid & LA_MODE) && needacl)
946 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
951 int mdd_attr_check_set_internal(const struct lu_env *env,
952 struct mdd_object *obj,
953 struct lu_attr *attr,
954 struct thandle *handle,
960 rc = mdd_attr_check(env, obj, attr);
965 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
969 static int mdd_attr_set_internal_locked(const struct lu_env *env,
970 struct mdd_object *obj,
971 struct lu_attr *attr,
972 struct thandle *handle,
978 needacl = needacl && (attr->la_valid & LA_MODE);
980 mdd_write_lock(env, obj, MOR_TGT_CHILD);
981 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
983 mdd_write_unlock(env, obj);
987 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
988 struct mdd_object *obj,
989 struct lu_attr *attr,
990 struct thandle *handle,
996 needacl = needacl && (attr->la_valid & LA_MODE);
998 mdd_write_lock(env, obj, MOR_TGT_CHILD);
999 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1001 mdd_write_unlock(env, obj);
1005 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1006 const struct lu_buf *buf, const char *name,
1007 int fl, struct thandle *handle)
1009 struct lustre_capa *capa = mdd_object_capa(env, obj);
1013 if (buf->lb_buf && buf->lb_len > 0)
1014 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1015 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1016 rc = mdo_xattr_del(env, obj, name, handle, capa);
1022 * This gives the same functionality as the code between
1023 * sys_chmod and inode_setattr
1024 * chown_common and inode_setattr
1025 * utimes and inode_setattr
1026 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1028 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1029 struct lu_attr *la, const struct md_attr *ma)
1031 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1032 struct md_ucred *uc;
1039 /* Do not permit change file type */
1040 if (la->la_valid & LA_TYPE)
1043 /* They should not be processed by setattr */
1044 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1047 /* export destroy does not have ->le_ses, but we may want
1048 * to drop LUSTRE_SOM_FL. */
1054 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1058 if (la->la_valid == LA_CTIME) {
1059 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1060 /* This is only for set ctime when rename's source is
1062 rc = mdd_may_delete(env, NULL, obj,
1063 (struct md_attr *)ma, 1, 0);
1064 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1065 la->la_valid &= ~LA_CTIME;
1069 if (la->la_valid == LA_ATIME) {
1070 /* This is atime only set for read atime update on close. */
1071 if (la->la_atime >= tmp_la->la_atime &&
1072 la->la_atime < (tmp_la->la_atime +
1073 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1074 la->la_valid &= ~LA_ATIME;
1078 /* Check if flags change. */
1079 if (la->la_valid & LA_FLAGS) {
1080 unsigned int oldflags = 0;
1081 unsigned int newflags = la->la_flags &
1082 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1084 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1085 !mdd_capable(uc, CFS_CAP_FOWNER))
1088 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1089 * only be changed by the relevant capability. */
1090 if (mdd_is_immutable(obj))
1091 oldflags |= LUSTRE_IMMUTABLE_FL;
1092 if (mdd_is_append(obj))
1093 oldflags |= LUSTRE_APPEND_FL;
1094 if ((oldflags ^ newflags) &&
1095 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1098 if (!S_ISDIR(tmp_la->la_mode))
1099 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1102 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1103 (la->la_valid & ~LA_FLAGS) &&
1104 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1107 /* Check for setting the obj time. */
1108 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1109 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1110 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1112 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1120 if (la->la_valid & LA_KILL_SUID) {
1121 la->la_valid &= ~LA_KILL_SUID;
1122 if ((tmp_la->la_mode & S_ISUID) &&
1123 !(la->la_valid & LA_MODE)) {
1124 la->la_mode = tmp_la->la_mode;
1125 la->la_valid |= LA_MODE;
1127 la->la_mode &= ~S_ISUID;
1130 if (la->la_valid & LA_KILL_SGID) {
1131 la->la_valid &= ~LA_KILL_SGID;
1132 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1133 (S_ISGID | S_IXGRP)) &&
1134 !(la->la_valid & LA_MODE)) {
1135 la->la_mode = tmp_la->la_mode;
1136 la->la_valid |= LA_MODE;
1138 la->la_mode &= ~S_ISGID;
1141 /* Make sure a caller can chmod. */
1142 if (la->la_valid & LA_MODE) {
1143 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1144 (uc->mu_fsuid != tmp_la->la_uid) &&
1145 !mdd_capable(uc, CFS_CAP_FOWNER))
1148 if (la->la_mode == (cfs_umode_t) -1)
1149 la->la_mode = tmp_la->la_mode;
1151 la->la_mode = (la->la_mode & S_IALLUGO) |
1152 (tmp_la->la_mode & ~S_IALLUGO);
1154 /* Also check the setgid bit! */
1155 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1156 la->la_gid : tmp_la->la_gid) &&
1157 !mdd_capable(uc, CFS_CAP_FSETID))
1158 la->la_mode &= ~S_ISGID;
1160 la->la_mode = tmp_la->la_mode;
1163 /* Make sure a caller can chown. */
1164 if (la->la_valid & LA_UID) {
1165 if (la->la_uid == (uid_t) -1)
1166 la->la_uid = tmp_la->la_uid;
1167 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168 (la->la_uid != tmp_la->la_uid)) &&
1169 !mdd_capable(uc, CFS_CAP_CHOWN))
1172 /* If the user or group of a non-directory has been
1173 * changed by a non-root user, remove the setuid bit.
1174 * 19981026 David C Niemi <niemi@tux.org>
1176 * Changed this to apply to all users, including root,
1177 * to avoid some races. This is the behavior we had in
1178 * 2.0. The check for non-root was definitely wrong
1179 * for 2.2 anyway, as it should have been using
1180 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1181 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1182 !S_ISDIR(tmp_la->la_mode)) {
1183 la->la_mode &= ~S_ISUID;
1184 la->la_valid |= LA_MODE;
1188 /* Make sure caller can chgrp. */
1189 if (la->la_valid & LA_GID) {
1190 if (la->la_gid == (gid_t) -1)
1191 la->la_gid = tmp_la->la_gid;
1192 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193 ((la->la_gid != tmp_la->la_gid) &&
1194 !lustre_in_group_p(uc, la->la_gid))) &&
1195 !mdd_capable(uc, CFS_CAP_CHOWN))
1198 /* Likewise, if the user or group of a non-directory
1199 * has been changed by a non-root user, remove the
1200 * setgid bit UNLESS there is no group execute bit
1201 * (this would be a file marked for mandatory
1202 * locking). 19981026 David C Niemi <niemi@tux.org>
1204 * Removed the fsuid check (see the comment above) --
1206 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1207 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1208 la->la_mode &= ~S_ISGID;
1209 la->la_valid |= LA_MODE;
1213 /* For both Size-on-MDS case and truncate case,
1214 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1215 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1216 * For SOM case, it is true, the MAY_WRITE perm has been checked
1217 * when open, no need check again. For truncate case, it is false,
1218 * the MAY_WRITE perm should be checked here. */
1219 if (ma->ma_attr_flags & MDS_SOM) {
1220 /* For the "Size-on-MDS" setattr update, merge coming
1221 * attributes with the set in the inode. BUG 10641 */
1222 if ((la->la_valid & LA_ATIME) &&
1223 (la->la_atime <= tmp_la->la_atime))
1224 la->la_valid &= ~LA_ATIME;
1226 /* OST attributes do not have a priority over MDS attributes,
1227 * so drop times if ctime is equal. */
1228 if ((la->la_valid & LA_CTIME) &&
1229 (la->la_ctime <= tmp_la->la_ctime))
1230 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1232 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1233 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1234 (uc->mu_fsuid == tmp_la->la_uid)) &&
1235 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1236 rc = mdd_permission_internal_locked(env, obj,
1243 if (la->la_valid & LA_CTIME) {
1244 /* The pure setattr, it has the priority over what is
1245 * already set, do not drop it if ctime is equal. */
1246 if (la->la_ctime < tmp_la->la_ctime)
1247 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1255 /** Store a data change changelog record
1256 * If this fails, we must fail the whole transaction; we don't
1257 * want the change to commit without the log entry.
1258 * \param mdd_obj - mdd_object of change
1259 * \param handle - transacion handle
1261 static int mdd_changelog_data_store(const struct lu_env *env,
1262 struct mdd_device *mdd,
1263 enum changelog_rec_type type,
1265 struct mdd_object *mdd_obj,
1266 struct thandle *handle)
1268 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1269 struct llog_changelog_rec *rec;
1270 struct thandle *th = NULL;
1276 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1278 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1281 LASSERT(mdd_obj != NULL);
1283 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1284 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1285 /* Don't need multiple updates in this log */
1286 /* Don't check under lock - no big deal if we get an extra
1291 reclen = llog_data_len(sizeof(*rec));
1292 buf = mdd_buf_alloc(env, reclen);
1293 if (buf->lb_buf == NULL)
1295 rec = (struct llog_changelog_rec *)buf->lb_buf;
1297 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1298 rec->cr.cr_type = (__u32)type;
1299 rec->cr.cr_tfid = *tfid;
1300 rec->cr.cr_namelen = 0;
1301 mdd_obj->mod_cltime = cfs_time_current_64();
1303 if (handle == NULL) {
1304 /* Used for the close event only for now. */
1305 LASSERT(type == CL_CLOSE);
1306 LASSERT(mdd_env_info(env)->mti_param.tp_credits != 0);
1307 th = mdd_trans_start(env, mdd);
1309 GOTO(err, rc = PTR_ERR(th));
1312 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1315 mdd_trans_stop(env, mdd, rc, th);
1318 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1319 rc, type, PFID(tfid));
1326 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1327 int flags, struct md_object *obj)
1329 struct thandle *handle;
1330 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1331 struct mdd_device *mdd = mdo2mdd(obj);
1335 handle = mdd_trans_start(env, mdd);
1338 return(PTR_ERR(handle));
1340 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1343 mdd_trans_stop(env, mdd, rc, handle);
1349 * Should be called with write lock held.
1351 * \see mdd_lma_set_locked().
1353 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1354 const struct md_attr *ma, struct thandle *handle)
1356 struct mdd_thread_info *info = mdd_env_info(env);
1358 struct lustre_mdt_attrs *lma =
1359 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1360 int lmasize = sizeof(struct lustre_mdt_attrs);
1365 /* Either HSM or SOM part is not valid, we need to read it before */
1366 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1367 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1371 lustre_lma_swab(lma);
1373 memset(lma, 0, lmasize);
1377 if (ma->ma_valid & MA_HSM) {
1378 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1379 lma->lma_compat |= LMAC_HSM;
1383 if (ma->ma_valid & MA_SOM) {
1384 LASSERT(ma->ma_som != NULL);
1385 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1386 lma->lma_compat &= ~LMAC_SOM;
1388 lma->lma_compat |= LMAC_SOM;
1389 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1390 lma->lma_som_size = ma->ma_som->msd_size;
1391 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1392 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1397 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1399 lustre_lma_swab(lma);
1400 buf = mdd_buf_get(env, lma, lmasize);
1401 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1407 * Save LMA extended attributes with data from \a ma.
1409 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1410 * not, LMA EA will be first read from disk, modified and write back.
1413 static int mdd_lma_set_locked(const struct lu_env *env,
1414 struct mdd_object *mdd_obj,
1415 const struct md_attr *ma, struct thandle *handle)
1419 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1420 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1421 mdd_write_unlock(env, mdd_obj);
1425 /* Precedence for choosing record type when multiple
1426 * attributes change: setattr > mtime > ctime > atime
1427 * (ctime changes when mtime does, plus chmod/chown.
1428 * atime and ctime are independent.) */
1429 static int mdd_attr_set_changelog(const struct lu_env *env,
1430 struct md_object *obj, struct thandle *handle,
1433 struct mdd_device *mdd = mdo2mdd(obj);
1436 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1437 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1438 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1439 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1440 bits = bits & mdd->mdd_cl.mc_mask;
1444 /* The record type is the lowest non-masked set bit */
1445 while (bits && ((bits & 1) == 0)) {
1450 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1451 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1452 md2mdd_obj(obj), handle);
1455 /* set attr and LOV EA at once, return updated attr */
1456 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1457 const struct md_attr *ma)
1459 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1460 struct mdd_device *mdd = mdo2mdd(obj);
1461 struct thandle *handle;
1462 struct lov_mds_md *lmm = NULL;
1463 struct llog_cookie *logcookies = NULL;
1464 int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1465 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1466 struct obd_device *obd = mdd->mdd_obd_dev;
1467 struct mds_obd *mds = &obd->u.mds;
1468 #ifdef HAVE_QUOTA_SUPPORT
1469 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1470 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1471 int quota_opc = 0, block_count = 0;
1472 int inode_pending[MAXQUOTAS] = { 0, 0 };
1473 int block_pending[MAXQUOTAS] = { 0, 0 };
1477 *la_copy = ma->ma_attr;
1478 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1482 /* setattr on "close" only change atime, or do nothing */
1483 if (ma->ma_valid == MA_INODE &&
1484 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1487 /*TODO: add lock here*/
1488 /* start a log jounal handle if needed */
1489 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1490 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1491 lmm_size = mdd_lov_mdsize(env, mdd);
1492 lmm = mdd_max_lmm_get(env, mdd);
1494 GOTO(no_trans, rc = -ENOMEM);
1496 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1504 if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1505 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1506 lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1509 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1510 MDD_TXN_ATTR_SET_OP, chlog_cnt);
1511 handle = mdd_trans_start(env, mdd);
1513 GOTO(no_trans, rc = PTR_ERR(handle));
1515 /* permission changes may require sync operation */
1516 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1517 handle->th_sync |= mdd->mdd_sync_permission;
1519 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1520 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1521 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1523 #ifdef HAVE_QUOTA_SUPPORT
1524 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1525 struct obd_export *exp = md_quota(env)->mq_exp;
1526 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1528 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1530 quota_opc = FSFILT_OP_SETATTR;
1531 mdd_quota_wrapper(la_copy, qnids);
1532 mdd_quota_wrapper(la_tmp, qoids);
1533 /* get file quota for new owner */
1534 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1535 qnids, inode_pending, 1, NULL, 0,
1537 block_count = (la_tmp->la_blocks + 7) >> 3;
1540 mdd_data_get(env, mdd_obj, &data);
1541 /* get block quota for new owner */
1542 lquota_chkquota(mds_quota_interface_ref, obd,
1543 exp, qnids, block_pending,
1545 LQUOTA_FLAGS_BLK, data, 1);
1551 if (la_copy->la_valid & LA_FLAGS) {
1552 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1555 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1556 } else if (la_copy->la_valid) { /* setattr */
1557 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1559 /* journal chown/chgrp in llog, just like unlink */
1560 if (rc == 0 && lmm_size){
1561 cookie_size = mdd_lov_cookiesize(env, mdd);
1562 logcookies = mdd_max_cookie_get(env, mdd);
1563 if (logcookies == NULL)
1564 GOTO(cleanup, rc = -ENOMEM);
1566 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1567 logcookies, cookie_size) <= 0)
1572 if (rc == 0 && ma->ma_valid & MA_LOV) {
1575 mode = mdd_object_type(mdd_obj);
1576 if (S_ISREG(mode) || S_ISDIR(mode)) {
1577 rc = mdd_lsm_sanity_check(env, mdd_obj);
1581 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1582 ma->ma_lmm_size, handle, 1);
1586 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1589 mode = mdd_object_type(mdd_obj);
1591 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1596 rc = mdd_attr_set_changelog(env, obj, handle,
1597 ma->ma_attr.la_valid);
1598 mdd_trans_stop(env, mdd, rc, handle);
1600 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1601 /*set obd attr, if needed*/
1602 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1605 #ifdef HAVE_QUOTA_SUPPORT
1607 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1609 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1611 /* Trigger dqrel/dqacq for original owner and new owner.
1612 * If failed, the next call for lquota_chkquota will
1614 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1621 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1622 const struct lu_buf *buf, const char *name, int fl,
1623 struct thandle *handle)
1628 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1629 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1630 mdd_write_unlock(env, obj);
1635 static int mdd_xattr_sanity_check(const struct lu_env *env,
1636 struct mdd_object *obj)
1638 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1639 struct md_ucred *uc = md_ucred(env);
1643 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1646 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1650 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1651 !mdd_capable(uc, CFS_CAP_FOWNER))
1658 * The caller should guarantee to update the object ctime
1659 * after xattr_set if needed.
1661 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1662 const struct lu_buf *buf, const char *name,
1665 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1666 struct mdd_device *mdd = mdo2mdd(obj);
1667 struct thandle *handle;
1671 rc = mdd_xattr_sanity_check(env, mdd_obj);
1675 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1676 handle = mdd_trans_start(env, mdd);
1678 RETURN(PTR_ERR(handle));
1680 /* security-replated changes may require sync */
1681 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1682 handle->th_sync |= mdd->mdd_sync_permission;
1684 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1686 /* Only record system & user xattr changes */
1687 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1688 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1689 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1690 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1691 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1692 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1693 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1695 mdd_trans_stop(env, mdd, rc, handle);
1701 * The caller should guarantee to update the object ctime
1702 * after xattr_set if needed.
1704 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1707 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1708 struct mdd_device *mdd = mdo2mdd(obj);
1709 struct thandle *handle;
1713 rc = mdd_xattr_sanity_check(env, mdd_obj);
1717 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1718 handle = mdd_trans_start(env, mdd);
1720 RETURN(PTR_ERR(handle));
1722 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1723 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1724 mdd_object_capa(env, mdd_obj));
1725 mdd_write_unlock(env, mdd_obj);
1727 /* Only record system & user xattr changes */
1728 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1729 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1730 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1731 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1732 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1733 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1734 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1737 mdd_trans_stop(env, mdd, rc, handle);
1742 /* partial unlink */
1743 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1746 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1747 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1748 struct mdd_device *mdd = mdo2mdd(obj);
1749 struct thandle *handle;
1750 #ifdef HAVE_QUOTA_SUPPORT
1751 struct obd_device *obd = mdd->mdd_obd_dev;
1752 struct mds_obd *mds = &obd->u.mds;
1753 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1760 * Check -ENOENT early here because we need to get object type
1761 * to calculate credits before transaction start
1763 if (!mdd_object_exists(mdd_obj))
1766 LASSERT(mdd_object_exists(mdd_obj) > 0);
1768 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1772 handle = mdd_trans_start(env, mdd);
1776 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1778 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1782 __mdd_ref_del(env, mdd_obj, handle, 0);
1784 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1786 __mdd_ref_del(env, mdd_obj, handle, 1);
1789 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1790 la_copy->la_ctime = ma->ma_attr.la_ctime;
1792 la_copy->la_valid = LA_CTIME;
1793 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1797 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1799 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1800 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1801 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1802 mdd_quota_wrapper(&ma->ma_attr, qids);
1809 mdd_write_unlock(env, mdd_obj);
1810 mdd_trans_stop(env, mdd, rc, handle);
1811 #ifdef HAVE_QUOTA_SUPPORT
1813 /* Trigger dqrel on the owner of child. If failed,
1814 * the next call for lquota_chkquota will process it */
1815 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1821 /* partial operation */
1822 static int mdd_oc_sanity_check(const struct lu_env *env,
1823 struct mdd_object *obj,
1829 switch (ma->ma_attr.la_mode & S_IFMT) {
1846 static int mdd_object_create(const struct lu_env *env,
1847 struct md_object *obj,
1848 const struct md_op_spec *spec,
1852 struct mdd_device *mdd = mdo2mdd(obj);
1853 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1854 const struct lu_fid *pfid = spec->u.sp_pfid;
1855 struct thandle *handle;
1856 #ifdef HAVE_QUOTA_SUPPORT
1857 struct obd_device *obd = mdd->mdd_obd_dev;
1858 struct obd_export *exp = md_quota(env)->mq_exp;
1859 struct mds_obd *mds = &obd->u.mds;
1860 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1861 int quota_opc = 0, block_count = 0;
1862 int inode_pending[MAXQUOTAS] = { 0, 0 };
1863 int block_pending[MAXQUOTAS] = { 0, 0 };
1868 #ifdef HAVE_QUOTA_SUPPORT
1869 if (mds->mds_quota) {
1870 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1871 mdd_quota_wrapper(&ma->ma_attr, qids);
1872 /* get file quota for child */
1873 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1874 qids, inode_pending, 1, NULL, 0,
1876 switch (ma->ma_attr.la_mode & S_IFMT) {
1885 /* get block quota for child */
1887 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1888 qids, block_pending, block_count,
1889 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1893 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1894 handle = mdd_trans_start(env, mdd);
1896 GOTO(out_pending, rc = PTR_ERR(handle));
1898 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1899 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1903 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1907 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1908 /* If creating the slave object, set slave EA here. */
1909 int lmv_size = spec->u.sp_ea.eadatalen;
1910 struct lmv_stripe_md *lmv;
1912 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1913 LASSERT(lmv != NULL && lmv_size > 0);
1915 rc = __mdd_xattr_set(env, mdd_obj,
1916 mdd_buf_get_const(env, lmv, lmv_size),
1917 XATTR_NAME_LMV, 0, handle);
1921 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1924 #ifdef CONFIG_FS_POSIX_ACL
1925 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1926 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1928 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1929 buf->lb_len = spec->u.sp_ea.eadatalen;
1930 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1931 rc = __mdd_acl_init(env, mdd_obj, buf,
1932 &ma->ma_attr.la_mode,
1937 ma->ma_attr.la_valid |= LA_MODE;
1940 pfid = spec->u.sp_ea.fid;
1943 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1949 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1950 mdd_write_unlock(env, mdd_obj);
1952 mdd_trans_stop(env, mdd, rc, handle);
1954 #ifdef HAVE_QUOTA_SUPPORT
1956 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1958 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1960 /* Trigger dqacq on the owner of child. If failed,
1961 * the next call for lquota_chkquota will process it. */
1962 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1970 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1971 const struct md_attr *ma)
1973 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1974 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1975 struct mdd_device *mdd = mdo2mdd(obj);
1976 struct thandle *handle;
1980 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1981 handle = mdd_trans_start(env, mdd);
1985 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1986 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1988 __mdd_ref_add(env, mdd_obj, handle);
1989 mdd_write_unlock(env, mdd_obj);
1991 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1992 la_copy->la_ctime = ma->ma_attr.la_ctime;
1994 la_copy->la_valid = LA_CTIME;
1995 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1998 mdd_trans_stop(env, mdd, 0, handle);
2004 * do NOT or the MAY_*'s, you'll get the weakest
2006 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2010 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2011 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2012 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2013 * owner can write to a file even if it is marked readonly to hide
2014 * its brokenness. (bug 5781) */
2015 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2016 struct md_ucred *uc = md_ucred(env);
2018 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2019 (la->la_uid == uc->mu_fsuid))
2023 if (flags & FMODE_READ)
2025 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2027 if (flags & MDS_FMODE_EXEC)
2032 static int mdd_open_sanity_check(const struct lu_env *env,
2033 struct mdd_object *obj, int flag)
2035 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2040 if (mdd_is_dead_obj(obj))
2043 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2047 if (S_ISLNK(tmp_la->la_mode))
2050 mode = accmode(env, tmp_la, flag);
2052 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2055 if (!(flag & MDS_OPEN_CREATED)) {
2056 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2061 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2062 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2063 flag &= ~MDS_OPEN_TRUNC;
2065 /* For writing append-only file must open it with append mode. */
2066 if (mdd_is_append(obj)) {
2067 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2069 if (flag & MDS_OPEN_TRUNC)
2075 * Now, flag -- O_NOATIME does not be packed by client.
2077 if (flag & O_NOATIME) {
2078 struct md_ucred *uc = md_ucred(env);
2080 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2081 (uc->mu_valid == UCRED_NEW)) &&
2082 (uc->mu_fsuid != tmp_la->la_uid) &&
2083 !mdd_capable(uc, CFS_CAP_FOWNER))
2091 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2094 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2097 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2099 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2101 mdd_obj->mod_count++;
2103 mdd_write_unlock(env, mdd_obj);
2107 /* return md_attr back,
2108 * if it is last unlink then return lov ea + llog cookie*/
2109 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2115 if (S_ISREG(mdd_object_type(obj))) {
2116 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2117 * Caller must be ready for that. */
2119 rc = __mdd_lmm_get(env, obj, ma);
2120 if ((ma->ma_valid & MA_LOV))
2121 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2128 * No permission check is needed.
2130 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2131 struct md_attr *ma, int mode)
2133 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2134 struct mdd_device *mdd = mdo2mdd(obj);
2135 struct thandle *handle = NULL;
2139 #ifdef HAVE_QUOTA_SUPPORT
2140 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2141 struct mds_obd *mds = &obd->u.mds;
2142 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2147 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2148 mdd_obj->mod_count--;
2150 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2151 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2152 "list\n", PFID(mdd_object_fid(mdd_obj)));
2156 /* check without any lock */
2157 if (mdd_obj->mod_count == 1 &&
2158 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2160 rc = mdd_log_txn_param_build(env, obj, ma,
2161 MDD_TXN_UNLINK_OP, 1);
2164 handle = mdd_trans_start(env, mdo2mdd(obj));
2166 RETURN(PTR_ERR(handle));
2169 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2170 if (handle == NULL && mdd_obj->mod_count == 1 &&
2171 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2172 mdd_write_unlock(env, mdd_obj);
2176 /* release open count */
2177 mdd_obj->mod_count --;
2179 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2180 /* remove link to object from orphan index */
2181 rc = __mdd_orphan_del(env, mdd_obj, handle);
2183 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2184 "list, OSS objects to be destroyed.\n",
2185 PFID(mdd_object_fid(mdd_obj)));
2187 CERROR("Object "DFID" can not be deleted from orphan "
2188 "list, maybe cause OST objects can not be "
2189 "destroyed (err: %d).\n",
2190 PFID(mdd_object_fid(mdd_obj)), rc);
2191 /* If object was not deleted from orphan list, do not
2192 * destroy OSS objects, which will be done when next
2198 rc = mdd_iattr_get(env, mdd_obj, ma);
2199 /* Object maybe not in orphan list originally, it is rare case for
2200 * mdd_finish_unlink() failure. */
2201 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2202 #ifdef HAVE_QUOTA_SUPPORT
2203 if (mds->mds_quota) {
2204 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2205 mdd_quota_wrapper(&ma->ma_attr, qids);
2208 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2209 if (ma->ma_valid & MA_FLAGS &&
2210 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2211 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2213 rc = mdd_object_kill(env, mdd_obj, ma);
2219 CERROR("Error when prepare to delete Object "DFID" , "
2220 "which will cause OST objects can not be "
2221 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2227 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2229 mdd_write_unlock(env, mdd_obj);
2232 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2233 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2235 mdd_txn_param_build(env, mdd, MDD_TXN_CLOSE_OP, 1);
2236 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2241 mdd_trans_stop(env, mdd, rc, handle);
2242 #ifdef HAVE_QUOTA_SUPPORT
2244 /* Trigger dqrel on the owner of child. If failed,
2245 * the next call for lquota_chkquota will process it */
2246 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2253 * Permission check is done when open,
2254 * no need check again.
2256 static int mdd_readpage_sanity_check(const struct lu_env *env,
2257 struct mdd_object *obj)
2259 struct dt_object *next = mdd_object_child(obj);
2263 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2271 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2272 struct lu_dirpage *dp, int nob,
2273 const struct dt_it_ops *iops, struct dt_it *it,
2279 struct lu_dirent *ent;
2280 struct lu_dirent *last = NULL;
2283 memset(area, 0, sizeof (*dp));
2284 area += sizeof (*dp);
2285 nob -= sizeof (*dp);
2292 len = iops->key_size(env, it);
2294 /* IAM iterator can return record with zero len. */
2298 hash = iops->store(env, it);
2299 if (unlikely(first)) {
2301 dp->ldp_hash_start = cpu_to_le64(hash);
2304 /* calculate max space required for lu_dirent */
2305 recsize = lu_dirent_calc_size(len, attr);
2307 if (nob >= recsize) {
2308 result = iops->rec(env, it, ent, attr);
2309 if (result == -ESTALE)
2314 /* osd might not able to pack all attributes,
2315 * so recheck rec length */
2316 recsize = le16_to_cpu(ent->lde_reclen);
2318 result = (last != NULL) ? 0 :-EINVAL;
2322 ent = (void *)ent + recsize;
2326 result = iops->next(env, it);
2327 if (result == -ESTALE)
2329 } while (result == 0);
2332 dp->ldp_hash_end = cpu_to_le64(hash);
2334 if (last->lde_hash == dp->ldp_hash_end)
2335 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2336 last->lde_reclen = 0; /* end mark */
2341 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2342 const struct lu_rdpg *rdpg)
2345 struct dt_object *next = mdd_object_child(obj);
2346 const struct dt_it_ops *iops;
2348 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2354 LASSERT(rdpg->rp_pages != NULL);
2355 LASSERT(next->do_index_ops != NULL);
2357 if (rdpg->rp_count <= 0)
2361 * iterate through directory and fill pages from @rdpg
2363 iops = &next->do_index_ops->dio_it;
2364 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2368 rc = iops->load(env, it, rdpg->rp_hash);
2372 * Iterator didn't find record with exactly the key requested.
2374 * It is currently either
2376 * - positioned above record with key less than
2377 * requested---skip it.
2379 * - or not positioned at all (is in IAM_IT_SKEWED
2380 * state)---position it on the next item.
2382 rc = iops->next(env, it);
2387 * At this point and across for-loop:
2389 * rc == 0 -> ok, proceed.
2390 * rc > 0 -> end of directory.
2393 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2394 i++, nob -= CFS_PAGE_SIZE) {
2395 struct lu_dirpage *dp;
2397 LASSERT(i < rdpg->rp_npages);
2398 pg = rdpg->rp_pages[i];
2400 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2403 rc = mdd_dir_page_build(env, mdd, dp,
2404 min_t(int, nob, LU_PAGE_SIZE),
2405 iops, it, rdpg->rp_attrs);
2410 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2412 } else if (rc < 0) {
2413 CWARN("build page failed: %d!\n", rc);
2416 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2417 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2418 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2425 struct lu_dirpage *dp;
2427 dp = cfs_kmap(rdpg->rp_pages[0]);
2428 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2431 * No pages were processed, mark this for first page
2434 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2437 cfs_kunmap(rdpg->rp_pages[0]);
2439 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2442 iops->fini(env, it);
2447 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2448 const struct lu_rdpg *rdpg)
2450 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2454 LASSERT(mdd_object_exists(mdd_obj));
2456 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2457 rc = mdd_readpage_sanity_check(env, mdd_obj);
2459 GOTO(out_unlock, rc);
2461 if (mdd_is_dead_obj(mdd_obj)) {
2463 struct lu_dirpage *dp;
2466 * According to POSIX, please do not return any entry to client:
2467 * even dot and dotdot should not be returned.
2469 CWARN("readdir from dead object: "DFID"\n",
2470 PFID(mdd_object_fid(mdd_obj)));
2472 if (rdpg->rp_count <= 0)
2473 GOTO(out_unlock, rc = -EFAULT);
2474 LASSERT(rdpg->rp_pages != NULL);
2476 pg = rdpg->rp_pages[0];
2477 dp = (struct lu_dirpage*)cfs_kmap(pg);
2478 memset(dp, 0 , sizeof(struct lu_dirpage));
2479 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2480 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2481 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2483 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2486 rc = __mdd_readpage(env, mdd_obj, rdpg);
2490 mdd_read_unlock(env, mdd_obj);
2494 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2496 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2497 struct dt_object *next;
2499 LASSERT(mdd_object_exists(mdd_obj));
2500 next = mdd_object_child(mdd_obj);
2501 return next->do_ops->do_object_sync(env, next);
2504 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2505 struct md_object *obj)
2507 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2509 LASSERT(mdd_object_exists(mdd_obj));
2510 return do_version_get(env, mdd_object_child(mdd_obj));
2513 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2514 dt_obj_version_t version)
2516 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2518 LASSERT(mdd_object_exists(mdd_obj));
2519 do_version_set(env, mdd_object_child(mdd_obj), version);
2522 const struct md_object_operations mdd_obj_ops = {
2523 .moo_permission = mdd_permission,
2524 .moo_attr_get = mdd_attr_get,
2525 .moo_attr_set = mdd_attr_set,
2526 .moo_xattr_get = mdd_xattr_get,
2527 .moo_xattr_set = mdd_xattr_set,
2528 .moo_xattr_list = mdd_xattr_list,
2529 .moo_xattr_del = mdd_xattr_del,
2530 .moo_object_create = mdd_object_create,
2531 .moo_ref_add = mdd_ref_add,
2532 .moo_ref_del = mdd_ref_del,
2533 .moo_open = mdd_open,
2534 .moo_close = mdd_close,
2535 .moo_readpage = mdd_readpage,
2536 .moo_readlink = mdd_readlink,
2537 .moo_changelog = mdd_changelog,
2538 .moo_capa_get = mdd_capa_get,
2539 .moo_object_sync = mdd_object_sync,
2540 .moo_version_get = mdd_version_get,
2541 .moo_version_set = mdd_version_set,
2542 .moo_path = mdd_path,
2543 .moo_file_lock = mdd_file_lock,
2544 .moo_file_unlock = mdd_file_unlock,