1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
45 #include "mdd_internal.h"
48 static struct thandle* mdd_trans_start(const struct lu_env *env,
50 static void mdd_trans_stop(const struct lu_env *env,
51 struct mdd_device *mdd, int rc,
52 struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
55 struct thandle *handle);
56 static void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
57 struct thandle *handle);
58 static int __mdd_lookup(const struct lu_env *env,
59 struct md_object *pobj,
60 const char *name, const struct lu_fid* fid,
62 static int __mdd_lookup_locked(const struct lu_env *env,
63 struct md_object *pobj,
64 const char *name, const struct lu_fid* fid,
67 static int mdd_exec_permission_lite(const struct lu_env *env,
68 struct mdd_object *obj);
70 static int __mdd_permission_internal(const struct lu_env *env,
71 struct mdd_object *obj,
72 int mask, int getattr);
74 static struct md_object_operations mdd_obj_ops;
75 static struct md_dir_operations mdd_dir_ops;
76 static struct lu_object_operations mdd_lu_obj_ops;
78 static struct lu_context_key mdd_thread_key;
80 static const char *mdd_root_dir_name = "root";
81 static const char dot[] = ".";
82 static const char dotdot[] = "..";
85 MDD_TXN_OBJECT_DESTROY_OP,
86 MDD_TXN_OBJECT_CREATE_OP,
89 MDD_TXN_INDEX_INSERT_OP,
90 MDD_TXN_INDEX_DELETE_OP,
94 MDD_TXN_RENAME_TGT_OP,
95 MDD_TXN_CREATE_DATA_OP,
99 struct mdd_txn_op_descr {
100 enum mdd_txn_op mod_op;
101 unsigned int mod_credits;
105 MDD_TXN_OBJECT_DESTROY_CREDITS = 0,
106 MDD_TXN_OBJECT_CREATE_CREDITS = 0,
107 MDD_TXN_ATTR_SET_CREDITS = 0,
108 MDD_TXN_XATTR_SET_CREDITS = 0,
109 MDD_TXN_INDEX_INSERT_CREDITS = 0,
110 MDD_TXN_INDEX_DELETE_CREDITS = 0,
111 MDD_TXN_LINK_CREDITS = 0,
112 MDD_TXN_UNLINK_CREDITS = 0,
113 MDD_TXN_RENAME_CREDITS = 0,
114 MDD_TXN_RENAME_TGT_CREDITS = 0,
115 MDD_TXN_CREATE_DATA_CREDITS = 0,
116 MDD_TXN_MKDIR_CREDITS = 0
119 #define DEFINE_MDD_TXN_OP_ARRAY(opname, base) \
120 [opname ## _OP - base ## _OP]= { \
121 .mod_op = opname ## _OP, \
122 .mod_credits = opname ## _CREDITS, \
126 * number of blocks to reserve for particular operations. Should be function
127 * of ... something. Stub for now.
130 #define DEFINE_MDD_TXN_OP_DESC(opname) \
131 DEFINE_MDD_TXN_OP_ARRAY(opname, MDD_TXN_OBJECT_DESTROY)
133 static struct mdd_txn_op_descr mdd_txn_descrs[] = {
134 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY),
135 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE),
136 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET),
137 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET),
138 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT),
139 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE),
140 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK),
141 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK),
142 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME),
143 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME_TGT),
144 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA),
145 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR)
148 static void mdd_txn_param_build(const struct lu_env *env, enum mdd_txn_op op)
150 LASSERT(0 <= op && op < ARRAY_SIZE(mdd_txn_descrs));
152 mdd_env_info(env)->mti_param.tp_credits =
153 mdd_txn_descrs[op].mod_credits;
156 static int mdd_credit_get(const struct lu_env *env, struct mdd_device *mdd,
160 credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child,
162 LASSERT(credits > 0);
166 /* XXX: we should calculate it by lsm count, not ost count. */
167 int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd)
169 struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
170 int ost_count = mds->mds_lov_desc.ld_tgt_count;
172 int index_create_credits;
173 int index_delete_credits;
183 /* Init credits for each ops. */
184 num_entries = ARRAY_SIZE(mdd_txn_descrs);
185 LASSERT(num_entries > 0);
187 /* Init the basic credits from osd layer. */
188 index_create_credits = mdd_credit_get(env, mdd, DTO_INDEX_INSERT);
189 index_delete_credits = mdd_credit_get(env, mdd, DTO_INDEX_DELETE);
190 log_credits = mdd_credit_get(env, mdd, DTO_LOG_REC);
191 attr_credits = mdd_credit_get(env, mdd, DTO_ATTR_SET);
192 xattr_credits = mdd_credit_get(env, mdd, DTO_XATTR_SET);
193 create_credits = mdd_credit_get(env, mdd, DTO_OBJECT_CREATE);
194 destroy_credits = mdd_credit_get(env, mdd, DTO_OBJECT_DELETE);
196 /* Calculate the mdd credits. */
197 for (i = 0; i < num_entries; i++) {
198 int opcode = mdd_txn_descrs[i].mod_op;
199 int *c = &mdd_txn_descrs[i].mod_credits;
201 case MDD_TXN_OBJECT_DESTROY_OP:
202 *c = destroy_credits;
204 case MDD_TXN_OBJECT_CREATE_OP:
205 /* OI_INSERT + CREATE OBJECT */
206 *c = index_create_credits + create_credits;
208 case MDD_TXN_ATTR_SET_OP:
209 /* ATTR set + XATTR(lsm, lmv) set */
210 *c = attr_credits + xattr_credits;
212 case MDD_TXN_XATTR_SET_OP:
215 case MDD_TXN_INDEX_INSERT_OP:
216 *c = index_create_credits;
218 case MDD_TXN_INDEX_DELETE_OP:
219 *c = index_delete_credits;
221 case MDD_TXN_LINK_OP:
222 *c = index_create_credits;
224 case MDD_TXN_UNLINK_OP:
225 /* delete index + Unlink log */
226 *c = index_delete_credits +
227 log_credits * ost_count;
229 case MDD_TXN_RENAME_OP:
230 /* 2 delete index + 1 insert + Unlink log */
231 *c = 2 * index_delete_credits +
232 index_create_credits +
233 log_credits * ost_count;
235 case MDD_TXN_RENAME_TGT_OP:
236 /* index insert + index delete */
237 *c = index_delete_credits +
238 index_create_credits;
240 case MDD_TXN_CREATE_DATA_OP:
241 /* same as set xattr(lsm) */
244 case MDD_TXN_MKDIR_OP:
245 /* INDEX INSERT + OI INSERT + CREATE_OBJECT_CREDITS
246 * SET_MD CREDITS is already counted in
247 * CREATE_OBJECT CREDITS
249 *c = 2 * index_create_credits + create_credits;
252 CERROR("Invalid op %d init its credit\n",
260 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
264 buf = &mdd_env_info(env)->mti_buf;
270 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
271 const void *area, ssize_t len)
275 buf = &mdd_env_info(env)->mti_buf;
276 buf->lb_buf = (void *)area;
281 #define mdd_get_group_info(group_info) do { \
282 atomic_inc(&(group_info)->usage); \
285 #define mdd_put_group_info(group_info) do { \
286 if (atomic_dec_and_test(&(group_info)->usage)) \
287 groups_free(group_info); \
290 #define MDD_NGROUPS_PER_BLOCK ((int)(CFS_PAGE_SIZE / sizeof(gid_t)))
292 #define MDD_GROUP_AT(gi, i) \
293 ((gi)->blocks[(i) / MDD_NGROUPS_PER_BLOCK][(i) % MDD_NGROUPS_PER_BLOCK])
295 /* groups_search() is copied from linux kernel! */
296 /* a simple bsearch */
297 static int mdd_groups_search(struct group_info *group_info, gid_t grp)
305 right = group_info->ngroups;
306 while (left < right) {
307 int mid = (left + right) / 2;
308 int cmp = grp - MDD_GROUP_AT(group_info, mid);
320 static int mdd_in_group_p(struct md_ucred *uc, gid_t grp)
324 if (grp != uc->mu_fsgid) {
325 struct group_info *group_info = NULL;
327 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD) ||
328 (!uc->mu_ginfo && !uc->mu_identity))
329 if ((grp == uc->mu_suppgids[0]) ||
330 (grp == uc->mu_suppgids[1]))
334 group_info = uc->mu_ginfo;
335 else if (uc->mu_identity)
336 group_info = uc->mu_identity->mi_ginfo;
341 mdd_get_group_info(group_info);
342 rc = mdd_groups_search(group_info, grp);
343 mdd_put_group_info(group_info);
348 static inline int mdd_permission_internal(const struct lu_env *env,
349 struct mdd_object *obj, int mask)
351 return __mdd_permission_internal(env, obj, mask, 1);
354 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
356 struct mdd_thread_info *info;
358 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
359 LASSERT(info != NULL);
363 static struct lu_object *mdd_object_alloc(const struct lu_env *env,
364 const struct lu_object_header *hdr,
367 struct mdd_object *mdd_obj;
369 OBD_ALLOC_PTR(mdd_obj);
370 if (mdd_obj != NULL) {
373 o = mdd2lu_obj(mdd_obj);
374 lu_object_init(o, NULL, d);
375 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
376 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
377 mdd_obj->mod_count = 0;
378 o->lo_ops = &mdd_lu_obj_ops;
385 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
387 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
388 struct lu_object *below;
389 struct lu_device *under;
392 under = &d->mdd_child->dd_lu_dev;
393 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
398 lu_object_add(o, below);
402 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj);
404 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
406 if (lu_object_exists(o))
407 return mdd_get_flags(env, lu2mdd_obj(o));
412 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
414 struct mdd_object *mdd = lu2mdd_obj(o);
420 static int mdd_object_print(const struct lu_env *env, void *cookie,
421 lu_printer_t p, const struct lu_object *o)
423 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
426 /* orphan handling is here */
427 static void mdd_object_delete(const struct lu_env *env,
430 struct mdd_object *mdd_obj = lu2mdd_obj(o);
431 struct thandle *handle = NULL;
434 if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
437 if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
438 mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP);
439 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
441 CERROR("Cannot get thandle\n");
443 mdd_write_lock(env, mdd_obj);
444 /* let's remove obj from the orphan list */
445 __mdd_orphan_del(env, mdd_obj, handle);
446 mdd_write_unlock(env, mdd_obj);
447 mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
453 static struct lu_object_operations mdd_lu_obj_ops = {
454 .loo_object_init = mdd_object_init,
455 .loo_object_start = mdd_object_start,
456 .loo_object_free = mdd_object_free,
457 .loo_object_print = mdd_object_print,
458 .loo_object_delete = mdd_object_delete
461 struct mdd_object *mdd_object_find(const struct lu_env *env,
462 struct mdd_device *d,
463 const struct lu_fid *f)
465 struct lu_object *o, *lo;
466 struct mdd_object *m;
469 o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
471 m = (struct mdd_object *)o;
473 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
474 /* remote object can't be located and should be put then */
476 lu_object_put(env, o);
482 static inline int mdd_is_immutable(struct mdd_object *obj)
484 return obj->mod_flags & IMMUTE_OBJ;
487 static inline int mdd_is_append(struct mdd_object *obj)
489 return obj->mod_flags & APPEND_OBJ;
492 static inline void mdd_set_dead_obj(struct mdd_object *obj)
495 obj->mod_flags |= DEAD_OBJ;
498 static inline int mdd_is_dead_obj(struct mdd_object *obj)
500 return obj && obj->mod_flags & DEAD_OBJ;
503 /*Check whether it may create the cobj under the pobj*/
504 static int mdd_may_create(const struct lu_env *env,
505 struct mdd_object *pobj, struct mdd_object *cobj,
511 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
514 if (mdd_is_dead_obj(pobj))
517 /*check pobj may create or not*/
519 rc = mdd_permission_internal(env, pobj,
520 MAY_WRITE | MAY_EXEC);
525 static inline int __mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
526 struct lu_attr *la, struct lustre_capa *capa)
528 struct dt_object *next = mdd_object_child(obj);
529 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
530 return next->do_ops->do_attr_get(env, next, la, capa);
533 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
535 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
537 if (flags & LUSTRE_APPEND_FL)
538 obj->mod_flags |= APPEND_OBJ;
540 if (flags & LUSTRE_IMMUTABLE_FL)
541 obj->mod_flags |= IMMUTE_OBJ;
544 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
546 struct lu_attr *la = &mdd_env_info(env)->mti_la;
550 mdd_read_lock(env, obj);
551 rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
552 mdd_read_unlock(env, obj);
554 mdd_flags_xlate(obj, la->la_flags);
558 #define mdd_cap_t(x) (x)
560 #define MDD_CAP_TO_MASK(x) (1 << (x))
562 #define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
564 /* capable() is copied from linux kernel! */
565 static inline int mdd_capable(struct md_ucred *uc, int cap)
567 if (mdd_cap_raised(uc->mu_cap, cap))
573 * It's inline, so penalty for filesystems that don't use sticky bit is
576 static inline int mdd_is_sticky(const struct lu_env *env,
577 struct mdd_object *pobj,
578 struct mdd_object *cobj)
580 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
581 struct md_ucred *uc = md_ucred(env);
584 rc = __mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
587 } else if (tmp_la->la_uid == uc->mu_fsuid) {
590 rc = __mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
593 else if (!(tmp_la->la_mode & S_ISVTX))
595 else if (tmp_la->la_uid == uc->mu_fsuid)
598 return !mdd_capable(uc, CAP_FOWNER);
602 /* Check whether it may delete the cobj under the pobj. */
603 static int mdd_may_delete(const struct lu_env *env,
604 struct mdd_object *pobj,
605 struct mdd_object *cobj,
606 int is_dir, int need_check)
608 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
614 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
617 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
621 if (!S_ISDIR(mdd_object_type(cobj)))
624 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
627 } else if (S_ISDIR(mdd_object_type(cobj))) {
632 if (mdd_is_dead_obj(pobj))
635 if (mdd_is_sticky(env, pobj, cobj))
639 rc = mdd_permission_internal(env, pobj,
640 MAY_WRITE | MAY_EXEC);
645 /* get only inode attributes */
646 static int __mdd_iattr_get(const struct lu_env *env,
647 struct mdd_object *mdd_obj, struct md_attr *ma)
652 rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr,
653 mdd_object_capa(env, mdd_obj));
655 ma->ma_valid = MA_INODE;
659 /* get lov EA only */
660 static int __mdd_lmm_get(const struct lu_env *env,
661 struct mdd_object *mdd_obj, struct md_attr *ma)
666 LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
667 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
670 ma->ma_valid |= MA_LOV;
677 static int __mdd_lmv_get(const struct lu_env *env,
678 struct mdd_object *mdd_obj, struct md_attr *ma)
682 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
685 ma->ma_valid |= MA_LMV;
691 #ifdef CONFIG_FS_POSIX_ACL
692 /* get default acl EA only */
693 static int __mdd_acl_def_get(const struct lu_env *env,
694 struct mdd_object *mdd_obj, struct md_attr *ma)
696 struct dt_object *next = mdd_object_child(mdd_obj);
699 rc = next->do_ops->do_xattr_get(env, next,
700 mdd_buf_get(env, ma->ma_lmv,
702 XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
704 ma->ma_lmv_size = rc;
705 ma->ma_valid |= MA_ACL_DEF;
707 } else if ((rc == -EOPNOTSUPP) || (rc == -ENODATA)) {
714 static int mdd_attr_get_internal(const struct lu_env *env,
715 struct mdd_object *mdd_obj,
721 if (ma->ma_need & MA_INODE)
722 rc = __mdd_iattr_get(env, mdd_obj, ma);
724 if (rc == 0 && ma->ma_need & MA_LOV) {
725 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726 S_ISDIR(mdd_object_type(mdd_obj)))
727 rc = __mdd_lmm_get(env, mdd_obj, ma);
729 if (rc == 0 && ma->ma_need & MA_LMV) {
730 if (S_ISDIR(mdd_object_type(mdd_obj)))
731 rc = __mdd_lmv_get(env, mdd_obj, ma);
733 #ifdef CONFIG_FS_POSIX_ACL
734 else if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
735 if (S_ISDIR(mdd_object_type(mdd_obj)))
736 rc = __mdd_acl_def_get(env, mdd_obj, ma);
739 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
744 static inline int mdd_attr_get_internal_locked(const struct lu_env *env,
745 struct mdd_object *mdd_obj,
749 mdd_read_lock(env, mdd_obj);
750 rc = mdd_attr_get_internal(env, mdd_obj, ma);
751 mdd_read_unlock(env, mdd_obj);
756 * No permission check is needed.
758 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
761 struct mdd_object *mdd_obj = md2mdd_obj(obj);
765 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
770 * No permission check is needed.
772 static int mdd_xattr_get(const struct lu_env *env,
773 struct md_object *obj, struct lu_buf *buf,
776 struct mdd_object *mdd_obj = md2mdd_obj(obj);
777 struct dt_object *next;
782 LASSERT(lu_object_exists(&obj->mo_lu));
784 next = mdd_object_child(mdd_obj);
785 mdd_read_lock(env, mdd_obj);
786 rc = next->do_ops->do_xattr_get(env, next, buf, name,
787 mdd_object_capa(env, mdd_obj));
788 mdd_read_unlock(env, mdd_obj);
794 * Permission check is done when open,
795 * no need check again.
797 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
800 struct mdd_object *mdd_obj = md2mdd_obj(obj);
801 struct dt_object *next;
806 LASSERT(lu_object_exists(&obj->mo_lu));
808 next = mdd_object_child(mdd_obj);
809 mdd_read_lock(env, mdd_obj);
810 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
811 mdd_object_capa(env, mdd_obj));
812 mdd_read_unlock(env, mdd_obj);
816 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
819 struct mdd_object *mdd_obj = md2mdd_obj(obj);
820 struct dt_object *next;
825 LASSERT(lu_object_exists(&obj->mo_lu));
827 next = mdd_object_child(mdd_obj);
828 mdd_read_lock(env, mdd_obj);
829 rc = next->do_ops->do_xattr_list(env, next, buf,
830 mdd_object_capa(env, mdd_obj));
831 mdd_read_unlock(env, mdd_obj);
836 static int mdd_txn_start_cb(const struct lu_env *env,
837 struct txn_param *param, void *cookie)
842 static int mdd_txn_stop_cb(const struct lu_env *env,
843 struct thandle *txn, void *cookie)
845 struct mdd_device *mdd = cookie;
846 struct obd_device *obd = mdd2obd_dev(mdd);
849 return mds_lov_write_objids(obd);
852 static int mdd_txn_commit_cb(const struct lu_env *env,
853 struct thandle *txn, void *cookie)
858 static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
859 const char *name, struct lu_device *next)
861 struct mdd_device *mdd = lu2mdd_dev(d);
862 struct dt_device *dt;
866 mdd->mdd_child = lu2dt_dev(next);
869 /* prepare transactions callbacks */
870 mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
871 mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
872 mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
873 mdd->mdd_txn_cb.dtc_cookie = mdd;
877 static struct lu_device *mdd_device_fini(const struct lu_env *env,
880 struct mdd_device *mdd = lu2mdd_dev(d);
881 struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
886 static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
889 struct dt_object *root;
892 dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
893 root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
896 LASSERT(root != NULL);
897 lu_object_put(env, &root->do_lu);
898 rc = orph_index_init(env, mdd);
905 static void mdd_device_shutdown(const struct lu_env *env,
906 struct mdd_device *m)
908 dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
910 mdd_fini_obd(env, m);
911 orph_index_fini(env, m);
914 static int mdd_process_config(const struct lu_env *env,
915 struct lu_device *d, struct lustre_cfg *cfg)
917 struct mdd_device *m = lu2mdd_dev(d);
918 struct dt_device *dt = m->mdd_child;
919 struct lu_device *next = &dt->dd_lu_dev;
923 switch (cfg->lcfg_command) {
925 rc = next->ld_ops->ldo_process_config(env, next, cfg);
928 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
930 rc = mdd_init_obd(env, m, cfg);
932 CERROR("lov init error %d \n", rc);
935 rc = mdd_mount(env, m);
938 rc = mdd_txn_init_credits(env, m);
941 mdd_device_shutdown(env, m);
943 rc = next->ld_ops->ldo_process_config(env, next, cfg);
950 static int mdd_lov_set_nextid(const struct lu_env *env,
951 struct mdd_device *mdd)
953 struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
957 LASSERT(mds->mds_lov_objids != NULL);
958 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
959 KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
960 mds->mds_lov_objids, NULL);
965 static int mdd_cleanup_unlink_llog(const struct lu_env *env,
966 struct mdd_device *mdd)
968 /* XXX: to be implemented! */
973 static int mdd_recovery_complete(const struct lu_env *env,
976 struct mdd_device *mdd = lu2mdd_dev(d);
977 struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
978 struct obd_device *obd = mdd2obd_dev(mdd);
982 LASSERT(mdd != NULL);
983 LASSERT(obd != NULL);
985 /* XXX: Do we need this in new stack? */
986 rc = mdd_lov_set_nextid(env, mdd);
988 CERROR("mdd_lov_set_nextid() failed %d\n",
993 /* XXX: cleanup unlink. */
994 rc = mdd_cleanup_unlink_llog(env, mdd);
996 CERROR("mdd_cleanup_unlink_llog() failed %d\n",
1001 obd_notify(obd->u.mds.mds_osc_obd, NULL,
1002 (obd->obd_async_recov ?
1003 OBD_NOTIFY_SYNC_NONBLOCK :
1004 OBD_NOTIFY_SYNC), NULL);
1006 obd->obd_recovering = 0;
1007 obd->obd_type->typ_dt_ops->o_postrecov(obd);
1009 /* XXX: orphans handling. */
1010 __mdd_orphan_cleanup(env, mdd);
1011 rc = next->ld_ops->ldo_recovery_complete(env, next);
1016 struct lu_device_operations mdd_lu_ops = {
1017 .ldo_object_alloc = mdd_object_alloc,
1018 .ldo_process_config = mdd_process_config,
1019 .ldo_recovery_complete = mdd_recovery_complete
1022 void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
1024 struct dt_object *next = mdd_object_child(obj);
1026 next->do_ops->do_write_lock(env, next);
1029 void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
1031 struct dt_object *next = mdd_object_child(obj);
1033 next->do_ops->do_read_lock(env, next);
1036 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
1038 struct dt_object *next = mdd_object_child(obj);
1040 next->do_ops->do_write_unlock(env, next);
1043 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
1045 struct dt_object *next = mdd_object_child(obj);
1047 next->do_ops->do_read_unlock(env, next);
1050 static void mdd_lock2(const struct lu_env *env,
1051 struct mdd_object *o0, struct mdd_object *o1)
1053 mdd_write_lock(env, o0);
1054 mdd_write_lock(env, o1);
1057 static void mdd_unlock2(const struct lu_env *env,
1058 struct mdd_object *o0, struct mdd_object *o1)
1060 mdd_write_unlock(env, o1);
1061 mdd_write_unlock(env, o0);
1064 static struct thandle* mdd_trans_start(const struct lu_env *env,
1065 struct mdd_device *mdd)
1067 struct txn_param *p = &mdd_env_info(env)->mti_param;
1069 return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
1072 static void mdd_trans_stop(const struct lu_env *env,
1073 struct mdd_device *mdd, int result,
1074 struct thandle *handle)
1076 handle->th_result = result;
1077 mdd_child_ops(mdd)->dt_trans_stop(env, handle);
1080 static int __mdd_object_create(const struct lu_env *env,
1081 struct mdd_object *obj, struct md_attr *ma,
1082 struct thandle *handle)
1084 struct dt_object *next;
1085 struct lu_attr *attr = &ma->ma_attr;
1089 if (!lu_object_exists(mdd2lu_obj(obj))) {
1090 next = mdd_object_child(obj);
1091 rc = next->do_ops->do_create(env, next, attr, handle);
1095 LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
1100 #ifdef CONFIG_FS_POSIX_ACL
1101 #include <linux/posix_acl_xattr.h>
1102 #include <linux/posix_acl.h>
1105 * Modify the ACL for the chmod.
1107 static int mdd_posix_acl_chmod_masq(posix_acl_xattr_entry *entry,
1108 __u32 mode, int count)
1110 posix_acl_xattr_entry *group_obj = NULL, *mask_obj = NULL, *pa, *pe;
1113 pe = &entry[count - 1];
1114 for (; pa <= pe; pa++) {
1117 pa->e_perm = (mode & S_IRWXU) >> 6;
1133 pa->e_perm = (mode & S_IRWXO);
1142 mask_obj->e_perm = (mode & S_IRWXG) >> 3;
1146 group_obj->e_perm = (mode & S_IRWXG) >> 3;
1152 static int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o,
1153 __u32 mode, struct thandle *handle)
1155 struct dt_object *next;
1157 posix_acl_xattr_entry *entry;
1163 next = mdd_object_child(o);
1164 buf = &mdd_env_info(env)->mti_buf;
1165 buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
1166 buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
1167 rc = next->do_ops->do_xattr_get(env, next, buf,
1168 XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1169 if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
1175 entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
1176 entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
1177 if (entry_count <= 0)
1180 rc = mdd_posix_acl_chmod_masq(entry, mode, entry_count);
1184 rc = next->do_ops->do_xattr_set(env, next, buf, XATTR_NAME_ACL_ACCESS,
1185 0, handle, BYPASS_CAPA);
1190 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
1191 const struct lu_attr *attr, struct thandle *handle,
1194 struct dt_object *next;
1197 LASSERT(lu_object_exists(mdd2lu_obj(o)));
1198 next = mdd_object_child(o);
1199 rc = next->do_ops->do_attr_set(env, next, attr, handle,
1200 mdd_object_capa(env, o));
1201 #ifdef CONFIG_FS_POSIX_ACL
1202 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1203 rc = mdd_acl_chmod(env, o, attr->la_mode, handle);
1208 int mdd_attr_set_internal_locked(const struct lu_env *env,
1209 struct mdd_object *o,
1210 const struct lu_attr *attr,
1211 struct thandle *handle)
1214 mdd_write_lock(env, o);
1215 rc = mdd_attr_set_internal(env, o, attr, handle, 1);
1216 mdd_write_unlock(env, o);
1220 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
1221 const struct lu_buf *buf, const char *name,
1222 int fl, struct thandle *handle)
1224 struct dt_object *next;
1225 struct lustre_capa *capa = mdd_object_capa(env, o);
1229 LASSERT(lu_object_exists(mdd2lu_obj(o)));
1230 next = mdd_object_child(o);
1231 if (buf->lb_buf && buf->lb_len > 0) {
1232 rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle,
1234 } else if (buf->lb_buf == NULL && buf->lb_len == 0) {
1235 rc = next->do_ops->do_xattr_del(env, next, name, handle, capa);
1240 /* this gives the same functionality as the code between
1241 * sys_chmod and inode_setattr
1242 * chown_common and inode_setattr
1243 * utimes and inode_setattr
1244 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1247 int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1250 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1251 struct md_ucred *uc = md_ucred(env);
1252 time_t now = CURRENT_SECONDS;
1259 /* Do not permit change file type */
1260 if (la->la_valid & LA_TYPE)
1263 /* They should not be processed by setattr */
1264 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1267 rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1271 if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
1274 * If only change flags of the object, we should
1275 * let it pass, but also need capability check
1276 * here if (!capable(CAP_LINUX_IMMUTABLE)),
1277 * fix it, when implement capable in mds
1279 if (la->la_valid & ~LA_FLAGS)
1282 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
1285 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1286 !mdd_capable(uc, CAP_FOWNER))
1290 * According to Ext3 implementation on this, the
1291 * Ctime will be changed, but not clear why?
1294 la->la_valid |= LA_CTIME;
1298 /* Check for setting the obj time. */
1299 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1300 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1301 rc = __mdd_permission_internal(env, obj, MAY_WRITE, 0);
1306 /* Make sure a caller can chmod. */
1307 if (la->la_valid & LA_MODE) {
1309 * Bypass la_vaild == LA_MODE,
1310 * this is for changing file with SUID or SGID.
1312 if ((la->la_valid & ~LA_MODE) &&
1313 (uc->mu_fsuid != tmp_la->la_uid) &&
1314 !mdd_capable(uc, CAP_FOWNER))
1317 if (la->la_mode == (umode_t) -1)
1318 la->la_mode = tmp_la->la_mode;
1320 la->la_mode = (la->la_mode & S_IALLUGO) |
1321 (tmp_la->la_mode & ~S_IALLUGO);
1323 /* Also check the setgid bit! */
1324 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
1325 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
1326 la->la_mode &= ~S_ISGID;
1328 la->la_mode = tmp_la->la_mode;
1331 /* Make sure a caller can chown. */
1332 if (la->la_valid & LA_UID) {
1333 if (la->la_uid == (uid_t) -1)
1334 la->la_uid = tmp_la->la_uid;
1335 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1336 (la->la_uid != tmp_la->la_uid)) &&
1337 !mdd_capable(uc, CAP_CHOWN))
1341 * If the user or group of a non-directory has been
1342 * changed by a non-root user, remove the setuid bit.
1343 * 19981026 David C Niemi <niemi@tux.org>
1345 * Changed this to apply to all users, including root,
1346 * to avoid some races. This is the behavior we had in
1347 * 2.0. The check for non-root was definitely wrong
1348 * for 2.2 anyway, as it should have been using
1349 * CAP_FSETID rather than fsuid -- 19990830 SD.
1351 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1352 !S_ISDIR(tmp_la->la_mode)) {
1353 la->la_mode &= ~S_ISUID;
1354 la->la_valid |= LA_MODE;
1358 /* Make sure caller can chgrp. */
1359 if (la->la_valid & LA_GID) {
1360 if (la->la_gid == (gid_t) -1)
1361 la->la_gid = tmp_la->la_gid;
1362 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1363 ((la->la_gid != tmp_la->la_gid) &&
1364 !mdd_in_group_p(uc, la->la_gid))) &&
1365 !mdd_capable(uc, CAP_CHOWN))
1369 * Likewise, if the user or group of a non-directory
1370 * has been changed by a non-root user, remove the
1371 * setgid bit UNLESS there is no group execute bit
1372 * (this would be a file marked for mandatory
1373 * locking). 19981026 David C Niemi <niemi@tux.org>
1375 * Removed the fsuid check (see the comment above) --
1378 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1379 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1380 la->la_mode &= ~S_ISGID;
1381 la->la_valid |= LA_MODE;
1385 /* For tuncate (or setsize), we should have MAY_WRITE perm */
1386 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1387 rc = mdd_permission_internal(env, obj, MAY_WRITE);
1392 * For the "Size-on-MDS" setattr update, merge coming
1393 * attributes with the set in the inode. BUG 10641
1395 if ((la->la_valid & LA_ATIME) &&
1396 (la->la_atime < tmp_la->la_atime))
1397 la->la_valid &= ~LA_ATIME;
1399 if ((la->la_valid & LA_CTIME) &&
1400 (la->la_ctime < tmp_la->la_ctime))
1401 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1403 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
1405 la->la_valid |= LA_MTIME;
1409 /* For last, ctime must be fixed */
1410 if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
1412 la->la_valid |= LA_CTIME;
1418 /* set attr and LOV EA at once, return updated attr */
1419 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1420 const struct md_attr *ma)
1422 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1423 struct mdd_device *mdd = mdo2mdd(obj);
1424 struct thandle *handle;
1425 struct lov_mds_md *lmm = NULL;
1426 int rc = 0, lmm_size = 0, max_size = 0;
1427 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1430 mdd_txn_param_build(env, MDD_TXN_ATTR_SET_OP);
1431 handle = mdd_trans_start(env, mdd);
1433 RETURN(PTR_ERR(handle));
1434 /*TODO: add lock here*/
1435 /* start a log jounal handle if needed */
1436 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1437 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1438 max_size = mdd_lov_mdsize(env, mdd);
1439 OBD_ALLOC(lmm, max_size);
1441 GOTO(cleanup, rc = -ENOMEM);
1443 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1450 if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
1451 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1452 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1454 *la_copy = ma->ma_attr;
1455 mdd_write_lock(env, mdd_obj);
1456 rc = mdd_fix_attr(env, mdd_obj, la_copy);
1457 mdd_write_unlock(env, mdd_obj);
1461 if (la_copy->la_valid & LA_FLAGS) {
1462 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1465 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1466 } else if (la_copy->la_valid) { /* setattr */
1467 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1469 /* journal chown/chgrp in llog, just like unlink */
1470 if (rc == 0 && lmm_size){
1471 /*TODO set_attr llog */
1475 if (rc == 0 && ma->ma_valid & MA_LOV) {
1478 mode = mdd_object_type(mdd_obj);
1479 if (S_ISREG(mode) || S_ISDIR(mode)) {
1480 /*TODO check permission*/
1481 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1482 ma->ma_lmm_size, handle, 1);
1487 mdd_trans_stop(env, mdd, rc, handle);
1488 if (rc == 0 && lmm_size) {
1489 /*set obd attr, if needed*/
1490 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
1493 OBD_FREE(lmm, max_size);
1499 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1500 const struct lu_buf *buf, const char *name, int fl,
1501 struct thandle *handle)
1506 mdd_write_lock(env, obj);
1507 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1508 mdd_write_unlock(env, obj);
1513 static int mdd_xattr_sanity_check(const struct lu_env *env,
1514 struct mdd_object *obj)
1516 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1517 struct md_ucred *uc = md_ucred(env);
1521 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1524 mdd_read_lock(env, obj);
1525 rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1526 mdd_read_unlock(env, obj);
1530 if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
1536 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1537 const struct lu_buf *buf, const char *name, int fl)
1539 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1540 struct mdd_device *mdd = mdo2mdd(obj);
1541 struct thandle *handle;
1545 rc = mdd_xattr_sanity_check(env, mdd_obj);
1549 mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1550 handle = mdd_trans_start(env, mdd);
1552 RETURN(PTR_ERR(handle));
1554 rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, name,
1556 #ifdef HAVE_SPLIT_SUPPORT
1559 * XXX: Very ugly hack, if setting lmv, it means splitting
1560 * sucess, we should return -ERESTART to notify the client, so
1561 * transno for this splitting should be zero according to the
1562 * replay rules. so return -ERESTART here let mdt trans stop
1563 * callback know this.
1565 if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
1569 mdd_trans_stop(env, mdd, rc, handle);
1574 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
1575 struct mdd_object *obj,
1576 const char *name, struct thandle *handle)
1578 struct dt_object *next;
1580 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1581 next = mdd_object_child(obj);
1582 return next->do_ops->do_xattr_del(env, next, name, handle,
1583 mdd_object_capa(env, obj));
1586 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1589 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590 struct mdd_device *mdd = mdo2mdd(obj);
1591 struct thandle *handle;
1595 rc = mdd_xattr_sanity_check(env, mdd_obj);
1599 mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1600 handle = mdd_trans_start(env, mdd);
1602 RETURN(PTR_ERR(handle));
1604 mdd_write_lock(env, mdd_obj);
1605 rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
1606 mdd_write_unlock(env, mdd_obj);
1608 mdd_trans_stop(env, mdd, rc, handle);
1613 static int __mdd_index_insert_only(const struct lu_env *env,
1614 struct mdd_object *pobj,
1615 const struct lu_fid *lf,
1616 const char *name, struct thandle *th,
1617 struct lustre_capa *capa)
1620 struct dt_object *next = mdd_object_child(pobj);
1623 if (dt_try_as_dir(env, next))
1624 rc = next->do_index_ops->dio_insert(env, next,
1625 (struct dt_rec *)lf,
1626 (struct dt_key *)name, th, capa);
1632 /* insert new index, add reference if isdir, update times */
1633 static int __mdd_index_insert(const struct lu_env *env,
1634 struct mdd_object *pobj, const struct lu_fid *lf,
1635 const char *name, int isdir, struct thandle *th,
1636 struct lustre_capa *capa)
1639 struct dt_object *next = mdd_object_child(pobj);
1643 struct lu_attr *la = &mdd_env_info(env)->mti_la;
1646 if (dt_try_as_dir(env, next))
1647 rc = next->do_index_ops->dio_insert(env, next,
1648 (struct dt_rec *)lf,
1649 (struct dt_key *)name,
1656 __mdd_ref_add(env, pobj, th);
1658 la->la_valid = LA_MTIME|LA_CTIME;
1659 la->la_atime = ma->ma_attr.la_atime;
1660 la->la_ctime = ma->ma_attr.la_ctime;
1661 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
1667 static int __mdd_index_delete(const struct lu_env *env,
1668 struct mdd_object *pobj, const char *name,
1669 int is_dir, struct thandle *handle,
1670 struct lustre_capa *capa)
1673 struct dt_object *next = mdd_object_child(pobj);
1676 if (dt_try_as_dir(env, next)) {
1677 rc = next->do_index_ops->dio_delete(env, next,
1678 (struct dt_key *)name,
1680 if (rc == 0 && is_dir)
1681 __mdd_ref_del(env, pobj, handle);
1687 static int mdd_link_sanity_check(const struct lu_env *env,
1688 struct mdd_object *tgt_obj,
1689 struct mdd_object *src_obj)
1695 rc = mdd_may_create(env, tgt_obj, NULL, 1);
1700 if (S_ISDIR(mdd_object_type(src_obj)))
1703 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1709 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1710 struct md_object *src_obj, const char *name,
1713 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1714 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1715 struct mdd_device *mdd = mdo2mdd(src_obj);
1716 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1717 struct thandle *handle;
1721 mdd_txn_param_build(env, MDD_TXN_LINK_OP);
1722 handle = mdd_trans_start(env, mdd);
1724 RETURN(PTR_ERR(handle));
1726 mdd_lock2(env, mdd_tobj, mdd_sobj);
1728 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
1732 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1734 mdd_object_capa(env, mdd_tobj));
1736 __mdd_ref_add(env, mdd_sobj, handle);
1738 *la_copy = ma->ma_attr;
1739 la_copy->la_valid = LA_CTIME;
1740 rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
1744 la_copy->la_valid = LA_CTIME | LA_MTIME;
1745 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1748 mdd_unlock2(env, mdd_tobj, mdd_sobj);
1749 mdd_trans_stop(env, mdd, rc, handle);
1754 * Check that @dir contains no entries except (possibly) dot and dotdot.
1759 * -ENOTEMPTY not empty
1763 static int mdd_dir_is_empty(const struct lu_env *env,
1764 struct mdd_object *dir)
1767 struct dt_object *obj;
1768 struct dt_it_ops *iops;
1772 obj = mdd_object_child(dir);
1773 iops = &obj->do_index_ops->dio_it;
1774 it = iops->init(env, obj, 0);
1776 result = iops->get(env, it, (const void *)"");
1779 for (result = 0, i = 0; result == 0 && i < 3; ++i)
1780 result = iops->next(env, it);
1782 result = -ENOTEMPTY;
1783 else if (result == +1)
1785 } else if (result == 0)
1787 * Huh? Index contains no zero key?
1792 iops->fini(env, it);
1798 /* return md_attr back,
1799 * if it is last unlink then return lov ea + llog cookie*/
1800 int __mdd_object_kill(const struct lu_env *env,
1801 struct mdd_object *obj,
1807 mdd_set_dead_obj(obj);
1808 if (S_ISREG(mdd_object_type(obj))) {
1809 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1810 * Caller must be ready for that. */
1811 rc = __mdd_lmm_get(env, obj, ma);
1812 if ((ma->ma_valid & MA_LOV))
1813 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1819 /* caller should take a lock before calling */
1820 static int __mdd_finish_unlink(const struct lu_env *env,
1821 struct mdd_object *obj, struct md_attr *ma,
1827 rc = __mdd_iattr_get(env, obj, ma);
1828 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1829 /* add new orphan and the object
1830 * will be deleted during the object_put() */
1831 if (__mdd_orphan_add(env, obj, th) == 0)
1832 set_bit(LU_OBJECT_ORPHAN,
1833 &mdd2lu_obj(obj)->lo_header->loh_flags);
1835 if (obj->mod_count == 0)
1836 rc = __mdd_object_kill(env, obj, ma);
1841 static int mdd_unlink_sanity_check(const struct lu_env *env,
1842 struct mdd_object *pobj,
1843 struct mdd_object *cobj,
1846 struct dt_object *dt_cobj = mdd_object_child(cobj);
1850 rc = mdd_may_delete(env, pobj, cobj,
1851 S_ISDIR(ma->ma_attr.la_mode), 1);
1855 if (S_ISDIR(mdd_object_type(cobj))) {
1856 if (dt_try_as_dir(env, dt_cobj))
1857 rc = mdd_dir_is_empty(env, cobj);
1865 static int mdd_unlink(const struct lu_env *env,
1866 struct md_object *pobj, struct md_object *cobj,
1867 const char *name, struct md_attr *ma)
1869 struct mdd_device *mdd = mdo2mdd(pobj);
1870 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1871 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1872 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1873 struct thandle *handle;
1877 mdd_txn_param_build(env, MDD_TXN_UNLINK_OP);
1878 handle = mdd_trans_start(env, mdd);
1880 RETURN(PTR_ERR(handle));
1882 mdd_lock2(env, mdd_pobj, mdd_cobj);
1884 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
1888 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
1889 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1890 mdd_object_capa(env, mdd_pobj));
1894 __mdd_ref_del(env, mdd_cobj, handle);
1895 *la_copy = ma->ma_attr;
1898 __mdd_ref_del(env, mdd_cobj, handle);
1900 la_copy->la_valid = LA_CTIME;
1901 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
1906 la_copy->la_valid = LA_CTIME | LA_MTIME;
1907 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
1911 rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle);
1914 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
1915 strlen("unlinked"), "unlinked", 0,
1919 mdd_unlock2(env, mdd_pobj, mdd_cobj);
1920 mdd_trans_stop(env, mdd, rc, handle);
1924 /* partial unlink */
1925 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1928 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1929 struct mdd_device *mdd = mdo2mdd(obj);
1930 struct thandle *handle;
1934 mdd_txn_param_build(env, MDD_TXN_UNLINK_OP);
1935 handle = mdd_trans_start(env, mdd);
1939 mdd_write_lock(env, mdd_obj);
1941 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1945 __mdd_ref_del(env, mdd_obj, handle);
1947 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1949 __mdd_ref_del(env, mdd_obj, handle);
1952 rc = __mdd_finish_unlink(env, mdd_obj, ma, handle);
1956 mdd_write_unlock(env, mdd_obj);
1957 mdd_trans_stop(env, mdd, rc, handle);
1961 static int mdd_parent_fid(const struct lu_env *env,
1962 struct mdd_object *obj,
1965 return __mdd_lookup_locked(env, &obj->mod_obj,
1970 * return 1: if lf is the fid of the ancestor of p1;
1973 * return -EREMOTE: if remote object is found, in this
1974 * case fid of remote object is saved to @pf;
1976 * otherwise: values < 0, errors.
1978 static int mdd_is_parent(const struct lu_env *env,
1979 struct mdd_device *mdd,
1980 struct mdd_object *p1,
1981 const struct lu_fid *lf,
1984 struct mdd_object *parent = NULL;
1985 struct lu_fid *pfid;
1989 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1990 pfid = &mdd_env_info(env)->mti_fid;
1992 /* Do not lookup ".." in root, they do not exist there. */
1993 if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1997 rc = mdd_parent_fid(env, p1, pfid);
2000 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
2002 if (lu_fid_eq(pfid, lf))
2005 mdd_object_put(env, parent);
2006 parent = mdd_object_find(env, mdd, pfid);
2008 /* cross-ref parent */
2009 if (parent == NULL) {
2012 GOTO(out, rc = EREMOTE);
2013 } else if (IS_ERR(parent))
2014 GOTO(out, rc = PTR_ERR(parent));
2019 if (parent && !IS_ERR(parent))
2020 mdd_object_put(env, parent);
2024 static int mdd_rename_lock(const struct lu_env *env,
2025 struct mdd_device *mdd,
2026 struct mdd_object *src_pobj,
2027 struct mdd_object *tgt_pobj)
2032 if (src_pobj == tgt_pobj) {
2033 mdd_write_lock(env, src_pobj);
2037 /* compared the parent child relationship of src_p&tgt_p */
2038 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2039 mdd_lock2(env, src_pobj, tgt_pobj);
2041 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2042 mdd_lock2(env, tgt_pobj, src_pobj);
2046 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2051 mdd_lock2(env, tgt_pobj, src_pobj);
2055 mdd_lock2(env, src_pobj, tgt_pobj);
2059 static void mdd_rename_unlock(const struct lu_env *env,
2060 struct mdd_object *src_pobj,
2061 struct mdd_object *tgt_pobj)
2063 mdd_write_unlock(env, src_pobj);
2064 if (src_pobj != tgt_pobj)
2065 mdd_write_unlock(env, tgt_pobj);
2068 static int mdd_rename_sanity_check(const struct lu_env *env,
2069 struct mdd_object *src_pobj,
2070 struct mdd_object *tgt_pobj,
2071 const struct lu_fid *sfid,
2073 struct mdd_object *tobj)
2078 if (mdd_is_dead_obj(src_pobj))
2081 /* The sobj maybe on the remote, check parent permission only here */
2082 rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
2087 rc = mdd_may_create(env, tgt_pobj, NULL,
2088 (src_pobj != tgt_pobj));
2090 mdd_read_lock(env, tobj);
2091 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
2092 (src_pobj != tgt_pobj));
2094 if (S_ISDIR(mdd_object_type(tobj))
2095 && mdd_dir_is_empty(env, tobj))
2097 mdd_read_unlock(env, tobj);
2102 /* src object can be remote that is why we use only fid and type of object */
2103 static int mdd_rename(const struct lu_env *env,
2104 struct md_object *src_pobj, struct md_object *tgt_pobj,
2105 const struct lu_fid *lf, const char *sname,
2106 struct md_object *tobj, const char *tname,
2109 struct mdd_device *mdd = mdo2mdd(src_pobj);
2110 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
2111 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2112 struct mdd_object *mdd_sobj = NULL;
2113 struct mdd_object *mdd_tobj = NULL;
2114 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2115 struct thandle *handle;
2120 LASSERT(ma->ma_attr.la_mode & S_IFMT);
2121 is_dir = S_ISDIR(ma->ma_attr.la_mode);
2122 if (ma->ma_attr.la_valid & LA_FLAGS &&
2123 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
2127 mdd_tobj = md2mdd_obj(tobj);
2129 mdd_txn_param_build(env, MDD_TXN_RENAME_OP);
2130 handle = mdd_trans_start(env, mdd);
2132 RETURN(PTR_ERR(handle));
2134 /* FIXME: Should consider tobj and sobj too in rename_lock. */
2135 rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
2137 GOTO(cleanup_unlocked, rc);
2139 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2140 lf, is_dir, mdd_tobj);
2144 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2145 mdd_object_capa(env, mdd_spobj));
2150 * Here tobj can be remote one, so we do index_delete unconditionally
2151 * and -ENOENT is allowed.
2153 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2154 mdd_object_capa(env, mdd_tpobj));
2155 if (rc != 0 && rc != -ENOENT)
2158 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2159 mdd_object_capa(env, mdd_tpobj));
2163 mdd_sobj = mdd_object_find(env, mdd, lf);
2164 *la_copy = ma->ma_attr;
2165 la_copy->la_valid = LA_CTIME;
2167 /*XXX: how to update ctime for remote sobj? */
2168 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
2172 if (tobj && lu_object_exists(&tobj->mo_lu)) {
2173 mdd_write_lock(env, mdd_tobj);
2174 __mdd_ref_del(env, mdd_tobj, handle);
2175 /* remove dot reference */
2177 __mdd_ref_del(env, mdd_tobj, handle);
2179 la_copy->la_valid = LA_CTIME;
2180 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
2184 rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle);
2185 mdd_write_unlock(env, mdd_tobj);
2190 la_copy->la_valid = LA_CTIME | LA_MTIME;
2191 rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
2195 if (mdd_spobj != mdd_tpobj) {
2196 la_copy->la_valid = LA_CTIME | LA_MTIME;
2197 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
2201 mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
2203 mdd_trans_stop(env, mdd, rc, handle);
2205 mdd_object_put(env, mdd_sobj);
2210 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
2211 const char *name, const struct lu_fid* fid, int mask)
2213 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2214 struct dt_object *dir = mdd_object_child(mdd_obj);
2215 struct dt_rec *rec = (struct dt_rec *)fid;
2216 const struct dt_key *key = (const struct dt_key *)name;
2220 if (mdd_is_dead_obj(mdd_obj))
2223 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
2227 CERROR("Object "DFID" locates on remote server\n",
2228 PFID(mdo2fid(mdd_obj)));
2233 if (mask == MAY_EXEC)
2234 rc = mdd_exec_permission_lite(env, mdd_obj);
2237 rc = mdd_permission_internal(env, mdd_obj, mask);
2241 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
2242 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
2243 mdd_object_capa(env, mdd_obj));
2251 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
2252 const char *name, const struct lu_fid* fid, int mask)
2254 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2257 mdd_read_lock(env, mdd_obj);
2258 rc = __mdd_lookup(env, pobj, name, fid, mask);
2259 mdd_read_unlock(env, mdd_obj);
2264 static int mdd_lookup(const struct lu_env *env,
2265 struct md_object *pobj, const char *name,
2270 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
2275 * No permission check is needed.
2277 * returns 1: if fid is ancestor of @mo;
2278 * returns 0: if fid is not a ancestor of @mo;
2280 * returns EREMOTE if remote object is found, fid of remote object is saved to
2283 * returns < 0: if error
2285 static int mdd_is_subdir(const struct lu_env *env,
2286 struct md_object *mo, const struct lu_fid *fid,
2287 struct lu_fid *sfid)
2289 struct mdd_device *mdd = mdo2mdd(mo);
2293 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
2296 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
2301 static int __mdd_object_initialize(const struct lu_env *env,
2302 const struct lu_fid *pfid,
2303 struct mdd_object *child,
2304 struct md_attr *ma, struct thandle *handle)
2309 /* update attributes for child.
2311 * (1) the valid bits should be converted between Lustre and Linux;
2312 * (2) maybe, the child attributes should be set in OSD when creation.
2315 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
2319 if (S_ISDIR(ma->ma_attr.la_mode)) {
2320 /* add . and .. for newly created dir */
2321 __mdd_ref_add(env, child, handle);
2322 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2323 dot, handle, BYPASS_CAPA);
2325 rc = __mdd_index_insert_only(env, child, pfid,
2331 rc2 = __mdd_index_delete(env, child, dot, 0,
2332 handle, BYPASS_CAPA);
2334 CERROR("Failure to cleanup after dotdot"
2335 " creation: %d (%d)\n", rc2, rc);
2337 __mdd_ref_del(env, child, handle);
2345 * The permission has been checked when obj created,
2346 * no need check again.
2348 static int mdd_cd_sanity_check(const struct lu_env *env,
2349 struct mdd_object *obj)
2355 if (!obj || mdd_is_dead_obj(obj))
2359 mdd_read_lock(env, obj);
2360 rc = mdd_permission_internal(env, obj, MAY_WRITE);
2361 mdd_read_unlock(env, obj);
2368 static int mdd_create_data(const struct lu_env *env,
2369 struct md_object *pobj, struct md_object *cobj,
2370 const struct md_create_spec *spec,
2373 struct mdd_device *mdd = mdo2mdd(cobj);
2374 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
2375 struct mdd_object *son = md2mdd_obj(cobj);
2376 struct lu_attr *attr = &ma->ma_attr;
2377 struct lov_mds_md *lmm = NULL;
2379 struct thandle *handle;
2383 rc = mdd_cd_sanity_check(env, son);
2387 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
2388 !(spec->sp_cr_flags & FMODE_WRITE))
2390 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
2395 mdd_txn_param_build(env, MDD_TXN_CREATE_DATA_OP);
2396 handle = mdd_trans_start(env, mdd);
2398 RETURN(rc = PTR_ERR(handle));
2401 * XXX: Setting the lov ea is not locked but setting the attr is locked?
2404 /* Replay creates has objects already */
2405 if (spec->u.sp_ea.no_lov_create) {
2406 CDEBUG(D_INFO, "we already have lov ea\n");
2407 rc = mdd_lov_set_md(env, mdd_pobj, son,
2408 (struct lov_mds_md *)spec->u.sp_ea.eadata,
2409 spec->u.sp_ea.eadatalen, handle, 0);
2411 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2412 lmm_size, handle, 0);
2415 rc = mdd_attr_get_internal_locked(env, son, ma);
2417 /* Finish mdd_lov_create() stuff. */
2418 mdd_lov_create_finish(env, mdd, rc);
2419 mdd_trans_stop(env, mdd, rc, handle);
2421 OBD_FREE(lmm, lmm_size);
2425 #ifdef CONFIG_FS_POSIX_ACL
2427 * Modify acl when creating a new obj.
2429 * mode_p initially must contain the mode parameter to the open() / creat()
2430 * system calls. All permissions that are not granted by the acl are removed.
2431 * The permissions in the acl are changed to reflect the mode_p parameter.
2433 static int mdd_posix_acl_create_masq(posix_acl_xattr_entry *entry,
2434 __u32 *mode_p, int count)
2436 posix_acl_xattr_entry *group_obj = NULL, *mask_obj = NULL, *pa, *pe;
2437 __u32 mode = *mode_p;
2441 pe = &entry[count - 1];
2442 for (; pa <= pe; pa++) {
2445 pa->e_perm &= (mode >> 6) | ~S_IRWXO;
2446 mode &= (pa->e_perm << 6) | ~S_IRWXU;
2459 pa->e_perm &= mode | ~S_IRWXO;
2460 mode &= pa->e_perm | ~S_IRWXO;
2474 mask_obj->e_perm &= (mode >> 3) | ~S_IRWXO;
2475 mode &= (mask_obj->e_perm << 3) | ~S_IRWXG;
2479 group_obj->e_perm &= (mode >> 3) | ~S_IRWXO;
2480 mode &= (group_obj->e_perm << 3) | ~S_IRWXG;
2483 *mode_p = (*mode_p & ~S_IRWXUGO) | mode;
2487 static int __mdd_acl_init(const struct lu_env *env, struct mdd_object *obj,
2488 struct lu_buf *buf, __u32 *mode,
2489 struct thandle *handle)
2491 struct dt_object *next;
2492 posix_acl_xattr_entry *entry;
2498 entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
2499 entry_count = (buf->lb_len - 4) / sizeof(posix_acl_xattr_entry);
2500 if (entry_count <= 0)
2503 next = mdd_object_child(obj);
2504 if (S_ISDIR(*mode)) {
2505 rc = next->do_ops->do_xattr_set(env, next, buf,
2506 XATTR_NAME_ACL_DEFAULT,
2507 0, handle, BYPASS_CAPA);
2512 rc = mdd_posix_acl_create_masq(entry, mode, entry_count);
2516 rc = next->do_ops->do_xattr_set(env, next, buf,
2517 XATTR_NAME_ACL_ACCESS,
2518 0, handle, BYPASS_CAPA);
2522 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2523 struct mdd_object *cobj, __u32 *mode,
2524 struct thandle *handle)
2526 struct dt_object *next = mdd_object_child(pobj);
2527 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2535 buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
2536 buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
2537 rc = next->do_ops->do_xattr_get(env, next, buf,
2538 XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
2539 if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
2545 rc = __mdd_acl_init(env, cobj, buf, mode, handle);
2550 static int mdd_create_sanity_check(const struct lu_env *env,
2551 struct md_object *pobj,
2552 const char *name, struct md_attr *ma)
2554 struct mdd_thread_info *info = mdd_env_info(env);
2555 struct lu_attr *la = &info->mti_la;
2556 struct lu_fid *fid = &info->mti_fid;
2557 struct mdd_object *obj = md2mdd_obj(pobj);
2562 if (mdd_is_dead_obj(obj))
2566 * Check if the name already exist, though it will be checked
2567 * in _index_insert also, for avoiding rolling back if exists
2570 rc = __mdd_lookup_locked(env, pobj, name, fid,
2571 MAY_WRITE | MAY_EXEC);
2573 RETURN(rc ? : -EEXIST);
2576 mdd_read_lock(env, obj);
2577 rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
2578 mdd_read_unlock(env, obj);
2582 if (la->la_mode & S_ISGID) {
2583 ma->ma_attr.la_gid = la->la_gid;
2584 if (S_ISDIR(ma->ma_attr.la_mode)) {
2585 ma->ma_attr.la_mode |= S_ISGID;
2586 ma->ma_attr.la_valid |= LA_MODE;
2590 switch (ma->ma_attr.la_mode & S_IFMT) {
2608 * Create object and insert it into namespace.
2610 static int mdd_create(const struct lu_env *env,
2611 struct md_object *pobj, const char *name,
2612 struct md_object *child,
2613 struct md_create_spec *spec,
2616 struct mdd_device *mdd = mdo2mdd(pobj);
2617 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2618 struct mdd_object *son = md2mdd_obj(child);
2619 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2620 struct lu_attr *attr = &ma->ma_attr;
2621 struct lov_mds_md *lmm = NULL;
2622 struct thandle *handle;
2623 int rc, created = 0, inserted = 0, lmm_size = 0;
2627 * Two operations have to be performed:
2629 * - allocation of new object (->do_create()), and
2631 * - insertion into parent index (->dio_insert()).
2633 * Due to locking, operation order is not important, when both are
2634 * successful, *but* error handling cases are quite different:
2636 * - if insertion is done first, and following object creation fails,
2637 * insertion has to be rolled back, but this operation might fail
2638 * also leaving us with dangling index entry.
2640 * - if creation is done first, is has to be undone if insertion
2641 * fails, leaving us with leaked space, which is neither good, nor
2644 * It seems that creation-first is simplest solution, but it is
2645 * sub-optimal in the frequent
2650 * case, because second mkdir is bound to create object, only to
2651 * destroy it immediately.
2653 * To avoid this follow local file systems that do double lookup:
2655 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
2657 * 1. create (__mdd_object_create())
2659 * 2. insert (__mdd_index_insert(), lookup again)
2662 /* sanity checks before big job */
2663 rc = mdd_create_sanity_check(env, pobj, name, ma);
2667 /* no RPC inside the transaction, so OST objects should be created at
2669 if (S_ISREG(attr->la_mode)) {
2670 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2676 mdd_txn_param_build(env, MDD_TXN_MKDIR_OP);
2677 handle = mdd_trans_start(env, mdd);
2679 RETURN(PTR_ERR(handle));
2681 mdd_write_lock(env, mdd_pobj);
2684 * XXX check that link can be added to the parent in mkdir case.
2687 mdd_write_lock(env, son);
2688 rc = __mdd_object_create(env, son, ma, handle);
2690 mdd_write_unlock(env, son);
2696 #ifdef CONFIG_FS_POSIX_ACL
2697 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
2699 mdd_write_unlock(env, son);
2702 ma->ma_attr.la_valid |= LA_MODE;
2706 rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj),
2708 mdd_write_unlock(env, son);
2711 * Object has no links, so it will be destroyed when last
2712 * reference is released. (XXX not now.)
2716 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2717 name, S_ISDIR(attr->la_mode), handle,
2718 mdd_object_capa(env, mdd_pobj));
2724 /* replay creates has objects already */
2725 if (spec->u.sp_ea.no_lov_create) {
2726 CDEBUG(D_INFO, "we already have lov ea\n");
2727 rc = mdd_lov_set_md(env, mdd_pobj, son,
2728 (struct lov_mds_md *)spec->u.sp_ea.eadata,
2729 spec->u.sp_ea.eadatalen, handle, 0);
2731 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2732 lmm_size, handle, 0);
2734 CERROR("error on stripe info copy %d \n", rc);
2738 if (S_ISLNK(attr->la_mode)) {
2739 struct dt_object *dt = mdd_object_child(son);
2740 const char *target_name = spec->u.sp_symname;
2741 int sym_len = strlen(target_name);
2742 const struct lu_buf *buf;
2745 buf = mdd_buf_get_const(env, target_name, sym_len);
2746 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2747 mdd_object_capa(env, son));
2754 *la_copy = ma->ma_attr;
2755 la_copy->la_valid = LA_CTIME | LA_MTIME;
2756 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
2760 /* return attr back */
2761 rc = mdd_attr_get_internal_locked(env, son, ma);
2763 if (rc && created) {
2767 rc2 = __mdd_index_delete(env, mdd_pobj, name,
2768 S_ISDIR(attr->la_mode),
2769 handle, BYPASS_CAPA);
2771 CERROR("error can not cleanup destroy %d\n",
2775 mdd_write_lock(env, son);
2776 __mdd_ref_del(env, son, handle);
2777 mdd_write_unlock(env, son);
2780 /* finish mdd_lov_create() stuff */
2781 mdd_lov_create_finish(env, mdd, rc);
2783 OBD_FREE(lmm, lmm_size);
2784 mdd_write_unlock(env, mdd_pobj);
2785 mdd_trans_stop(env, mdd, rc, handle);
2789 /* partial operation */
2790 static int mdd_oc_sanity_check(const struct lu_env *env,
2791 struct mdd_object *obj,
2797 switch (ma->ma_attr.la_mode & S_IFMT) {
2814 static int mdd_object_create(const struct lu_env *env,
2815 struct md_object *obj,
2816 const struct md_create_spec *spec,
2820 struct mdd_device *mdd = mdo2mdd(obj);
2821 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2822 const struct lu_fid *pfid = spec->u.sp_pfid;
2823 struct thandle *handle;
2827 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2831 mdd_txn_param_build(env, MDD_TXN_OBJECT_CREATE_OP);
2832 handle = mdd_trans_start(env, mdd);
2834 RETURN(PTR_ERR(handle));
2836 mdd_write_lock(env, mdd_obj);
2837 rc = __mdd_object_create(env, mdd_obj, ma, handle);
2841 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2842 /* If creating the slave object, set slave EA here. */
2843 int lmv_size = spec->u.sp_ea.eadatalen;
2844 struct lmv_stripe_md *lmv;
2846 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2847 LASSERT(lmv != NULL && lmv_size > 0);
2849 rc = __mdd_xattr_set(env, mdd_obj,
2850 mdd_buf_get_const(env, lmv, lmv_size),
2851 MDS_LMV_MD_NAME, 0, handle);
2854 pfid = spec->u.sp_ea.fid;
2856 CDEBUG(D_INFO, "Set slave ea "DFID", eadatalen %d, rc %d\n",
2857 PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
2858 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0);
2860 #ifdef CONFIG_FS_POSIX_ACL
2861 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2862 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2864 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2865 buf->lb_len = spec->u.sp_ea.eadatalen;
2866 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2867 rc = __mdd_acl_init(env, mdd_obj, buf,
2868 &ma->ma_attr.la_mode,
2873 ma->ma_attr.la_valid |= LA_MODE;
2877 rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
2881 mdd_write_unlock(env, mdd_obj);
2883 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
2885 mdd_trans_stop(env, mdd, rc, handle);
2890 * Partial operation. Be aware, this is called with write lock taken, so we use
2891 * locksless version of __mdd_lookup() here.
2893 static int mdd_ni_sanity_check(const struct lu_env *env,
2894 struct md_object *pobj,
2896 const struct lu_fid *fid)
2898 struct mdd_object *obj = md2mdd_obj(pobj);
2905 if (mdd_is_dead_obj(obj))
2908 /* The exist of the name will be checked in _index_insert. */
2910 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2912 RETURN(rc ? : -EEXIST);
2916 RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
2919 static int mdd_name_insert(const struct lu_env *env,
2920 struct md_object *pobj,
2921 const char *name, const struct lu_fid *fid,
2924 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2925 struct thandle *handle;
2929 mdd_txn_param_build(env, MDD_TXN_INDEX_INSERT_OP);
2930 handle = mdd_trans_start(env, mdo2mdd(pobj));
2932 RETURN(PTR_ERR(handle));
2934 mdd_write_lock(env, mdd_obj);
2935 rc = mdd_ni_sanity_check(env, pobj, name, fid);
2937 GOTO(out_unlock, rc);
2939 rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
2943 mdd_write_unlock(env, mdd_obj);
2945 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
2950 * Be aware, this is called with write lock taken, so we use locksless version
2951 * of __mdd_lookup() here.
2953 static int mdd_nr_sanity_check(const struct lu_env *env,
2954 struct md_object *pobj,
2957 struct mdd_object *obj = md2mdd_obj(pobj);
2959 struct mdd_thread_info *info = mdd_env_info(env);
2960 struct lu_fid *fid = &info->mti_fid;
2966 if (mdd_is_dead_obj(obj))
2969 /* The exist of the name will be checked in _index_delete. */
2971 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2974 RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
2977 static int mdd_name_remove(const struct lu_env *env,
2978 struct md_object *pobj,
2979 const char *name, int is_dir)
2981 struct mdd_device *mdd = mdo2mdd(pobj);
2982 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2983 struct thandle *handle;
2987 mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP);
2988 handle = mdd_trans_start(env, mdd);
2990 RETURN(PTR_ERR(handle));
2992 mdd_write_lock(env, mdd_obj);
2993 rc = mdd_nr_sanity_check(env, pobj, name);
2995 GOTO(out_unlock, rc);
2997 rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
3001 mdd_write_unlock(env, mdd_obj);
3003 mdd_trans_stop(env, mdd, rc, handle);
3007 static int mdd_rt_sanity_check(const struct lu_env *env,
3008 struct mdd_object *tgt_pobj,
3009 struct mdd_object *tobj,
3010 const struct lu_fid *sfid,
3011 const char *name, struct md_attr *ma)
3017 if (mdd_is_dead_obj(tgt_pobj))
3020 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
3022 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
3023 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
3024 mdd_dir_is_empty(env, tobj))
3027 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
3033 static int mdd_rename_tgt(const struct lu_env *env,
3034 struct md_object *pobj, struct md_object *tobj,
3035 const struct lu_fid *lf, const char *name,
3038 struct mdd_device *mdd = mdo2mdd(pobj);
3039 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
3040 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
3041 struct thandle *handle;
3045 mdd_txn_param_build(env, MDD_TXN_RENAME_TGT_OP);
3046 handle = mdd_trans_start(env, mdd);
3048 RETURN(PTR_ERR(handle));
3051 mdd_lock2(env, mdd_tpobj, mdd_tobj);
3053 mdd_write_lock(env, mdd_tpobj);
3055 /*TODO rename sanity checking*/
3056 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
3060 /* if rename_tgt is called then we should just re-insert name with
3061 * correct fid, no need to dec/inc parent nlink if obj is dir */
3062 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
3066 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
3071 if (tobj && lu_object_exists(&tobj->mo_lu))
3072 __mdd_ref_del(env, mdd_tobj, handle);
3075 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
3077 mdd_write_unlock(env, mdd_tpobj);
3078 mdd_trans_stop(env, mdd, rc, handle);
3083 * No permission check is needed.
3085 static int mdd_root_get(const struct lu_env *env,
3086 struct md_device *m, struct lu_fid *f)
3088 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3091 *f = mdd->mdd_root_fid;
3096 * No permission check is needed.
3098 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
3099 struct kstatfs *sfs)
3101 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3106 rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
3112 * No permission check is needed.
3114 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
3115 int *md_size, int *cookie_size)
3117 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3120 *md_size = mdd_lov_mdsize(env, mdd);
3121 *cookie_size = mdd_lov_cookiesize(env, mdd);
3126 static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m,
3127 int mode, unsigned long timeout, __u32 alg,
3128 struct lustre_capa_key *keys)
3130 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3131 struct mds_obd *mds = &mdd2obd_dev(mdd)->u.mds;
3135 mds->mds_capa_keys = keys;
3136 rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, mode,
3137 timeout, alg, keys);
3141 static int mdd_update_capa_key(const struct lu_env *env,
3142 struct md_device *m,
3143 struct lustre_capa_key *key)
3145 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3146 struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
3150 rc = obd_set_info_async(lov_exp, strlen(KEY_CAPA_KEY), KEY_CAPA_KEY,
3151 sizeof(*key), key, NULL);
3155 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
3156 struct thandle *handle)
3158 struct dt_object *next;
3160 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
3161 next = mdd_object_child(obj);
3162 next->do_ops->do_ref_add(env, next, handle);
3166 * XXX: if permission check is needed here?
3168 static int mdd_ref_add(const struct lu_env *env,
3169 struct md_object *obj)
3171 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3172 struct mdd_device *mdd = mdo2mdd(obj);
3173 struct thandle *handle;
3177 mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
3178 handle = mdd_trans_start(env, mdd);
3182 mdd_write_lock(env, mdd_obj);
3183 rc = mdd_link_sanity_check(env, NULL, mdd_obj);
3185 __mdd_ref_add(env, mdd_obj, handle);
3186 mdd_write_unlock(env, mdd_obj);
3188 mdd_trans_stop(env, mdd, 0, handle);
3194 __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
3195 struct thandle *handle)
3197 struct dt_object *next = mdd_object_child(obj);
3200 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
3202 next->do_ops->do_ref_del(env, next, handle);
3206 /* do NOT or the MAY_*'s, you'll get the weakest */
3207 static int accmode(struct mdd_object *mdd_obj, int flags)
3212 /* Sadly, NFSD reopens a file repeatedly during operation, so the
3213 * "acc_mode = 0" allowance for newly-created files isn't honoured.
3214 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
3215 * owner can write to a file even if it is marked readonly to hide
3216 * its brokenness. (bug 5781) */
3217 if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
3220 if (flags & FMODE_READ)
3222 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
3224 if (flags & MDS_FMODE_EXEC)
3229 static int mdd_open_sanity_check(const struct lu_env *env,
3230 struct mdd_object *obj, int flag)
3232 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
3233 int mode = accmode(obj, flag);
3238 if (mdd_is_dead_obj(obj))
3241 rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
3245 if (S_ISLNK(tmp_la->la_mode))
3248 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
3251 if (!(flag & MDS_OPEN_CREATED)) {
3252 rc = __mdd_permission_internal(env, obj, mode, 0);
3258 * FIFO's, sockets and device files are special: they don't
3259 * actually live on the filesystem itself, and as such you
3260 * can write to them even if the filesystem is read-only.
3262 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
3263 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
3267 * An append-only file must be opened in append mode for writing.
3269 if (mdd_is_append(obj)) {
3270 if ((flag & FMODE_WRITE) && !(flag & O_APPEND))
3276 /* O_NOATIME can only be set by the owner or superuser */
3277 if (flag & O_NOATIME) {
3278 struct md_ucred *uc = md_ucred(env);
3280 if (uc->mu_fsuid != tmp_la->la_uid &&
3281 !mdd_capable(uc, CAP_FOWNER))
3288 static int mdd_open(const struct lu_env *env, struct md_object *obj,
3291 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3294 mdd_write_lock(env, mdd_obj);
3296 rc = mdd_open_sanity_check(env, mdd_obj, flags);
3298 mdd_obj->mod_count++;
3300 mdd_write_unlock(env, mdd_obj);
3305 * No permission check is needed.
3307 static int mdd_close(const struct lu_env *env, struct md_object *obj,
3311 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3314 mdd_write_lock(env, mdd_obj);
3315 /* release open count */
3316 mdd_obj->mod_count --;
3318 rc = __mdd_iattr_get(env, mdd_obj, ma);
3319 if (rc == 0 && mdd_obj->mod_count == 0) {
3320 if (ma->ma_attr.la_nlink == 0)
3321 rc = __mdd_object_kill(env, mdd_obj, ma);
3323 mdd_write_unlock(env, mdd_obj);
3328 * Permission check is done when open,
3329 * no need check again.
3331 static int mdd_readpage_sanity_check(const struct lu_env *env,
3332 struct mdd_object *obj)
3334 struct dt_object *next = mdd_object_child(obj);
3338 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
3340 rc = mdd_permission_internal(env, obj, MAY_READ);
3350 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
3351 const struct lu_rdpg *rdpg)
3353 struct dt_object *next;
3354 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3358 LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3359 next = mdd_object_child(mdd_obj);
3361 mdd_read_lock(env, mdd_obj);
3362 rc = mdd_readpage_sanity_check(env, mdd_obj);
3364 GOTO(out_unlock, rc);
3366 rc = next->do_ops->do_readpage(env, next, rdpg,
3367 mdd_object_capa(env, mdd_obj));
3370 mdd_read_unlock(env, mdd_obj);
3374 #ifdef CONFIG_FS_POSIX_ACL
3375 static int mdd_posix_acl_permission(struct md_ucred *uc, struct lu_attr *la,
3376 int want, posix_acl_xattr_entry *entry,
3379 posix_acl_xattr_entry *pa, *pe, *mask_obj;
3387 pe = &entry[count - 1];
3388 for (; pa <= pe; pa++) {
3391 /* (May have been checked already) */
3392 if (la->la_uid == uc->mu_fsuid)
3396 if (pa->e_id == uc->mu_fsuid)
3400 if (mdd_in_group_p(uc, la->la_gid)) {
3402 if ((pa->e_perm & want) == want)
3407 if (mdd_in_group_p(uc, pa->e_id)) {
3409 if ((pa->e_perm & want) == want)
3427 for (mask_obj = pa + 1; mask_obj <= pe; mask_obj++) {
3428 if (mask_obj->e_tag == ACL_MASK) {
3429 if ((pa->e_perm & mask_obj->e_perm & want) == want)
3437 if ((pa->e_perm & want) == want)
3444 static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
3445 struct lu_attr* la, int mask)
3447 #ifdef CONFIG_FS_POSIX_ACL
3448 struct dt_object *next;
3449 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
3450 struct md_ucred *uc = md_ucred(env);
3451 posix_acl_xattr_entry *entry;
3456 next = mdd_object_child(obj);
3458 buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
3459 buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
3460 rc = next->do_ops->do_xattr_get(env, next, buf,
3461 XATTR_NAME_ACL_ACCESS,
3462 mdd_object_capa(env, obj));
3464 RETURN(rc ? : -EACCES);
3466 entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
3467 entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
3469 rc = mdd_posix_acl_permission(uc, la, mask, entry, entry_count);
3478 static int mdd_exec_permission_lite(const struct lu_env *env,
3479 struct mdd_object *obj)
3481 struct lu_attr *la = &mdd_env_info(env)->mti_la;
3482 struct md_ucred *uc = md_ucred(env);
3487 /* These means unnecessary for permission check */
3488 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3491 /* Invalid user credit */
3492 if (uc->mu_valid == UCRED_INVALID)
3495 rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
3500 if (uc->mu_fsuid == la->la_uid)
3502 else if (mdd_in_group_p(uc, la->la_gid))
3505 if (mode & MAY_EXEC)
3508 if (((la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) &&
3509 mdd_capable(uc, CAP_DAC_OVERRIDE))
3512 if (S_ISDIR(la->la_mode) && mdd_capable(uc, CAP_DAC_READ_SEARCH))
3519 static int __mdd_permission_internal(const struct lu_env *env,
3520 struct mdd_object *obj,
3521 int mask, int getattr)
3523 struct lu_attr *la = &mdd_env_info(env)->mti_la;
3524 struct md_ucred *uc = md_ucred(env);
3533 /* These means unnecessary for permission check */
3534 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3537 /* Invalid user credit */
3538 if (uc->mu_valid == UCRED_INVALID)
3542 * Nobody gets write access to an immutable file.
3544 if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
3548 rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
3554 if (uc->mu_fsuid == la->la_uid) {
3557 if (mode & S_IRWXG) {
3558 rc = mdd_check_acl(env, obj, la, mask);
3560 goto check_capabilities;
3561 else if ((rc != -EAGAIN) && (rc != -EOPNOTSUPP) &&
3565 if (mdd_in_group_p(uc, la->la_gid))
3570 * If the DACs are ok we don't need any capability check.
3572 if (((mode & mask & S_IRWXO) == mask))
3578 * Read/write DACs are always overridable.
3579 * Executable DACs are overridable if at least one exec bit is set.
3580 * Dir's DACs are always overridable.
3582 if (!(mask & MAY_EXEC) ||
3583 (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
3584 if (mdd_capable(uc, CAP_DAC_OVERRIDE))
3588 * Searching includes executable on directories, else just read.
3590 if ((mask == MAY_READ) ||
3591 (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
3592 if (mdd_capable(uc, CAP_DAC_READ_SEARCH))
3598 static inline int mdd_permission_internal_locked(const struct lu_env *env,
3599 struct mdd_object *obj,
3604 mdd_read_lock(env, obj);
3605 rc = mdd_permission_internal(env, obj, mask);
3606 mdd_read_unlock(env, obj);
3611 static int mdd_permission(const struct lu_env *env, struct md_object *obj,
3614 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3618 rc = mdd_permission_internal_locked(env, mdd_obj, mask);
3623 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
3624 struct lustre_capa *capa, int renewal)
3626 struct dt_object *next;
3627 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3628 struct obd_capa *oc;
3632 LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3633 next = mdd_object_child(mdd_obj);
3635 oc = next->do_ops->do_capa_get(env, next, renewal ? capa : NULL,
3647 struct md_device_operations mdd_ops = {
3648 .mdo_statfs = mdd_statfs,
3649 .mdo_root_get = mdd_root_get,
3650 .mdo_maxsize_get = mdd_maxsize_get,
3651 .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
3652 .mdo_update_capa_key= mdd_update_capa_key,
3655 static struct md_dir_operations mdd_dir_ops = {
3656 .mdo_is_subdir = mdd_is_subdir,
3657 .mdo_lookup = mdd_lookup,
3658 .mdo_create = mdd_create,
3659 .mdo_rename = mdd_rename,
3660 .mdo_link = mdd_link,
3661 .mdo_unlink = mdd_unlink,
3662 .mdo_name_insert = mdd_name_insert,
3663 .mdo_name_remove = mdd_name_remove,
3664 .mdo_rename_tgt = mdd_rename_tgt,
3665 .mdo_create_data = mdd_create_data
3668 static struct md_object_operations mdd_obj_ops = {
3669 .moo_permission = mdd_permission,
3670 .moo_attr_get = mdd_attr_get,
3671 .moo_attr_set = mdd_attr_set,
3672 .moo_xattr_get = mdd_xattr_get,
3673 .moo_xattr_set = mdd_xattr_set,
3674 .moo_xattr_list = mdd_xattr_list,
3675 .moo_xattr_del = mdd_xattr_del,
3676 .moo_object_create = mdd_object_create,
3677 .moo_ref_add = mdd_ref_add,
3678 .moo_ref_del = mdd_ref_del,
3679 .moo_open = mdd_open,
3680 .moo_close = mdd_close,
3681 .moo_readpage = mdd_readpage,
3682 .moo_readlink = mdd_readlink,
3683 .moo_capa_get = mdd_capa_get
3686 static struct obd_ops mdd_obd_device_ops = {
3687 .o_owner = THIS_MODULE
3690 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
3691 struct lu_device_type *t,
3692 struct lustre_cfg *lcfg)
3694 struct lu_device *l;
3695 struct mdd_device *m;
3699 l = ERR_PTR(-ENOMEM);
3701 md_device_init(&m->mdd_md_dev, t);
3703 l->ld_ops = &mdd_lu_ops;
3704 m->mdd_md_dev.md_ops = &mdd_ops;
3710 static void mdd_device_free(const struct lu_env *env,
3711 struct lu_device *lu)
3713 struct mdd_device *m = lu2mdd_dev(lu);
3715 LASSERT(atomic_read(&lu->ld_ref) == 0);
3716 md_device_fini(&m->mdd_md_dev);
3720 static void *mdd_ucred_key_init(const struct lu_context *ctx,
3721 struct lu_context_key *key)
3723 struct md_ucred *uc;
3727 uc = ERR_PTR(-ENOMEM);
3731 static void mdd_ucred_key_fini(const struct lu_context *ctx,
3732 struct lu_context_key *key, void *data)
3734 struct md_ucred *uc = data;
3738 static struct lu_context_key mdd_ucred_key = {
3739 .lct_tags = LCT_SESSION,
3740 .lct_init = mdd_ucred_key_init,
3741 .lct_fini = mdd_ucred_key_fini
3744 struct md_ucred *md_ucred(const struct lu_env *env)
3746 LASSERT(env->le_ses != NULL);
3747 return lu_context_key_get(env->le_ses, &mdd_ucred_key);
3749 EXPORT_SYMBOL(md_ucred);
3751 static void *mdd_capainfo_key_init(const struct lu_context *ctx,
3752 struct lu_context_key *key)
3754 struct md_capainfo *ci;
3758 ci = ERR_PTR(-ENOMEM);
3762 static void mdd_capainfo_key_fini(const struct lu_context *ctx,
3763 struct lu_context_key *key, void *data)
3765 struct md_capainfo *ci = data;
3769 struct lu_context_key mdd_capainfo_key = {
3770 .lct_tags = LCT_SESSION,
3771 .lct_init = mdd_capainfo_key_init,
3772 .lct_fini = mdd_capainfo_key_fini
3775 struct md_capainfo *md_capainfo(const struct lu_env *env)
3777 /* NB, in mdt_init0 */
3778 if (env->le_ses == NULL)
3780 return lu_context_key_get(env->le_ses, &mdd_capainfo_key);
3782 EXPORT_SYMBOL(md_capainfo);
3784 static int mdd_type_init(struct lu_device_type *t)
3788 result = lu_context_key_register(&mdd_thread_key);
3790 result = lu_context_key_register(&mdd_ucred_key);
3792 result = lu_context_key_register(&mdd_capainfo_key);
3796 static void mdd_type_fini(struct lu_device_type *t)
3798 lu_context_key_degister(&mdd_capainfo_key);
3799 lu_context_key_degister(&mdd_ucred_key);
3800 lu_context_key_degister(&mdd_thread_key);
3803 static struct lu_device_type_operations mdd_device_type_ops = {
3804 .ldto_init = mdd_type_init,
3805 .ldto_fini = mdd_type_fini,
3807 .ldto_device_alloc = mdd_device_alloc,
3808 .ldto_device_free = mdd_device_free,
3810 .ldto_device_init = mdd_device_init,
3811 .ldto_device_fini = mdd_device_fini
3814 static struct lu_device_type mdd_device_type = {
3815 .ldt_tags = LU_DEVICE_MD,
3816 .ldt_name = LUSTRE_MDD_NAME,
3817 .ldt_ops = &mdd_device_type_ops,
3818 .ldt_ctx_tags = LCT_MD_THREAD
3821 static void *mdd_key_init(const struct lu_context *ctx,
3822 struct lu_context_key *key)
3824 struct mdd_thread_info *info;
3826 OBD_ALLOC_PTR(info);
3828 info = ERR_PTR(-ENOMEM);
3832 static void mdd_key_fini(const struct lu_context *ctx,
3833 struct lu_context_key *key, void *data)
3835 struct mdd_thread_info *info = data;
3839 static struct lu_context_key mdd_thread_key = {
3840 .lct_tags = LCT_MD_THREAD,
3841 .lct_init = mdd_key_init,
3842 .lct_fini = mdd_key_fini
3845 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
3849 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
3853 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
3855 static int __init mdd_mod_init(void)
3857 struct lprocfs_static_vars lvars;
3858 printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
3859 lprocfs_init_vars(mdd, &lvars);
3860 return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
3861 LUSTRE_MDD_NAME, &mdd_device_type);
3864 static void __exit mdd_mod_exit(void)
3866 class_unregister_type(LUSTRE_MDD_NAME);
3869 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3870 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
3871 MODULE_LICENSE("GPL");
3873 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);