1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
45 #include "mdd_internal.h"
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51 struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54 struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56 struct mdd_object *obj, enum dt_lock_mode mode);
57 static int __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58 struct thandle *handle);
59 static int __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60 struct thandle *handle, struct md_attr *);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62 const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
67 static struct lu_context_key mdd_thread_key;
69 const char *mdd_root_dir_name = "root";
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
73 struct mdd_thread_info *info;
75 info = lu_context_key_get(ctx, &mdd_thread_key);
76 LASSERT(info != NULL);
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81 const struct lu_object_header *hdr,
84 struct mdd_object *mdo;
90 o = &mdo->mod_obj.mo_lu;
91 lu_object_init(o, NULL, d);
92 mdo->mod_obj.mo_ops = &mdd_obj_ops;
93 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
94 o->lo_ops = &mdd_lu_obj_ops;
95 return &mdo->mod_obj.mo_lu;
101 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
104 struct lu_object *below;
105 struct lu_device *under;
108 under = &d->mdd_child->dd_lu_dev;
109 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
114 lu_object_add(o, below);
118 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
120 struct mdd_object *mdd = lu2mdd_obj(o);
126 static int mdd_attr_get(const struct lu_context *ctxt,
127 struct md_object *obj, struct lu_attr *attr)
129 struct mdd_object *mdd_obj = md2mdd_obj(obj);
130 struct dt_object *next;
135 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
137 next = mdd_object_child(mdd_obj);
138 rc = next->do_ops->do_attr_get(ctxt, next, attr);
143 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
144 void *buf, int buf_len, const char *name)
146 struct mdd_object *mdd_obj = md2mdd_obj(obj);
147 struct dt_object *next;
152 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
154 next = mdd_object_child(mdd_obj);
155 rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
161 MDD_TXN_OBJECT_DESTROY_OP,
162 MDD_TXN_OBJECT_CREATE_OP,
164 MDD_TXN_XATTR_SET_OP,
165 MDD_TXN_INDEX_INSERT_OP,
166 MDD_TXN_INDEX_DELETE_OP,
173 struct mdd_txn_op_descr {
174 enum mdd_txn_op mod_op;
175 unsigned int mod_credits;
179 MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
180 MDD_TXN_OBJECT_CREATE_CREDITS = 20,
181 MDD_TXN_ATTR_SET_CREDITS = 20,
182 MDD_TXN_XATTR_SET_CREDITS = 20,
183 MDD_TXN_INDEX_INSERT_CREDITS = 20,
184 MDD_TXN_INDEX_DELETE_CREDITS = 20,
185 MDD_TXN_LINK_CREDITS = 20,
186 MDD_TXN_UNLINK_CREDITS = 20,
187 MDD_TXN_RENAME_CREDITS = 20,
188 MDD_TXN_MKDIR_CREDITS = 20
191 #define DEFINE_MDD_TXN_OP_DESC(opname) \
192 static const struct mdd_txn_op_descr opname = { \
193 .mod_op = opname ## _OP, \
194 .mod_credits = opname ## _CREDITS, \
198 * number of blocks to reserve for particular operations. Should be function
199 * of ... something. Stub for now.
201 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
202 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
203 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
204 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
205 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
206 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
207 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
208 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
209 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
210 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
212 static void mdd_txn_param_build(const struct lu_context *ctx,
213 const struct mdd_txn_op_descr *opd)
215 mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
218 static int mdd_object_print(const struct lu_context *ctxt,
219 struct seq_file *f, const struct lu_object *o)
221 return seq_printf(f, LUSTRE_MDD0_NAME"-object@%p", o);
224 static int mdd_object_exists(const struct lu_context *ctx,
225 const struct lu_object *o)
227 return lu_object_exists(ctx, lu_object_next(o));
230 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
233 struct dt_object *root;
236 root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
239 LASSERT(root != NULL);
240 lu_object_put(ctx, &root->do_lu);
247 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
249 /*create PENDING and OBJECTS dir for open and llog*/
253 static int mdd_fs_cleanup(struct mdd_device *mdd)
255 /*create PENDING and OBJECTS dir for open and llog*/
259 static int mdd_device_init(const struct lu_context *ctx,
260 struct lu_device *d, struct lu_device *next)
262 struct mdd_device *mdd = lu2mdd_dev(d);
266 mdd->mdd_child = lu2dt_dev(next);
268 rc = mdd_fs_setup(ctx, mdd);
274 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
277 struct mdd_device *m = lu2mdd_dev(d);
278 struct lu_device *next = &m->mdd_child->dd_lu_dev;
280 dt_device_fini(&m->mdd_lov_dev);
285 static int mdd_process_config(const struct lu_context *ctxt,
286 struct lu_device *d, struct lustre_cfg *cfg)
288 struct mdd_device *m = lu2mdd_dev(d);
289 struct lu_device *next = &m->mdd_child->dd_lu_dev;
292 switch(cfg->lcfg_command) {
294 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
297 rc = mdd_mount(ctxt, m);
300 rc = mdd_lov_init(ctxt, m, cfg);
302 CERROR("lov init error %d \n", rc);
307 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
314 struct lu_device_operations mdd_lu_ops = {
315 .ldo_object_alloc = mdd_object_alloc,
316 .ldo_process_config = mdd_process_config,
319 static struct lu_object_operations mdd_lu_obj_ops = {
320 .loo_object_init = mdd_object_init,
321 .loo_object_free = mdd_object_free,
322 .loo_object_print = mdd_object_print,
323 .loo_object_exists = mdd_object_exists,
327 static void mdd_lock(const struct lu_context *ctxt,
328 struct mdd_object *obj, enum dt_lock_mode mode)
330 struct dt_object *next = mdd_object_child(obj);
332 next->do_ops->do_lock(ctxt, next, mode);
335 static void mdd_unlock(const struct lu_context *ctxt,
336 struct mdd_object *obj, enum dt_lock_mode mode)
338 struct dt_object *next = mdd_object_child(obj);
340 next->do_ops->do_unlock(ctxt, next, mode);
343 static void mdd_lock2(const struct lu_context *ctxt,
344 struct mdd_object *o0, struct mdd_object *o1)
346 mdd_lock(ctxt, o0, DT_WRITE_LOCK);
347 mdd_lock(ctxt, o1, DT_WRITE_LOCK);
350 static void mdd_unlock2(const struct lu_context *ctxt,
351 struct mdd_object *o0, struct mdd_object *o1)
353 mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
354 mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
357 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
358 struct mdd_device *mdd)
360 struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
362 return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
365 static void mdd_trans_stop(const struct lu_context *ctxt,
366 struct mdd_device *mdd, struct thandle *handle)
368 mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
371 static int __mdd_object_create(const struct lu_context *ctxt,
372 struct mdd_object *obj, struct md_attr *ma,
373 struct thandle *handle)
375 struct dt_object *next;
376 struct lu_attr *attr = &ma->ma_attr;
380 if (!lu_object_exists(ctxt, &obj->mod_obj.mo_lu)) {
381 next = mdd_object_child(obj);
382 rc = next->do_ops->do_create(ctxt, next, attr, handle);
386 LASSERT(ergo(rc == 0, lu_object_exists(ctxt, &obj->mod_obj.mo_lu)));
387 /* increase the nlink for directory */
388 if (rc == 0 && dt_try_as_dir(ctxt, mdd_object_child(obj)))
389 rc = __mdd_ref_add(ctxt, obj, handle);
392 mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
397 static int mdd_object_create(const struct lu_context *ctxt, struct md_object *obj,
398 struct md_attr *attr)
401 struct mdd_device *mdd = mdo2mdd(obj);
402 struct thandle *handle;
406 mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
407 handle = mdd_trans_start(ctxt, mdd);
409 RETURN(PTR_ERR(handle));
411 rc = __mdd_object_create(ctxt, md2mdd_obj(obj), attr, handle);
413 mdd_trans_stop(ctxt, mdd, handle);
418 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
419 const struct lu_attr *attr, struct thandle *handle)
421 struct dt_object *next;
423 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
424 next = mdd_object_child(md2mdd_obj(obj));
425 return next->do_ops->do_attr_set(ctxt, next, attr, handle);
428 static int mdd_attr_set(const struct lu_context *ctxt,
429 struct md_object *obj, const struct lu_attr *attr)
431 struct mdd_device *mdd = mdo2mdd(obj);
432 struct thandle *handle;
436 mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
437 handle = mdd_trans_start(ctxt, mdd);
439 RETURN(PTR_ERR(handle));
441 rc = __mdd_attr_set(ctxt, obj, attr, handle);
443 mdd_trans_stop(ctxt, mdd, handle);
448 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
449 struct mdd_object *obj, const void *buf,
450 int buf_len, const char *name,struct thandle *handle)
452 struct dt_object *next;
454 LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
455 next = mdd_object_child(obj);
456 return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
460 int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
461 const void *buf, int buf_len, const char *name)
463 struct mdd_device *mdd = mdo2mdd(obj);
464 struct thandle *handle;
468 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
469 handle = mdd_trans_start(ctxt, mdd);
471 RETURN(PTR_ERR(handle));
473 rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
476 mdd_trans_stop(ctxt, mdd, handle);
481 static int __mdd_index_insert(const struct lu_context *ctxt,
482 struct mdd_object *pobj, const struct lu_fid *lf,
483 const char *name, struct thandle *handle)
486 struct dt_object *next = mdd_object_child(pobj);
489 if (dt_try_as_dir(ctxt, next))
490 rc = next->do_index_ops->dio_insert(ctxt, next,
492 (struct dt_key *)name, handle);
498 static int __mdd_index_delete(const struct lu_context *ctxt,
499 struct mdd_object *pobj, const char *name,
500 struct thandle *handle)
503 struct dt_object *next = mdd_object_child(pobj);
506 if (dt_try_as_dir(ctxt, next))
507 rc = next->do_index_ops->dio_delete(ctxt, next,
508 (struct dt_key *)name, handle);
514 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
515 struct md_object *src_obj, const char *name)
517 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
518 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
519 struct mdd_device *mdd = mdo2mdd(src_obj);
520 struct thandle *handle;
524 mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
525 handle = mdd_trans_start(ctxt, mdd);
527 RETURN(PTR_ERR(handle));
529 mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
531 rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
536 rc = __mdd_ref_add(ctxt, mdd_sobj, handle);
539 rc = __mdd_index_delete(ctxt, mdd_tobj, name, handle);
540 mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
541 mdd_trans_stop(ctxt, mdd, handle);
545 static int mdd_empty_dir(const struct lu_context *ctxt,
546 struct md_object *dir)
548 /*TODO: iterate through the index until first entry
549 * other than dot or dotdot. For now - not empty always */
553 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
554 struct md_object *cobj, const char *name,
557 struct mdd_device *mdd = mdo2mdd(pobj);
558 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
559 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
560 struct dt_object *dt_cobj = mdd_object_child(mdd_cobj);
561 struct thandle *handle;
566 if (dt_try_as_dir(ctxt, dt_cobj)) {
567 if (!S_ISDIR(ma->ma_attr.la_mode))
568 RETURN(rc = -EISDIR);
570 if (S_ISDIR(ma->ma_attr.la_mode))
571 RETURN(rc = -ENOTDIR);
574 mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
575 handle = mdd_trans_start(ctxt, mdd);
577 RETURN(PTR_ERR(handle));
579 mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
582 if (S_ISDIR(ma->ma_attr.la_mode)) {
583 if (!mdd_empty_dir(ctxt, cobj))
584 GOTO(cleanup, rc = -ENOTEMPTY);
587 rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
591 rc = __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
593 if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) {
595 rc = __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
598 rc = __mdd_ref_del(ctxt, mdd_pobj, handle, NULL);
602 mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
603 mdd_trans_stop(ctxt, mdd, handle);
607 static int mdd_parent_fid(const struct lu_context *ctxt,
608 struct mdd_object *obj,
611 const char *name = "..";
614 rc = mdd_lookup(ctxt, &obj->mod_obj, name, fid);
619 #define mdo2fid(obj) (&((obj)->mod_obj.mo_lu.lo_header->loh_fid))
620 static int mdd_is_parent(const struct lu_context *ctxt,
621 struct mdd_device *mdd,
622 struct mdd_object *p1,
623 struct mdd_object *p2)
625 struct lu_fid * pfid;
628 pfid = &mdd_ctx_info(ctxt)->mti_fid;
630 rc = mdd_parent_fid(ctxt, p1, pfid);
633 if (lu_fid_eq(pfid, mdo2fid(p2))) {
636 } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
641 static int mdd_rename_lock(const struct lu_context *ctxt,
642 struct mdd_device *mdd,
643 struct mdd_object *src_pobj,
644 struct mdd_object *tgt_pobj)
648 if (src_pobj == tgt_pobj) {
649 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
652 /*compared the parent child relationship of src_p&tgt_p*/
653 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
654 mdd_lock2(ctxt, src_pobj, tgt_pobj);
656 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
657 mdd_lock2(ctxt, tgt_pobj, src_pobj);
660 if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
661 mdd_lock2(ctxt, tgt_pobj, src_pobj);
664 if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
665 mdd_lock2(ctxt, src_pobj, tgt_pobj);
669 mdd_lock2(ctxt, src_pobj, tgt_pobj);
673 static void mdd_rename_unlock(const struct lu_context *ctxt,
674 struct mdd_object *src_pobj,
675 struct mdd_object *tgt_pobj)
677 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
678 if (src_pobj != tgt_pobj)
679 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
683 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
684 struct md_object *tgt_pobj, const struct lu_fid *lf,
685 const char *sname, struct md_object *tobj,
688 struct mdd_device *mdd = mdo2mdd(src_pobj);
689 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
690 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
691 struct mdd_object *mdd_tobj = NULL;
692 struct thandle *handle;
696 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
697 handle = mdd_trans_start(ctxt, mdd);
699 RETURN(PTR_ERR(handle));
701 /*FIXME: Should consider tobj and sobj too in rename_lock*/
702 rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
707 mdd_tobj = md2mdd_obj(tobj);
709 rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
712 /*FIXME: no sobj now, we should check sobj type, if it is dir,
713 * the nlink of its parent should be dec
716 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
721 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
726 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
727 struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
729 rc = __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
732 if (dt_try_as_dir(ctxt, dt_tobj)) {
733 rc = __mdd_ref_add(ctxt, mdd_tpobj, handle);
739 /*FIXME: should we do error handling here?*/
741 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
742 mdd_trans_stop(ctxt, mdd, handle);
746 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
747 const char *name, struct lu_fid* fid)
749 struct dt_object *dir = mdd_object_child(md2mdd_obj(pobj));
750 struct dt_rec *rec = (struct dt_rec *)fid;
751 const struct dt_key *key = (const struct dt_key *)name;
755 if (dt_try_as_dir(ctxt, dir))
756 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
763 * Create object and insert it into namespace.
765 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
766 const char *name, struct md_object *child,
767 const char *target_name, struct md_attr* ma)
769 struct mdd_device *mdd = mdo2mdd(pobj);
770 struct mdd_object *mdo = md2mdd_obj(pobj);
771 struct mdd_object *son = md2mdd_obj(child);
772 struct dt_object *dt_son = mdd_object_child(son);
773 struct lu_attr *attr = &ma->ma_attr;
775 struct lov_mds_md *lmm = NULL;
776 struct thandle *handle;
777 int rc, created = 0, inserted = 0, ref_add = 0, lmm_size;
780 /* sanity checks before big job */
781 fid = &mdd_ctx_info(ctxt)->mti_fid;
782 rc = mdd_lookup(ctxt, pobj, name, fid);
784 rc = rc ? rc : -EEXIST;
787 /* no RPC inside the transaction, so OST objects should be created at
790 if (S_ISREG(attr->la_mode)) {
791 rc = mdd_lov_create(ctxt, mdd, son, &lmm, &lmm_size);
796 mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
797 handle = mdd_trans_start(ctxt, mdd);
799 RETURN(PTR_ERR(handle));
801 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
804 * Two operations have to be performed:
806 * - allocation of new object (->do_create()), and
808 * - insertion into parent index (->dio_insert()).
810 * Due to locking, operation order is not important, when both are
811 * successful, *but* error handling cases are quite different:
813 * - if insertion is done first, and following object creation fails,
814 * insertion has to be rolled back, but this operation might fail
815 * also leaving us with dangling index entry.
817 * - if creation is done first, is has to be undone if insertion
818 * fails, leaving us with leaked space, which is neither good, nor
821 * It seems that creation-first is simplest solution, but it is
822 * sub-optimal in the frequent
827 * case, because second mkdir is bound to create object, only to
828 * destroy it immediately.
830 * Note that local file systems do
832 * 0. lookup -> -EEXIST
838 * Maybe we should do the same. For now: creation-first.
841 rc = __mdd_object_create(ctxt, son, ma, handle);
847 rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
855 if (dt_try_as_dir(ctxt, dt_son)) {
856 rc = __mdd_ref_add(ctxt, mdo, handle);
861 rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size);
863 CERROR("error on stripe info copy %d \n", rc);
867 int rc1 = 0, rc2 = 0, rc3 = 0;
869 rc1 = __mdd_ref_del(ctxt, son, handle, NULL);
871 rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
873 rc3 = __mdd_ref_add(ctxt, mdo, handle);
874 if (rc1 || rc2 || rc3)
875 CERROR("error can not cleanup destory %d insert %d \n",
879 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
880 mdd_trans_stop(ctxt, mdd, handle);
884 static int mdd_mkname(const struct lu_context *ctxt, struct md_object *pobj,
885 const char *name, const struct lu_fid *fid)
887 struct mdd_device *mdd = mdo2mdd(pobj);
888 struct mdd_object *mdo = md2mdd_obj(pobj);
889 struct thandle *handle;
893 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
894 handle = mdd_trans_start(ctxt, mdd);
896 RETURN(PTR_ERR(handle));
898 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
900 rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
902 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
903 mdd_trans_stop(ctxt, mdd, handle);
907 static int mdd_name_remove(const struct lu_context *ctxt,
908 struct md_object *pobj,
911 struct mdd_device *mdd = mdo2mdd(pobj);
912 struct mdd_object *mdo = md2mdd_obj(pobj);
913 struct thandle *handle;
917 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
918 handle = mdd_trans_start(ctxt, mdd);
920 RETURN(PTR_ERR(handle));
922 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
924 rc = __mdd_index_delete(ctxt, mdo, name, handle);
926 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
928 mdd_trans_stop(ctxt, mdd, handle);
932 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
933 struct md_object *tobj, const struct lu_fid *lf,
936 struct mdd_device *mdd = mdo2mdd(pobj);
937 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
938 struct mdd_object *mdd_tobj = NULL;
939 struct thandle *handle;
943 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
944 handle = mdd_trans_start(ctxt, mdd);
946 RETURN(PTR_ERR(handle));
949 mdd_tobj = md2mdd_obj(tobj);
951 mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
954 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
959 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
963 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
964 rc = __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
969 /*FIXME: should we do error handling here?*/
970 mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
971 mdd_trans_stop(ctxt, mdd, handle);
975 static int mdd_root_get(const struct lu_context *ctx,
976 struct md_device *m, struct lu_fid *f)
978 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
981 *f = mdd->mdd_root_fid;
985 static int mdd_statfs(const struct lu_context *ctx,
986 struct md_device *m, struct kstatfs *sfs) {
987 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
992 rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
997 static int __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
998 struct thandle *handle)
1000 struct dt_object *next;
1002 LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
1003 next = mdd_object_child(obj);
1004 return next->do_ops->do_ref_add(ctxt, next, handle);
1007 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1009 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1010 struct mdd_device *mdd = mdo2mdd(obj);
1011 struct thandle *handle;
1015 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1016 handle = mdd_trans_start(ctxt, mdd);
1019 rc = __mdd_ref_add(ctxt, mdd_obj, handle);
1021 mdd_trans_stop(ctxt, mdd, handle);
1027 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1028 struct thandle *handle, struct md_attr *ma)
1030 struct dt_object *next = mdd_object_child(obj);
1033 LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
1035 rc = next->do_ops->do_ref_del(ctxt, next, handle);
1036 if (rc == 0 && ma != NULL)
1037 mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
1042 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1045 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1046 struct mdd_device *mdd = mdo2mdd(obj);
1047 struct thandle *handle;
1051 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1052 handle = mdd_trans_start(ctxt, mdd);
1055 rc = __mdd_ref_del(ctxt, mdd_obj, handle, ma);
1057 mdd_trans_stop(ctxt, mdd, handle);
1062 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1067 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1072 struct md_device_operations mdd_ops = {
1073 .mdo_root_get = mdd_root_get,
1074 .mdo_statfs = mdd_statfs,
1075 .mdo_notify = mdd_notify
1078 static struct md_dir_operations mdd_dir_ops = {
1079 .mdo_lookup = mdd_lookup,
1080 .mdo_create = mdd_create,
1081 .mdo_rename = mdd_rename,
1082 .mdo_link = mdd_link,
1083 .mdo_unlink = mdd_unlink,
1084 .mdo_name_insert = mdd_mkname,
1085 .mdo_name_remove = mdd_name_remove,
1086 .mdo_rename_tgt = mdd_rename_tgt,
1090 static struct md_object_operations mdd_obj_ops = {
1091 .moo_attr_get = mdd_attr_get,
1092 .moo_attr_set = mdd_attr_set,
1093 .moo_xattr_get = mdd_xattr_get,
1094 .moo_xattr_set = mdd_xattr_set,
1095 .moo_object_create = mdd_object_create,
1096 .moo_ref_add = mdd_ref_add,
1097 .moo_ref_del = mdd_ref_del,
1098 .moo_open = mdd_open,
1099 .moo_close = mdd_close
1102 static struct obd_ops mdd_obd_device_ops = {
1103 .o_owner = THIS_MODULE
1106 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1107 struct lu_device_type *t,
1108 struct lustre_cfg *lcfg)
1110 struct lu_device *l;
1111 struct mdd_device *m;
1115 l = ERR_PTR(-ENOMEM);
1117 md_device_init(&m->mdd_md_dev, t);
1119 l->ld_ops = &mdd_lu_ops;
1120 m->mdd_md_dev.md_ops = &mdd_ops;
1126 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1128 struct mdd_device *m = lu2mdd_dev(lu);
1130 LASSERT(atomic_read(&lu->ld_ref) == 0);
1131 md_device_fini(&m->mdd_md_dev);
1135 static int mdd_type_init(struct lu_device_type *t)
1137 return lu_context_key_register(&mdd_thread_key);
1140 static void mdd_type_fini(struct lu_device_type *t)
1142 lu_context_key_degister(&mdd_thread_key);
1145 static struct lu_device_type_operations mdd_device_type_ops = {
1146 .ldto_init = mdd_type_init,
1147 .ldto_fini = mdd_type_fini,
1149 .ldto_device_alloc = mdd_device_alloc,
1150 .ldto_device_free = mdd_device_free,
1152 .ldto_device_init = mdd_device_init,
1153 .ldto_device_fini = mdd_device_fini
1156 static struct lu_device_type mdd_device_type = {
1157 .ldt_tags = LU_DEVICE_MD,
1158 .ldt_name = LUSTRE_MDD0_NAME,
1159 .ldt_ops = &mdd_device_type_ops,
1160 .ldt_ctx_tags = LCT_MD_THREAD
1163 static void *mdd_key_init(const struct lu_context *ctx,
1164 struct lu_context_key *key)
1166 struct mdd_thread_info *info;
1168 OBD_ALLOC_PTR(info);
1170 info = ERR_PTR(-ENOMEM);
1174 static void mdd_key_fini(const struct lu_context *ctx,
1175 struct lu_context_key *key, void *data)
1177 struct mdd_thread_info *info = data;
1181 static struct lu_context_key mdd_thread_key = {
1182 .lct_tags = LCT_MD_THREAD,
1183 .lct_init = mdd_key_init,
1184 .lct_fini = mdd_key_fini
1187 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1191 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1195 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1197 static int __init mdd_mod_init(void)
1199 struct lprocfs_static_vars lvars;
1201 lprocfs_init_vars(mdd, &lvars);
1202 return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1203 LUSTRE_MDD0_NAME, &mdd_device_type);
1206 static void __exit mdd_mod_exit(void)
1208 class_unregister_type(LUSTRE_MDD0_NAME);
1211 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1212 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1213 MODULE_LICENSE("GPL");
1215 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);