1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
45 #include "mdd_internal.h"
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51 struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54 struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56 struct mdd_object *obj, enum dt_lock_mode mode);
57 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58 struct thandle *handle);
59 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60 struct thandle *handle, struct md_attr *);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62 const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
67 static struct lu_context_key mdd_thread_key;
69 static const char *mdd_root_dir_name = "root";
70 static const char dot[] = ".";
71 static const char dotdot[] = "..";
74 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
76 struct mdd_thread_info *info;
78 info = lu_context_key_get(ctx, &mdd_thread_key);
79 LASSERT(info != NULL);
83 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
84 const struct lu_object_header *hdr,
87 struct mdd_object *mdo;
94 lu_object_init(o, NULL, d);
95 mdo->mod_obj.mo_ops = &mdd_obj_ops;
96 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
97 o->lo_ops = &mdd_lu_obj_ops;
104 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
106 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
107 struct lu_object *below;
108 struct lu_device *under;
111 under = &d->mdd_child->dd_lu_dev;
112 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
117 lu_object_add(o, below);
121 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
123 struct mdd_object *mdd = lu2mdd_obj(o);
129 static int mdd_attr_get(const struct lu_context *ctxt,
130 struct md_object *obj, struct lu_attr *attr)
132 struct mdd_object *mdd_obj = md2mdd_obj(obj);
133 struct dt_object *next;
138 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
140 next = mdd_object_child(mdd_obj);
141 rc = next->do_ops->do_attr_get(ctxt, next, attr);
146 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
147 void *buf, int buf_len, const char *name)
149 struct mdd_object *mdd_obj = md2mdd_obj(obj);
150 struct dt_object *next;
155 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
157 next = mdd_object_child(mdd_obj);
158 rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
163 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
164 void *buf, int buf_len)
166 struct mdd_object *mdd_obj = md2mdd_obj(obj);
167 struct dt_object *next;
172 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
174 next = mdd_object_child(mdd_obj);
175 rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
181 MDD_TXN_OBJECT_DESTROY_OP,
182 MDD_TXN_OBJECT_CREATE_OP,
184 MDD_TXN_XATTR_SET_OP,
185 MDD_TXN_INDEX_INSERT_OP,
186 MDD_TXN_INDEX_DELETE_OP,
193 struct mdd_txn_op_descr {
194 enum mdd_txn_op mod_op;
195 unsigned int mod_credits;
199 MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
200 MDD_TXN_OBJECT_CREATE_CREDITS = 20,
201 MDD_TXN_ATTR_SET_CREDITS = 20,
202 MDD_TXN_XATTR_SET_CREDITS = 20,
203 MDD_TXN_INDEX_INSERT_CREDITS = 20,
204 MDD_TXN_INDEX_DELETE_CREDITS = 20,
205 MDD_TXN_LINK_CREDITS = 20,
206 MDD_TXN_UNLINK_CREDITS = 20,
207 MDD_TXN_RENAME_CREDITS = 20,
208 MDD_TXN_MKDIR_CREDITS = 20
211 #define DEFINE_MDD_TXN_OP_DESC(opname) \
212 static const struct mdd_txn_op_descr opname = { \
213 .mod_op = opname ## _OP, \
214 .mod_credits = opname ## _CREDITS, \
218 * number of blocks to reserve for particular operations. Should be function
219 * of ... something. Stub for now.
221 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
222 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
223 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
224 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
225 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
226 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
227 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
228 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
229 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
230 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
232 static void mdd_txn_param_build(const struct lu_context *ctx,
233 const struct mdd_txn_op_descr *opd)
235 mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
238 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
239 lu_printer_t p, const struct lu_object *o)
241 return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
244 static int mdd_object_exists(const struct lu_context *ctx,
245 const struct lu_object *o)
247 return lu_object_exists(ctx, lu_object_next(o));
250 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
253 struct dt_object *root;
256 root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
259 LASSERT(root != NULL);
260 lu_object_put(ctx, &root->do_lu);
267 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
269 /*create PENDING and OBJECTS dir for open and llog*/
273 static int mdd_fs_cleanup(struct mdd_device *mdd)
275 /*create PENDING and OBJECTS dir for open and llog*/
279 static int mdd_device_init(const struct lu_context *ctx,
280 struct lu_device *d, struct lu_device *next)
282 struct mdd_device *mdd = lu2mdd_dev(d);
286 mdd->mdd_child = lu2dt_dev(next);
288 rc = mdd_fs_setup(ctx, mdd);
294 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
297 struct mdd_device *m = lu2mdd_dev(d);
298 struct lu_device *next = &m->mdd_child->dd_lu_dev;
300 dt_device_fini(&m->mdd_lov_dev);
305 static int mdd_process_config(const struct lu_context *ctxt,
306 struct lu_device *d, struct lustre_cfg *cfg)
308 struct mdd_device *m = lu2mdd_dev(d);
309 struct dt_device *dt = m->mdd_child;
310 struct lu_device *next = &dt->dd_lu_dev;
313 switch(cfg->lcfg_command) {
315 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
318 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
319 rc = mdd_mount(ctxt, m);
322 rc = mdd_init_obd(ctxt, m);
324 CERROR("lov init error %d \n", rc);
329 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
336 struct lu_device_operations mdd_lu_ops = {
337 .ldo_object_alloc = mdd_object_alloc,
338 .ldo_process_config = mdd_process_config,
341 static struct lu_object_operations mdd_lu_obj_ops = {
342 .loo_object_init = mdd_object_init,
343 .loo_object_free = mdd_object_free,
344 .loo_object_print = mdd_object_print,
345 .loo_object_exists = mdd_object_exists,
348 static void mdd_lock(const struct lu_context *ctxt,
349 struct mdd_object *obj, enum dt_lock_mode mode)
351 struct dt_object *next = mdd_object_child(obj);
353 next->do_ops->do_lock(ctxt, next, mode);
356 static void mdd_unlock(const struct lu_context *ctxt,
357 struct mdd_object *obj, enum dt_lock_mode mode)
359 struct dt_object *next = mdd_object_child(obj);
361 next->do_ops->do_unlock(ctxt, next, mode);
364 static void mdd_lock2(const struct lu_context *ctxt,
365 struct mdd_object *o0, struct mdd_object *o1)
367 mdd_lock(ctxt, o0, DT_WRITE_LOCK);
368 mdd_lock(ctxt, o1, DT_WRITE_LOCK);
371 static void mdd_unlock2(const struct lu_context *ctxt,
372 struct mdd_object *o0, struct mdd_object *o1)
374 mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
375 mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
378 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
379 struct mdd_device *mdd)
381 struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
383 return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
386 static void mdd_trans_stop(const struct lu_context *ctxt,
387 struct mdd_device *mdd, struct thandle *handle)
389 mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
392 static int __mdd_object_create(const struct lu_context *ctxt,
393 struct mdd_object *obj, struct md_attr *ma,
394 struct thandle *handle)
396 struct dt_object *next;
397 struct lu_attr *attr = &ma->ma_attr;
401 if (!lu_object_exists(ctxt, mdd2lu_obj(obj))) {
402 next = mdd_object_child(obj);
403 rc = next->do_ops->do_create(ctxt, next, attr, handle);
405 rc = mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
407 ma->ma_valid |= MA_INODE;
412 LASSERT(ergo(rc == 0, lu_object_exists(ctxt, mdd2lu_obj(obj))));
417 static int mdd_object_create(const struct lu_context *ctxt,
418 struct md_object *obj, struct md_attr *attr)
421 struct mdd_device *mdd = mdo2mdd(obj);
422 struct thandle *handle;
426 mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
427 handle = mdd_trans_start(ctxt, mdd);
429 RETURN(PTR_ERR(handle));
431 rc = __mdd_object_create(ctxt, md2mdd_obj(obj), attr, handle);
433 mdd_trans_stop(ctxt, mdd, handle);
438 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
439 const struct lu_attr *attr, struct thandle *handle)
441 struct dt_object *next;
443 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
444 next = mdd_object_child(md2mdd_obj(obj));
445 return next->do_ops->do_attr_set(ctxt, next, attr, handle);
448 static int mdd_attr_set(const struct lu_context *ctxt,
449 struct md_object *obj, const struct lu_attr *attr)
451 struct mdd_device *mdd = mdo2mdd(obj);
452 struct thandle *handle;
456 mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
457 handle = mdd_trans_start(ctxt, mdd);
459 RETURN(PTR_ERR(handle));
461 rc = __mdd_attr_set(ctxt, obj, attr, handle);
463 mdd_trans_stop(ctxt, mdd, handle);
468 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
469 struct mdd_object *obj, const void *buf,
470 int buf_len, const char *name, int fl,
471 struct thandle *handle)
473 struct dt_object *next;
475 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
476 next = mdd_object_child(obj);
477 return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name, fl,
481 int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
482 const void *buf, int buf_len, const char *name, int fl)
484 struct mdd_device *mdd = mdo2mdd(obj);
485 struct thandle *handle;
489 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
490 handle = mdd_trans_start(ctxt, mdd);
492 RETURN(PTR_ERR(handle));
494 rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
497 mdd_trans_stop(ctxt, mdd, handle);
502 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
503 struct mdd_object *obj,
504 const char *name, struct thandle *handle)
506 struct dt_object *next;
508 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
509 next = mdd_object_child(obj);
510 return next->do_ops->do_xattr_del(ctxt, next, name, handle);
513 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
516 struct mdd_device *mdd = mdo2mdd(obj);
517 struct thandle *handle;
521 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
522 handle = mdd_trans_start(ctxt, mdd);
524 RETURN(PTR_ERR(handle));
526 rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
528 mdd_trans_stop(ctxt, mdd, handle);
533 static int __mdd_index_insert(const struct lu_context *ctxt,
534 struct mdd_object *pobj, const struct lu_fid *lf,
535 const char *name, struct thandle *handle)
538 struct dt_object *next = mdd_object_child(pobj);
541 if (dt_try_as_dir(ctxt, next))
542 rc = next->do_index_ops->dio_insert(ctxt, next,
544 (struct dt_key *)name, handle);
550 static int __mdd_index_delete(const struct lu_context *ctxt,
551 struct mdd_object *pobj, const char *name,
552 struct thandle *handle)
555 struct dt_object *next = mdd_object_child(pobj);
558 if (dt_try_as_dir(ctxt, next))
559 rc = next->do_index_ops->dio_delete(ctxt, next,
560 (struct dt_key *)name, handle);
566 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
567 struct md_object *src_obj, const char *name)
569 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
570 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
571 struct mdd_device *mdd = mdo2mdd(src_obj);
572 struct thandle *handle;
576 mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
577 handle = mdd_trans_start(ctxt, mdd);
579 RETURN(PTR_ERR(handle));
581 mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
584 * XXX Check that link can be added to the child.
587 rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
590 __mdd_ref_add(ctxt, mdd_sobj, handle);
592 mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
593 mdd_trans_stop(ctxt, mdd, handle);
598 * Check that @dir contains no entries except (possibly) dot and dotdot.
603 * -ENOTEMPTY not empty
607 static int mdd_dir_is_empty(const struct lu_context *ctx,
608 struct mdd_object *dir)
611 struct dt_object *obj;
612 struct dt_it_ops *iops;
615 obj = mdd_object_child(dir);
616 iops = &obj->do_index_ops->dio_it;
617 it = iops->init(ctx, obj);
619 result = iops->get(ctx, it, (const void *)"");
622 for (result = 0, i = 0; result == 0 && i < 3; ++i) {
623 result = iops->next(ctx, it);
630 fid = (void *)iops->rec(ctx, it);
631 name = (void *)iops->key(ctx, it);
632 len = iops->key_size(ctx, it);
633 CERROR("entry: "DFID3": \"%*.*s\"\n",
634 PFID3(fid), len, len, name);
641 else if (result == +1)
643 } else if (result == 0)
645 * Huh? Index contains no zero key?
654 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
655 struct md_object *cobj, const char *name,
658 struct mdd_device *mdd = mdo2mdd(pobj);
659 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
660 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
661 struct dt_object *dt_cobj = mdd_object_child(mdd_cobj);
662 struct thandle *handle;
667 if (dt_try_as_dir(ctxt, dt_cobj)) {
668 if (!S_ISDIR(ma->ma_attr.la_mode))
669 RETURN(rc = -EISDIR);
671 if (S_ISDIR(ma->ma_attr.la_mode))
672 RETURN(rc = -ENOTDIR);
675 mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
676 handle = mdd_trans_start(ctxt, mdd);
678 RETURN(PTR_ERR(handle));
680 mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
683 if (S_ISDIR(ma->ma_attr.la_mode)) {
684 rc = mdd_dir_is_empty(ctxt, mdd_cobj);
689 rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
693 __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
695 if (S_ISDIR(ma->ma_attr.la_mode)) {
697 __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
699 __mdd_ref_del(ctxt, mdd_pobj, handle, NULL);
703 mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
704 mdd_trans_stop(ctxt, mdd, handle);
708 static int mdd_parent_fid(const struct lu_context *ctxt,
709 struct mdd_object *obj,
714 rc = mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
719 static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj)
721 return lu_object_fid(&obj->mod_obj.mo_lu);
724 static int mdd_is_parent(const struct lu_context *ctxt,
725 struct mdd_device *mdd,
726 struct mdd_object *p1,
727 struct mdd_object *p2)
729 struct lu_fid * pfid;
732 pfid = &mdd_ctx_info(ctxt)->mti_fid;
734 rc = mdd_parent_fid(ctxt, p1, pfid);
737 if (lu_fid_eq(pfid, mdo2fid(p2))) {
740 } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
745 static int mdd_rename_lock(const struct lu_context *ctxt,
746 struct mdd_device *mdd,
747 struct mdd_object *src_pobj,
748 struct mdd_object *tgt_pobj)
752 if (src_pobj == tgt_pobj) {
753 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
756 /*compared the parent child relationship of src_p&tgt_p*/
757 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
758 mdd_lock2(ctxt, src_pobj, tgt_pobj);
760 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
761 mdd_lock2(ctxt, tgt_pobj, src_pobj);
764 if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
765 mdd_lock2(ctxt, tgt_pobj, src_pobj);
768 if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
769 mdd_lock2(ctxt, src_pobj, tgt_pobj);
773 mdd_lock2(ctxt, src_pobj, tgt_pobj);
777 static void mdd_rename_unlock(const struct lu_context *ctxt,
778 struct mdd_object *src_pobj,
779 struct mdd_object *tgt_pobj)
781 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
782 if (src_pobj != tgt_pobj)
783 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
786 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
787 struct md_object *tgt_pobj, const struct lu_fid *lf,
788 const char *sname, struct md_object *tobj,
791 struct mdd_device *mdd = mdo2mdd(src_pobj);
792 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
793 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
794 struct mdd_object *mdd_tobj = NULL;
795 struct thandle *handle;
799 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
800 handle = mdd_trans_start(ctxt, mdd);
802 RETURN(PTR_ERR(handle));
804 /*FIXME: Should consider tobj and sobj too in rename_lock*/
805 rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
810 mdd_tobj = md2mdd_obj(tobj);
812 rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
815 /*FIXME: no sobj now, we should check sobj type, if it is dir,
816 * the nlink of its parent should be dec
819 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
824 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
829 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
830 struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
832 __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
833 if (dt_try_as_dir(ctxt, dt_tobj))
834 __mdd_ref_del(ctxt, mdd_tpobj, handle, NULL);
837 /*FIXME: should we do error handling here?*/
839 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
840 mdd_trans_stop(ctxt, mdd, handle);
844 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
845 const char *name, struct lu_fid* fid)
847 struct dt_object *dir = mdd_object_child(md2mdd_obj(pobj));
848 struct dt_rec *rec = (struct dt_rec *)fid;
849 const struct dt_key *key = (const struct dt_key *)name;
853 if (dt_try_as_dir(ctxt, dir))
854 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
860 static int __mdd_object_initialize(const struct lu_context *ctxt,
861 struct mdd_object *parent,
862 struct mdd_object *child,
863 struct md_attr *ma, struct thandle *handle)
868 if (S_ISDIR(ma->ma_attr.la_mode)) {
869 __mdd_ref_add(ctxt, child, handle);
870 rc = __mdd_index_insert(ctxt, child,
871 mdo2fid(child), dot, handle);
873 rc = __mdd_index_insert(ctxt, child, mdo2fid(parent),
876 __mdd_ref_add(ctxt, parent, handle);
880 rc2 = __mdd_index_delete(ctxt,
883 CERROR("Failure to cleanup after dotdot"
884 " creation: %d (%d)\n", rc2, rc);
886 __mdd_ref_del(ctxt, child, handle, 0);
894 * Create object and insert it into namespace.
896 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
897 const char *name, struct md_object *child,
898 const char *target_name, struct md_attr* ma)
900 struct mdd_device *mdd = mdo2mdd(pobj);
901 struct mdd_object *mdo = md2mdd_obj(pobj);
902 struct mdd_object *son = md2mdd_obj(child);
903 struct lu_attr *attr = &ma->ma_attr;
905 struct lov_mds_md *lmm = NULL;
906 struct thandle *handle;
907 int rc, created = 0, inserted = 0, lmm_size;
910 /* sanity checks before big job */
911 fid = &mdd_ctx_info(ctxt)->mti_fid;
912 rc = mdd_lookup(ctxt, pobj, name, fid);
914 rc = rc ? rc : -EEXIST;
917 /* no RPC inside the transaction, so OST objects should be created at
920 if (S_ISREG(attr->la_mode)) {
921 rc = mdd_lov_create(ctxt, mdd, son, &lmm, &lmm_size);
924 if (lmm_size < ma->ma_lmm_size)
925 ma->ma_lmm_size = lmm_size;
926 if (ma->ma_lmm_size > 0) {
927 memcpy(ma->ma_lmm, lmm, ma->ma_lmm_size);
928 ma->ma_valid |= MA_LOV;
932 mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
933 handle = mdd_trans_start(ctxt, mdd);
935 RETURN(PTR_ERR(handle));
937 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
940 * XXX check that link can be added to the parent in mkdir case.
944 * Two operations have to be performed:
946 * - allocation of new object (->do_create()), and
948 * - insertion into parent index (->dio_insert()).
950 * Due to locking, operation order is not important, when both are
951 * successful, *but* error handling cases are quite different:
953 * - if insertion is done first, and following object creation fails,
954 * insertion has to be rolled back, but this operation might fail
955 * also leaving us with dangling index entry.
957 * - if creation is done first, is has to be undone if insertion
958 * fails, leaving us with leaked space, which is neither good, nor
961 * It seems that creation-first is simplest solution, but it is
962 * sub-optimal in the frequent
967 * case, because second mkdir is bound to create object, only to
968 * destroy it immediately.
970 * Note that local file systems do
972 * 0. lookup -> -EEXIST
978 * Maybe we should do the same. For now: creation-first.
981 rc = __mdd_object_create(ctxt, son, ma, handle);
987 rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
990 * Object has no links, so it will be destroyed when last
991 * reference is released. (XXX not now.)
995 rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
1003 rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size);
1005 CERROR("error on stripe info copy %d \n", rc);
1008 if (rc && created) {
1012 rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
1014 CERROR("error can not cleanup destroy %d\n",
1018 __mdd_ref_del(ctxt, son, handle, NULL);
1021 OBD_FREE(lmm, lmm_size);
1022 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1023 mdd_trans_stop(ctxt, mdd, handle);
1027 static int mdd_mkname(const struct lu_context *ctxt, struct md_object *pobj,
1028 const char *name, const struct lu_fid *fid)
1030 struct mdd_device *mdd = mdo2mdd(pobj);
1031 struct mdd_object *mdo = md2mdd_obj(pobj);
1032 struct thandle *handle;
1036 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1037 handle = mdd_trans_start(ctxt, mdd);
1039 RETURN(PTR_ERR(handle));
1041 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1043 rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
1045 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1046 mdd_trans_stop(ctxt, mdd, handle);
1050 static int mdd_name_remove(const struct lu_context *ctxt,
1051 struct md_object *pobj,
1054 struct mdd_device *mdd = mdo2mdd(pobj);
1055 struct mdd_object *mdo = md2mdd_obj(pobj);
1056 struct thandle *handle;
1060 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1061 handle = mdd_trans_start(ctxt, mdd);
1063 RETURN(PTR_ERR(handle));
1065 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1067 rc = __mdd_index_delete(ctxt, mdo, name, handle);
1069 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1071 mdd_trans_stop(ctxt, mdd, handle);
1075 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1076 struct md_object *tobj, const struct lu_fid *lf,
1079 struct mdd_device *mdd = mdo2mdd(pobj);
1080 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1081 struct mdd_object *mdd_tobj = NULL;
1082 struct thandle *handle;
1086 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1087 handle = mdd_trans_start(ctxt, mdd);
1089 RETURN(PTR_ERR(handle));
1092 mdd_tobj = md2mdd_obj(tobj);
1094 mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1097 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1102 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
1106 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu))
1107 __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
1109 /*FIXME: should we do error handling here?*/
1110 mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1111 mdd_trans_stop(ctxt, mdd, handle);
1115 static int mdd_root_get(const struct lu_context *ctx,
1116 struct md_device *m, struct lu_fid *f)
1118 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1121 *f = mdd->mdd_root_fid;
1125 static int mdd_statfs(const struct lu_context *ctx,
1126 struct md_device *m, struct kstatfs *sfs) {
1127 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1132 rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1137 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1138 struct thandle *handle)
1140 struct dt_object *next;
1142 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1143 next = mdd_object_child(obj);
1144 next->do_ops->do_ref_add(ctxt, next, handle);
1147 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1149 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1150 struct mdd_device *mdd = mdo2mdd(obj);
1151 struct thandle *handle;
1154 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1155 handle = mdd_trans_start(ctxt, mdd);
1158 __mdd_ref_add(ctxt, mdd_obj, handle);
1160 mdd_trans_stop(ctxt, mdd, handle);
1166 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1167 struct thandle *handle, struct md_attr *ma)
1169 struct dt_object *next = mdd_object_child(obj);
1171 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1173 next->do_ops->do_ref_del(ctxt, next, handle);
1175 int rc = mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
1177 ma->ma_valid |= MA_INODE;
1181 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1184 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1185 struct mdd_device *mdd = mdo2mdd(obj);
1186 struct thandle *handle;
1189 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1190 handle = mdd_trans_start(ctxt, mdd);
1193 __mdd_ref_del(ctxt, mdd_obj, handle, ma);
1195 mdd_trans_stop(ctxt, mdd, handle);
1200 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1205 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1210 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
1211 struct lu_rdpg *rdpg)
1213 struct dt_object *next;
1216 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(md2mdd_obj(obj))));
1217 next = mdd_object_child(md2mdd_obj(obj));
1218 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
1222 struct md_device_operations mdd_ops = {
1223 .mdo_root_get = mdd_root_get,
1224 .mdo_statfs = mdd_statfs,
1227 static struct md_dir_operations mdd_dir_ops = {
1228 .mdo_lookup = mdd_lookup,
1229 .mdo_create = mdd_create,
1230 .mdo_rename = mdd_rename,
1231 .mdo_link = mdd_link,
1232 .mdo_unlink = mdd_unlink,
1233 .mdo_name_insert = mdd_mkname,
1234 .mdo_name_remove = mdd_name_remove,
1235 .mdo_rename_tgt = mdd_rename_tgt,
1239 static struct md_object_operations mdd_obj_ops = {
1240 .moo_attr_get = mdd_attr_get,
1241 .moo_attr_set = mdd_attr_set,
1242 .moo_xattr_get = mdd_xattr_get,
1243 .moo_xattr_set = mdd_xattr_set,
1244 .moo_xattr_list = mdd_xattr_list,
1245 .moo_xattr_del = mdd_xattr_del,
1246 .moo_object_create = mdd_object_create,
1247 .moo_ref_add = mdd_ref_add,
1248 .moo_ref_del = mdd_ref_del,
1249 .moo_open = mdd_open,
1250 .moo_close = mdd_close,
1251 .moo_readpage = mdd_readpage
1254 static struct obd_ops mdd_obd_device_ops = {
1255 .o_owner = THIS_MODULE
1258 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1259 struct lu_device_type *t,
1260 struct lustre_cfg *lcfg)
1262 struct lu_device *l;
1263 struct mdd_device *m;
1267 l = ERR_PTR(-ENOMEM);
1269 md_device_init(&m->mdd_md_dev, t);
1271 l->ld_ops = &mdd_lu_ops;
1272 m->mdd_md_dev.md_ops = &mdd_ops;
1278 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1280 struct mdd_device *m = lu2mdd_dev(lu);
1282 LASSERT(atomic_read(&lu->ld_ref) == 0);
1283 md_device_fini(&m->mdd_md_dev);
1287 static int mdd_type_init(struct lu_device_type *t)
1289 return lu_context_key_register(&mdd_thread_key);
1292 static void mdd_type_fini(struct lu_device_type *t)
1294 lu_context_key_degister(&mdd_thread_key);
1297 static struct lu_device_type_operations mdd_device_type_ops = {
1298 .ldto_init = mdd_type_init,
1299 .ldto_fini = mdd_type_fini,
1301 .ldto_device_alloc = mdd_device_alloc,
1302 .ldto_device_free = mdd_device_free,
1304 .ldto_device_init = mdd_device_init,
1305 .ldto_device_fini = mdd_device_fini
1308 static struct lu_device_type mdd_device_type = {
1309 .ldt_tags = LU_DEVICE_MD,
1310 .ldt_name = LUSTRE_MDD0_NAME,
1311 .ldt_ops = &mdd_device_type_ops,
1312 .ldt_ctx_tags = LCT_MD_THREAD
1315 static void *mdd_key_init(const struct lu_context *ctx,
1316 struct lu_context_key *key)
1318 struct mdd_thread_info *info;
1320 OBD_ALLOC_PTR(info);
1322 info = ERR_PTR(-ENOMEM);
1326 static void mdd_key_fini(const struct lu_context *ctx,
1327 struct lu_context_key *key, void *data)
1329 struct mdd_thread_info *info = data;
1333 static struct lu_context_key mdd_thread_key = {
1334 .lct_tags = LCT_MD_THREAD,
1335 .lct_init = mdd_key_init,
1336 .lct_fini = mdd_key_fini
1339 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1343 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1347 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1349 static int __init mdd_mod_init(void)
1351 struct lprocfs_static_vars lvars;
1353 lprocfs_init_vars(mdd, &lvars);
1354 return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1355 LUSTRE_MDD0_NAME, &mdd_device_type);
1358 static void __exit mdd_mod_exit(void)
1360 class_unregister_type(LUSTRE_MDD0_NAME);
1363 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1364 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1365 MODULE_LICENSE("GPL");
1367 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);