1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
45 #include "mdd_internal.h"
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51 struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54 struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56 struct mdd_object *obj, enum dt_lock_mode mode);
57 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58 struct thandle *handle);
59 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60 struct thandle *handle);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62 const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
67 static struct lu_context_key mdd_thread_key;
69 static const char *mdd_root_dir_name = "root";
70 static const char dot[] = ".";
71 static const char dotdot[] = "..";
74 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
76 struct mdd_thread_info *info;
78 info = lu_context_key_get(ctx, &mdd_thread_key);
79 LASSERT(info != NULL);
83 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
84 const struct lu_object_header *hdr,
87 struct mdd_object *mdo;
94 lu_object_init(o, NULL, d);
95 mdo->mod_obj.mo_ops = &mdd_obj_ops;
96 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
97 o->lo_ops = &mdd_lu_obj_ops;
104 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
106 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
107 struct lu_object *below;
108 struct lu_device *under;
111 under = &d->mdd_child->dd_lu_dev;
112 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
117 lu_object_add(o, below);
121 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
123 struct mdd_object *mdd = lu2mdd_obj(o);
129 static int mdd_attr_get(const struct lu_context *ctxt,
130 struct md_object *obj, struct md_attr *ma)
132 struct mdd_object *mdd_obj = md2mdd_obj(obj);
133 struct dt_object *next;
138 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
140 next = mdd_object_child(mdd_obj);
141 rc = next->do_ops->do_attr_get(ctxt, next, &ma->ma_attr);
143 LASSERT((ma->ma_attr.la_mode & S_IFMT) ==
144 (obj->mo_lu.lo_header->loh_attr & S_IFMT));
145 ma->ma_valid |= MA_INODE;
146 /* get LOV EA also */
147 if ((S_ISREG(ma->ma_attr.la_mode)
148 || S_ISDIR(ma->ma_attr.la_mode))
149 && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) {
150 rc = mdd_get_md(ctxt, obj, ma->ma_lmm,&ma->ma_lmm_size);
152 ma->ma_valid |= MA_LOV;
157 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
162 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
163 void *buf, int buf_len, const char *name)
165 struct mdd_object *mdd_obj = md2mdd_obj(obj);
166 struct dt_object *next;
171 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
173 next = mdd_object_child(mdd_obj);
174 rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
179 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
180 void *buf, int buf_len)
182 struct mdd_object *mdd_obj = md2mdd_obj(obj);
183 struct dt_object *next;
188 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
190 next = mdd_object_child(mdd_obj);
191 rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
194 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
195 void *buf, int buf_len)
197 struct mdd_object *mdd_obj = md2mdd_obj(obj);
198 struct dt_object *next;
203 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
205 next = mdd_object_child(mdd_obj);
206 rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
212 MDD_TXN_OBJECT_DESTROY_OP,
213 MDD_TXN_OBJECT_CREATE_OP,
215 MDD_TXN_XATTR_SET_OP,
216 MDD_TXN_INDEX_INSERT_OP,
217 MDD_TXN_INDEX_DELETE_OP,
221 MDD_TXN_CREATE_DATA_OP,
225 struct mdd_txn_op_descr {
226 enum mdd_txn_op mod_op;
227 unsigned int mod_credits;
231 MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
232 MDD_TXN_OBJECT_CREATE_CREDITS = 20,
233 MDD_TXN_ATTR_SET_CREDITS = 20,
234 MDD_TXN_XATTR_SET_CREDITS = 20,
235 MDD_TXN_INDEX_INSERT_CREDITS = 20,
236 MDD_TXN_INDEX_DELETE_CREDITS = 20,
237 MDD_TXN_LINK_CREDITS = 20,
238 MDD_TXN_UNLINK_CREDITS = 20,
239 MDD_TXN_RENAME_CREDITS = 20,
240 MDD_TXN_CREATE_DATA_CREDITS = 20,
241 MDD_TXN_MKDIR_CREDITS = 20
244 #define DEFINE_MDD_TXN_OP_DESC(opname) \
245 static const struct mdd_txn_op_descr opname = { \
246 .mod_op = opname ## _OP, \
247 .mod_credits = opname ## _CREDITS, \
251 * number of blocks to reserve for particular operations. Should be function
252 * of ... something. Stub for now.
254 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
255 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
256 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
257 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
258 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
259 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
260 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
261 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
262 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
263 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
264 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
266 static void mdd_txn_param_build(const struct lu_context *ctx,
267 const struct mdd_txn_op_descr *opd)
269 mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
272 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
273 lu_printer_t p, const struct lu_object *o)
275 return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
278 static int mdd_object_exists(const struct lu_context *ctx,
279 const struct lu_object *o)
281 return lu_object_exists(ctx, lu_object_next(o));
284 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
287 struct dt_object *root;
290 root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
293 LASSERT(root != NULL);
294 lu_object_put(ctx, &root->do_lu);
301 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
303 /*create PENDING and OBJECTS dir for open and llog*/
307 static int mdd_fs_cleanup(struct mdd_device *mdd)
309 /*create PENDING and OBJECTS dir for open and llog*/
313 static int mdd_device_init(const struct lu_context *ctx,
314 struct lu_device *d, struct lu_device *next)
316 struct mdd_device *mdd = lu2mdd_dev(d);
320 mdd->mdd_child = lu2dt_dev(next);
322 rc = mdd_fs_setup(ctx, mdd);
328 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
331 struct mdd_device *m = lu2mdd_dev(d);
332 struct lu_device *next = &m->mdd_child->dd_lu_dev;
334 dt_device_fini(&m->mdd_lov_dev);
339 static int mdd_process_config(const struct lu_context *ctxt,
340 struct lu_device *d, struct lustre_cfg *cfg)
342 struct mdd_device *m = lu2mdd_dev(d);
343 struct dt_device *dt = m->mdd_child;
344 struct lu_device *next = &dt->dd_lu_dev;
345 char *dev = lustre_cfg_string(cfg, 0);
348 switch(cfg->lcfg_command) {
350 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
353 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
355 rc = mdd_mount(ctxt, m);
358 rc = mdd_init_obd(ctxt, m, dev);
360 CERROR("lov init error %d \n", rc);
365 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
372 struct lu_device_operations mdd_lu_ops = {
373 .ldo_object_alloc = mdd_object_alloc,
374 .ldo_process_config = mdd_process_config,
377 static struct lu_object_operations mdd_lu_obj_ops = {
378 .loo_object_init = mdd_object_init,
379 .loo_object_free = mdd_object_free,
380 .loo_object_print = mdd_object_print,
381 .loo_object_exists = mdd_object_exists,
384 static void mdd_lock(const struct lu_context *ctxt,
385 struct mdd_object *obj, enum dt_lock_mode mode)
387 struct dt_object *next = mdd_object_child(obj);
389 next->do_ops->do_lock(ctxt, next, mode);
392 static void mdd_unlock(const struct lu_context *ctxt,
393 struct mdd_object *obj, enum dt_lock_mode mode)
395 struct dt_object *next = mdd_object_child(obj);
397 next->do_ops->do_unlock(ctxt, next, mode);
400 static void mdd_lock2(const struct lu_context *ctxt,
401 struct mdd_object *o0, struct mdd_object *o1)
403 mdd_lock(ctxt, o0, DT_WRITE_LOCK);
404 mdd_lock(ctxt, o1, DT_WRITE_LOCK);
407 static void mdd_unlock2(const struct lu_context *ctxt,
408 struct mdd_object *o0, struct mdd_object *o1)
410 mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
411 mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
414 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
415 struct mdd_device *mdd)
417 struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
419 return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
422 static void mdd_trans_stop(const struct lu_context *ctxt,
423 struct mdd_device *mdd, struct thandle *handle)
425 mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
428 static int __mdd_object_create(const struct lu_context *ctxt,
429 struct mdd_object *obj, struct md_attr *ma,
430 struct thandle *handle)
432 struct dt_object *next;
433 struct lu_attr *attr = &ma->ma_attr;
437 if (!lu_object_exists(ctxt, mdd2lu_obj(obj))) {
438 next = mdd_object_child(obj);
439 rc = next->do_ops->do_create(ctxt, next, attr, handle);
443 LASSERT(ergo(rc == 0, lu_object_exists(ctxt, mdd2lu_obj(obj))));
448 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
449 const struct lu_attr *attr, struct thandle *handle)
451 struct dt_object *next;
453 LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
454 next = mdd_object_child(md2mdd_obj(obj));
455 return next->do_ops->do_attr_set(ctxt, next, attr, handle);
458 static int mdd_attr_set(const struct lu_context *ctxt,
459 struct md_object *obj, const struct lu_attr *attr)
461 struct mdd_object *mdo = md2mdd_obj(obj);
462 struct mdd_device *mdd = mdo2mdd(obj);
463 struct thandle *handle;
467 mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
468 handle = mdd_trans_start(ctxt, mdd);
470 RETURN(PTR_ERR(handle));
472 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
473 rc = __mdd_attr_set(ctxt, obj, attr, handle);
474 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
476 mdd_trans_stop(ctxt, mdd, handle);
481 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
482 struct mdd_object *obj, const void *buf,
483 int buf_len, const char *name, int fl,
484 struct thandle *handle)
486 struct dt_object *next;
488 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
489 next = mdd_object_child(obj);
490 return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name, fl,
494 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct md_object *obj,
495 const void *buf, int buf_len, const char *name, int fl,
496 struct thandle *handle)
498 struct mdd_object *mdo = md2mdd_obj(obj);
499 struct mdd_device *mdd = mdo2mdd(obj);
504 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
505 rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
507 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
512 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
513 const void *buf, int buf_len, const char *name, int fl)
515 struct mdd_device *mdd = mdo2mdd(obj);
516 struct thandle *handle;
520 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
521 handle = mdd_trans_start(ctxt, mdd);
523 RETURN(PTR_ERR(handle));
525 rc = mdd_xattr_set_txn(ctxt, obj, buf, buf_len, name, fl, handle);
527 mdd_trans_stop(ctxt, mdd, handle);
532 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
533 struct mdd_object *obj,
534 const char *name, struct thandle *handle)
536 struct dt_object *next;
538 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
539 next = mdd_object_child(obj);
540 return next->do_ops->do_xattr_del(ctxt, next, name, handle);
543 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
546 struct mdd_object *mdo = md2mdd_obj(obj);
547 struct mdd_device *mdd = mdo2mdd(obj);
548 struct thandle *handle;
552 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
553 handle = mdd_trans_start(ctxt, mdd);
555 RETURN(PTR_ERR(handle));
557 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
558 rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
559 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
561 mdd_trans_stop(ctxt, mdd, handle);
566 static int __mdd_index_insert(const struct lu_context *ctxt,
567 struct mdd_object *pobj, const struct lu_fid *lf,
568 const char *name, struct thandle *handle)
571 struct dt_object *next = mdd_object_child(pobj);
574 if (dt_try_as_dir(ctxt, next))
575 rc = next->do_index_ops->dio_insert(ctxt, next,
577 (struct dt_key *)name, handle);
583 static int __mdd_index_delete(const struct lu_context *ctxt,
584 struct mdd_object *pobj, const char *name,
585 struct thandle *handle)
588 struct dt_object *next = mdd_object_child(pobj);
591 if (dt_try_as_dir(ctxt, next))
592 rc = next->do_index_ops->dio_delete(ctxt, next,
593 (struct dt_key *)name, handle);
599 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
600 struct md_object *src_obj, const char *name)
602 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
603 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
604 struct mdd_device *mdd = mdo2mdd(src_obj);
605 struct thandle *handle;
609 mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
610 handle = mdd_trans_start(ctxt, mdd);
612 RETURN(PTR_ERR(handle));
614 mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
617 * XXX Check that link can be added to the child.
620 rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
623 __mdd_ref_add(ctxt, mdd_sobj, handle);
625 mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
626 mdd_trans_stop(ctxt, mdd, handle);
631 * Check that @dir contains no entries except (possibly) dot and dotdot.
636 * -ENOTEMPTY not empty
640 static int mdd_dir_is_empty(const struct lu_context *ctx,
641 struct mdd_object *dir)
644 struct dt_object *obj;
645 struct dt_it_ops *iops;
648 obj = mdd_object_child(dir);
649 iops = &obj->do_index_ops->dio_it;
650 it = iops->init(ctx, obj);
652 result = iops->get(ctx, it, (const void *)"");
655 for (result = 0, i = 0; result == 0 && i < 3; ++i)
656 result = iops->next(ctx, it);
660 else if (result == +1)
662 } else if (result == 0)
664 * Huh? Index contains no zero key?
673 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
674 struct md_object *cobj, const char *name,
677 struct mdd_device *mdd = mdo2mdd(pobj);
678 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
679 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
680 struct thandle *handle;
684 if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
685 if (!S_ISDIR(ma->ma_attr.la_mode))
687 } else if (S_ISDIR(ma->ma_attr.la_mode))
690 mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
691 handle = mdd_trans_start(ctxt, mdd);
693 RETURN(PTR_ERR(handle));
695 mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
698 if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
699 rc = mdd_dir_is_empty(ctxt, mdd_cobj);
704 rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
708 __mdd_ref_del(ctxt, mdd_cobj, handle);
709 if (S_ISDIR(ma->ma_attr.la_mode)) {
711 __mdd_ref_del(ctxt, mdd_cobj, handle);
713 __mdd_ref_del(ctxt, mdd_pobj, handle);
715 mdd_attr_get(ctxt, cobj, ma);
718 /*This should be moved to handle last unlink. wait open
719 * orphan prototype finished*/
720 if (S_ISREG(ma->ma_attr.la_mode) && (ma->ma_valid & MA_LOV) &&
721 ma->ma_attr.la_nlink == 0 && cobj->mo_lu.lo_header->loh_ref == 1) {
722 rc = mdd_unlink_log(ctxt, mdd, mdd_cobj, ma);
726 mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
727 mdd_trans_stop(ctxt, mdd, handle);
731 static int mdd_parent_fid(const struct lu_context *ctxt,
732 struct mdd_object *obj,
737 rc = mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
742 static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj)
744 return lu_object_fid(&obj->mod_obj.mo_lu);
747 static int mdd_is_parent(const struct lu_context *ctxt,
748 struct mdd_device *mdd,
749 struct mdd_object *p1,
750 struct mdd_object *p2)
752 struct lu_fid * pfid;
755 pfid = &mdd_ctx_info(ctxt)->mti_fid;
757 rc = mdd_parent_fid(ctxt, p1, pfid);
760 if (lu_fid_eq(pfid, mdo2fid(p2))) {
763 } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
768 static int mdd_rename_lock(const struct lu_context *ctxt,
769 struct mdd_device *mdd,
770 struct mdd_object *src_pobj,
771 struct mdd_object *tgt_pobj)
775 if (src_pobj == tgt_pobj) {
776 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
779 /*compared the parent child relationship of src_p&tgt_p*/
780 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
781 mdd_lock2(ctxt, src_pobj, tgt_pobj);
783 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
784 mdd_lock2(ctxt, tgt_pobj, src_pobj);
787 if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
788 mdd_lock2(ctxt, tgt_pobj, src_pobj);
791 if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
792 mdd_lock2(ctxt, src_pobj, tgt_pobj);
796 mdd_lock2(ctxt, src_pobj, tgt_pobj);
800 static void mdd_rename_unlock(const struct lu_context *ctxt,
801 struct mdd_object *src_pobj,
802 struct mdd_object *tgt_pobj)
804 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
805 if (src_pobj != tgt_pobj)
806 mdd_unlock(ctxt, tgt_pobj, DT_WRITE_LOCK);
809 static umode_t mdd_object_type(const struct mdd_object *obj)
811 return obj->mod_obj.mo_lu.lo_header->loh_attr;
814 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
815 struct md_object *tgt_pobj, const struct lu_fid *lf,
816 const char *sname, struct md_object *tobj,
819 struct mdd_device *mdd = mdo2mdd(src_pobj);
820 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
821 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
822 struct mdd_object *mdd_tobj = NULL;
823 struct thandle *handle;
827 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
828 handle = mdd_trans_start(ctxt, mdd);
830 RETURN(PTR_ERR(handle));
832 /*FIXME: Should consider tobj and sobj too in rename_lock*/
833 rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
838 mdd_tobj = md2mdd_obj(tobj);
840 rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
843 /*FIXME: no sobj now, we should check sobj type, if it is dir,
844 * the nlink of its parent should be dec
847 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
852 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
857 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
858 __mdd_ref_del(ctxt, mdd_tobj, handle);
859 if (S_ISDIR(mdd_object_type(mdd_tobj)))
860 __mdd_ref_del(ctxt, mdd_tpobj, handle);
861 /* XXX: mdd_attr_get(ctxt, tobj, ma); needed here */
865 /*FIXME: should we do error handling here?*/
867 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
868 mdd_trans_stop(ctxt, mdd, handle);
872 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
873 const char *name, struct lu_fid* fid)
875 struct mdd_object *mdo = md2mdd_obj(pobj);
876 struct dt_object *dir = mdd_object_child(mdo);
877 struct dt_rec *rec = (struct dt_rec *)fid;
878 const struct dt_key *key = (const struct dt_key *)name;
882 mdd_lock(ctxt, mdo, DT_READ_LOCK);
883 if (S_ISDIR(mdd_object_type(mdo)) && dt_try_as_dir(ctxt, dir))
884 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
887 mdd_unlock(ctxt, mdo, DT_READ_LOCK);
891 static int __mdd_object_initialize(const struct lu_context *ctxt,
892 struct mdd_object *parent,
893 struct mdd_object *child,
894 struct md_attr *ma, struct thandle *handle)
899 if (S_ISDIR(ma->ma_attr.la_mode)) {
900 __mdd_ref_add(ctxt, child, handle);
901 rc = __mdd_index_insert(ctxt, child,
902 mdo2fid(child), dot, handle);
904 rc = __mdd_index_insert(ctxt, child, mdo2fid(parent),
907 __mdd_ref_add(ctxt, parent, handle);
911 rc2 = __mdd_index_delete(ctxt,
914 CERROR("Failure to cleanup after dotdot"
915 " creation: %d (%d)\n", rc2, rc);
917 __mdd_ref_del(ctxt, child, handle);
924 static int mdd_create_data(const struct lu_context *ctxt,
925 struct md_object *pobj, struct md_object *cobj,
926 const void *eadata, int eadatasize,
929 struct mdd_device *mdd = mdo2mdd(pobj);
930 struct mdd_object *mdo = md2mdd_obj(pobj);
931 struct mdd_object *son = md2mdd_obj(cobj);
932 struct lu_attr *attr = &ma->ma_attr;
933 struct lov_mds_md *lmm = NULL;
934 struct thandle *handle;
939 mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
940 handle = mdd_trans_start(ctxt, mdd);
942 RETURN(PTR_ERR(handle));
945 * XXX: should take transaction handle.
947 rc = mdd_lov_create(ctxt, mdd, mdo, son, &lmm, &lmm_size, eadata,
950 rc = mdd_lov_set_md(ctxt, pobj, cobj, lmm,
951 lmm_size, attr->la_mode, handle);
953 rc = mdd_attr_get(ctxt, cobj, ma);
956 mdd_trans_stop(ctxt, mdd, handle);
960 static int mdd_create_sanity_check(const struct lu_context *ctxt,
961 struct mdd_device *mdd,
962 struct md_object *pobj,
963 const char *name, struct md_attr *ma)
968 fid = &mdd_ctx_info(ctxt)->mti_fid;
969 rc = mdd_lookup(ctxt, pobj, name, fid);
971 rc = rc ? rc : -EEXIST;
975 switch (ma->ma_attr.la_mode & S_IFMT) {
993 * Create object and insert it into namespace.
995 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
996 const char *name, struct md_object *child,
997 const char *target_name, const void *eadata,
998 int eadatasize, struct md_attr* ma)
1000 struct mdd_device *mdd = mdo2mdd(pobj);
1001 struct mdd_object *mdo = md2mdd_obj(pobj);
1002 struct mdd_object *son = md2mdd_obj(child);
1003 struct lu_attr *attr = &ma->ma_attr;
1004 struct lov_mds_md *lmm = NULL;
1005 struct thandle *handle;
1006 int rc, created = 0, inserted = 0, lmm_size = 0;
1009 /* sanity checks before big job */
1010 rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1013 /* no RPC inside the transaction, so OST objects should be created at
1015 if (S_ISREG(attr->la_mode)) {
1016 rc = mdd_lov_create(ctxt, mdd, mdo, son, &lmm, &lmm_size,
1017 eadata, eadatasize, attr);
1022 mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1023 handle = mdd_trans_start(ctxt, mdd);
1025 RETURN(PTR_ERR(handle));
1027 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1030 * XXX check that link can be added to the parent in mkdir case.
1034 * Two operations have to be performed:
1036 * - allocation of new object (->do_create()), and
1038 * - insertion into parent index (->dio_insert()).
1040 * Due to locking, operation order is not important, when both are
1041 * successful, *but* error handling cases are quite different:
1043 * - if insertion is done first, and following object creation fails,
1044 * insertion has to be rolled back, but this operation might fail
1045 * also leaving us with dangling index entry.
1047 * - if creation is done first, is has to be undone if insertion
1048 * fails, leaving us with leaked space, which is neither good, nor
1051 * It seems that creation-first is simplest solution, but it is
1052 * sub-optimal in the frequent
1057 * case, because second mkdir is bound to create object, only to
1058 * destroy it immediately.
1060 * Note that local file systems do
1062 * 0. lookup -> -EEXIST
1068 * Maybe we should do the same. For now: creation-first.
1071 rc = __mdd_object_create(ctxt, son, ma, handle);
1077 rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
1080 * Object has no links, so it will be destroyed when last
1081 * reference is released. (XXX not now.)
1085 rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
1092 rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size, attr->la_mode,
1095 CERROR("error on stripe info copy %d \n", rc);
1099 if (S_ISLNK(attr->la_mode)) {
1100 struct dt_object *dt = mdd_object_child(son);
1102 int sym_len = strlen(target_name);
1103 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1104 sym_len, &pos, handle);
1110 mdd_attr_get(ctxt, child, ma);
1112 if (rc && created) {
1116 rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
1118 CERROR("error can not cleanup destroy %d\n",
1122 __mdd_ref_del(ctxt, son, handle);
1125 OBD_FREE(lmm, lmm_size);
1126 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1127 mdd_trans_stop(ctxt, mdd, handle);
1130 /* partial operation */
1131 static int mdd_object_create(const struct lu_context *ctxt,
1132 struct md_object *obj, struct md_attr *ma)
1135 struct mdd_device *mdd = mdo2mdd(obj);
1136 struct thandle *handle;
1140 mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1141 handle = mdd_trans_start(ctxt, mdd);
1143 RETURN(PTR_ERR(handle));
1145 rc = __mdd_object_create(ctxt, md2mdd_obj(obj), ma, handle);
1146 /* XXX: parent fid is needed here
1147 rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
1149 mdd_attr_get(ctxt, md2mdd_obj(obj), ma);
1151 mdd_trans_stop(ctxt, mdd, handle);
1155 /* partial operation */
1156 static int mdd_name_insert(const struct lu_context *ctxt,
1157 struct md_object *pobj,
1158 const char *name, const struct lu_fid *fid)
1160 struct mdd_device *mdd = mdo2mdd(pobj);
1161 struct mdd_object *mdo = md2mdd_obj(pobj);
1162 struct thandle *handle;
1166 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1167 handle = mdd_trans_start(ctxt, mdd);
1169 RETURN(PTR_ERR(handle));
1171 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1173 rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
1175 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1176 mdd_trans_stop(ctxt, mdd, handle);
1180 static int mdd_name_remove(const struct lu_context *ctxt,
1181 struct md_object *pobj,
1184 struct mdd_device *mdd = mdo2mdd(pobj);
1185 struct mdd_object *mdo = md2mdd_obj(pobj);
1186 struct thandle *handle;
1190 mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1191 handle = mdd_trans_start(ctxt, mdd);
1193 RETURN(PTR_ERR(handle));
1195 mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1197 rc = __mdd_index_delete(ctxt, mdo, name, handle);
1199 mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1201 mdd_trans_stop(ctxt, mdd, handle);
1205 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1206 struct md_object *tobj, const struct lu_fid *lf,
1209 struct mdd_device *mdd = mdo2mdd(pobj);
1210 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1211 struct mdd_object *mdd_tobj = NULL;
1212 struct thandle *handle;
1216 mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1217 handle = mdd_trans_start(ctxt, mdd);
1219 RETURN(PTR_ERR(handle));
1222 mdd_tobj = md2mdd_obj(tobj);
1224 mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1227 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1232 rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
1236 if (tobj && lu_object_exists(ctxt, &tobj->mo_lu))
1237 __mdd_ref_del(ctxt, mdd_tobj, handle);
1239 /*FIXME: should we do error handling here?*/
1240 mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1241 mdd_trans_stop(ctxt, mdd, handle);
1245 static int mdd_root_get(const struct lu_context *ctx,
1246 struct md_device *m, struct lu_fid *f)
1248 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1251 *f = mdd->mdd_root_fid;
1255 static int mdd_statfs(const struct lu_context *ctx,
1256 struct md_device *m, struct kstatfs *sfs)
1258 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1263 rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1268 static int mdd_get_maxsize(const struct lu_context *ctx,
1269 struct md_device *m, int *md_size,
1272 struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1277 rc = mdd_lov_mdsize(ctx, mdd, md_size);
1280 rc = mdd_lov_cookiesize(ctx, mdd, cookie_size);
1285 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1286 struct thandle *handle)
1288 struct dt_object *next;
1290 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1291 next = mdd_object_child(obj);
1292 next->do_ops->do_ref_add(ctxt, next, handle);
1295 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1297 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1298 struct mdd_device *mdd = mdo2mdd(obj);
1299 struct thandle *handle;
1302 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1303 handle = mdd_trans_start(ctxt, mdd);
1307 mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1308 __mdd_ref_add(ctxt, mdd_obj, handle);
1309 mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1311 mdd_trans_stop(ctxt, mdd, handle);
1317 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1318 struct thandle *handle)
1320 struct dt_object *next = mdd_object_child(obj);
1322 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1324 next->do_ops->do_ref_del(ctxt, next, handle);
1327 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1330 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1331 struct mdd_device *mdd = mdo2mdd(obj);
1332 struct thandle *handle;
1335 mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1336 handle = mdd_trans_start(ctxt, mdd);
1340 mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1341 __mdd_ref_del(ctxt, mdd_obj, handle);
1342 mdd_attr_get(ctxt, obj, ma);
1343 mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1345 mdd_trans_stop(ctxt, mdd, handle);
1350 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1355 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1360 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
1361 const struct lu_rdpg *rdpg)
1363 struct dt_object *next;
1364 struct mdd_object *mdd = md2mdd_obj(obj);
1367 LASSERT(lu_object_exists(ctxt, mdd2lu_obj(md2mdd_obj(obj))));
1368 next = mdd_object_child(md2mdd_obj(obj));
1370 mdd_lock(ctxt, mdd, DT_READ_LOCK);
1371 if (S_ISDIR(mdd_object_type(mdd)) && dt_try_as_dir(ctxt, next))
1372 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
1375 mdd_unlock(ctxt, mdd, DT_READ_LOCK);
1379 struct md_device_operations mdd_ops = {
1380 .mdo_root_get = mdd_root_get,
1381 .mdo_statfs = mdd_statfs,
1382 .mdo_get_maxsize = mdd_get_maxsize,
1385 static struct md_dir_operations mdd_dir_ops = {
1386 .mdo_lookup = mdd_lookup,
1387 .mdo_create = mdd_create,
1388 .mdo_rename = mdd_rename,
1389 .mdo_link = mdd_link,
1390 .mdo_unlink = mdd_unlink,
1391 .mdo_name_insert = mdd_name_insert,
1392 .mdo_name_remove = mdd_name_remove,
1393 .mdo_rename_tgt = mdd_rename_tgt,
1394 .mdo_create_data = mdd_create_data
1398 static struct md_object_operations mdd_obj_ops = {
1399 .moo_attr_get = mdd_attr_get,
1400 .moo_attr_set = mdd_attr_set,
1401 .moo_xattr_get = mdd_xattr_get,
1402 .moo_xattr_set = mdd_xattr_set,
1403 .moo_xattr_list = mdd_xattr_list,
1404 .moo_xattr_del = mdd_xattr_del,
1405 .moo_object_create = mdd_object_create,
1406 .moo_ref_add = mdd_ref_add,
1407 .moo_ref_del = mdd_ref_del,
1408 .moo_open = mdd_open,
1409 .moo_close = mdd_close,
1410 .moo_readpage = mdd_readpage,
1411 .moo_readlink = mdd_readlink
1414 static struct obd_ops mdd_obd_device_ops = {
1415 .o_owner = THIS_MODULE
1418 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1419 struct lu_device_type *t,
1420 struct lustre_cfg *lcfg)
1422 struct lu_device *l;
1423 struct mdd_device *m;
1427 l = ERR_PTR(-ENOMEM);
1429 md_device_init(&m->mdd_md_dev, t);
1431 l->ld_ops = &mdd_lu_ops;
1432 m->mdd_md_dev.md_ops = &mdd_ops;
1438 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1440 struct mdd_device *m = lu2mdd_dev(lu);
1442 LASSERT(atomic_read(&lu->ld_ref) == 0);
1443 md_device_fini(&m->mdd_md_dev);
1447 static int mdd_type_init(struct lu_device_type *t)
1449 return lu_context_key_register(&mdd_thread_key);
1452 static void mdd_type_fini(struct lu_device_type *t)
1454 lu_context_key_degister(&mdd_thread_key);
1457 static struct lu_device_type_operations mdd_device_type_ops = {
1458 .ldto_init = mdd_type_init,
1459 .ldto_fini = mdd_type_fini,
1461 .ldto_device_alloc = mdd_device_alloc,
1462 .ldto_device_free = mdd_device_free,
1464 .ldto_device_init = mdd_device_init,
1465 .ldto_device_fini = mdd_device_fini
1468 static struct lu_device_type mdd_device_type = {
1469 .ldt_tags = LU_DEVICE_MD,
1470 .ldt_name = LUSTRE_MDD0_NAME,
1471 .ldt_ops = &mdd_device_type_ops,
1472 .ldt_ctx_tags = LCT_MD_THREAD
1475 static void *mdd_key_init(const struct lu_context *ctx,
1476 struct lu_context_key *key)
1478 struct mdd_thread_info *info;
1480 OBD_ALLOC_PTR(info);
1482 info = ERR_PTR(-ENOMEM);
1486 static void mdd_key_fini(const struct lu_context *ctx,
1487 struct lu_context_key *key, void *data)
1489 struct mdd_thread_info *info = data;
1493 static struct lu_context_key mdd_thread_key = {
1494 .lct_tags = LCT_MD_THREAD,
1495 .lct_init = mdd_key_init,
1496 .lct_fini = mdd_key_fini
1499 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1503 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1507 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1509 static int __init mdd_mod_init(void)
1511 struct lprocfs_static_vars lvars;
1513 lprocfs_init_vars(mdd, &lvars);
1514 return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1515 LUSTRE_MDD0_NAME, &mdd_device_type);
1518 static void __exit mdd_mod_exit(void)
1520 class_unregister_type(LUSTRE_MDD0_NAME);
1523 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1524 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1525 MODULE_LICENSE("GPL");
1527 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);