1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137 const void *area, ssize_t len)
141 buf = &mdd_env_info(env)->mti_buf;
142 buf->lb_buf = (void *)area;
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154 OBD_VFREE(buf->lb_buf, buf->lb_len);
156 OBD_FREE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL) {
161 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162 OBD_ALLOC(buf->lb_buf, buf->lb_len);
165 if (buf->lb_buf == NULL) {
166 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
169 if (buf->lb_buf == NULL)
175 /** Increase the size of the \a mti_big_buf.
176 * preserves old data in buffer
177 * old buffer remains unchanged on error
178 * \retval 0 or -ENOMEM
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
185 LASSERT(len >= oldbuf->lb_len);
186 if (len > BUF_VMALLOC_SIZE) {
187 OBD_VMALLOC(buf.lb_buf, len);
190 OBD_ALLOC(buf.lb_buf, len);
193 if (buf.lb_buf == NULL)
197 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199 if (oldbuf->lb_vmalloc)
200 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204 memcpy(oldbuf, &buf, sizeof(buf));
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210 struct mdd_device *mdd)
212 struct mdd_thread_info *mti = mdd_env_info(env);
215 max_cookie_size = mdd_lov_cookiesize(env, mdd);
216 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217 if (mti->mti_max_cookie)
218 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219 mti->mti_max_cookie = NULL;
220 mti->mti_max_cookie_size = 0;
222 if (unlikely(mti->mti_max_cookie == NULL)) {
223 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224 if (likely(mti->mti_max_cookie != NULL))
225 mti->mti_max_cookie_size = max_cookie_size;
227 if (likely(mti->mti_max_cookie != NULL))
228 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229 return mti->mti_max_cookie;
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233 struct mdd_device *mdd)
235 struct mdd_thread_info *mti = mdd_env_info(env);
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240 if (mti->mti_max_lmm)
241 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242 mti->mti_max_lmm = NULL;
243 mti->mti_max_lmm_size = 0;
245 if (unlikely(mti->mti_max_lmm == NULL)) {
246 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247 if (unlikely(mti->mti_max_lmm != NULL))
248 mti->mti_max_lmm_size = max_lmm_size;
250 return mti->mti_max_lmm;
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254 const struct lu_object_header *hdr,
257 struct mdd_object *mdd_obj;
259 OBD_ALLOC_PTR(mdd_obj);
260 if (mdd_obj != NULL) {
263 o = mdd2lu_obj(mdd_obj);
264 lu_object_init(o, NULL, d);
265 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267 mdd_obj->mod_count = 0;
268 o->lo_ops = &mdd_lu_obj_ops;
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276 const struct lu_object_conf *unused)
278 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279 struct mdd_object *mdd_obj = lu2mdd_obj(o);
280 struct lu_object *below;
281 struct lu_device *under;
284 mdd_obj->mod_cltime = 0;
285 under = &d->mdd_child->dd_lu_dev;
286 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287 mdd_pdlock_init(mdd_obj);
291 lu_object_add(o, below);
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 if (lu_object_exists(o))
299 return mdd_get_flags(env, lu2mdd_obj(o));
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 struct mdd_object *mdd = lu2mdd_obj(o);
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313 lu_printer_t p, const struct lu_object *o)
315 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317 "valid=%x, cltime=%llu, flags=%lx)",
318 mdd, mdd->mod_count, mdd->mod_valid,
319 mdd->mod_cltime, mdd->mod_flags);
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323 .loo_object_init = mdd_object_init,
324 .loo_object_start = mdd_object_start,
325 .loo_object_free = mdd_object_free,
326 .loo_object_print = mdd_object_print,
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330 struct mdd_device *d,
331 const struct lu_fid *f)
333 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337 const char *path, struct lu_fid *fid)
340 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341 struct mdd_object *obj;
342 struct lu_name *lname = &mdd_env_info(env)->mti_name;
347 /* temp buffer for path element */
348 buf = mdd_buf_alloc(env, PATH_MAX);
349 if (buf->lb_buf == NULL)
352 lname->ln_name = name = buf->lb_buf;
353 lname->ln_namelen = 0;
354 *f = mdd->mdd_root_fid;
361 while (*path != '/' && *path != '\0') {
369 /* find obj corresponding to fid */
370 obj = mdd_object_find(env, mdd, f);
372 GOTO(out, rc = -EREMOTE);
374 GOTO(out, rc = -PTR_ERR(obj));
375 /* get child fid from parent and name */
376 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377 mdd_object_put(env, obj);
382 lname->ln_namelen = 0;
391 /** The maximum depth that fid2path() will search.
392 * This is limited only because we want to store the fids for
393 * historical path lookup purposes.
395 #define MAX_PATH_DEPTH 100
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399 __u64 pli_recno; /**< history point */
400 __u64 pli_currec; /**< current record */
401 struct lu_fid pli_fid;
402 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403 struct mdd_object *pli_mdd_obj;
404 char *pli_path; /**< full path */
406 int pli_linkno; /**< which hardlink to follow */
407 int pli_fidcount; /**< number of \a pli_fids */
410 static int mdd_path_current(const struct lu_env *env,
411 struct path_lookup_info *pli)
413 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414 struct mdd_object *mdd_obj;
415 struct lu_buf *buf = NULL;
416 struct link_ea_header *leh;
417 struct link_ea_entry *lee;
418 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
425 ptr = pli->pli_path + pli->pli_pathlen - 1;
428 pli->pli_fidcount = 0;
429 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432 mdd_obj = mdd_object_find(env, mdd,
433 &pli->pli_fids[pli->pli_fidcount]);
435 GOTO(out, rc = -EREMOTE);
437 GOTO(out, rc = -PTR_ERR(mdd_obj));
438 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440 mdd_object_put(env, mdd_obj);
444 /* Do I need to error out here? */
449 /* Get parent fid and object name */
450 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451 buf = mdd_links_get(env, mdd_obj);
452 mdd_read_unlock(env, mdd_obj);
453 mdd_object_put(env, mdd_obj);
455 GOTO(out, rc = PTR_ERR(buf));
458 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461 /* If set, use link #linkno for path lookup, otherwise use
462 link #0. Only do this for the final path element. */
463 if ((pli->pli_fidcount == 0) &&
464 (pli->pli_linkno < leh->leh_reccount)) {
466 for (count = 0; count < pli->pli_linkno; count++) {
467 lee = (struct link_ea_entry *)
468 ((char *)lee + reclen);
469 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471 if (pli->pli_linkno < leh->leh_reccount - 1)
472 /* indicate to user there are more links */
476 /* Pack the name in the end of the buffer */
477 ptr -= tmpname->ln_namelen;
478 if (ptr - 1 <= pli->pli_path)
479 GOTO(out, rc = -EOVERFLOW);
480 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
483 /* Store the parent fid for historic lookup */
484 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485 GOTO(out, rc = -EOVERFLOW);
486 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
489 /* Verify that our path hasn't changed since we started the lookup.
490 Record the current index, and verify the path resolves to the
491 same fid. If it does, then the path is correct as of this index. */
492 spin_lock(&mdd->mdd_cl.mc_lock);
493 pli->pli_currec = mdd->mdd_cl.mc_index;
494 spin_unlock(&mdd->mdd_cl.mc_lock);
495 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498 GOTO (out, rc = -EAGAIN);
500 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503 PFID(&pli->pli_fid));
504 GOTO(out, rc = -EAGAIN);
507 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
511 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512 /* if we vmalloced a large buffer drop it */
518 static int mdd_path_historic(const struct lu_env *env,
519 struct path_lookup_info *pli)
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526 char *path, int pathlen, __u64 *recno, int *linkno)
528 struct path_lookup_info *pli;
536 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
612 struct lov_desc *ldesc;
613 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617 LASSERT(ldesc != NULL);
622 lmm->lmm_magic = LOV_MAGIC_V1;
623 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624 lmm->lmm_pattern = ldesc->ld_pattern;
625 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 *size = sizeof(struct lov_mds_md);
629 RETURN(sizeof(struct lov_mds_md));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
651 ma->ma_valid |= MA_LOV;
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664 rc = __mdd_lmm_get(env, mdd_obj, ma);
665 mdd_read_unlock(env, mdd_obj);
670 static int __mdd_lmv_get(const struct lu_env *env,
671 struct mdd_object *mdd_obj, struct md_attr *ma)
676 if (ma->ma_valid & MA_LMV)
679 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
682 ma->ma_valid |= MA_LMV;
688 static int mdd_attr_get_internal(const struct lu_env *env,
689 struct mdd_object *mdd_obj,
695 if (ma->ma_need & MA_INODE)
696 rc = mdd_iattr_get(env, mdd_obj, ma);
698 if (rc == 0 && ma->ma_need & MA_LOV) {
699 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700 S_ISDIR(mdd_object_type(mdd_obj)))
701 rc = __mdd_lmm_get(env, mdd_obj, ma);
703 if (rc == 0 && ma->ma_need & MA_LMV) {
704 if (S_ISDIR(mdd_object_type(mdd_obj)))
705 rc = __mdd_lmv_get(env, mdd_obj, ma);
707 #ifdef CONFIG_FS_POSIX_ACL
708 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709 if (S_ISDIR(mdd_object_type(mdd_obj)))
710 rc = mdd_def_acl_get(env, mdd_obj, ma);
713 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719 struct mdd_object *mdd_obj, struct md_attr *ma)
722 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
725 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726 rc = mdd_attr_get_internal(env, mdd_obj, ma);
728 mdd_read_unlock(env, mdd_obj);
733 * No permission check is needed.
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
738 struct mdd_object *mdd_obj = md2mdd_obj(obj);
742 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
747 * No permission check is needed.
749 static int mdd_xattr_get(const struct lu_env *env,
750 struct md_object *obj, struct lu_buf *buf,
753 struct mdd_object *mdd_obj = md2mdd_obj(obj);
758 LASSERT(mdd_object_exists(mdd_obj));
760 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761 rc = mdo_xattr_get(env, mdd_obj, buf, name,
762 mdd_object_capa(env, mdd_obj));
763 mdd_read_unlock(env, mdd_obj);
769 * Permission check is done when open,
770 * no need check again.
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
775 struct mdd_object *mdd_obj = md2mdd_obj(obj);
776 struct dt_object *next;
781 LASSERT(mdd_object_exists(mdd_obj));
783 next = mdd_object_child(mdd_obj);
784 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786 mdd_object_capa(env, mdd_obj));
787 mdd_read_unlock(env, mdd_obj);
792 * No permission check is needed.
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
797 struct mdd_object *mdd_obj = md2mdd_obj(obj);
802 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804 mdd_read_unlock(env, mdd_obj);
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810 struct mdd_object *c, struct md_attr *ma,
811 struct thandle *handle,
812 const struct md_op_spec *spec)
814 struct lu_attr *attr = &ma->ma_attr;
815 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817 const struct dt_index_features *feat = spec->sp_feat;
821 if (!mdd_object_exists(c)) {
822 struct dt_object *next = mdd_object_child(c);
825 if (feat != &dt_directory_features && feat != NULL)
826 dof->dof_type = DFT_INDEX;
828 dof->dof_type = dt_mode_to_dft(attr->la_mode);
830 dof->u.dof_idx.di_feat = feat;
832 /* @hint will be initialized by underlying device. */
833 next->do_ops->do_ah_init(env, hint,
834 p ? mdd_object_child(p) : NULL,
835 attr->la_mode & S_IFMT);
837 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
846 * Make sure the ctime is increased only.
848 static inline int mdd_attr_check(const struct lu_env *env,
849 struct mdd_object *obj,
850 struct lu_attr *attr)
852 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
856 if (attr->la_valid & LA_CTIME) {
857 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
861 if (attr->la_ctime < tmp_la->la_ctime)
862 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863 else if (attr->la_valid == LA_CTIME &&
864 attr->la_ctime == tmp_la->la_ctime)
865 attr->la_valid &= ~LA_CTIME;
870 int mdd_attr_set_internal(const struct lu_env *env,
871 struct mdd_object *obj,
872 struct lu_attr *attr,
873 struct thandle *handle,
879 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881 if (!rc && (attr->la_valid & LA_MODE) && needacl)
882 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888 struct mdd_object *obj,
889 struct lu_attr *attr,
890 struct thandle *handle,
896 rc = mdd_attr_check(env, obj, attr);
901 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906 struct mdd_object *obj,
907 struct lu_attr *attr,
908 struct thandle *handle,
914 needacl = needacl && (attr->la_valid & LA_MODE);
916 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
919 mdd_write_unlock(env, obj);
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924 struct mdd_object *obj,
925 struct lu_attr *attr,
926 struct thandle *handle,
932 needacl = needacl && (attr->la_valid & LA_MODE);
934 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
937 mdd_write_unlock(env, obj);
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942 const struct lu_buf *buf, const char *name,
943 int fl, struct thandle *handle)
945 struct lustre_capa *capa = mdd_object_capa(env, obj);
949 if (buf->lb_buf && buf->lb_len > 0)
950 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951 else if (buf->lb_buf == NULL && buf->lb_len == 0)
952 rc = mdo_xattr_del(env, obj, name, handle, capa);
958 * This gives the same functionality as the code between
959 * sys_chmod and inode_setattr
960 * chown_common and inode_setattr
961 * utimes and inode_setattr
962 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965 struct lu_attr *la, const struct md_attr *ma)
967 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
968 struct md_ucred *uc = md_ucred(env);
975 /* Do not permit change file type */
976 if (la->la_valid & LA_TYPE)
979 /* They should not be processed by setattr */
980 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
983 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
987 if (la->la_valid == LA_CTIME) {
988 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989 /* This is only for set ctime when rename's source is
991 rc = mdd_may_delete(env, NULL, obj,
992 (struct md_attr *)ma, 1, 0);
993 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994 la->la_valid &= ~LA_CTIME;
998 if (la->la_valid == LA_ATIME) {
999 /* This is atime only set for read atime update on close. */
1000 if (la->la_atime <= tmp_la->la_atime +
1001 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002 la->la_valid &= ~LA_ATIME;
1006 /* Check if flags change. */
1007 if (la->la_valid & LA_FLAGS) {
1008 unsigned int oldflags = 0;
1009 unsigned int newflags = la->la_flags &
1010 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1012 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013 !mdd_capable(uc, CFS_CAP_FOWNER))
1016 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017 * only be changed by the relevant capability. */
1018 if (mdd_is_immutable(obj))
1019 oldflags |= LUSTRE_IMMUTABLE_FL;
1020 if (mdd_is_append(obj))
1021 oldflags |= LUSTRE_APPEND_FL;
1022 if ((oldflags ^ newflags) &&
1023 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1026 if (!S_ISDIR(tmp_la->la_mode))
1027 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1030 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031 (la->la_valid & ~LA_FLAGS) &&
1032 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1035 /* Check for setting the obj time. */
1036 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1048 /* Make sure a caller can chmod. */
1049 if (la->la_valid & LA_MODE) {
1050 /* Bypass la_vaild == LA_MODE,
1051 * this is for changing file with SUID or SGID. */
1052 if ((la->la_valid & ~LA_MODE) &&
1053 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054 (uc->mu_fsuid != tmp_la->la_uid) &&
1055 !mdd_capable(uc, CFS_CAP_FOWNER))
1058 if (la->la_mode == (umode_t) -1)
1059 la->la_mode = tmp_la->la_mode;
1061 la->la_mode = (la->la_mode & S_IALLUGO) |
1062 (tmp_la->la_mode & ~S_IALLUGO);
1064 /* Also check the setgid bit! */
1065 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066 la->la_gid : tmp_la->la_gid) &&
1067 !mdd_capable(uc, CFS_CAP_FSETID))
1068 la->la_mode &= ~S_ISGID;
1070 la->la_mode = tmp_la->la_mode;
1073 /* Make sure a caller can chown. */
1074 if (la->la_valid & LA_UID) {
1075 if (la->la_uid == (uid_t) -1)
1076 la->la_uid = tmp_la->la_uid;
1077 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078 (la->la_uid != tmp_la->la_uid)) &&
1079 !mdd_capable(uc, CFS_CAP_CHOWN))
1082 /* If the user or group of a non-directory has been
1083 * changed by a non-root user, remove the setuid bit.
1084 * 19981026 David C Niemi <niemi@tux.org>
1086 * Changed this to apply to all users, including root,
1087 * to avoid some races. This is the behavior we had in
1088 * 2.0. The check for non-root was definitely wrong
1089 * for 2.2 anyway, as it should have been using
1090 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092 !S_ISDIR(tmp_la->la_mode)) {
1093 la->la_mode &= ~S_ISUID;
1094 la->la_valid |= LA_MODE;
1098 /* Make sure caller can chgrp. */
1099 if (la->la_valid & LA_GID) {
1100 if (la->la_gid == (gid_t) -1)
1101 la->la_gid = tmp_la->la_gid;
1102 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103 ((la->la_gid != tmp_la->la_gid) &&
1104 !lustre_in_group_p(uc, la->la_gid))) &&
1105 !mdd_capable(uc, CFS_CAP_CHOWN))
1108 /* Likewise, if the user or group of a non-directory
1109 * has been changed by a non-root user, remove the
1110 * setgid bit UNLESS there is no group execute bit
1111 * (this would be a file marked for mandatory
1112 * locking). 19981026 David C Niemi <niemi@tux.org>
1114 * Removed the fsuid check (see the comment above) --
1116 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118 la->la_mode &= ~S_ISGID;
1119 la->la_valid |= LA_MODE;
1123 /* For both Size-on-MDS case and truncate case,
1124 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126 * For SOM case, it is true, the MAY_WRITE perm has been checked
1127 * when open, no need check again. For truncate case, it is false,
1128 * the MAY_WRITE perm should be checked here. */
1129 if (ma->ma_attr_flags & MDS_SOM) {
1130 /* For the "Size-on-MDS" setattr update, merge coming
1131 * attributes with the set in the inode. BUG 10641 */
1132 if ((la->la_valid & LA_ATIME) &&
1133 (la->la_atime <= tmp_la->la_atime))
1134 la->la_valid &= ~LA_ATIME;
1136 /* OST attributes do not have a priority over MDS attributes,
1137 * so drop times if ctime is equal. */
1138 if ((la->la_valid & LA_CTIME) &&
1139 (la->la_ctime <= tmp_la->la_ctime))
1140 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1142 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144 (uc->mu_fsuid == tmp_la->la_uid)) &&
1145 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146 rc = mdd_permission_internal_locked(env, obj,
1153 if (la->la_valid & LA_CTIME) {
1154 /* The pure setattr, it has the priority over what is
1155 * already set, do not drop it if ctime is equal. */
1156 if (la->la_ctime < tmp_la->la_ctime)
1157 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1165 /** Store a data change changelog record
1166 * If this fails, we must fail the whole transaction; we don't
1167 * want the change to commit without the log entry.
1168 * \param mdd_obj - mdd_object of change
1169 * \param handle - transacion handle
1171 static int mdd_changelog_data_store(const struct lu_env *env,
1172 struct mdd_device *mdd,
1173 enum changelog_rec_type type,
1174 struct mdd_object *mdd_obj,
1175 struct thandle *handle)
1177 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178 struct llog_changelog_rec *rec;
1183 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1186 LASSERT(handle != NULL);
1187 LASSERT(mdd_obj != NULL);
1189 if ((type == CL_SETATTR) &&
1190 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191 /* Don't need multiple updates in this log */
1192 /* Don't check under lock - no big deal if we get an extra
1197 reclen = llog_data_len(sizeof(*rec));
1198 buf = mdd_buf_alloc(env, reclen);
1199 if (buf->lb_buf == NULL)
1201 rec = (struct llog_changelog_rec *)buf->lb_buf;
1203 rec->cr.cr_flags = CLF_VERSION;
1204 rec->cr.cr_type = (__u32)type;
1205 rec->cr.cr_tfid = *tfid;
1206 rec->cr.cr_namelen = 0;
1207 mdd_obj->mod_cltime = cfs_time_current_64();
1209 rc = mdd_changelog_llog_write(mdd, rec, handle);
1211 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212 rc, type, PFID(tfid));
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221 const struct md_attr *ma)
1223 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224 struct mdd_device *mdd = mdo2mdd(obj);
1225 struct thandle *handle;
1226 struct lov_mds_md *lmm = NULL;
1227 struct llog_cookie *logcookies = NULL;
1228 int rc, lmm_size = 0, cookie_size = 0;
1229 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231 struct obd_device *obd = mdd->mdd_obd_dev;
1232 struct mds_obd *mds = &obd->u.mds;
1233 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1234 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1235 int quota_opc = 0, block_count = 0;
1236 int inode_pending[MAXQUOTAS] = { 0, 0 };
1237 int block_pending[MAXQUOTAS] = { 0, 0 };
1241 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1242 MDD_TXN_ATTR_SET_OP);
1243 handle = mdd_trans_start(env, mdd);
1245 RETURN(PTR_ERR(handle));
1246 /*TODO: add lock here*/
1247 /* start a log jounal handle if needed */
1248 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1249 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1250 lmm_size = mdd_lov_mdsize(env, mdd);
1251 lmm = mdd_max_lmm_get(env, mdd);
1253 GOTO(cleanup, rc = -ENOMEM);
1255 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1262 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1263 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1264 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1266 *la_copy = ma->ma_attr;
1267 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1271 #ifdef HAVE_QUOTA_SUPPORT
1272 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1273 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1275 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1277 quota_opc = FSFILT_OP_SETATTR;
1278 mdd_quota_wrapper(la_copy, qnids);
1279 mdd_quota_wrapper(la_tmp, qoids);
1280 /* get file quota for new owner */
1281 lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1282 inode_pending, 1, NULL, 0, NULL, 0);
1283 block_count = (la_tmp->la_blocks + 7) >> 3;
1286 mdd_data_get(env, mdd_obj, &data);
1287 /* get block quota for new owner */
1288 lquota_chkquota(mds_quota_interface_ref, obd,
1289 qnids, block_pending,
1291 LQUOTA_FLAGS_BLK, data, 1);
1297 if (la_copy->la_valid & LA_FLAGS) {
1298 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1301 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1302 } else if (la_copy->la_valid) { /* setattr */
1303 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1305 /* journal chown/chgrp in llog, just like unlink */
1306 if (rc == 0 && lmm_size){
1307 cookie_size = mdd_lov_cookiesize(env, mdd);
1308 logcookies = mdd_max_cookie_get(env, mdd);
1309 if (logcookies == NULL)
1310 GOTO(cleanup, rc = -ENOMEM);
1312 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1313 logcookies, cookie_size) <= 0)
1318 if (rc == 0 && ma->ma_valid & MA_LOV) {
1321 mode = mdd_object_type(mdd_obj);
1322 if (S_ISREG(mode) || S_ISDIR(mode)) {
1323 rc = mdd_lsm_sanity_check(env, mdd_obj);
1327 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1328 ma->ma_lmm_size, handle, 1);
1333 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1334 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1336 mdd_trans_stop(env, mdd, rc, handle);
1337 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1338 /*set obd attr, if needed*/
1339 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1342 #ifdef HAVE_QUOTA_SUPPORT
1344 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1346 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1348 /* Trigger dqrel/dqacq for original owner and new owner.
1349 * If failed, the next call for lquota_chkquota will
1351 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1358 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1359 const struct lu_buf *buf, const char *name, int fl,
1360 struct thandle *handle)
1365 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1366 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1367 mdd_write_unlock(env, obj);
1372 static int mdd_xattr_sanity_check(const struct lu_env *env,
1373 struct mdd_object *obj)
1375 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1376 struct md_ucred *uc = md_ucred(env);
1380 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1383 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1387 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1388 !mdd_capable(uc, CFS_CAP_FOWNER))
1395 * The caller should guarantee to update the object ctime
1396 * after xattr_set if needed.
1398 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1399 const struct lu_buf *buf, const char *name,
1402 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1403 struct mdd_device *mdd = mdo2mdd(obj);
1404 struct thandle *handle;
1408 rc = mdd_xattr_sanity_check(env, mdd_obj);
1412 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1413 handle = mdd_trans_start(env, mdd);
1415 RETURN(PTR_ERR(handle));
1417 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1419 /* Only record user xattr changes */
1420 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1421 (strncmp("user.", name, 5) == 0))
1422 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1424 mdd_trans_stop(env, mdd, rc, handle);
1430 * The caller should guarantee to update the object ctime
1431 * after xattr_set if needed.
1433 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1436 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1437 struct mdd_device *mdd = mdo2mdd(obj);
1438 struct thandle *handle;
1442 rc = mdd_xattr_sanity_check(env, mdd_obj);
1446 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1447 handle = mdd_trans_start(env, mdd);
1449 RETURN(PTR_ERR(handle));
1451 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1452 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1453 mdd_object_capa(env, mdd_obj));
1454 mdd_write_unlock(env, mdd_obj);
1456 /* Only record user xattr changes */
1457 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1458 (strncmp("user.", name, 5) != 0))
1459 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1462 mdd_trans_stop(env, mdd, rc, handle);
1467 /* partial unlink */
1468 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1471 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1472 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1473 struct mdd_device *mdd = mdo2mdd(obj);
1474 struct thandle *handle;
1475 #ifdef HAVE_QUOTA_SUPPORT
1476 struct obd_device *obd = mdd->mdd_obd_dev;
1477 struct mds_obd *mds = &obd->u.mds;
1478 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1485 * Check -ENOENT early here because we need to get object type
1486 * to calculate credits before transaction start
1488 if (!mdd_object_exists(mdd_obj))
1491 LASSERT(mdd_object_exists(mdd_obj) > 0);
1493 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1497 handle = mdd_trans_start(env, mdd);
1501 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1503 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1507 __mdd_ref_del(env, mdd_obj, handle, 0);
1509 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1511 __mdd_ref_del(env, mdd_obj, handle, 1);
1514 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1515 la_copy->la_ctime = ma->ma_attr.la_ctime;
1517 la_copy->la_valid = LA_CTIME;
1518 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1522 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1523 #ifdef HAVE_QUOTA_SUPPORT
1524 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1525 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1526 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1527 mdd_quota_wrapper(&ma->ma_attr, qids);
1534 mdd_write_unlock(env, mdd_obj);
1535 mdd_trans_stop(env, mdd, rc, handle);
1536 #ifdef HAVE_QUOTA_SUPPORT
1538 /* Trigger dqrel on the owner of child. If failed,
1539 * the next call for lquota_chkquota will process it */
1540 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1546 /* partial operation */
1547 static int mdd_oc_sanity_check(const struct lu_env *env,
1548 struct mdd_object *obj,
1554 switch (ma->ma_attr.la_mode & S_IFMT) {
1571 static int mdd_object_create(const struct lu_env *env,
1572 struct md_object *obj,
1573 const struct md_op_spec *spec,
1577 struct mdd_device *mdd = mdo2mdd(obj);
1578 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1579 const struct lu_fid *pfid = spec->u.sp_pfid;
1580 struct thandle *handle;
1581 #ifdef HAVE_QUOTA_SUPPORT
1582 struct obd_device *obd = mdd->mdd_obd_dev;
1583 struct mds_obd *mds = &obd->u.mds;
1584 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1585 int quota_opc = 0, block_count = 0;
1586 int inode_pending[MAXQUOTAS] = { 0, 0 };
1587 int block_pending[MAXQUOTAS] = { 0, 0 };
1592 #ifdef HAVE_QUOTA_SUPPORT
1593 if (mds->mds_quota) {
1594 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1595 mdd_quota_wrapper(&ma->ma_attr, qids);
1596 /* get file quota for child */
1597 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1598 inode_pending, 1, NULL, 0, NULL, 0);
1599 switch (ma->ma_attr.la_mode & S_IFMT) {
1608 /* get block quota for child */
1610 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1611 block_pending, block_count, NULL,
1612 LQUOTA_FLAGS_BLK, NULL, 0);
1616 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1617 handle = mdd_trans_start(env, mdd);
1619 GOTO(out_pending, rc = PTR_ERR(handle));
1621 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1622 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1626 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1630 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1631 /* If creating the slave object, set slave EA here. */
1632 int lmv_size = spec->u.sp_ea.eadatalen;
1633 struct lmv_stripe_md *lmv;
1635 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1636 LASSERT(lmv != NULL && lmv_size > 0);
1638 rc = __mdd_xattr_set(env, mdd_obj,
1639 mdd_buf_get_const(env, lmv, lmv_size),
1640 XATTR_NAME_LMV, 0, handle);
1644 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1647 #ifdef CONFIG_FS_POSIX_ACL
1648 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1649 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1651 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1652 buf->lb_len = spec->u.sp_ea.eadatalen;
1653 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1654 rc = __mdd_acl_init(env, mdd_obj, buf,
1655 &ma->ma_attr.la_mode,
1660 ma->ma_attr.la_valid |= LA_MODE;
1663 pfid = spec->u.sp_ea.fid;
1666 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1672 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1673 mdd_write_unlock(env, mdd_obj);
1675 mdd_trans_stop(env, mdd, rc, handle);
1677 #ifdef HAVE_QUOTA_SUPPORT
1679 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1681 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1683 /* Trigger dqacq on the owner of child. If failed,
1684 * the next call for lquota_chkquota will process it. */
1685 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1693 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1694 const struct md_attr *ma)
1696 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1697 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1698 struct mdd_device *mdd = mdo2mdd(obj);
1699 struct thandle *handle;
1703 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1704 handle = mdd_trans_start(env, mdd);
1708 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1709 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1711 __mdd_ref_add(env, mdd_obj, handle);
1712 mdd_write_unlock(env, mdd_obj);
1714 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1715 la_copy->la_ctime = ma->ma_attr.la_ctime;
1717 la_copy->la_valid = LA_CTIME;
1718 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1721 mdd_trans_stop(env, mdd, 0, handle);
1727 * do NOT or the MAY_*'s, you'll get the weakest
1729 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1733 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1734 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1735 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1736 * owner can write to a file even if it is marked readonly to hide
1737 * its brokenness. (bug 5781) */
1738 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1739 struct md_ucred *uc = md_ucred(env);
1741 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1742 (la->la_uid == uc->mu_fsuid))
1746 if (flags & FMODE_READ)
1748 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1750 if (flags & MDS_FMODE_EXEC)
1755 static int mdd_open_sanity_check(const struct lu_env *env,
1756 struct mdd_object *obj, int flag)
1758 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1763 if (mdd_is_dead_obj(obj))
1766 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1770 if (S_ISLNK(tmp_la->la_mode))
1773 mode = accmode(env, tmp_la, flag);
1775 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1778 if (!(flag & MDS_OPEN_CREATED)) {
1779 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1784 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1785 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1786 flag &= ~MDS_OPEN_TRUNC;
1788 /* For writing append-only file must open it with append mode. */
1789 if (mdd_is_append(obj)) {
1790 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1792 if (flag & MDS_OPEN_TRUNC)
1798 * Now, flag -- O_NOATIME does not be packed by client.
1800 if (flag & O_NOATIME) {
1801 struct md_ucred *uc = md_ucred(env);
1803 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1804 (uc->mu_valid == UCRED_NEW)) &&
1805 (uc->mu_fsuid != tmp_la->la_uid) &&
1806 !mdd_capable(uc, CFS_CAP_FOWNER))
1814 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1817 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1820 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1822 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1824 mdd_obj->mod_count++;
1826 mdd_write_unlock(env, mdd_obj);
1830 /* return md_attr back,
1831 * if it is last unlink then return lov ea + llog cookie*/
1832 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1838 if (S_ISREG(mdd_object_type(obj))) {
1839 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1840 * Caller must be ready for that. */
1842 rc = __mdd_lmm_get(env, obj, ma);
1843 if ((ma->ma_valid & MA_LOV))
1844 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1851 * No permission check is needed.
1853 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1856 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1857 struct mdd_device *mdd = mdo2mdd(obj);
1858 struct thandle *handle;
1862 #ifdef HAVE_QUOTA_SUPPORT
1863 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1864 struct mds_obd *mds = &obd->u.mds;
1865 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1870 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1873 handle = mdd_trans_start(env, mdo2mdd(obj));
1875 RETURN(PTR_ERR(handle));
1877 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1878 /* release open count */
1879 mdd_obj->mod_count --;
1881 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1882 /* remove link to object from orphan index */
1883 rc = __mdd_orphan_del(env, mdd_obj, handle);
1885 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1886 "list, OSS objects to be destroyed.\n",
1887 PFID(mdd_object_fid(mdd_obj)));
1889 CERROR("Object "DFID" can not be deleted from orphan "
1890 "list, maybe cause OST objects can not be "
1891 "destroyed (err: %d).\n",
1892 PFID(mdd_object_fid(mdd_obj)), rc);
1893 /* If object was not deleted from orphan list, do not
1894 * destroy OSS objects, which will be done when next
1900 rc = mdd_iattr_get(env, mdd_obj, ma);
1901 /* Object maybe not in orphan list originally, it is rare case for
1902 * mdd_finish_unlink() failure. */
1903 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1904 #ifdef HAVE_QUOTA_SUPPORT
1905 if (mds->mds_quota) {
1906 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1907 mdd_quota_wrapper(&ma->ma_attr, qids);
1910 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1911 if (ma->ma_valid & MA_FLAGS &&
1912 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1913 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1915 rc = mdd_object_kill(env, mdd_obj, ma);
1921 CERROR("Error when prepare to delete Object "DFID" , "
1922 "which will cause OST objects can not be "
1923 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
1929 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1931 mdd_write_unlock(env, mdd_obj);
1932 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1933 #ifdef HAVE_QUOTA_SUPPORT
1935 /* Trigger dqrel on the owner of child. If failed,
1936 * the next call for lquota_chkquota will process it */
1937 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1944 * Permission check is done when open,
1945 * no need check again.
1947 static int mdd_readpage_sanity_check(const struct lu_env *env,
1948 struct mdd_object *obj)
1950 struct dt_object *next = mdd_object_child(obj);
1954 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1962 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1963 int first, void *area, int nob,
1964 const struct dt_it_ops *iops, struct dt_it *it,
1965 __u64 *start, __u64 *end,
1966 struct lu_dirent **last, __u32 attr)
1970 struct lu_dirent *ent;
1973 memset(area, 0, sizeof (struct lu_dirpage));
1974 area += sizeof (struct lu_dirpage);
1975 nob -= sizeof (struct lu_dirpage);
1983 len = iops->key_size(env, it);
1985 /* IAM iterator can return record with zero len. */
1989 hash = iops->store(env, it);
1990 if (unlikely(first)) {
1995 /* calculate max space required for lu_dirent */
1996 recsize = lu_dirent_calc_size(len, attr);
1998 if (nob >= recsize) {
1999 result = iops->rec(env, it, ent, attr);
2000 if (result == -ESTALE)
2005 /* osd might not able to pack all attributes,
2006 * so recheck rec length */
2007 recsize = le16_to_cpu(ent->lde_reclen);
2010 * record doesn't fit into page, enlarge previous one.
2013 (*last)->lde_reclen =
2014 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2023 ent = (void *)ent + recsize;
2027 result = iops->next(env, it);
2028 if (result == -ESTALE)
2030 } while (result == 0);
2037 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2038 const struct lu_rdpg *rdpg)
2041 struct dt_object *next = mdd_object_child(obj);
2042 const struct dt_it_ops *iops;
2044 struct lu_dirent *last = NULL;
2045 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2052 LASSERT(rdpg->rp_pages != NULL);
2053 LASSERT(next->do_index_ops != NULL);
2055 if (rdpg->rp_count <= 0)
2059 * iterate through directory and fill pages from @rdpg
2061 iops = &next->do_index_ops->dio_it;
2062 it = iops->init(env, next, mdd_object_capa(env, obj));
2066 rc = iops->load(env, it, rdpg->rp_hash);
2070 * Iterator didn't find record with exactly the key requested.
2072 * It is currently either
2074 * - positioned above record with key less than
2075 * requested---skip it.
2077 * - or not positioned at all (is in IAM_IT_SKEWED
2078 * state)---position it on the next item.
2080 rc = iops->next(env, it);
2085 * At this point and across for-loop:
2087 * rc == 0 -> ok, proceed.
2088 * rc > 0 -> end of directory.
2091 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2092 i++, nob -= CFS_PAGE_SIZE) {
2093 LASSERT(i < rdpg->rp_npages);
2094 pg = rdpg->rp_pages[i];
2095 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2096 min_t(int, nob, CFS_PAGE_SIZE), iops,
2097 it, &hash_start, &hash_end, &last,
2099 if (rc != 0 || i == rdpg->rp_npages - 1) {
2101 last->lde_reclen = 0;
2109 hash_end = DIR_END_OFF;
2113 struct lu_dirpage *dp;
2115 dp = cfs_kmap(rdpg->rp_pages[0]);
2116 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2117 dp->ldp_hash_end = cpu_to_le64(hash_end);
2120 * No pages were processed, mark this.
2122 dp->ldp_flags |= LDF_EMPTY;
2124 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2125 cfs_kunmap(rdpg->rp_pages[0]);
2128 iops->fini(env, it);
2133 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2134 const struct lu_rdpg *rdpg)
2136 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2140 LASSERT(mdd_object_exists(mdd_obj));
2142 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2143 rc = mdd_readpage_sanity_check(env, mdd_obj);
2145 GOTO(out_unlock, rc);
2147 if (mdd_is_dead_obj(mdd_obj)) {
2149 struct lu_dirpage *dp;
2152 * According to POSIX, please do not return any entry to client:
2153 * even dot and dotdot should not be returned.
2155 CWARN("readdir from dead object: "DFID"\n",
2156 PFID(mdd_object_fid(mdd_obj)));
2158 if (rdpg->rp_count <= 0)
2159 GOTO(out_unlock, rc = -EFAULT);
2160 LASSERT(rdpg->rp_pages != NULL);
2162 pg = rdpg->rp_pages[0];
2163 dp = (struct lu_dirpage*)cfs_kmap(pg);
2164 memset(dp, 0 , sizeof(struct lu_dirpage));
2165 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2166 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2167 dp->ldp_flags |= LDF_EMPTY;
2168 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2170 GOTO(out_unlock, rc = 0);
2173 rc = __mdd_readpage(env, mdd_obj, rdpg);
2177 mdd_read_unlock(env, mdd_obj);
2181 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2183 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2184 struct dt_object *next;
2186 LASSERT(mdd_object_exists(mdd_obj));
2187 next = mdd_object_child(mdd_obj);
2188 return next->do_ops->do_object_sync(env, next);
2191 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2192 struct md_object *obj)
2194 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2196 LASSERT(mdd_object_exists(mdd_obj));
2197 return do_version_get(env, mdd_object_child(mdd_obj));
2200 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2201 dt_obj_version_t version)
2203 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2205 LASSERT(mdd_object_exists(mdd_obj));
2206 return do_version_set(env, mdd_object_child(mdd_obj), version);
2209 const struct md_object_operations mdd_obj_ops = {
2210 .moo_permission = mdd_permission,
2211 .moo_attr_get = mdd_attr_get,
2212 .moo_attr_set = mdd_attr_set,
2213 .moo_xattr_get = mdd_xattr_get,
2214 .moo_xattr_set = mdd_xattr_set,
2215 .moo_xattr_list = mdd_xattr_list,
2216 .moo_xattr_del = mdd_xattr_del,
2217 .moo_object_create = mdd_object_create,
2218 .moo_ref_add = mdd_ref_add,
2219 .moo_ref_del = mdd_ref_del,
2220 .moo_open = mdd_open,
2221 .moo_close = mdd_close,
2222 .moo_readpage = mdd_readpage,
2223 .moo_readlink = mdd_readlink,
2224 .moo_capa_get = mdd_capa_get,
2225 .moo_object_sync = mdd_object_sync,
2226 .moo_version_get = mdd_version_get,
2227 .moo_version_set = mdd_version_set,
2228 .moo_path = mdd_path,