1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
45 #include "mdd_internal.h"
48 static struct thandle* mdd_trans_start(const struct lu_env *env,
50 static void mdd_trans_stop(const struct lu_env *env,
51 struct mdd_device *mdd, int rc,
52 struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
55 struct thandle *handle);
56 static void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
57 struct thandle *handle);
58 static int __mdd_lookup(const struct lu_env *env,
59 struct md_object *pobj,
60 const char *name, const struct lu_fid* fid,
61 int mask, struct md_ucred *uc);
62 static int __mdd_lookup_locked(const struct lu_env *env,
63 struct md_object *pobj,
64 const char *name, const struct lu_fid* fid,
65 int mask, struct md_ucred *uc);
66 static int mdd_exec_permission_lite(const struct lu_env *env,
67 struct mdd_object *obj,
69 static int __mdd_permission_internal(const struct lu_env *env,
70 struct mdd_object *obj,
71 int mask, int getattr,
74 static struct md_object_operations mdd_obj_ops;
75 static struct md_dir_operations mdd_dir_ops;
76 static struct lu_object_operations mdd_lu_obj_ops;
78 static struct lu_context_key mdd_thread_key;
80 static const char *mdd_root_dir_name = "root";
81 static const char dot[] = ".";
82 static const char dotdot[] = "..";
85 MDD_TXN_OBJECT_DESTROY_OP,
86 MDD_TXN_OBJECT_CREATE_OP,
89 MDD_TXN_INDEX_INSERT_OP,
90 MDD_TXN_INDEX_DELETE_OP,
94 MDD_TXN_CREATE_DATA_OP,
98 struct mdd_txn_op_descr {
99 enum mdd_txn_op mod_op;
100 unsigned int mod_credits;
103 /* Calculate the credits of each transaction here */
104 /* Note: we did not count into QUOTA here, If we mount with --data_journal
108 * EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) + EXT3_SINGLEDATA_TRANS_BLOCKS 8
109 * XXX Note: maybe iam need more,since iam have more level than Ext3 htree
111 INSERT_IAM_CREDITS = 16,
114 * same as IAM insert/delete 16
116 INSERT_OI_CREDITS = 16,
119 * Same as create object in Ext3 filesystem, but did not count QUOTA i
120 * EXT3_DATA_TRANS_BLOCKS(12) + INDEX_EXTRA_BLOCKS(8) +
121 * 3(inode bits,groups, GDT)*/
122 CREATE_OBJECT_CREDITS = 23,
125 * SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS
126 * XXX Note: in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
127 * also counted in. Do not know why? */
128 XATTR_SET_CREDITS = 12,
130 /* A log rec need EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
131 * EXT3_SINGLEDATA_TRANS_BLOCKS(8))
136 /* XXX we should know the ost count to calculate the llog */
137 #define DEFAULT_LSM_COUNT 4 /* FIXME later */
139 MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
140 /* OBJECT CREATE :OI_INSERT + CREATE */
141 MDD_TXN_OBJECT_CREATE_CREDITS = (INSERT_OI_CREDITS + \
142 CREATE_OBJECT_CREDITS),
143 /* ATTR SET: XATTR_SET + ATTR set(3)*/
144 MDD_TXN_ATTR_SET_CREDITS = (XATTR_SET_CREDITS + 3),
146 MDD_TXN_XATTR_SET_CREDITS = XATTR_SET_CREDITS,
148 MDD_TXN_INDEX_INSERT_CREDITS = INSERT_IAM_CREDITS,
149 MDD_TXN_INDEX_DELETE_CREDITS = INSERT_IAM_CREDITS,
150 MDD_TXN_LINK_CREDITS = INSERT_IAM_CREDITS,
154 * IAM_INSERT_CREDITS + UNLINK log
155 * Unlink log = ((EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
156 * EXT3_SINGLEDATA_TRANS_BLOCKS(8)) * lsm stripe count
157 * XXX we should know the ost count to calculate the llog
159 MDD_TXN_UNLINK_CREDITS = (INSERT_IAM_CREDITS +
160 LOG_REC_CREDIT*DEFAULT_LSM_COUNT),
163 * 2 IAM_INSERT + 1 IAM_DELETE + UNLINK LOG
165 MDD_TXN_RENAME_CREDITS = (3 * INSERT_IAM_CREDITS + \
166 LOG_REC_CREDIT * DEFAULT_LSM_COUNT),
167 /* CREATE_DATA CREDITS
170 MDD_TXN_CREATE_DATA_CREDITS = XATTR_SET_CREDITS,
172 * IAM_INSERT + OI_INSERT + CREATE_OBJECT_CREDITS
173 * SET_MD CREDITS is already counted in CREATE_OBJECT CREDITS */
174 MDD_TXN_MKDIR_CREDITS = (INSERT_IAM_CREDITS + INSERT_OI_CREDITS \
175 + CREATE_OBJECT_CREDITS)
178 #define DEFINE_MDD_TXN_OP_DESC(opname) \
179 static const struct mdd_txn_op_descr opname = { \
180 .mod_op = opname ## _OP, \
181 .mod_credits = opname ## _CREDITS, \
185 * number of blocks to reserve for particular operations. Should be function
186 * of ... something. Stub for now.
188 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
189 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
190 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
191 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
192 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
193 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
194 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
195 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
196 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
197 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
198 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
200 static void mdd_txn_param_build(const struct lu_env *env,
201 const struct mdd_txn_op_descr *opd)
203 mdd_env_info(env)->mti_param.tp_credits = opd->mod_credits;
206 #define mdd_get_group_info(group_info) do { \
207 atomic_inc(&(group_info)->usage); \
210 #define mdd_put_group_info(group_info) do { \
211 if (atomic_dec_and_test(&(group_info)->usage)) \
212 groups_free(group_info); \
215 #define MDD_NGROUPS_PER_BLOCK ((int)(CFS_PAGE_SIZE / sizeof(gid_t)))
217 #define MDD_GROUP_AT(gi, i) \
218 ((gi)->blocks[(i) / MDD_NGROUPS_PER_BLOCK][(i) % MDD_NGROUPS_PER_BLOCK])
220 /* groups_search() is copied from linux kernel! */
221 /* a simple bsearch */
222 static int mdd_groups_search(struct group_info *group_info, gid_t grp)
230 right = group_info->ngroups;
231 while (left < right) {
232 int mid = (left + right) / 2;
233 int cmp = grp - MDD_GROUP_AT(group_info, mid);
245 static int mdd_in_group_p(struct md_ucred *uc, gid_t grp)
249 if (grp != uc->mu_fsgid) {
250 struct group_info *group_info = NULL;
252 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
253 if ((grp == uc->mu_suppgids[0]) ||
254 (grp == uc->mu_suppgids[1]))
258 group_info = uc->mu_ginfo;
259 else if (uc->mu_identity)
260 group_info = uc->mu_identity->mi_ginfo;
265 mdd_get_group_info(group_info);
266 rc = mdd_groups_search(group_info, grp);
267 mdd_put_group_info(group_info);
272 static inline int mdd_permission_internal(const struct lu_env *env,
273 struct mdd_object *obj, int mask,
276 return __mdd_permission_internal(env, obj, mask, 1, uc);
279 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
281 struct mdd_thread_info *info;
283 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
284 LASSERT(info != NULL);
288 static struct lu_object *mdd_object_alloc(const struct lu_env *env,
289 const struct lu_object_header *hdr,
292 struct mdd_object *mdd_obj;
294 OBD_ALLOC_PTR(mdd_obj);
295 if (mdd_obj != NULL) {
298 o = mdd2lu_obj(mdd_obj);
299 lu_object_init(o, NULL, d);
300 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
301 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
302 mdd_obj->mod_count = 0;
303 o->lo_ops = &mdd_lu_obj_ops;
310 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
312 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
313 struct lu_object *below;
314 struct lu_device *under;
317 under = &d->mdd_child->dd_lu_dev;
318 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
323 lu_object_add(o, below);
327 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj);
329 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
331 if (lu_object_exists(o))
332 return mdd_get_flags(env, lu2mdd_obj(o));
337 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
339 struct mdd_object *mdd = lu2mdd_obj(o);
345 static int mdd_object_print(const struct lu_env *env, void *cookie,
346 lu_printer_t p, const struct lu_object *o)
348 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
351 /* orphan handling is here */
352 static void mdd_object_delete(const struct lu_env *env,
355 struct mdd_object *mdd_obj = lu2mdd_obj(o);
356 struct thandle *handle = NULL;
359 if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
362 if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
363 mdd_txn_param_build(env, &MDD_TXN_MKDIR);
364 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
366 CERROR("Cannot get thandle\n");
368 mdd_write_lock(env, mdd_obj);
369 /* let's remove obj from the orphan list */
370 __mdd_orphan_del(env, mdd_obj, handle);
371 mdd_write_unlock(env, mdd_obj);
372 mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
378 static struct lu_object_operations mdd_lu_obj_ops = {
379 .loo_object_init = mdd_object_init,
380 .loo_object_start = mdd_object_start,
381 .loo_object_free = mdd_object_free,
382 .loo_object_print = mdd_object_print,
383 .loo_object_delete = mdd_object_delete
386 struct mdd_object *mdd_object_find(const struct lu_env *env,
387 struct mdd_device *d,
388 const struct lu_fid *f)
390 struct lu_object *o, *lo;
391 struct mdd_object *m;
394 o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA);
396 m = (struct mdd_object *)o;
398 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
399 /* remote object can't be located and should be put then */
401 lu_object_put(env, o);
407 static inline int mdd_is_immutable(struct mdd_object *obj)
409 return obj->mod_flags & IMMUTE_OBJ;
412 static inline int mdd_is_append(struct mdd_object *obj)
414 return obj->mod_flags & APPEND_OBJ;
417 static inline void mdd_set_dead_obj(struct mdd_object *obj)
420 obj->mod_flags |= DEAD_OBJ;
423 static inline int mdd_is_dead_obj(struct mdd_object *obj)
425 return obj && obj->mod_flags & DEAD_OBJ;
428 /*Check whether it may create the cobj under the pobj*/
429 static int mdd_may_create(const struct lu_env *env,
430 struct mdd_object *pobj, struct mdd_object *cobj,
431 int need_check, struct md_ucred *uc)
436 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
439 if (mdd_is_dead_obj(pobj))
442 /*check pobj may create or not*/
444 rc = mdd_permission_internal(env, pobj,
445 MAY_WRITE | MAY_EXEC, uc);
450 static inline int __mdd_la_get(const struct lu_env *env,
451 struct mdd_object *obj, struct lu_attr *la)
453 struct dt_object *next = mdd_object_child(obj);
454 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
455 return next->do_ops->do_attr_get(env, next, la);
458 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
460 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
462 if (flags & LUSTRE_APPEND_FL)
463 obj->mod_flags |= APPEND_OBJ;
465 if (flags & LUSTRE_IMMUTABLE_FL)
466 obj->mod_flags |= IMMUTE_OBJ;
469 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
471 struct lu_attr *la = &mdd_env_info(env)->mti_la;
475 mdd_read_lock(env, obj);
476 rc = __mdd_la_get(env, obj, la);
477 mdd_read_unlock(env, obj);
479 mdd_flags_xlate(obj, la->la_flags);
483 #define mdd_cap_t(x) (x)
485 #define MDD_CAP_TO_MASK(x) (1 << (x))
487 #define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
489 /* capable() is copied from linux kernel! */
490 static inline int mdd_capable(struct md_ucred *uc, int cap)
492 if (mdd_cap_raised(uc->mu_cap, cap))
498 * It's inline, so penalty for filesystems that don't use sticky bit is
501 static inline int mdd_is_sticky(const struct lu_env *env,
502 struct mdd_object *pobj,
503 struct mdd_object *cobj,
506 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
509 rc = __mdd_la_get(env, cobj, tmp_la);
512 } else if (tmp_la->la_uid == uc->mu_fsuid) {
515 rc = __mdd_la_get(env, pobj, tmp_la);
518 else if (!(tmp_la->la_mode & S_ISVTX))
520 else if (tmp_la->la_uid == uc->mu_fsuid)
523 return !mdd_capable(uc, CAP_FOWNER);
527 /*Check whether it may delete the cobj under the pobj*/
528 static int mdd_may_delete(const struct lu_env *env,
529 struct mdd_object *pobj, struct mdd_object *cobj,
530 int is_dir, int need_check, struct md_ucred *uc)
532 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
538 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
541 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
545 if (!S_ISDIR(mdd_object_type(cobj)))
548 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
551 } else if (S_ISDIR(mdd_object_type(cobj))) {
556 if (mdd_is_dead_obj(pobj))
559 if (mdd_is_sticky(env, pobj, cobj, uc))
563 rc = mdd_permission_internal(env, pobj,
564 MAY_WRITE | MAY_EXEC, uc);
569 /* get only inode attributes */
570 static int __mdd_iattr_get(const struct lu_env *env,
571 struct mdd_object *mdd_obj, struct md_attr *ma)
576 rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr);
578 ma->ma_valid = MA_INODE;
582 /* get lov EA only */
583 static int __mdd_lmm_get(const struct lu_env *env,
584 struct mdd_object *mdd_obj, struct md_attr *ma)
589 LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
590 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
593 ma->ma_valid |= MA_LOV;
600 static int __mdd_lmv_get(const struct lu_env *env,
601 struct mdd_object *mdd_obj, struct md_attr *ma)
605 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
608 ma->ma_valid |= MA_LMV;
614 static int mdd_attr_get_internal(const struct lu_env *env,
615 struct mdd_object *mdd_obj,
621 if (ma->ma_need & MA_INODE)
622 rc = __mdd_iattr_get(env, mdd_obj, ma);
624 if (rc == 0 && ma->ma_need & MA_LOV) {
625 if (S_ISREG(mdd_object_type(mdd_obj)) ||
626 S_ISDIR(mdd_object_type(mdd_obj)))
627 rc = __mdd_lmm_get(env, mdd_obj, ma);
629 if (rc == 0 && ma->ma_need & MA_LMV) {
630 if (S_ISDIR(mdd_object_type(mdd_obj)))
631 rc = __mdd_lmv_get(env, mdd_obj, ma);
633 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
638 static inline int mdd_attr_get_internal_locked(const struct lu_env *env,
639 struct mdd_object *mdd_obj,
643 mdd_read_lock(env, mdd_obj);
644 rc = mdd_attr_get_internal(env, mdd_obj, ma);
645 mdd_read_unlock(env, mdd_obj);
650 * No permission check is needed.
652 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
653 struct md_attr *ma, struct md_ucred *uc)
655 struct mdd_object *mdd_obj = md2mdd_obj(obj);
659 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
664 * No permission check is needed.
666 static int mdd_xattr_get(const struct lu_env *env,
667 struct md_object *obj, void *buf, int buf_len,
668 const char *name, struct md_ucred *uc)
670 struct mdd_object *mdd_obj = md2mdd_obj(obj);
671 struct dt_object *next;
676 LASSERT(lu_object_exists(&obj->mo_lu));
678 next = mdd_object_child(mdd_obj);
679 mdd_read_lock(env, mdd_obj);
680 rc = next->do_ops->do_xattr_get(env, next, buf, buf_len, name);
681 mdd_read_unlock(env, mdd_obj);
687 * Permission check is done when open,
688 * no need check again.
690 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
691 void *buf, int buf_len, struct md_ucred *uc)
693 struct mdd_object *mdd_obj = md2mdd_obj(obj);
694 struct dt_object *next;
699 LASSERT(lu_object_exists(&obj->mo_lu));
701 next = mdd_object_child(mdd_obj);
702 mdd_read_lock(env, mdd_obj);
703 rc = next->do_body_ops->dbo_read(env, next, buf, buf_len, &pos);
704 mdd_read_unlock(env, mdd_obj);
708 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
709 void *buf, int buf_len, struct md_ucred *uc)
711 struct mdd_object *mdd_obj = md2mdd_obj(obj);
712 struct dt_object *next;
717 LASSERT(lu_object_exists(&obj->mo_lu));
719 next = mdd_object_child(mdd_obj);
720 mdd_read_lock(env, mdd_obj);
721 rc = next->do_ops->do_xattr_list(env, next, buf, buf_len);
722 mdd_read_unlock(env, mdd_obj);
727 static int mdd_txn_start_cb(const struct lu_env *env,
728 struct txn_param *param, void *cookie)
733 static int mdd_txn_stop_cb(const struct lu_env *env,
734 struct thandle *txn, void *cookie)
736 struct mdd_device *mdd = cookie;
737 struct obd_device *obd = mdd2obd_dev(mdd);
740 return mds_lov_write_objids(obd);
743 static int mdd_txn_commit_cb(const struct lu_env *env,
744 struct thandle *txn, void *cookie)
749 static int mdd_device_init(const struct lu_env *env,
750 struct lu_device *d, struct lu_device *next)
752 struct mdd_device *mdd = lu2mdd_dev(d);
753 struct dt_device *dt;
757 mdd->mdd_child = lu2dt_dev(next);
760 /* prepare transactions callbacks */
761 mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
762 mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
763 mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
764 mdd->mdd_txn_cb.dtc_cookie = mdd;
769 static struct lu_device *mdd_device_fini(const struct lu_env *env,
772 struct mdd_device *mdd = lu2mdd_dev(d);
773 struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
778 static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
781 struct dt_object *root;
784 dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
785 root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
788 LASSERT(root != NULL);
789 lu_object_put(env, &root->do_lu);
790 rc = orph_index_init(env, mdd);
797 static void mdd_device_shutdown(const struct lu_env *env,
798 struct mdd_device *m)
800 dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
802 mdd_fini_obd(env, m);
803 orph_index_fini(env, m);
806 static int mdd_process_config(const struct lu_env *env,
807 struct lu_device *d, struct lustre_cfg *cfg)
809 struct mdd_device *m = lu2mdd_dev(d);
810 struct dt_device *dt = m->mdd_child;
811 struct lu_device *next = &dt->dd_lu_dev;
815 switch (cfg->lcfg_command) {
817 rc = next->ld_ops->ldo_process_config(env, next, cfg);
820 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
822 rc = mdd_init_obd(env, m, cfg);
824 CERROR("lov init error %d \n", rc);
827 rc = mdd_mount(env, m);
832 mdd_device_shutdown(env, m);
834 rc = next->ld_ops->ldo_process_config(env, next, cfg);
841 static int mdd_recovery_complete(const struct lu_env *env,
844 struct mdd_device *mdd = lu2mdd_dev(d);
845 struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
846 struct obd_device *obd = mdd2obd_dev(mdd);
850 rc = mdd_lov_set_nextid(env, mdd);
852 CERROR("%s: mdd_lov_set_nextid failed %d\n",
856 rc = mdd_cleanup_unlink_llog(env, mdd);
858 obd_notify(obd->u.mds.mds_osc_obd, NULL,
859 obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
860 OBD_NOTIFY_SYNC, NULL);
865 obd->obd_recovering = 0;
866 obd->obd_type->typ_dt_ops->o_postrecov(obd);
867 /* TODO: orphans handling */
868 __mdd_orphan_cleanup(env, mdd);
869 rc = next->ld_ops->ldo_recovery_complete(env, next);
874 struct lu_device_operations mdd_lu_ops = {
875 .ldo_object_alloc = mdd_object_alloc,
876 .ldo_process_config = mdd_process_config,
877 .ldo_recovery_complete = mdd_recovery_complete
880 void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
882 struct dt_object *next = mdd_object_child(obj);
884 next->do_ops->do_write_lock(env, next);
887 void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
889 struct dt_object *next = mdd_object_child(obj);
891 next->do_ops->do_read_lock(env, next);
894 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
896 struct dt_object *next = mdd_object_child(obj);
898 next->do_ops->do_write_unlock(env, next);
901 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
903 struct dt_object *next = mdd_object_child(obj);
905 next->do_ops->do_read_unlock(env, next);
908 static void mdd_lock2(const struct lu_env *env,
909 struct mdd_object *o0, struct mdd_object *o1)
911 mdd_write_lock(env, o0);
912 mdd_write_lock(env, o1);
915 static void mdd_unlock2(const struct lu_env *env,
916 struct mdd_object *o0, struct mdd_object *o1)
918 mdd_write_unlock(env, o1);
919 mdd_write_unlock(env, o0);
922 static struct thandle* mdd_trans_start(const struct lu_env *env,
923 struct mdd_device *mdd)
925 struct txn_param *p = &mdd_env_info(env)->mti_param;
927 return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
930 static void mdd_trans_stop(const struct lu_env *env,
931 struct mdd_device *mdd, int result,
932 struct thandle *handle)
934 handle->th_result = result;
935 mdd_child_ops(mdd)->dt_trans_stop(env, handle);
938 static int __mdd_object_create(const struct lu_env *env,
939 struct mdd_object *obj, struct md_attr *ma,
940 struct thandle *handle)
942 struct dt_object *next;
943 struct lu_attr *attr = &ma->ma_attr;
947 if (!lu_object_exists(mdd2lu_obj(obj))) {
948 next = mdd_object_child(obj);
949 rc = next->do_ops->do_create(env, next, attr, handle);
953 LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
958 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
959 const struct lu_attr *attr, struct thandle *handle)
961 struct dt_object *next;
963 LASSERT(lu_object_exists(mdd2lu_obj(o)));
964 next = mdd_object_child(o);
965 return next->do_ops->do_attr_set(env, next, attr, handle);
968 int mdd_attr_set_internal_locked(const struct lu_env *env,
969 struct mdd_object *o,
970 const struct lu_attr *attr,
971 struct thandle *handle)
974 mdd_write_lock(env, o);
975 rc = mdd_attr_set_internal(env, o, attr, handle);
976 mdd_write_unlock(env, o);
980 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
981 const void *buf, int buf_len, const char *name,
982 int fl, struct thandle *handle)
984 struct dt_object *next;
988 LASSERT(lu_object_exists(mdd2lu_obj(o)));
989 next = mdd_object_child(o);
990 if (buf && buf_len > 0) {
991 rc = next->do_ops->do_xattr_set(env, next, buf, buf_len, name,
993 }else if (buf == NULL && buf_len == 0) {
994 rc = next->do_ops->do_xattr_del(env, next, name, handle);
999 /* this gives the same functionality as the code between
1000 * sys_chmod and inode_setattr
1001 * chown_common and inode_setattr
1002 * utimes and inode_setattr
1003 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1006 int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1007 struct lu_attr *la, struct md_ucred *uc)
1009 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1010 time_t now = CURRENT_SECONDS;
1017 /* Do not permit change file type */
1018 if (la->la_valid & LA_TYPE)
1021 /* They should not be processed by setattr */
1022 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1025 rc = __mdd_la_get(env, obj, tmp_la);
1029 if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
1032 * If only change flags of the object, we should
1033 * let it pass, but also need capability check
1034 * here if (!capable(CAP_LINUX_IMMUTABLE)),
1035 * fix it, when implement capable in mds
1037 if (la->la_valid & ~LA_FLAGS)
1040 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
1043 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1044 !mdd_capable(uc, CAP_FOWNER))
1048 * According to Ext3 implementation on this, the
1049 * Ctime will be changed, but not clear why?
1052 la->la_valid |= LA_CTIME;
1056 /* Check for setting the obj time. */
1057 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1058 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1059 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1060 !mdd_capable(uc, CAP_FOWNER))
1064 /* Make sure a caller can chmod. */
1065 if (la->la_valid & LA_MODE) {
1067 * Bypass la_vaild == LA_MODE,
1068 * this is for changing file with SUID or SGID.
1070 if ((la->la_valid & ~LA_MODE) &&
1071 (uc->mu_fsuid != tmp_la->la_uid) &&
1072 !mdd_capable(uc, CAP_FOWNER))
1075 if (la->la_mode == (umode_t) -1)
1076 la->la_mode = tmp_la->la_mode;
1078 la->la_mode = (la->la_mode & S_IALLUGO) |
1079 (tmp_la->la_mode & ~S_IALLUGO);
1081 /* Also check the setgid bit! */
1082 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
1083 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
1084 la->la_mode &= ~S_ISGID;
1086 la->la_mode = tmp_la->la_mode;
1089 /* Make sure a caller can chown. */
1090 if (la->la_valid & LA_UID) {
1091 if (la->la_uid == (uid_t) -1)
1092 la->la_uid = tmp_la->la_uid;
1093 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1094 (la->la_uid != tmp_la->la_uid)) &&
1095 !mdd_capable(uc, CAP_CHOWN))
1099 * If the user or group of a non-directory has been
1100 * changed by a non-root user, remove the setuid bit.
1101 * 19981026 David C Niemi <niemi@tux.org>
1103 * Changed this to apply to all users, including root,
1104 * to avoid some races. This is the behavior we had in
1105 * 2.0. The check for non-root was definitely wrong
1106 * for 2.2 anyway, as it should have been using
1107 * CAP_FSETID rather than fsuid -- 19990830 SD.
1109 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1110 !S_ISDIR(tmp_la->la_mode)) {
1111 la->la_mode &= ~S_ISUID;
1112 la->la_valid |= LA_MODE;
1116 /* Make sure caller can chgrp. */
1117 if (la->la_valid & LA_GID) {
1118 if (la->la_gid == (gid_t) -1)
1119 la->la_gid = tmp_la->la_gid;
1120 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1121 ((la->la_gid != tmp_la->la_gid) &&
1122 !mdd_in_group_p(uc, la->la_gid))) &&
1123 !mdd_capable(uc, CAP_CHOWN))
1127 * Likewise, if the user or group of a non-directory
1128 * has been changed by a non-root user, remove the
1129 * setgid bit UNLESS there is no group execute bit
1130 * (this would be a file marked for mandatory
1131 * locking). 19981026 David C Niemi <niemi@tux.org>
1133 * Removed the fsuid check (see the comment above) --
1136 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1137 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1138 la->la_mode &= ~S_ISGID;
1139 la->la_valid |= LA_MODE;
1143 /* For tuncate (or setsize), we should have MAY_WRITE perm */
1144 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1145 rc = mdd_permission_internal(env, obj, MAY_WRITE, uc);
1150 * For the "Size-on-MDS" setattr update, merge coming
1151 * attributes with the set in the inode. BUG 10641
1153 if ((la->la_valid & LA_ATIME) &&
1154 (la->la_atime < tmp_la->la_atime))
1155 la->la_valid &= ~LA_ATIME;
1157 if ((la->la_valid & LA_CTIME) &&
1158 (la->la_ctime < tmp_la->la_ctime))
1159 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1161 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
1163 la->la_valid |= LA_MTIME;
1167 /* For last, ctime must be fixed */
1168 if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
1170 la->la_valid |= LA_CTIME;
1176 /* set attr and LOV EA at once, return updated attr */
1177 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1178 const struct md_attr *ma, struct md_ucred *uc)
1180 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1181 struct mdd_device *mdd = mdo2mdd(obj);
1182 struct thandle *handle;
1183 struct lov_mds_md *lmm = NULL;
1184 int rc = 0, lmm_size = 0, max_size = 0;
1185 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1188 mdd_txn_param_build(env, &MDD_TXN_ATTR_SET);
1189 handle = mdd_trans_start(env, mdd);
1191 RETURN(PTR_ERR(handle));
1192 /*TODO: add lock here*/
1193 /* start a log jounal handle if needed */
1194 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1195 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1196 max_size = mdd_lov_mdsize(env, mdd);
1197 OBD_ALLOC(lmm, max_size);
1199 GOTO(cleanup, rc = -ENOMEM);
1201 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1208 if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
1209 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1210 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1212 *la_copy = ma->ma_attr;
1213 mdd_write_lock(env, mdd_obj);
1214 rc = mdd_fix_attr(env, mdd_obj, la_copy, uc);
1215 mdd_write_unlock(env, mdd_obj);
1219 if (la_copy->la_valid & LA_FLAGS) {
1220 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1223 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1224 } else if (la_copy->la_valid) { /* setattr */
1225 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1227 /* journal chown/chgrp in llog, just like unlink */
1228 if (rc == 0 && lmm_size){
1229 /*TODO set_attr llog */
1233 if (rc == 0 && ma->ma_valid & MA_LOV) {
1236 mode = mdd_object_type(mdd_obj);
1237 if (S_ISREG(mode) || S_ISDIR(mode)) {
1238 /*TODO check permission*/
1239 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1240 ma->ma_lmm_size, handle, 1);
1245 mdd_trans_stop(env, mdd, rc, handle);
1246 if (rc == 0 && lmm_size) {
1247 /*set obd attr, if needed*/
1248 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
1251 OBD_FREE(lmm, max_size);
1257 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1258 const void *buf, int buf_len, const char *name, int fl,
1259 struct thandle *handle)
1264 mdd_write_lock(env, obj);
1265 rc = __mdd_xattr_set(env, obj, buf, buf_len, name, fl, handle);
1266 mdd_write_unlock(env, obj);
1271 static int mdd_xattr_sanity_check(const struct lu_env *env,
1272 struct mdd_object *obj,
1273 struct md_ucred *uc)
1275 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1279 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1282 mdd_read_lock(env, obj);
1283 rc = __mdd_la_get(env, obj, tmp_la);
1284 mdd_read_unlock(env, obj);
1288 if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
1294 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1295 const void *buf, int buf_len, const char *name, int fl,
1296 struct md_ucred *uc)
1298 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1299 struct mdd_device *mdd = mdo2mdd(obj);
1300 struct thandle *handle;
1304 rc = mdd_xattr_sanity_check(env, mdd_obj, uc);
1308 mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1309 handle = mdd_trans_start(env, mdd);
1311 RETURN(PTR_ERR(handle));
1313 rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, buf_len, name,
1315 #ifdef HAVE_SPLIT_SUPPORT
1317 /* very ugly hack, if setting lmv, it means splitting
1318 * sucess, we should return -ERESTART to notify the
1319 * client, so transno for this splitting should be
1320 * zero according to the replay rules. so return -ERESTART
1321 * here let mdt trans stop callback know this.
1323 if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
1327 mdd_trans_stop(env, mdd, rc, handle);
1332 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
1333 struct mdd_object *obj,
1334 const char *name, struct thandle *handle)
1336 struct dt_object *next;
1338 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1339 next = mdd_object_child(obj);
1340 return next->do_ops->do_xattr_del(env, next, name, handle);
1343 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1344 const char *name, struct md_ucred *uc)
1346 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1347 struct mdd_device *mdd = mdo2mdd(obj);
1348 struct thandle *handle;
1352 rc = mdd_xattr_sanity_check(env, mdd_obj, uc);
1356 mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1357 handle = mdd_trans_start(env, mdd);
1359 RETURN(PTR_ERR(handle));
1361 mdd_write_lock(env, mdd_obj);
1362 rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
1363 mdd_write_unlock(env, mdd_obj);
1365 mdd_trans_stop(env, mdd, rc, handle);
1370 static int __mdd_index_insert_only(const struct lu_env *env,
1371 struct mdd_object *pobj,
1372 const struct lu_fid *lf,
1373 const char *name, struct thandle *th)
1376 struct dt_object *next = mdd_object_child(pobj);
1379 if (dt_try_as_dir(env, next))
1380 rc = next->do_index_ops->dio_insert(env, next,
1381 (struct dt_rec *)lf,
1382 (struct dt_key *)name, th);
1388 /* insert new index, add reference if isdir, update times */
1389 static int __mdd_index_insert(const struct lu_env *env,
1390 struct mdd_object *pobj, const struct lu_fid *lf,
1391 const char *name, int isdir, struct thandle *th)
1394 struct dt_object *next = mdd_object_child(pobj);
1398 struct lu_attr *la = &mdd_env_info(env)->mti_la;
1401 if (dt_try_as_dir(env, next))
1402 rc = next->do_index_ops->dio_insert(env, next,
1403 (struct dt_rec *)lf,
1404 (struct dt_key *)name, th);
1410 __mdd_ref_add(env, pobj, th);
1412 la->la_valid = LA_MTIME|LA_CTIME;
1413 la->la_atime = ma->ma_attr.la_atime;
1414 la->la_ctime = ma->ma_attr.la_ctime;
1415 rc = mdd_attr_set_internal(env, mdd_obj, la, handle);
1421 static int __mdd_index_delete(const struct lu_env *env,
1422 struct mdd_object *pobj, const char *name,
1423 struct thandle *handle)
1426 struct dt_object *next = mdd_object_child(pobj);
1429 if (dt_try_as_dir(env, next))
1430 rc = next->do_index_ops->dio_delete(env, next,
1431 (struct dt_key *)name, handle);
1437 static int mdd_link_sanity_check(const struct lu_env *env,
1438 struct mdd_object *tgt_obj,
1439 struct mdd_object *src_obj,
1440 struct md_ucred *uc)
1445 rc = mdd_may_create(env, tgt_obj, NULL, 1, uc);
1449 if (S_ISDIR(mdd_object_type(src_obj)))
1452 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1458 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1459 struct md_object *src_obj, const char *name,
1460 struct md_attr *ma, struct md_ucred *uc)
1462 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1463 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1464 struct mdd_device *mdd = mdo2mdd(src_obj);
1465 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1466 struct thandle *handle;
1470 mdd_txn_param_build(env, &MDD_TXN_LINK);
1471 handle = mdd_trans_start(env, mdd);
1473 RETURN(PTR_ERR(handle));
1475 mdd_lock2(env, mdd_tobj, mdd_sobj);
1477 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj, uc);
1481 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1484 __mdd_ref_add(env, mdd_sobj, handle);
1486 *la_copy = ma->ma_attr;
1487 la_copy->la_valid = LA_CTIME;
1488 rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle);
1492 la_copy->la_valid = LA_CTIME | LA_MTIME;
1493 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
1496 mdd_unlock2(env, mdd_tobj, mdd_sobj);
1497 mdd_trans_stop(env, mdd, rc, handle);
1502 * Check that @dir contains no entries except (possibly) dot and dotdot.
1507 * -ENOTEMPTY not empty
1511 static int mdd_dir_is_empty(const struct lu_env *env,
1512 struct mdd_object *dir)
1515 struct dt_object *obj;
1516 struct dt_it_ops *iops;
1519 obj = mdd_object_child(dir);
1520 iops = &obj->do_index_ops->dio_it;
1521 it = iops->init(env, obj, 0);
1523 result = iops->get(env, it, (const void *)"");
1526 for (result = 0, i = 0; result == 0 && i < 3; ++i)
1527 result = iops->next(env, it);
1529 result = -ENOTEMPTY;
1530 else if (result == +1)
1532 } else if (result == 0)
1534 * Huh? Index contains no zero key?
1539 iops->fini(env, it);
1545 /* return md_attr back,
1546 * if it is last unlink then return lov ea + llog cookie*/
1547 int __mdd_object_kill(const struct lu_env *env,
1548 struct mdd_object *obj,
1554 mdd_set_dead_obj(obj);
1555 if (S_ISREG(mdd_object_type(obj))) {
1556 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1557 * Caller must be ready for that. */
1558 rc = __mdd_lmm_get(env, obj, ma);
1559 if ((ma->ma_valid & MA_LOV))
1560 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1566 /* caller should take a lock before calling */
1567 static int __mdd_finish_unlink(const struct lu_env *env,
1568 struct mdd_object *obj, struct md_attr *ma,
1574 rc = __mdd_iattr_get(env, obj, ma);
1575 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1576 /* add new orphan and the object
1577 * will be deleted during the object_put() */
1578 if (__mdd_orphan_add(env, obj, th) == 0)
1579 set_bit(LU_OBJECT_ORPHAN,
1580 &mdd2lu_obj(obj)->lo_header->loh_flags);
1582 if (obj->mod_count == 0)
1583 rc = __mdd_object_kill(env, obj, ma);
1588 static int mdd_unlink_sanity_check(const struct lu_env *env,
1589 struct mdd_object *pobj,
1590 struct mdd_object *cobj,
1592 struct md_ucred *uc)
1594 struct dt_object *dt_cobj = mdd_object_child(cobj);
1598 rc = mdd_may_delete(env, pobj, cobj,
1599 S_ISDIR(ma->ma_attr.la_mode), 1, uc);
1603 if (S_ISDIR(mdd_object_type(cobj))) {
1604 if (dt_try_as_dir(env, dt_cobj))
1605 rc = mdd_dir_is_empty(env, cobj);
1613 static int mdd_unlink(const struct lu_env *env,
1614 struct md_object *pobj, struct md_object *cobj,
1615 const char *name, struct md_attr *ma, struct md_ucred *uc)
1617 struct mdd_device *mdd = mdo2mdd(pobj);
1618 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1619 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1620 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1621 struct thandle *handle;
1625 mdd_txn_param_build(env, &MDD_TXN_UNLINK);
1626 handle = mdd_trans_start(env, mdd);
1628 RETURN(PTR_ERR(handle));
1630 mdd_lock2(env, mdd_pobj, mdd_cobj);
1632 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma, uc);
1636 rc = __mdd_index_delete(env, mdd_pobj, name, handle);
1640 __mdd_ref_del(env, mdd_cobj, handle);
1641 *la_copy = ma->ma_attr;
1642 if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1644 __mdd_ref_del(env, mdd_cobj, handle);
1646 __mdd_ref_del(env, mdd_pobj, handle);
1648 la_copy->la_valid = LA_CTIME;
1649 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle);
1654 la_copy->la_valid = LA_CTIME | LA_MTIME;
1655 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
1659 rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle);
1662 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
1663 strlen("unlinked"), "unlinked", 0,
1667 mdd_unlock2(env, mdd_pobj, mdd_cobj);
1668 mdd_trans_stop(env, mdd, rc, handle);
1672 /* partial unlink */
1673 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1674 struct md_attr *ma, struct md_ucred *uc)
1676 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1677 struct mdd_device *mdd = mdo2mdd(obj);
1678 struct thandle *handle;
1682 mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1683 handle = mdd_trans_start(env, mdd);
1687 mdd_write_lock(env, mdd_obj);
1689 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma, uc);
1693 __mdd_ref_del(env, mdd_obj, handle);
1695 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1697 __mdd_ref_del(env, mdd_obj, handle);
1700 rc = __mdd_finish_unlink(env, mdd_obj, ma, handle);
1704 mdd_write_unlock(env, mdd_obj);
1705 mdd_trans_stop(env, mdd, rc, handle);
1709 static int mdd_parent_fid(const struct lu_env *env,
1710 struct mdd_object *obj,
1713 return __mdd_lookup_locked(env, &obj->mod_obj,
1714 dotdot, fid, 0, NULL);
1718 * return 1: if lf is the fid of the ancestor of p1;
1721 * return -EREMOTE: if remote object is found, in this
1722 * case fid of remote object is saved to @pf;
1724 * otherwise: values < 0, errors.
1726 static int mdd_is_parent(const struct lu_env *env,
1727 struct mdd_device *mdd,
1728 struct mdd_object *p1,
1729 const struct lu_fid *lf,
1732 struct mdd_object *parent = NULL;
1733 struct lu_fid *pfid;
1737 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1738 pfid = &mdd_env_info(env)->mti_fid;
1740 /* Do not lookup ".." in root, they do not exist there. */
1741 if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1745 rc = mdd_parent_fid(env, p1, pfid);
1748 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1750 if (lu_fid_eq(pfid, lf))
1753 mdd_object_put(env, parent);
1754 parent = mdd_object_find(env, mdd, pfid);
1756 /* cross-ref parent */
1757 if (parent == NULL) {
1760 GOTO(out, rc = EREMOTE);
1761 } else if (IS_ERR(parent))
1762 GOTO(out, rc = PTR_ERR(parent));
1767 if (parent && !IS_ERR(parent))
1768 mdd_object_put(env, parent);
1772 static int mdd_rename_lock(const struct lu_env *env,
1773 struct mdd_device *mdd,
1774 struct mdd_object *src_pobj,
1775 struct mdd_object *tgt_pobj)
1780 if (src_pobj == tgt_pobj) {
1781 mdd_write_lock(env, src_pobj);
1785 /* compared the parent child relationship of src_p&tgt_p */
1786 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1787 mdd_lock2(env, src_pobj, tgt_pobj);
1789 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1790 mdd_lock2(env, tgt_pobj, src_pobj);
1794 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1799 mdd_lock2(env, tgt_pobj, src_pobj);
1803 mdd_lock2(env, src_pobj, tgt_pobj);
1808 static void mdd_rename_unlock(const struct lu_env *env,
1809 struct mdd_object *src_pobj,
1810 struct mdd_object *tgt_pobj)
1812 mdd_write_unlock(env, src_pobj);
1813 if (src_pobj != tgt_pobj)
1814 mdd_write_unlock(env, tgt_pobj);
1817 static int mdd_rename_sanity_check(const struct lu_env *env,
1818 struct mdd_object *src_pobj,
1819 struct mdd_object *tgt_pobj,
1820 const struct lu_fid *sfid,
1822 struct mdd_object *sobj,
1823 struct mdd_object *tobj,
1824 struct md_ucred *uc)
1826 struct mdd_device *mdd = mdo2mdd(&src_pobj->mod_obj);
1827 int rc = 0, need_check = 1;
1830 mdd_read_lock(env, src_pobj);
1831 rc = mdd_may_delete(env, src_pobj, sobj, src_is_dir, need_check, uc);
1832 mdd_read_unlock(env, src_pobj);
1836 if (src_pobj == tgt_pobj)
1840 mdd_read_lock(env, tgt_pobj);
1841 rc = mdd_may_create(env, tgt_pobj, NULL, need_check, uc);
1842 mdd_read_unlock(env, tgt_pobj);
1844 mdd_read_lock(env, tgt_pobj);
1845 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1847 mdd_read_unlock(env, tgt_pobj);
1848 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
1849 mdd_dir_is_empty(env, tobj))
1853 /* source should not be ancestor of target dir */
1854 if (!rc && src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
1859 /* src object can be remote that is why we use only fid and type of object */
1860 static int mdd_rename(const struct lu_env *env,
1861 struct md_object *src_pobj, struct md_object *tgt_pobj,
1862 const struct lu_fid *lf, const char *sname,
1863 struct md_object *tobj, const char *tname,
1864 struct md_attr *ma, struct md_ucred *uc)
1866 struct mdd_device *mdd = mdo2mdd(src_pobj);
1867 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1868 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1869 struct mdd_object *mdd_sobj = mdd_object_find(env, mdd, lf);
1870 struct mdd_object *mdd_tobj = NULL;
1871 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1872 struct thandle *handle;
1877 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1878 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1879 if (ma->ma_attr.la_valid & LA_FLAGS &&
1880 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1881 GOTO(out, rc = -EPERM);
1884 mdd_tobj = md2mdd_obj(tobj);
1886 /*XXX: shouldn't this check be done under lock below? */
1887 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1888 lf, is_dir, mdd_sobj, mdd_tobj, uc);
1892 mdd_txn_param_build(env, &MDD_TXN_RENAME);
1893 handle = mdd_trans_start(env, mdd);
1895 GOTO(out, rc = PTR_ERR(handle));
1897 /*FIXME: Should consider tobj and sobj too in rename_lock*/
1898 rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
1900 GOTO(cleanup_unlocked, rc);
1902 rc = __mdd_index_delete(env, mdd_spobj, sname, handle);
1906 /*if sobj is dir, its parent object nlink should be dec too*/
1908 __mdd_ref_del(env, mdd_spobj, handle);
1910 rc = __mdd_index_delete(env, mdd_tpobj, tname, handle);
1911 /* tobj can be remote one,
1912 * so we do index_delete unconditionally and -ENOENT is allowed */
1913 if (rc != 0 && rc != -ENOENT)
1916 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle);
1920 *la_copy = ma->ma_attr;
1921 la_copy->la_valid = LA_CTIME;
1923 /*XXX: how to update ctime for remote sobj? */
1924 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
1928 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1929 mdd_write_lock(env, mdd_tobj);
1930 __mdd_ref_del(env, mdd_tobj, handle);
1931 /* remove dot reference */
1933 __mdd_ref_del(env, mdd_tobj, handle);
1935 la_copy->la_valid = LA_CTIME;
1936 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
1940 rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle);
1941 mdd_write_unlock(env, mdd_tobj);
1946 la_copy->la_valid = LA_CTIME | LA_MTIME;
1947 rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle);
1951 if (mdd_spobj != mdd_tpobj) {
1952 la_copy->la_valid = LA_CTIME | LA_MTIME;
1953 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle);
1957 mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
1959 mdd_trans_stop(env, mdd, rc, handle);
1962 mdd_object_put(env, mdd_sobj);
1967 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1968 const char *name, const struct lu_fid* fid, int mask,
1969 struct md_ucred *uc)
1971 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1972 struct dt_object *dir = mdd_object_child(mdd_obj);
1973 struct dt_rec *rec = (struct dt_rec *)fid;
1974 const struct dt_key *key = (const struct dt_key *)name;
1978 if (mdd_is_dead_obj(mdd_obj))
1981 if (mask == MAY_EXEC)
1982 rc = mdd_exec_permission_lite(env, mdd_obj, uc);
1984 rc = mdd_permission_internal(env, mdd_obj, mask, uc);
1988 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
1989 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key);
1997 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
1998 const char *name, const struct lu_fid* fid, int mask,
1999 struct md_ucred *uc)
2001 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2004 mdd_read_lock(env, mdd_obj);
2005 rc = __mdd_lookup(env, pobj, name, fid, mask, uc);
2006 mdd_read_unlock(env, mdd_obj);
2011 static int mdd_lookup(const struct lu_env *env,
2012 struct md_object *pobj, const char *name,
2013 struct lu_fid* fid, struct md_ucred *uc)
2017 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC, uc);
2022 * returns 1: if fid is ancestor of @mo;
2023 * returns 0: if fid is not a ancestor of @mo;
2025 * returns EREMOTE if remote object is found, fid of remote object is saved to
2028 * returns < 0: if error
2030 static int mdd_is_subdir(const struct lu_env *env,
2031 struct md_object *mo, const struct lu_fid *fid,
2032 struct lu_fid *sfid, struct md_ucred *uc)
2034 struct mdd_device *mdd = mdo2mdd(mo);
2038 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
2041 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
2046 static int __mdd_object_initialize(const struct lu_env *env,
2047 const struct lu_fid *pfid,
2048 struct mdd_object *child,
2049 struct md_attr *ma, struct thandle *handle)
2054 /* update attributes for child.
2056 * (1) the valid bits should be converted between Lustre and Linux;
2057 * (2) maybe, the child attributes should be set in OSD when creation.
2060 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle);
2064 if (S_ISDIR(ma->ma_attr.la_mode)) {
2065 /* add . and .. for newly created dir */
2066 __mdd_ref_add(env, child, handle);
2067 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2070 rc = __mdd_index_insert_only(env, child, pfid,
2075 rc2 = __mdd_index_delete(env,
2076 child, dot, handle);
2078 CERROR("Failure to cleanup after dotdot"
2079 " creation: %d (%d)\n", rc2, rc);
2081 __mdd_ref_del(env, child, handle);
2089 * XXX: Need MAY_WRITE to be checked?
2091 static int mdd_cd_sanity_check(const struct lu_env *env,
2092 struct mdd_object *obj, struct md_ucred *uc)
2098 if (!obj || mdd_is_dead_obj(obj))
2102 mdd_read_lock(env, obj);
2103 rc = mdd_permission_internal(env, obj, MAY_WRITE, uc);
2104 mdd_read_unlock(env, obj);
2111 static int mdd_create_data(const struct lu_env *env,
2112 struct md_object *pobj, struct md_object *cobj,
2113 const struct md_create_spec *spec,
2114 struct md_attr *ma, struct md_ucred *uc)
2116 struct mdd_device *mdd = mdo2mdd(cobj);
2117 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
2118 struct mdd_object *son = md2mdd_obj(cobj);
2119 struct lu_attr *attr = &ma->ma_attr;
2120 struct lov_mds_md *lmm = NULL;
2122 struct thandle *handle;
2126 rc = mdd_cd_sanity_check(env, son, uc);
2130 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
2131 !(spec->sp_cr_flags & FMODE_WRITE))
2133 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
2138 mdd_txn_param_build(env, &MDD_TXN_CREATE_DATA);
2139 handle = mdd_trans_start(env, mdd);
2141 RETURN(rc = PTR_ERR(handle));
2143 /*XXX: setting the lov ea is not locked
2144 * but setting the attr is locked? */
2146 /* replay creates has objects already */
2147 if (spec->u.sp_ea.no_lov_create) {
2148 CDEBUG(D_INFO, "we already have lov ea\n");
2149 rc = mdd_lov_set_md(env, mdd_pobj, son,
2150 (struct lov_mds_md *)spec->u.sp_ea.eadata,
2151 spec->u.sp_ea.eadatalen, handle, 0);
2153 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2154 lmm_size, handle, 0);
2157 rc = mdd_attr_get_internal_locked(env, son, ma);
2159 /* finish mdd_lov_create() stuff */
2160 mdd_lov_create_finish(env, mdd, rc);
2161 mdd_trans_stop(env, mdd, rc, handle);
2163 OBD_FREE(lmm, lmm_size);
2167 static int mdd_create_sanity_check(const struct lu_env *env,
2168 struct md_object *pobj,
2169 const char *name, struct md_attr *ma,
2170 struct md_ucred *uc)
2172 struct mdd_thread_info *info = mdd_env_info(env);
2173 struct lu_attr *la = &info->mti_la;
2174 struct lu_fid *fid = &info->mti_fid;
2175 struct mdd_object *obj = md2mdd_obj(pobj);
2180 if (mdd_is_dead_obj(obj))
2183 rc = __mdd_lookup_locked(env, pobj, name, fid,
2184 MAY_WRITE | MAY_EXEC, uc);
2186 RETURN(rc ? : -EEXIST);
2189 mdd_read_lock(env, obj);
2190 rc = __mdd_la_get(env, obj, la);
2191 mdd_read_unlock(env, obj);
2195 if (la->la_mode & S_ISGID) {
2196 ma->ma_attr.la_gid = la->la_gid;
2197 if (S_ISDIR(ma->ma_attr.la_mode)) {
2198 ma->ma_attr.la_mode |= S_ISGID;
2199 ma->ma_attr.la_valid |= LA_MODE;
2203 switch (ma->ma_attr.la_mode & S_IFMT) {
2221 * Create object and insert it into namespace.
2223 static int mdd_create(const struct lu_env *env,
2224 struct md_object *pobj, const char *name,
2225 struct md_object *child,
2226 const struct md_create_spec *spec,
2227 struct md_attr* ma, struct md_ucred *uc)
2229 struct mdd_device *mdd = mdo2mdd(pobj);
2230 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2231 struct mdd_object *son = md2mdd_obj(child);
2232 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2233 struct lu_attr *attr = &ma->ma_attr;
2234 struct lov_mds_md *lmm = NULL;
2235 struct thandle *handle;
2236 int rc, created = 0, inserted = 0, lmm_size = 0;
2239 /* sanity checks before big job */
2240 rc = mdd_create_sanity_check(env, pobj, name, ma, uc);
2244 /* no RPC inside the transaction, so OST objects should be created at
2246 if (S_ISREG(attr->la_mode)) {
2247 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2253 mdd_txn_param_build(env, &MDD_TXN_MKDIR);
2254 handle = mdd_trans_start(env, mdd);
2256 RETURN(PTR_ERR(handle));
2258 mdd_write_lock(env, mdd_pobj);
2261 * XXX check that link can be added to the parent in mkdir case.
2265 * Two operations have to be performed:
2267 * - allocation of new object (->do_create()), and
2269 * - insertion into parent index (->dio_insert()).
2271 * Due to locking, operation order is not important, when both are
2272 * successful, *but* error handling cases are quite different:
2274 * - if insertion is done first, and following object creation fails,
2275 * insertion has to be rolled back, but this operation might fail
2276 * also leaving us with dangling index entry.
2278 * - if creation is done first, is has to be undone if insertion
2279 * fails, leaving us with leaked space, which is neither good, nor
2282 * It seems that creation-first is simplest solution, but it is
2283 * sub-optimal in the frequent
2288 * case, because second mkdir is bound to create object, only to
2289 * destroy it immediately.
2291 * Note that local file systems do
2293 * 0. lookup -> -EEXIST
2299 * Maybe we should do the same. For now: creation-first.
2302 mdd_write_lock(env, son);
2303 rc = __mdd_object_create(env, son, ma, handle);
2305 mdd_write_unlock(env, son);
2311 rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj),
2313 mdd_write_unlock(env, son);
2316 * Object has no links, so it will be destroyed when last
2317 * reference is released. (XXX not now.)
2321 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2322 name, S_ISDIR(attr->la_mode), handle);
2328 /* replay creates has objects already */
2329 if (spec->u.sp_ea.no_lov_create) {
2330 CDEBUG(D_INFO, "we already have lov ea\n");
2331 rc = mdd_lov_set_md(env, mdd_pobj, son,
2332 (struct lov_mds_md *)spec->u.sp_ea.eadata,
2333 spec->u.sp_ea.eadatalen, handle, 0);
2335 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2336 lmm_size, handle, 0);
2338 CERROR("error on stripe info copy %d \n", rc);
2342 if (S_ISLNK(attr->la_mode)) {
2343 struct dt_object *dt = mdd_object_child(son);
2344 const char *target_name = spec->u.sp_symname;
2345 int sym_len = strlen(target_name);
2348 rc = dt->do_body_ops->dbo_write(env, dt, target_name,
2349 sym_len, &pos, handle);
2356 *la_copy = ma->ma_attr;
2357 la_copy->la_valid = LA_CTIME | LA_MTIME;
2358 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
2362 /* return attr back */
2363 rc = mdd_attr_get_internal_locked(env, son, ma);
2365 if (rc && created) {
2369 rc2 = __mdd_index_delete(env, mdd_pobj, name, handle);
2371 CERROR("error can not cleanup destroy %d\n",
2375 __mdd_ref_del(env, son, handle);
2377 /* finish mdd_lov_create() stuff */
2378 mdd_lov_create_finish(env, mdd, rc);
2380 OBD_FREE(lmm, lmm_size);
2381 mdd_write_unlock(env, mdd_pobj);
2382 mdd_trans_stop(env, mdd, rc, handle);
2386 /* partial operation */
2387 static int mdd_oc_sanity_check(const struct lu_env *env,
2388 struct mdd_object *obj,
2390 struct md_ucred *uc)
2396 if (lu_object_exists(&obj->mod_obj.mo_lu))
2399 switch (ma->ma_attr.la_mode & S_IFMT) {
2416 static int mdd_object_create(const struct lu_env *env,
2417 struct md_object *obj,
2418 const struct md_create_spec *spec,
2420 struct md_ucred *uc)
2423 struct mdd_device *mdd = mdo2mdd(obj);
2424 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2425 struct thandle *handle;
2426 const struct lu_fid *pfid = spec->u.sp_pfid;
2430 rc = mdd_oc_sanity_check(env, mdd_obj, ma, uc);
2434 mdd_txn_param_build(env, &MDD_TXN_OBJECT_CREATE);
2435 handle = mdd_trans_start(env, mdd);
2437 RETURN(PTR_ERR(handle));
2439 mdd_write_lock(env, mdd_obj);
2440 rc = __mdd_object_create(env, mdd_obj, ma, handle);
2441 if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2442 /* if creating the slave object, set slave EA here */
2443 rc = __mdd_xattr_set(env, mdd_obj, spec->u.sp_ea.eadata,
2444 spec->u.sp_ea.eadatalen, MDS_LMV_MD_NAME,
2446 pfid = spec->u.sp_ea.fid;
2447 CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
2448 PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
2452 rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
2453 mdd_write_unlock(env, mdd_obj);
2456 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
2458 mdd_trans_stop(env, mdd, rc, handle);
2463 * Partial operation. Be aware, this is called with write lock taken, so we use
2464 * locksless version of __mdd_lookup() here.
2466 static int mdd_ni_sanity_check(const struct lu_env *env,
2467 struct md_object *pobj,
2469 const struct lu_fid *fid,
2470 struct md_ucred *uc)
2472 struct mdd_object *obj = md2mdd_obj(pobj);
2477 if (mdd_is_dead_obj(obj))
2480 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
2482 RETURN(rc ? : -EEXIST);
2487 static int mdd_name_insert(const struct lu_env *env,
2488 struct md_object *pobj,
2489 const char *name, const struct lu_fid *fid,
2490 int isdir, struct md_ucred *uc)
2492 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2493 struct thandle *handle;
2497 mdd_txn_param_build(env, &MDD_TXN_INDEX_INSERT);
2498 handle = mdd_trans_start(env, mdo2mdd(pobj));
2500 RETURN(PTR_ERR(handle));
2502 mdd_write_lock(env, mdd_obj);
2503 rc = mdd_ni_sanity_check(env, pobj, name, fid, uc);
2505 GOTO(out_unlock, rc);
2507 rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle);
2510 mdd_write_unlock(env, mdd_obj);
2512 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
2517 * Be aware, this is called with write lock taken, so we use locksless version
2518 * of __mdd_lookup() here.
2520 static int mdd_nr_sanity_check(const struct lu_env *env,
2521 struct md_object *pobj,
2523 struct md_ucred *uc)
2525 struct mdd_thread_info *info = mdd_env_info(env);
2526 struct lu_fid *fid = &info->mti_fid;
2527 struct mdd_object *obj = md2mdd_obj(pobj);
2532 if (mdd_is_dead_obj(obj))
2535 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
2539 static int mdd_name_remove(const struct lu_env *env,
2540 struct md_object *pobj,
2542 struct md_ucred *uc)
2544 struct mdd_device *mdd = mdo2mdd(pobj);
2545 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2546 struct thandle *handle;
2550 mdd_txn_param_build(env, &MDD_TXN_INDEX_DELETE);
2551 handle = mdd_trans_start(env, mdd);
2553 RETURN(PTR_ERR(handle));
2555 mdd_write_lock(env, mdd_obj);
2556 rc = mdd_nr_sanity_check(env, pobj, name, uc);
2558 GOTO(out_unlock, rc);
2560 rc = __mdd_index_delete(env, mdd_obj, name, handle);
2563 mdd_write_unlock(env, mdd_obj);
2565 mdd_trans_stop(env, mdd, rc, handle);
2569 static int mdd_rt_sanity_check(const struct lu_env *env,
2570 struct mdd_object *tgt_pobj,
2571 struct mdd_object *tobj,
2572 const struct lu_fid *sfid,
2573 const char *name, struct md_attr *ma,
2574 struct md_ucred *uc)
2576 struct mdd_device *mdd = mdo2mdd(&tgt_pobj->mod_obj);
2581 if (mdd_is_dead_obj(tgt_pobj))
2584 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
2586 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1, uc);
2587 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
2588 mdd_dir_is_empty(env, tobj))
2591 rc = mdd_may_create(env, tgt_pobj, NULL, 1, uc);
2594 /* source should not be ancestor of target dir */
2595 if (!rc &&& src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
2601 static int mdd_rename_tgt(const struct lu_env *env,
2602 struct md_object *pobj, struct md_object *tobj,
2603 const struct lu_fid *lf, const char *name,
2604 struct md_attr *ma, struct md_ucred *uc)
2606 struct mdd_device *mdd = mdo2mdd(pobj);
2607 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
2608 struct mdd_object *mdd_tobj = NULL;
2609 struct thandle *handle;
2613 mdd_txn_param_build(env, &MDD_TXN_RENAME);
2614 handle = mdd_trans_start(env, mdd);
2616 RETURN(PTR_ERR(handle));
2619 mdd_tobj = md2mdd_obj(tobj);
2620 mdd_lock2(env, mdd_tpobj, mdd_tobj);
2622 mdd_write_lock(env, mdd_tpobj);
2625 /*TODO rename sanity checking*/
2626 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma, uc);
2631 rc = __mdd_index_delete(env, mdd_tpobj, name, handle);
2636 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle);
2640 if (tobj && lu_object_exists(&tobj->mo_lu))
2641 __mdd_ref_del(env, mdd_tobj, handle);
2644 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
2646 mdd_write_unlock(env, mdd_tpobj);
2647 mdd_trans_stop(env, mdd, rc, handle);
2652 * No permission check is needed.
2654 static int mdd_root_get(const struct lu_env *env,
2655 struct md_device *m, struct lu_fid *f,
2656 struct md_ucred *uc)
2658 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2661 *f = mdd->mdd_root_fid;
2666 * No permission check is needed.
2668 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
2669 struct kstatfs *sfs, struct md_ucred *uc)
2671 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2676 rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
2682 * No permission check is needed.
2684 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
2685 int *md_size, int *cookie_size, struct md_ucred *uc)
2687 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2690 *md_size = mdd_lov_mdsize(env, mdd);
2691 *cookie_size = mdd_lov_cookiesize(env, mdd);
2696 static int mdd_init_capa_keys(struct md_device *m,
2697 struct lustre_capa_key *keys)
2699 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2700 struct mds_obd *mds = &mdd2obd_dev(mdd)->u.mds;
2703 mds->mds_capa_keys = keys;
2707 static int mdd_update_capa_key(const struct lu_env *env,
2708 struct md_device *m,
2709 struct lustre_capa_key *key)
2711 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2712 struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
2716 rc = obd_set_info_async(lov_exp, strlen(KEY_CAPA_KEY), KEY_CAPA_KEY,
2717 sizeof(*key), key, NULL);
2721 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
2722 struct thandle *handle)
2724 struct dt_object *next;
2726 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2727 next = mdd_object_child(obj);
2728 next->do_ops->do_ref_add(env, next, handle);
2732 * XXX: if permission check is needed here?
2734 static int mdd_ref_add(const struct lu_env *env,
2735 struct md_object *obj, struct md_ucred *uc)
2737 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2738 struct mdd_device *mdd = mdo2mdd(obj);
2739 struct thandle *handle;
2742 mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
2743 handle = mdd_trans_start(env, mdd);
2747 mdd_write_lock(env, mdd_obj);
2748 __mdd_ref_add(env, mdd_obj, handle);
2749 mdd_write_unlock(env, mdd_obj);
2751 mdd_trans_stop(env, mdd, 0, handle);
2757 __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
2758 struct thandle *handle)
2760 struct dt_object *next = mdd_object_child(obj);
2762 LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2764 next->do_ops->do_ref_del(env, next, handle);
2767 /* do NOT or the MAY_*'s, you'll get the weakest */
2768 static int accmode(struct mdd_object *mdd_obj, int flags)
2773 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2774 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2775 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2776 * owner can write to a file even if it is marked readonly to hide
2777 * its brokenness. (bug 5781) */
2778 if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2781 if (flags & FMODE_READ)
2783 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2785 if (flags & MDS_FMODE_EXEC)
2790 static int mdd_open_sanity_check(const struct lu_env *env,
2791 struct mdd_object *obj, int flag,
2792 struct md_ucred *uc)
2794 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2795 int mode = accmode(obj, flag);
2800 if (mdd_is_dead_obj(obj))
2803 rc = __mdd_la_get(env, obj, tmp_la);
2807 if (S_ISLNK(tmp_la->la_mode))
2810 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2813 if (!(flag & MDS_OPEN_CREATED)) {
2814 rc = __mdd_permission_internal(env, obj, mode, 0, uc);
2820 * FIFO's, sockets and device files are special: they don't
2821 * actually live on the filesystem itself, and as such you
2822 * can write to them even if the filesystem is read-only.
2824 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2825 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2829 * An append-only file must be opened in append mode for writing.
2831 if (mdd_is_append(obj)) {
2832 if ((flag & FMODE_WRITE) && !(flag & O_APPEND))
2838 /* O_NOATIME can only be set by the owner or superuser */
2839 if (flag & O_NOATIME)
2840 if (uc->mu_fsuid != tmp_la->la_uid && !mdd_capable(uc, CAP_FOWNER))
2846 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2847 int flags, struct md_ucred *uc)
2849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2852 mdd_write_lock(env, mdd_obj);
2854 rc = mdd_open_sanity_check(env, mdd_obj, flags, uc);
2856 mdd_obj->mod_count ++;
2858 mdd_write_unlock(env, mdd_obj);
2863 * No permission check is needed.
2865 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2866 struct md_attr *ma, struct md_ucred *uc)
2869 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2872 mdd_write_lock(env, mdd_obj);
2873 /* release open count */
2874 mdd_obj->mod_count --;
2876 rc = __mdd_iattr_get(env, mdd_obj, ma);
2877 if (rc == 0 && mdd_obj->mod_count == 0) {
2878 if (ma->ma_attr.la_nlink == 0)
2879 rc = __mdd_object_kill(env, mdd_obj, ma);
2881 mdd_write_unlock(env, mdd_obj);
2885 static int mdd_readpage_sanity_check(const struct lu_env *env,
2886 struct mdd_object *obj,
2887 struct md_ucred *uc)
2889 struct dt_object *next = mdd_object_child(obj);
2893 if (S_ISDIR(mdd_object_type(obj)) &&
2894 dt_try_as_dir(env, next))
2895 rc = mdd_permission_internal(env, obj, MAY_READ, uc);
2902 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2903 const struct lu_rdpg *rdpg, struct md_ucred *uc)
2905 struct dt_object *next;
2906 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2910 LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2911 next = mdd_object_child(mdd_obj);
2913 mdd_read_lock(env, mdd_obj);
2914 rc = mdd_readpage_sanity_check(env, mdd_obj, uc);
2916 GOTO(out_unlock, rc);
2918 rc = next->do_ops->do_readpage(env, next, rdpg);
2921 mdd_read_unlock(env, mdd_obj);
2925 #ifdef CONFIG_FS_POSIX_ACL
2926 #include <linux/posix_acl_xattr.h>
2927 #include <linux/posix_acl.h>
2929 static int mdd_posix_acl_permission(struct md_ucred *uc, struct lu_attr *la,
2930 int want, posix_acl_xattr_entry *entry,
2933 posix_acl_xattr_entry *pa, *pe, *mask_obj;
2941 pe = &entry[count - 1];
2942 for (; pa <= pe; pa++) {
2945 /* (May have been checked already) */
2946 if (la->la_uid == uc->mu_fsuid)
2950 if (pa->e_id == uc->mu_fsuid)
2954 if (mdd_in_group_p(uc, la->la_gid)) {
2956 if ((pa->e_perm & want) == want)
2961 if (mdd_in_group_p(uc, pa->e_id)) {
2963 if ((pa->e_perm & want) == want)
2981 for (mask_obj = pa + 1; mask_obj <= pe; mask_obj++) {
2982 if (mask_obj->e_tag == ACL_MASK) {
2983 if ((pa->e_perm & mask_obj->e_perm & want) == want)
2991 if ((pa->e_perm & want) == want)
2998 static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
2999 struct lu_attr* la, int mask, struct md_ucred *uc)
3001 #ifdef CONFIG_FS_POSIX_ACL
3002 struct dt_object *next;
3005 posix_acl_xattr_entry *entry;
3010 next = mdd_object_child(obj);
3011 buf_len = next->do_ops->do_xattr_get(env, next, NULL, 0, "");
3013 RETURN(buf_len ? : -EACCES);
3015 OBD_ALLOC(buf, buf_len);
3019 rc = next->do_ops->do_xattr_get(env, next, buf, buf_len, "");
3021 GOTO(out, rc = rc ? : -EACCES);
3023 entry = ((posix_acl_xattr_header *)buf)->a_entries;
3024 entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
3026 rc = mdd_posix_acl_permission(uc, la, mask, entry, entry_count);
3029 OBD_FREE(buf, buf_len);
3037 static int mdd_exec_permission_lite(const struct lu_env *env,
3038 struct mdd_object *obj,
3039 struct md_ucred *uc)
3041 struct lu_attr *la = &mdd_env_info(env)->mti_la;
3046 /* These means unnecessary for permission check */
3047 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3050 /* Invalid user credit */
3051 if (uc->mu_valid == UCRED_INVALID)
3054 rc = __mdd_la_get(env, obj, la);
3059 if (uc->mu_fsuid == la->la_uid)
3061 else if (mdd_in_group_p(uc, la->la_gid))
3064 if (mode & MAY_EXEC)
3067 if (((la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) &&
3068 mdd_capable(uc, CAP_DAC_OVERRIDE))
3071 if (S_ISDIR(la->la_mode) && mdd_capable(uc, CAP_DAC_READ_SEARCH))
3077 static int __mdd_permission_internal(const struct lu_env *env,
3078 struct mdd_object *obj,
3079 int mask, int getattr,
3080 struct md_ucred *uc)
3082 struct lu_attr *la = &mdd_env_info(env)->mti_la;
3091 /* These means unnecessary for permission check */
3092 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3095 /* Invalid user credit */
3096 if (uc->mu_valid == UCRED_INVALID)
3100 * Nobody gets write access to an immutable file.
3102 if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
3106 rc = __mdd_la_get(env, obj, la);
3112 if (uc->mu_fsuid == la->la_uid) {
3115 if (mode & S_IRWXG) {
3116 if (((mode >> 3) & mask & S_IRWXO) != mask)
3119 rc = mdd_check_acl(env, obj, la, mask, uc);
3121 goto check_capabilities;
3122 else if ((rc != -EAGAIN) && (rc != -EOPNOTSUPP))
3127 if (mdd_in_group_p(uc, la->la_gid))
3132 * If the DACs are ok we don't need any capability check.
3134 if (((mode & mask & S_IRWXO) == mask))
3140 * Read/write DACs are always overridable.
3141 * Executable DACs are overridable if at least one exec bit is set.
3142 * Dir's DACs are always overridable.
3144 if (!(mask & MAY_EXEC) ||
3145 (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
3146 if (mdd_capable(uc, CAP_DAC_OVERRIDE))
3150 * Searching includes executable on directories, else just read.
3152 if ((mask == MAY_READ) ||
3153 (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
3154 if (mdd_capable(uc, CAP_DAC_READ_SEARCH))
3160 static inline int mdd_permission_internal_locked(const struct lu_env *env,
3161 struct mdd_object *obj,
3162 int mask, struct md_ucred *uc)
3166 mdd_read_lock(env, obj);
3167 rc = mdd_permission_internal(env, obj, mask, uc);
3168 mdd_read_unlock(env, obj);
3173 static int mdd_permission(const struct lu_env *env, struct md_object *obj,
3174 int mask, struct md_ucred *uc)
3176 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3180 rc = mdd_permission_internal_locked(env, mdd_obj, mask, uc);
3185 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
3186 struct lustre_capa *capa)
3188 struct mdd_object *mdd_obj = md2mdd_obj(obj);
3189 struct mdd_device *mdd = mdo2mdd(obj);
3190 struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site;
3191 struct lustre_capa_key *key = &ls->ls_capa_keys[1];
3192 struct obd_capa *ocapa;
3196 LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3198 capa->lc_fid = *mdo2fid(mdd_obj);
3199 if (ls->ls_capa_timeout < CAPA_TIMEOUT)
3200 capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
3201 if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid))
3202 capa->lc_flags |= CAPA_FL_ROOT;
3203 capa->lc_flags = ls->ls_capa_alg << 24;
3205 /* TODO: get right permission here after remote uid landing */
3206 ocapa = capa_lookup(capa);
3208 LASSERT(!capa_is_expired(ocapa));
3209 capa_cpy(capa, ocapa);
3214 capa->lc_keyid = key->lk_keyid;
3215 capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout;
3216 rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
3224 struct md_device_operations mdd_ops = {
3225 .mdo_statfs = mdd_statfs,
3226 .mdo_root_get = mdd_root_get,
3227 .mdo_maxsize_get = mdd_maxsize_get,
3228 .mdo_init_capa_keys = mdd_init_capa_keys,
3229 .mdo_update_capa_key= mdd_update_capa_key,
3232 static struct md_dir_operations mdd_dir_ops = {
3233 .mdo_is_subdir = mdd_is_subdir,
3234 .mdo_lookup = mdd_lookup,
3235 .mdo_create = mdd_create,
3236 .mdo_rename = mdd_rename,
3237 .mdo_link = mdd_link,
3238 .mdo_unlink = mdd_unlink,
3239 .mdo_name_insert = mdd_name_insert,
3240 .mdo_name_remove = mdd_name_remove,
3241 .mdo_rename_tgt = mdd_rename_tgt,
3242 .mdo_create_data = mdd_create_data
3245 static struct md_object_operations mdd_obj_ops = {
3246 .moo_permission = mdd_permission,
3247 .moo_attr_get = mdd_attr_get,
3248 .moo_attr_set = mdd_attr_set,
3249 .moo_xattr_get = mdd_xattr_get,
3250 .moo_xattr_set = mdd_xattr_set,
3251 .moo_xattr_list = mdd_xattr_list,
3252 .moo_xattr_del = mdd_xattr_del,
3253 .moo_object_create = mdd_object_create,
3254 .moo_ref_add = mdd_ref_add,
3255 .moo_ref_del = mdd_ref_del,
3256 .moo_open = mdd_open,
3257 .moo_close = mdd_close,
3258 .moo_readpage = mdd_readpage,
3259 .moo_readlink = mdd_readlink,
3260 .moo_capa_get = mdd_capa_get
3263 static struct obd_ops mdd_obd_device_ops = {
3264 .o_owner = THIS_MODULE
3267 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
3268 struct lu_device_type *t,
3269 struct lustre_cfg *lcfg)
3271 struct lu_device *l;
3272 struct mdd_device *m;
3276 l = ERR_PTR(-ENOMEM);
3278 md_device_init(&m->mdd_md_dev, t);
3280 l->ld_ops = &mdd_lu_ops;
3281 m->mdd_md_dev.md_ops = &mdd_ops;
3287 static void mdd_device_free(const struct lu_env *env,
3288 struct lu_device *lu)
3290 struct mdd_device *m = lu2mdd_dev(lu);
3292 LASSERT(atomic_read(&lu->ld_ref) == 0);
3293 md_device_fini(&m->mdd_md_dev);
3297 static int mdd_type_init(struct lu_device_type *t)
3299 return lu_context_key_register(&mdd_thread_key);
3302 static void mdd_type_fini(struct lu_device_type *t)
3304 lu_context_key_degister(&mdd_thread_key);
3307 static struct lu_device_type_operations mdd_device_type_ops = {
3308 .ldto_init = mdd_type_init,
3309 .ldto_fini = mdd_type_fini,
3311 .ldto_device_alloc = mdd_device_alloc,
3312 .ldto_device_free = mdd_device_free,
3314 .ldto_device_init = mdd_device_init,
3315 .ldto_device_fini = mdd_device_fini
3318 static struct lu_device_type mdd_device_type = {
3319 .ldt_tags = LU_DEVICE_MD,
3320 .ldt_name = LUSTRE_MDD_NAME,
3321 .ldt_ops = &mdd_device_type_ops,
3322 .ldt_ctx_tags = LCT_MD_THREAD
3325 static void *mdd_key_init(const struct lu_context *ctx,
3326 struct lu_context_key *key)
3328 struct mdd_thread_info *info;
3330 OBD_ALLOC_PTR(info);
3332 info = ERR_PTR(-ENOMEM);
3336 static void mdd_key_fini(const struct lu_context *ctx,
3337 struct lu_context_key *key, void *data)
3339 struct mdd_thread_info *info = data;
3343 static struct lu_context_key mdd_thread_key = {
3344 .lct_tags = LCT_MD_THREAD,
3345 .lct_init = mdd_key_init,
3346 .lct_fini = mdd_key_fini
3349 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
3353 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
3357 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
3359 static int __init mdd_mod_init(void)
3361 struct lprocfs_static_vars lvars;
3362 printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
3363 lprocfs_init_vars(mdd, &lvars);
3364 return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
3365 LUSTRE_MDD_NAME, &mdd_device_type);
3368 static void __exit mdd_mod_exit(void)
3370 class_unregister_type(LUSTRE_MDD_NAME);
3373 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3374 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
3375 MODULE_LICENSE("GPL");
3377 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);