1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137 const void *area, ssize_t len)
141 buf = &mdd_env_info(env)->mti_buf;
142 buf->lb_buf = (void *)area;
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154 OBD_VFREE(buf->lb_buf, buf->lb_len);
156 OBD_FREE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL) {
161 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162 OBD_ALLOC(buf->lb_buf, buf->lb_len);
165 if (buf->lb_buf == NULL) {
166 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
169 if (buf->lb_buf == NULL)
175 /** Increase the size of the \a mti_big_buf.
176 * preserves old data in buffer
177 * old buffer remains unchanged on error
178 * \retval 0 or -ENOMEM
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
185 LASSERT(len >= oldbuf->lb_len);
186 if (len > BUF_VMALLOC_SIZE) {
187 OBD_VMALLOC(buf.lb_buf, len);
190 OBD_ALLOC(buf.lb_buf, len);
193 if (buf.lb_buf == NULL)
197 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199 if (oldbuf->lb_vmalloc)
200 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204 memcpy(oldbuf, &buf, sizeof(buf));
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210 struct mdd_device *mdd)
212 struct mdd_thread_info *mti = mdd_env_info(env);
215 max_cookie_size = mdd_lov_cookiesize(env, mdd);
216 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217 if (mti->mti_max_cookie)
218 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219 mti->mti_max_cookie = NULL;
220 mti->mti_max_cookie_size = 0;
222 if (unlikely(mti->mti_max_cookie == NULL)) {
223 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224 if (likely(mti->mti_max_cookie != NULL))
225 mti->mti_max_cookie_size = max_cookie_size;
227 if (likely(mti->mti_max_cookie != NULL))
228 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229 return mti->mti_max_cookie;
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233 struct mdd_device *mdd)
235 struct mdd_thread_info *mti = mdd_env_info(env);
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240 if (mti->mti_max_lmm)
241 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242 mti->mti_max_lmm = NULL;
243 mti->mti_max_lmm_size = 0;
245 if (unlikely(mti->mti_max_lmm == NULL)) {
246 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247 if (unlikely(mti->mti_max_lmm != NULL))
248 mti->mti_max_lmm_size = max_lmm_size;
250 return mti->mti_max_lmm;
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254 const struct lu_object_header *hdr,
257 struct mdd_object *mdd_obj;
259 OBD_ALLOC_PTR(mdd_obj);
260 if (mdd_obj != NULL) {
263 o = mdd2lu_obj(mdd_obj);
264 lu_object_init(o, NULL, d);
265 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267 mdd_obj->mod_count = 0;
268 o->lo_ops = &mdd_lu_obj_ops;
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276 const struct lu_object_conf *unused)
278 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279 struct mdd_object *mdd_obj = lu2mdd_obj(o);
280 struct lu_object *below;
281 struct lu_device *under;
284 mdd_obj->mod_cltime = 0;
285 under = &d->mdd_child->dd_lu_dev;
286 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287 mdd_pdlock_init(mdd_obj);
291 lu_object_add(o, below);
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 if (lu_object_exists(o))
299 return mdd_get_flags(env, lu2mdd_obj(o));
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 struct mdd_object *mdd = lu2mdd_obj(o);
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313 lu_printer_t p, const struct lu_object *o)
315 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317 "valid=%x, cltime=%llu, flags=%lx)",
318 mdd, mdd->mod_count, mdd->mod_valid,
319 mdd->mod_cltime, mdd->mod_flags);
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323 .loo_object_init = mdd_object_init,
324 .loo_object_start = mdd_object_start,
325 .loo_object_free = mdd_object_free,
326 .loo_object_print = mdd_object_print,
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330 struct mdd_device *d,
331 const struct lu_fid *f)
333 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337 const char *path, struct lu_fid *fid)
340 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341 struct mdd_object *obj;
342 struct lu_name *lname = &mdd_env_info(env)->mti_name;
347 /* temp buffer for path element */
348 buf = mdd_buf_alloc(env, PATH_MAX);
349 if (buf->lb_buf == NULL)
352 lname->ln_name = name = buf->lb_buf;
353 lname->ln_namelen = 0;
354 *f = mdd->mdd_root_fid;
361 while (*path != '/' && *path != '\0') {
369 /* find obj corresponding to fid */
370 obj = mdd_object_find(env, mdd, f);
372 GOTO(out, rc = -EREMOTE);
374 GOTO(out, rc = -PTR_ERR(obj));
375 /* get child fid from parent and name */
376 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377 mdd_object_put(env, obj);
382 lname->ln_namelen = 0;
391 /** The maximum depth that fid2path() will search.
392 * This is limited only because we want to store the fids for
393 * historical path lookup purposes.
395 #define MAX_PATH_DEPTH 100
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399 __u64 pli_recno; /**< history point */
400 __u64 pli_currec; /**< current record */
401 struct lu_fid pli_fid;
402 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403 struct mdd_object *pli_mdd_obj;
404 char *pli_path; /**< full path */
406 int pli_linkno; /**< which hardlink to follow */
407 int pli_fidcount; /**< number of \a pli_fids */
410 static int mdd_path_current(const struct lu_env *env,
411 struct path_lookup_info *pli)
413 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414 struct mdd_object *mdd_obj;
415 struct lu_buf *buf = NULL;
416 struct link_ea_header *leh;
417 struct link_ea_entry *lee;
418 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
425 ptr = pli->pli_path + pli->pli_pathlen - 1;
428 pli->pli_fidcount = 0;
429 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432 mdd_obj = mdd_object_find(env, mdd,
433 &pli->pli_fids[pli->pli_fidcount]);
435 GOTO(out, rc = -EREMOTE);
437 GOTO(out, rc = -PTR_ERR(mdd_obj));
438 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440 mdd_object_put(env, mdd_obj);
444 /* Do I need to error out here? */
449 /* Get parent fid and object name */
450 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451 buf = mdd_links_get(env, mdd_obj);
452 mdd_read_unlock(env, mdd_obj);
453 mdd_object_put(env, mdd_obj);
455 GOTO(out, rc = PTR_ERR(buf));
458 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461 /* If set, use link #linkno for path lookup, otherwise use
462 link #0. Only do this for the final path element. */
463 if ((pli->pli_fidcount == 0) &&
464 (pli->pli_linkno < leh->leh_reccount)) {
466 for (count = 0; count < pli->pli_linkno; count++) {
467 lee = (struct link_ea_entry *)
468 ((char *)lee + reclen);
469 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471 if (pli->pli_linkno < leh->leh_reccount - 1)
472 /* indicate to user there are more links */
476 /* Pack the name in the end of the buffer */
477 ptr -= tmpname->ln_namelen;
478 if (ptr - 1 <= pli->pli_path)
479 GOTO(out, rc = -EOVERFLOW);
480 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
483 /* Store the parent fid for historic lookup */
484 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485 GOTO(out, rc = -EOVERFLOW);
486 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
489 /* Verify that our path hasn't changed since we started the lookup.
490 Record the current index, and verify the path resolves to the
491 same fid. If it does, then the path is correct as of this index. */
492 spin_lock(&mdd->mdd_cl.mc_lock);
493 pli->pli_currec = mdd->mdd_cl.mc_index;
494 spin_unlock(&mdd->mdd_cl.mc_lock);
495 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498 GOTO (out, rc = -EAGAIN);
500 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503 PFID(&pli->pli_fid));
504 GOTO(out, rc = -EAGAIN);
507 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
511 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512 /* if we vmalloced a large buffer drop it */
518 static int mdd_path_historic(const struct lu_env *env,
519 struct path_lookup_info *pli)
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526 char *path, int pathlen, __u64 *recno, int *linkno)
528 struct path_lookup_info *pli;
536 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
612 struct lov_desc *ldesc;
613 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617 LASSERT(ldesc != NULL);
622 lmm->lmm_magic = LOV_MAGIC_V1;
623 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624 lmm->lmm_pattern = ldesc->ld_pattern;
625 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 *size = sizeof(struct lov_mds_md);
629 RETURN(sizeof(struct lov_mds_md));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
651 ma->ma_valid |= MA_LOV;
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664 rc = __mdd_lmm_get(env, mdd_obj, ma);
665 mdd_read_unlock(env, mdd_obj);
670 static int __mdd_lmv_get(const struct lu_env *env,
671 struct mdd_object *mdd_obj, struct md_attr *ma)
676 if (ma->ma_valid & MA_LMV)
679 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
682 ma->ma_valid |= MA_LMV;
688 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
691 struct mdd_thread_info *info = mdd_env_info(env);
692 struct lustre_mdt_attrs *lma =
693 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
698 /* If all needed data are already valid, nothing to do */
699 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
700 (ma->ma_need & (MA_HSM | MA_SOM)))
703 /* Read LMA from disk EA */
704 lma_size = sizeof(info->mti_xattr_buf);
705 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
709 /* Useless to check LMA incompatibility because this is already done in
710 * osd_ea_fid_get(), and this will fail long before this code is
712 * So, if we are here, LMA is compatible.
715 lustre_lma_swab(lma);
717 /* Swab and copy LMA */
718 if (ma->ma_need & MA_HSM) {
719 if (lma->lma_compat & LMAC_HSM)
720 ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
722 ma->ma_hsm_flags = 0;
723 ma->ma_valid |= MA_HSM;
725 if (ma->ma_need & MA_SOM) {
727 /* XXX: Here, copy and swab SoM data, and then remove this
729 LASSERT(!(ma->ma_need & MA_SOM));
731 ma->ma_valid |= MA_SOM;
737 static int mdd_attr_get_internal(const struct lu_env *env,
738 struct mdd_object *mdd_obj,
744 if (ma->ma_need & MA_INODE)
745 rc = mdd_iattr_get(env, mdd_obj, ma);
747 if (rc == 0 && ma->ma_need & MA_LOV) {
748 if (S_ISREG(mdd_object_type(mdd_obj)) ||
749 S_ISDIR(mdd_object_type(mdd_obj)))
750 rc = __mdd_lmm_get(env, mdd_obj, ma);
752 if (rc == 0 && ma->ma_need & MA_LMV) {
753 if (S_ISDIR(mdd_object_type(mdd_obj)))
754 rc = __mdd_lmv_get(env, mdd_obj, ma);
756 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
757 if (S_ISREG(mdd_object_type(mdd_obj)))
758 rc = __mdd_lma_get(env, mdd_obj, ma);
760 #ifdef CONFIG_FS_POSIX_ACL
761 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
762 if (S_ISDIR(mdd_object_type(mdd_obj)))
763 rc = mdd_def_acl_get(env, mdd_obj, ma);
766 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
771 int mdd_attr_get_internal_locked(const struct lu_env *env,
772 struct mdd_object *mdd_obj, struct md_attr *ma)
775 int needlock = ma->ma_need &
776 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
779 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
780 rc = mdd_attr_get_internal(env, mdd_obj, ma);
782 mdd_read_unlock(env, mdd_obj);
787 * No permission check is needed.
789 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
792 struct mdd_object *mdd_obj = md2mdd_obj(obj);
796 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
801 * No permission check is needed.
803 static int mdd_xattr_get(const struct lu_env *env,
804 struct md_object *obj, struct lu_buf *buf,
807 struct mdd_object *mdd_obj = md2mdd_obj(obj);
812 LASSERT(mdd_object_exists(mdd_obj));
814 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
815 rc = mdo_xattr_get(env, mdd_obj, buf, name,
816 mdd_object_capa(env, mdd_obj));
817 mdd_read_unlock(env, mdd_obj);
823 * Permission check is done when open,
824 * no need check again.
826 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
829 struct mdd_object *mdd_obj = md2mdd_obj(obj);
830 struct dt_object *next;
835 LASSERT(mdd_object_exists(mdd_obj));
837 next = mdd_object_child(mdd_obj);
838 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
839 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
840 mdd_object_capa(env, mdd_obj));
841 mdd_read_unlock(env, mdd_obj);
846 * No permission check is needed.
848 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
851 struct mdd_object *mdd_obj = md2mdd_obj(obj);
856 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
857 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
858 mdd_read_unlock(env, mdd_obj);
863 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
864 struct mdd_object *c, struct md_attr *ma,
865 struct thandle *handle,
866 const struct md_op_spec *spec)
868 struct lu_attr *attr = &ma->ma_attr;
869 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
870 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
871 const struct dt_index_features *feat = spec->sp_feat;
875 if (!mdd_object_exists(c)) {
876 struct dt_object *next = mdd_object_child(c);
879 if (feat != &dt_directory_features && feat != NULL)
880 dof->dof_type = DFT_INDEX;
882 dof->dof_type = dt_mode_to_dft(attr->la_mode);
884 dof->u.dof_idx.di_feat = feat;
886 /* @hint will be initialized by underlying device. */
887 next->do_ops->do_ah_init(env, hint,
888 p ? mdd_object_child(p) : NULL,
889 attr->la_mode & S_IFMT);
891 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
892 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
900 * Make sure the ctime is increased only.
902 static inline int mdd_attr_check(const struct lu_env *env,
903 struct mdd_object *obj,
904 struct lu_attr *attr)
906 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
910 if (attr->la_valid & LA_CTIME) {
911 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
915 if (attr->la_ctime < tmp_la->la_ctime)
916 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
917 else if (attr->la_valid == LA_CTIME &&
918 attr->la_ctime == tmp_la->la_ctime)
919 attr->la_valid &= ~LA_CTIME;
924 int mdd_attr_set_internal(const struct lu_env *env,
925 struct mdd_object *obj,
926 struct lu_attr *attr,
927 struct thandle *handle,
933 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
934 #ifdef CONFIG_FS_POSIX_ACL
935 if (!rc && (attr->la_valid & LA_MODE) && needacl)
936 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
941 int mdd_attr_check_set_internal(const struct lu_env *env,
942 struct mdd_object *obj,
943 struct lu_attr *attr,
944 struct thandle *handle,
950 rc = mdd_attr_check(env, obj, attr);
955 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
959 static int mdd_attr_set_internal_locked(const struct lu_env *env,
960 struct mdd_object *obj,
961 struct lu_attr *attr,
962 struct thandle *handle,
968 needacl = needacl && (attr->la_valid & LA_MODE);
970 mdd_write_lock(env, obj, MOR_TGT_CHILD);
971 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
973 mdd_write_unlock(env, obj);
977 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
978 struct mdd_object *obj,
979 struct lu_attr *attr,
980 struct thandle *handle,
986 needacl = needacl && (attr->la_valid & LA_MODE);
988 mdd_write_lock(env, obj, MOR_TGT_CHILD);
989 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
991 mdd_write_unlock(env, obj);
995 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
996 const struct lu_buf *buf, const char *name,
997 int fl, struct thandle *handle)
999 struct lustre_capa *capa = mdd_object_capa(env, obj);
1003 if (buf->lb_buf && buf->lb_len > 0)
1004 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1005 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1006 rc = mdo_xattr_del(env, obj, name, handle, capa);
1012 * This gives the same functionality as the code between
1013 * sys_chmod and inode_setattr
1014 * chown_common and inode_setattr
1015 * utimes and inode_setattr
1016 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1018 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1019 struct lu_attr *la, const struct md_attr *ma)
1021 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1022 struct md_ucred *uc = md_ucred(env);
1029 /* Do not permit change file type */
1030 if (la->la_valid & LA_TYPE)
1033 /* They should not be processed by setattr */
1034 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1037 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1041 if (la->la_valid == LA_CTIME) {
1042 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1043 /* This is only for set ctime when rename's source is
1045 rc = mdd_may_delete(env, NULL, obj,
1046 (struct md_attr *)ma, 1, 0);
1047 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1048 la->la_valid &= ~LA_CTIME;
1052 if (la->la_valid == LA_ATIME) {
1053 /* This is atime only set for read atime update on close. */
1054 if (la->la_atime <= tmp_la->la_atime +
1055 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1056 la->la_valid &= ~LA_ATIME;
1060 /* Check if flags change. */
1061 if (la->la_valid & LA_FLAGS) {
1062 unsigned int oldflags = 0;
1063 unsigned int newflags = la->la_flags &
1064 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1066 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1067 !mdd_capable(uc, CFS_CAP_FOWNER))
1070 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1071 * only be changed by the relevant capability. */
1072 if (mdd_is_immutable(obj))
1073 oldflags |= LUSTRE_IMMUTABLE_FL;
1074 if (mdd_is_append(obj))
1075 oldflags |= LUSTRE_APPEND_FL;
1076 if ((oldflags ^ newflags) &&
1077 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1080 if (!S_ISDIR(tmp_la->la_mode))
1081 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1084 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1085 (la->la_valid & ~LA_FLAGS) &&
1086 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1089 /* Check for setting the obj time. */
1090 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1091 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1092 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1093 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1094 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1102 /* Make sure a caller can chmod. */
1103 if (la->la_valid & LA_MODE) {
1104 /* Bypass la_vaild == LA_MODE,
1105 * this is for changing file with SUID or SGID. */
1106 if ((la->la_valid & ~LA_MODE) &&
1107 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1108 (uc->mu_fsuid != tmp_la->la_uid) &&
1109 !mdd_capable(uc, CFS_CAP_FOWNER))
1112 if (la->la_mode == (umode_t) -1)
1113 la->la_mode = tmp_la->la_mode;
1115 la->la_mode = (la->la_mode & S_IALLUGO) |
1116 (tmp_la->la_mode & ~S_IALLUGO);
1118 /* Also check the setgid bit! */
1119 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1120 la->la_gid : tmp_la->la_gid) &&
1121 !mdd_capable(uc, CFS_CAP_FSETID))
1122 la->la_mode &= ~S_ISGID;
1124 la->la_mode = tmp_la->la_mode;
1127 /* Make sure a caller can chown. */
1128 if (la->la_valid & LA_UID) {
1129 if (la->la_uid == (uid_t) -1)
1130 la->la_uid = tmp_la->la_uid;
1131 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1132 (la->la_uid != tmp_la->la_uid)) &&
1133 !mdd_capable(uc, CFS_CAP_CHOWN))
1136 /* If the user or group of a non-directory has been
1137 * changed by a non-root user, remove the setuid bit.
1138 * 19981026 David C Niemi <niemi@tux.org>
1140 * Changed this to apply to all users, including root,
1141 * to avoid some races. This is the behavior we had in
1142 * 2.0. The check for non-root was definitely wrong
1143 * for 2.2 anyway, as it should have been using
1144 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1145 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1146 !S_ISDIR(tmp_la->la_mode)) {
1147 la->la_mode &= ~S_ISUID;
1148 la->la_valid |= LA_MODE;
1152 /* Make sure caller can chgrp. */
1153 if (la->la_valid & LA_GID) {
1154 if (la->la_gid == (gid_t) -1)
1155 la->la_gid = tmp_la->la_gid;
1156 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1157 ((la->la_gid != tmp_la->la_gid) &&
1158 !lustre_in_group_p(uc, la->la_gid))) &&
1159 !mdd_capable(uc, CFS_CAP_CHOWN))
1162 /* Likewise, if the user or group of a non-directory
1163 * has been changed by a non-root user, remove the
1164 * setgid bit UNLESS there is no group execute bit
1165 * (this would be a file marked for mandatory
1166 * locking). 19981026 David C Niemi <niemi@tux.org>
1168 * Removed the fsuid check (see the comment above) --
1170 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1171 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1172 la->la_mode &= ~S_ISGID;
1173 la->la_valid |= LA_MODE;
1177 /* For both Size-on-MDS case and truncate case,
1178 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1179 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1180 * For SOM case, it is true, the MAY_WRITE perm has been checked
1181 * when open, no need check again. For truncate case, it is false,
1182 * the MAY_WRITE perm should be checked here. */
1183 if (ma->ma_attr_flags & MDS_SOM) {
1184 /* For the "Size-on-MDS" setattr update, merge coming
1185 * attributes with the set in the inode. BUG 10641 */
1186 if ((la->la_valid & LA_ATIME) &&
1187 (la->la_atime <= tmp_la->la_atime))
1188 la->la_valid &= ~LA_ATIME;
1190 /* OST attributes do not have a priority over MDS attributes,
1191 * so drop times if ctime is equal. */
1192 if ((la->la_valid & LA_CTIME) &&
1193 (la->la_ctime <= tmp_la->la_ctime))
1194 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1196 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1197 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1198 (uc->mu_fsuid == tmp_la->la_uid)) &&
1199 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1200 rc = mdd_permission_internal_locked(env, obj,
1207 if (la->la_valid & LA_CTIME) {
1208 /* The pure setattr, it has the priority over what is
1209 * already set, do not drop it if ctime is equal. */
1210 if (la->la_ctime < tmp_la->la_ctime)
1211 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1219 /** Store a data change changelog record
1220 * If this fails, we must fail the whole transaction; we don't
1221 * want the change to commit without the log entry.
1222 * \param mdd_obj - mdd_object of change
1223 * \param handle - transacion handle
1225 static int mdd_changelog_data_store(const struct lu_env *env,
1226 struct mdd_device *mdd,
1227 enum changelog_rec_type type,
1228 struct mdd_object *mdd_obj,
1229 struct thandle *handle)
1231 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1232 struct llog_changelog_rec *rec;
1237 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1240 LASSERT(handle != NULL);
1241 LASSERT(mdd_obj != NULL);
1243 if ((type == CL_SETATTR) &&
1244 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1245 /* Don't need multiple updates in this log */
1246 /* Don't check under lock - no big deal if we get an extra
1251 reclen = llog_data_len(sizeof(*rec));
1252 buf = mdd_buf_alloc(env, reclen);
1253 if (buf->lb_buf == NULL)
1255 rec = (struct llog_changelog_rec *)buf->lb_buf;
1257 rec->cr.cr_flags = CLF_VERSION;
1258 rec->cr.cr_type = (__u32)type;
1259 rec->cr.cr_tfid = *tfid;
1260 rec->cr.cr_namelen = 0;
1261 mdd_obj->mod_cltime = cfs_time_current_64();
1263 rc = mdd_changelog_llog_write(mdd, rec, handle);
1265 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1266 rc, type, PFID(tfid));
1274 * Should be called with write lock held.
1276 * \see mdd_lma_set_locked().
1278 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1279 const struct md_attr *ma, struct thandle *handle)
1281 struct mdd_thread_info *info = mdd_env_info(env);
1283 struct lustre_mdt_attrs *lma =
1284 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1285 int lmasize = sizeof(struct lustre_mdt_attrs);
1290 memset(lma, 0, lmasize);
1292 /* Either HSM or SOM part is not valid, we need to read it before */
1293 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1294 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1298 lustre_lma_swab(lma);
1302 if (ma->ma_valid & MA_HSM) {
1303 lma->lma_flags |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1304 lma->lma_compat |= LMAC_HSM;
1306 /* XXX: Copy SOM data */
1307 if (ma->ma_valid & MA_SOM) {
1309 lma->lma_compat |= LMAC_SOM;
1311 LASSERT(!(ma->ma_valid & MA_SOM));
1315 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1317 lustre_lma_swab(lma);
1318 buf = mdd_buf_get(env, lma, lmasize);
1319 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1325 * Save LMA extended attributes with data from \a ma.
1327 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1328 * not, LMA EA will be first read from disk, modified and write back.
1331 static int mdd_lma_set_locked(const struct lu_env *env,
1332 struct mdd_object *mdd_obj,
1333 const struct md_attr *ma, struct thandle *handle)
1337 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1338 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1339 mdd_write_unlock(env, mdd_obj);
1343 /* set attr and LOV EA at once, return updated attr */
1344 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1345 const struct md_attr *ma)
1347 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1348 struct mdd_device *mdd = mdo2mdd(obj);
1349 struct thandle *handle;
1350 struct lov_mds_md *lmm = NULL;
1351 struct llog_cookie *logcookies = NULL;
1352 int rc, lmm_size = 0, cookie_size = 0;
1353 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1354 #ifdef HAVE_QUOTA_SUPPORT
1355 struct obd_device *obd = mdd->mdd_obd_dev;
1356 struct obd_export *exp = md_quota(env)->mq_exp;
1357 struct mds_obd *mds = &obd->u.mds;
1358 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1359 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1360 int quota_opc = 0, block_count = 0;
1361 int inode_pending[MAXQUOTAS] = { 0, 0 };
1362 int block_pending[MAXQUOTAS] = { 0, 0 };
1366 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1367 MDD_TXN_ATTR_SET_OP);
1368 handle = mdd_trans_start(env, mdd);
1370 RETURN(PTR_ERR(handle));
1371 /*TODO: add lock here*/
1372 /* start a log jounal handle if needed */
1373 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1374 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1375 lmm_size = mdd_lov_mdsize(env, mdd);
1376 lmm = mdd_max_lmm_get(env, mdd);
1378 GOTO(cleanup, rc = -ENOMEM);
1380 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1387 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1388 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1389 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1391 *la_copy = ma->ma_attr;
1392 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1396 #ifdef HAVE_QUOTA_SUPPORT
1397 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1398 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1400 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1402 quota_opc = FSFILT_OP_SETATTR;
1403 mdd_quota_wrapper(la_copy, qnids);
1404 mdd_quota_wrapper(la_tmp, qoids);
1405 /* get file quota for new owner */
1406 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1407 qnids, inode_pending, 1, NULL, 0,
1409 block_count = (la_tmp->la_blocks + 7) >> 3;
1412 mdd_data_get(env, mdd_obj, &data);
1413 /* get block quota for new owner */
1414 lquota_chkquota(mds_quota_interface_ref, obd,
1415 exp, qnids, block_pending,
1417 LQUOTA_FLAGS_BLK, data, 1);
1423 if (la_copy->la_valid & LA_FLAGS) {
1424 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1427 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1428 } else if (la_copy->la_valid) { /* setattr */
1429 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1431 /* journal chown/chgrp in llog, just like unlink */
1432 if (rc == 0 && lmm_size){
1433 cookie_size = mdd_lov_cookiesize(env, mdd);
1434 logcookies = mdd_max_cookie_get(env, mdd);
1435 if (logcookies == NULL)
1436 GOTO(cleanup, rc = -ENOMEM);
1438 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1439 logcookies, cookie_size) <= 0)
1444 if (rc == 0 && ma->ma_valid & MA_LOV) {
1447 mode = mdd_object_type(mdd_obj);
1448 if (S_ISREG(mode) || S_ISDIR(mode)) {
1449 rc = mdd_lsm_sanity_check(env, mdd_obj);
1453 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1454 ma->ma_lmm_size, handle, 1);
1458 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1461 mode = mdd_object_type(mdd_obj);
1463 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1467 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1468 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1470 mdd_trans_stop(env, mdd, rc, handle);
1471 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1472 /*set obd attr, if needed*/
1473 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1476 #ifdef HAVE_QUOTA_SUPPORT
1478 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1480 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1482 /* Trigger dqrel/dqacq for original owner and new owner.
1483 * If failed, the next call for lquota_chkquota will
1485 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1492 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1493 const struct lu_buf *buf, const char *name, int fl,
1494 struct thandle *handle)
1499 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1500 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1501 mdd_write_unlock(env, obj);
1506 static int mdd_xattr_sanity_check(const struct lu_env *env,
1507 struct mdd_object *obj)
1509 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1510 struct md_ucred *uc = md_ucred(env);
1514 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1517 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1521 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1522 !mdd_capable(uc, CFS_CAP_FOWNER))
1529 * The caller should guarantee to update the object ctime
1530 * after xattr_set if needed.
1532 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1533 const struct lu_buf *buf, const char *name,
1536 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1537 struct mdd_device *mdd = mdo2mdd(obj);
1538 struct thandle *handle;
1542 rc = mdd_xattr_sanity_check(env, mdd_obj);
1546 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1547 /* security-replated changes may require sync */
1548 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1549 mdd->mdd_sync_permission == 1)
1550 txn_param_sync(&mdd_env_info(env)->mti_param);
1552 handle = mdd_trans_start(env, mdd);
1554 RETURN(PTR_ERR(handle));
1556 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1558 /* Only record user xattr changes */
1559 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1560 (strncmp("user.", name, 5) == 0))
1561 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1563 mdd_trans_stop(env, mdd, rc, handle);
1569 * The caller should guarantee to update the object ctime
1570 * after xattr_set if needed.
1572 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1575 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1576 struct mdd_device *mdd = mdo2mdd(obj);
1577 struct thandle *handle;
1581 rc = mdd_xattr_sanity_check(env, mdd_obj);
1585 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1586 handle = mdd_trans_start(env, mdd);
1588 RETURN(PTR_ERR(handle));
1590 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1591 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1592 mdd_object_capa(env, mdd_obj));
1593 mdd_write_unlock(env, mdd_obj);
1595 /* Only record user xattr changes */
1596 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1597 (strncmp("user.", name, 5) != 0))
1598 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1601 mdd_trans_stop(env, mdd, rc, handle);
1606 /* partial unlink */
1607 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1610 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1611 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1612 struct mdd_device *mdd = mdo2mdd(obj);
1613 struct thandle *handle;
1614 #ifdef HAVE_QUOTA_SUPPORT
1615 struct obd_device *obd = mdd->mdd_obd_dev;
1616 struct mds_obd *mds = &obd->u.mds;
1617 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1624 * Check -ENOENT early here because we need to get object type
1625 * to calculate credits before transaction start
1627 if (!mdd_object_exists(mdd_obj))
1630 LASSERT(mdd_object_exists(mdd_obj) > 0);
1632 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1636 handle = mdd_trans_start(env, mdd);
1640 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1642 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1646 __mdd_ref_del(env, mdd_obj, handle, 0);
1648 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1650 __mdd_ref_del(env, mdd_obj, handle, 1);
1653 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1654 la_copy->la_ctime = ma->ma_attr.la_ctime;
1656 la_copy->la_valid = LA_CTIME;
1657 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1661 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1662 #ifdef HAVE_QUOTA_SUPPORT
1663 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1664 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1665 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1666 mdd_quota_wrapper(&ma->ma_attr, qids);
1673 mdd_write_unlock(env, mdd_obj);
1674 mdd_trans_stop(env, mdd, rc, handle);
1675 #ifdef HAVE_QUOTA_SUPPORT
1677 /* Trigger dqrel on the owner of child. If failed,
1678 * the next call for lquota_chkquota will process it */
1679 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1685 /* partial operation */
1686 static int mdd_oc_sanity_check(const struct lu_env *env,
1687 struct mdd_object *obj,
1693 switch (ma->ma_attr.la_mode & S_IFMT) {
1710 static int mdd_object_create(const struct lu_env *env,
1711 struct md_object *obj,
1712 const struct md_op_spec *spec,
1716 struct mdd_device *mdd = mdo2mdd(obj);
1717 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1718 const struct lu_fid *pfid = spec->u.sp_pfid;
1719 struct thandle *handle;
1720 #ifdef HAVE_QUOTA_SUPPORT
1721 struct obd_device *obd = mdd->mdd_obd_dev;
1722 struct obd_export *exp = md_quota(env)->mq_exp;
1723 struct mds_obd *mds = &obd->u.mds;
1724 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1725 int quota_opc = 0, block_count = 0;
1726 int inode_pending[MAXQUOTAS] = { 0, 0 };
1727 int block_pending[MAXQUOTAS] = { 0, 0 };
1732 #ifdef HAVE_QUOTA_SUPPORT
1733 if (mds->mds_quota) {
1734 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1735 mdd_quota_wrapper(&ma->ma_attr, qids);
1736 /* get file quota for child */
1737 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1738 qids, inode_pending, 1, NULL, 0,
1740 switch (ma->ma_attr.la_mode & S_IFMT) {
1749 /* get block quota for child */
1751 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1752 qids, block_pending, block_count,
1753 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1757 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1758 handle = mdd_trans_start(env, mdd);
1760 GOTO(out_pending, rc = PTR_ERR(handle));
1762 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1763 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1767 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1771 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1772 /* If creating the slave object, set slave EA here. */
1773 int lmv_size = spec->u.sp_ea.eadatalen;
1774 struct lmv_stripe_md *lmv;
1776 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1777 LASSERT(lmv != NULL && lmv_size > 0);
1779 rc = __mdd_xattr_set(env, mdd_obj,
1780 mdd_buf_get_const(env, lmv, lmv_size),
1781 XATTR_NAME_LMV, 0, handle);
1785 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1788 #ifdef CONFIG_FS_POSIX_ACL
1789 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1790 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1792 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1793 buf->lb_len = spec->u.sp_ea.eadatalen;
1794 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1795 rc = __mdd_acl_init(env, mdd_obj, buf,
1796 &ma->ma_attr.la_mode,
1801 ma->ma_attr.la_valid |= LA_MODE;
1804 pfid = spec->u.sp_ea.fid;
1807 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1813 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1814 mdd_write_unlock(env, mdd_obj);
1816 mdd_trans_stop(env, mdd, rc, handle);
1818 #ifdef HAVE_QUOTA_SUPPORT
1820 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1822 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1824 /* Trigger dqacq on the owner of child. If failed,
1825 * the next call for lquota_chkquota will process it. */
1826 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1834 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1835 const struct md_attr *ma)
1837 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1838 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1839 struct mdd_device *mdd = mdo2mdd(obj);
1840 struct thandle *handle;
1844 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1845 handle = mdd_trans_start(env, mdd);
1849 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1850 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1852 __mdd_ref_add(env, mdd_obj, handle);
1853 mdd_write_unlock(env, mdd_obj);
1855 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1856 la_copy->la_ctime = ma->ma_attr.la_ctime;
1858 la_copy->la_valid = LA_CTIME;
1859 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1862 mdd_trans_stop(env, mdd, 0, handle);
1868 * do NOT or the MAY_*'s, you'll get the weakest
1870 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1874 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1875 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1876 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1877 * owner can write to a file even if it is marked readonly to hide
1878 * its brokenness. (bug 5781) */
1879 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1880 struct md_ucred *uc = md_ucred(env);
1882 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1883 (la->la_uid == uc->mu_fsuid))
1887 if (flags & FMODE_READ)
1889 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1891 if (flags & MDS_FMODE_EXEC)
1896 static int mdd_open_sanity_check(const struct lu_env *env,
1897 struct mdd_object *obj, int flag)
1899 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1904 if (mdd_is_dead_obj(obj))
1907 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1911 if (S_ISLNK(tmp_la->la_mode))
1914 mode = accmode(env, tmp_la, flag);
1916 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1919 if (!(flag & MDS_OPEN_CREATED)) {
1920 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1925 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1926 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1927 flag &= ~MDS_OPEN_TRUNC;
1929 /* For writing append-only file must open it with append mode. */
1930 if (mdd_is_append(obj)) {
1931 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1933 if (flag & MDS_OPEN_TRUNC)
1939 * Now, flag -- O_NOATIME does not be packed by client.
1941 if (flag & O_NOATIME) {
1942 struct md_ucred *uc = md_ucred(env);
1944 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1945 (uc->mu_valid == UCRED_NEW)) &&
1946 (uc->mu_fsuid != tmp_la->la_uid) &&
1947 !mdd_capable(uc, CFS_CAP_FOWNER))
1955 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1958 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1961 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1963 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1965 mdd_obj->mod_count++;
1967 mdd_write_unlock(env, mdd_obj);
1971 /* return md_attr back,
1972 * if it is last unlink then return lov ea + llog cookie*/
1973 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1979 if (S_ISREG(mdd_object_type(obj))) {
1980 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1981 * Caller must be ready for that. */
1983 rc = __mdd_lmm_get(env, obj, ma);
1984 if ((ma->ma_valid & MA_LOV))
1985 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1992 * No permission check is needed.
1994 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1997 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1998 struct mdd_device *mdd = mdo2mdd(obj);
1999 struct thandle *handle;
2003 #ifdef HAVE_QUOTA_SUPPORT
2004 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2005 struct mds_obd *mds = &obd->u.mds;
2006 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2011 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2014 handle = mdd_trans_start(env, mdo2mdd(obj));
2016 RETURN(PTR_ERR(handle));
2018 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2019 /* release open count */
2020 mdd_obj->mod_count --;
2022 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2023 /* remove link to object from orphan index */
2024 rc = __mdd_orphan_del(env, mdd_obj, handle);
2026 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2027 "list, OSS objects to be destroyed.\n",
2028 PFID(mdd_object_fid(mdd_obj)));
2030 CERROR("Object "DFID" can not be deleted from orphan "
2031 "list, maybe cause OST objects can not be "
2032 "destroyed (err: %d).\n",
2033 PFID(mdd_object_fid(mdd_obj)), rc);
2034 /* If object was not deleted from orphan list, do not
2035 * destroy OSS objects, which will be done when next
2041 rc = mdd_iattr_get(env, mdd_obj, ma);
2042 /* Object maybe not in orphan list originally, it is rare case for
2043 * mdd_finish_unlink() failure. */
2044 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2045 #ifdef HAVE_QUOTA_SUPPORT
2046 if (mds->mds_quota) {
2047 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2048 mdd_quota_wrapper(&ma->ma_attr, qids);
2051 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2052 if (ma->ma_valid & MA_FLAGS &&
2053 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2054 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2056 rc = mdd_object_kill(env, mdd_obj, ma);
2062 CERROR("Error when prepare to delete Object "DFID" , "
2063 "which will cause OST objects can not be "
2064 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2070 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2072 mdd_write_unlock(env, mdd_obj);
2073 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2074 #ifdef HAVE_QUOTA_SUPPORT
2076 /* Trigger dqrel on the owner of child. If failed,
2077 * the next call for lquota_chkquota will process it */
2078 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2085 * Permission check is done when open,
2086 * no need check again.
2088 static int mdd_readpage_sanity_check(const struct lu_env *env,
2089 struct mdd_object *obj)
2091 struct dt_object *next = mdd_object_child(obj);
2095 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2103 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2104 int first, void *area, int nob,
2105 const struct dt_it_ops *iops, struct dt_it *it,
2106 __u64 *start, __u64 *end,
2107 struct lu_dirent **last, __u32 attr)
2111 struct lu_dirent *ent;
2114 memset(area, 0, sizeof (struct lu_dirpage));
2115 area += sizeof (struct lu_dirpage);
2116 nob -= sizeof (struct lu_dirpage);
2124 len = iops->key_size(env, it);
2126 /* IAM iterator can return record with zero len. */
2130 hash = iops->store(env, it);
2131 if (unlikely(first)) {
2136 /* calculate max space required for lu_dirent */
2137 recsize = lu_dirent_calc_size(len, attr);
2139 if (nob >= recsize) {
2140 result = iops->rec(env, it, ent, attr);
2141 if (result == -ESTALE)
2146 /* osd might not able to pack all attributes,
2147 * so recheck rec length */
2148 recsize = le16_to_cpu(ent->lde_reclen);
2151 * record doesn't fit into page, enlarge previous one.
2154 (*last)->lde_reclen =
2155 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2164 ent = (void *)ent + recsize;
2168 result = iops->next(env, it);
2169 if (result == -ESTALE)
2171 } while (result == 0);
2178 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2179 const struct lu_rdpg *rdpg)
2182 struct dt_object *next = mdd_object_child(obj);
2183 const struct dt_it_ops *iops;
2185 struct lu_dirent *last = NULL;
2186 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2193 LASSERT(rdpg->rp_pages != NULL);
2194 LASSERT(next->do_index_ops != NULL);
2196 if (rdpg->rp_count <= 0)
2200 * iterate through directory and fill pages from @rdpg
2202 iops = &next->do_index_ops->dio_it;
2203 it = iops->init(env, next, mdd_object_capa(env, obj));
2207 rc = iops->load(env, it, rdpg->rp_hash);
2211 * Iterator didn't find record with exactly the key requested.
2213 * It is currently either
2215 * - positioned above record with key less than
2216 * requested---skip it.
2218 * - or not positioned at all (is in IAM_IT_SKEWED
2219 * state)---position it on the next item.
2221 rc = iops->next(env, it);
2226 * At this point and across for-loop:
2228 * rc == 0 -> ok, proceed.
2229 * rc > 0 -> end of directory.
2232 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2233 i++, nob -= CFS_PAGE_SIZE) {
2234 LASSERT(i < rdpg->rp_npages);
2235 pg = rdpg->rp_pages[i];
2236 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2237 min_t(int, nob, CFS_PAGE_SIZE), iops,
2238 it, &hash_start, &hash_end, &last,
2240 if (rc != 0 || i == rdpg->rp_npages - 1) {
2242 last->lde_reclen = 0;
2250 hash_end = DIR_END_OFF;
2254 struct lu_dirpage *dp;
2256 dp = cfs_kmap(rdpg->rp_pages[0]);
2257 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2258 dp->ldp_hash_end = cpu_to_le64(hash_end);
2261 * No pages were processed, mark this.
2263 dp->ldp_flags |= LDF_EMPTY;
2265 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2266 cfs_kunmap(rdpg->rp_pages[0]);
2269 iops->fini(env, it);
2274 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2275 const struct lu_rdpg *rdpg)
2277 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2281 LASSERT(mdd_object_exists(mdd_obj));
2283 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2284 rc = mdd_readpage_sanity_check(env, mdd_obj);
2286 GOTO(out_unlock, rc);
2288 if (mdd_is_dead_obj(mdd_obj)) {
2290 struct lu_dirpage *dp;
2293 * According to POSIX, please do not return any entry to client:
2294 * even dot and dotdot should not be returned.
2296 CWARN("readdir from dead object: "DFID"\n",
2297 PFID(mdd_object_fid(mdd_obj)));
2299 if (rdpg->rp_count <= 0)
2300 GOTO(out_unlock, rc = -EFAULT);
2301 LASSERT(rdpg->rp_pages != NULL);
2303 pg = rdpg->rp_pages[0];
2304 dp = (struct lu_dirpage*)cfs_kmap(pg);
2305 memset(dp, 0 , sizeof(struct lu_dirpage));
2306 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2307 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2308 dp->ldp_flags |= LDF_EMPTY;
2309 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2311 GOTO(out_unlock, rc = 0);
2314 rc = __mdd_readpage(env, mdd_obj, rdpg);
2318 mdd_read_unlock(env, mdd_obj);
2322 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2324 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2325 struct dt_object *next;
2327 LASSERT(mdd_object_exists(mdd_obj));
2328 next = mdd_object_child(mdd_obj);
2329 return next->do_ops->do_object_sync(env, next);
2332 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2333 struct md_object *obj)
2335 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2337 LASSERT(mdd_object_exists(mdd_obj));
2338 return do_version_get(env, mdd_object_child(mdd_obj));
2341 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2342 dt_obj_version_t version)
2344 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2346 LASSERT(mdd_object_exists(mdd_obj));
2347 return do_version_set(env, mdd_object_child(mdd_obj), version);
2350 const struct md_object_operations mdd_obj_ops = {
2351 .moo_permission = mdd_permission,
2352 .moo_attr_get = mdd_attr_get,
2353 .moo_attr_set = mdd_attr_set,
2354 .moo_xattr_get = mdd_xattr_get,
2355 .moo_xattr_set = mdd_xattr_set,
2356 .moo_xattr_list = mdd_xattr_list,
2357 .moo_xattr_del = mdd_xattr_del,
2358 .moo_object_create = mdd_object_create,
2359 .moo_ref_add = mdd_ref_add,
2360 .moo_ref_del = mdd_ref_del,
2361 .moo_open = mdd_open,
2362 .moo_close = mdd_close,
2363 .moo_readpage = mdd_readpage,
2364 .moo_readlink = mdd_readlink,
2365 .moo_capa_get = mdd_capa_get,
2366 .moo_object_sync = mdd_object_sync,
2367 .moo_version_get = mdd_version_get,
2368 .moo_version_set = mdd_version_set,
2369 .moo_path = mdd_path,