Whamcloud - gitweb
add lu_context to various operations
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <linux/obd.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_ver.h>
38 #include <linux/obd_support.h>
39 #include <linux/lprocfs_status.h>
40
41
42 #include <linux/lu_object.h>
43 #include <linux/md_object.h>
44 #include <linux/dt_object.h>
45
46 #include "mdd_internal.h"
47
48
49 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
50                                        struct mdd_device *,
51                                        struct txn_param *);
52 static void mdd_trans_stop(struct lu_context *ctxt,
53                            struct mdd_device *mdd, struct thandle *handle);
54 static struct dt_object* mdd_object_child(struct mdd_object *o);
55 static struct lu_device_operations mdd_lu_ops;
56 static void mdd_lock(struct lu_context *ctx,
57                      struct mdd_object *obj, enum dt_lock_mode mode);
58 static void mdd_unlock(struct lu_context *ctx,
59                        struct mdd_object *obj, enum dt_lock_mode mode);
60
61 static struct md_object_operations mdd_obj_ops;
62 static struct md_dir_operations    mdd_dir_ops;
63 static struct lu_object_operations mdd_lu_obj_ops;
64
65 static struct lu_context_key       mdd_thread_key;
66
67 struct mdd_thread_info {
68         struct txn_param mti_param;
69 };
70
71 static struct mdd_thread_info *mdd_ctx_info(struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static int lu_device_is_mdd(struct lu_device *d)
81 {
82         /*
83          * XXX for now. Tags in lu_device_type->ldt_something are needed.
84          */
85         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdd_lu_ops);
86 }
87
88 static struct mdd_device* lu2mdd_dev(struct lu_device *d)
89 {
90         LASSERT(lu_device_is_mdd(d));
91         return container_of0(d, struct mdd_device, mdd_md_dev.md_lu_dev);
92 }
93
94 static inline struct lu_device *mdd2lu_dev(struct mdd_device *d)
95 {
96         return (&d->mdd_md_dev.md_lu_dev);
97 }
98
99 static struct mdd_object *mdd_obj(struct lu_object *o)
100 {
101         LASSERT(lu_device_is_mdd(o->lo_dev));
102         return container_of0(o, struct mdd_object, mod_obj.mo_lu);
103 }
104
105 static struct mdd_device* mdo2mdd(struct md_object *mdo)
106 {
107         return lu2mdd_dev(mdo->mo_lu.lo_dev);
108 }
109
110 static struct mdd_object* mdo2mddo(struct md_object *mdo)
111 {
112         return container_of0(mdo, struct mdd_object, mod_obj);
113 }
114
115 static inline struct dt_device_operations *mdd_child_ops(struct mdd_device *d)
116 {
117         return d->mdd_child->dd_ops;
118 }
119
120 static struct lu_object *mdd_object_alloc(struct lu_context *ctxt,
121                                           struct lu_device *d)
122 {
123         struct mdd_object *mdo;
124         ENTRY;
125
126         OBD_ALLOC_PTR(mdo);
127         if (mdo != NULL) {
128                 struct lu_object *o;
129                 
130                 o = &mdo->mod_obj.mo_lu;
131                 lu_object_init(o, NULL, d);
132                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
133                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
134                 o->lo_ops = &mdd_lu_obj_ops;
135                 return &mdo->mod_obj.mo_lu;
136         } else
137                 return NULL;
138 }
139
140 static int mdd_object_init(struct lu_context *ctxt, struct lu_object *o)
141 {
142         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
143         struct lu_object  *below;
144         struct lu_device  *under;
145         ENTRY;
146
147         under = &d->mdd_child->dd_lu_dev;
148         below = under->ld_ops->ldo_object_alloc(ctxt, under);
149
150         if (below == NULL)
151                 RETURN(-ENOMEM);
152
153         lu_object_add(o, below);
154         RETURN(0);
155 }
156
157 static void mdd_object_free(struct lu_context *ctxt, struct lu_object *o)
158 {
159         struct lu_object_header *h;
160         struct mdd_object *mdd = mdd_obj(o);
161
162         h = o->lo_header;
163         lu_object_fini(o);
164         OBD_FREE_PTR(mdd);
165 }
166
167 static int
168 mdd_attr_get(struct lu_context *ctxt,
169              struct md_object *obj, struct lu_attr *attr)
170 {
171         struct mdd_object *mdd_obj = mdo2mddo(obj);
172         struct dt_object  *next = mdd_object_child(mdd_obj);
173         int rc;
174
175         ENTRY;
176
177         rc = next->do_ops->do_attr_get(ctxt, next, attr);
178         RETURN(rc);
179 }
180
181 static int
182 mdd_xattr_get(struct lu_context *ctxt, struct md_object *obj, void *buf,
183               int buf_len, const char *name)
184 {
185         struct mdd_object *mdd_obj = mdo2mddo(obj);
186         struct dt_object  *next = mdd_object_child(mdd_obj);
187         int rc;
188
189         ENTRY;
190
191         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
192         RETURN(rc);
193 }
194
195 static int
196 __mdd_object_destroy(struct lu_context *ctxt, struct mdd_object *obj,
197                      struct thandle *handle)
198 {
199         struct dt_object  *next = mdd_object_child(obj);
200         int rc;
201         rc = next->do_ops->do_object_destroy(ctxt, next, handle);
202         RETURN(rc);
203 }
204
205 static int mdd_add_orphan(struct mdd_device *mdd, struct mdd_object *obj,
206                           struct thandle *handle)
207 {
208         int rc = 0;
209         ENTRY;
210
211         RETURN(rc);
212 }
213
214 static int
215 open_orphan(struct mdd_object *obj)
216 {
217         return 0;
218 }
219
220 static int
221 mdd_add_unlink_log(struct mdd_device *mdd, struct mdd_object *obj,
222                    struct thandle *handle)
223 {
224         return 0;
225 }
226
227 /*
228  * number of blocks to reserve for particular operations. Should be function
229  * of ... something. Stub for now.
230  */
231 enum {
232         MDD_OBJECT_DESTROY_CREDITS = 10,
233         MDD_OBJECT_CREATE_CREDITS  = 10,
234         MDD_ATTR_SET_CREDITS       = 10,
235         MDD_XATTR_SET_CREDITS      = 10,
236         MDD_INDEX_INSERT_CREDITS   = 10,
237         MDD_INDEX_DELETE_CREDITS   = 10,
238         MDD_LINK_CREDITS           = 10,
239         MDD_RENAME_CREDITS         = 10,
240         MDD_MKDIR_CREDITS          = 10
241 };
242
243 static int
244 mdd_object_destroy(struct lu_context *ctxt, struct md_object *obj)
245 {
246         struct mdd_device *mdd = mdo2mdd(obj);
247         struct mdd_object *mdd_obj = mdo2mddo(obj);
248         struct thandle *handle;
249         int rc ;
250         ENTRY;
251
252         handle = mdd_trans_start(ctxt, mdd,
253                                  /*
254                                   * TXN_PARAM should probably go into
255                                   * lu_context_key to reduce stack
256                                   * consumption. Currently this is just one
257                                   * int, though.
258                                   */
259                                  &TXN_PARAM(MDD_OBJECT_DESTROY_CREDITS));
260         if (IS_ERR(handle))
261                 RETURN(PTR_ERR(handle));
262
263         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
264         if (open_orphan(mdd_obj))
265                 rc = mdd_add_orphan(mdd, mdd_obj, handle);
266         else {
267                 rc = __mdd_object_destroy(ctxt, mdd_obj, handle);
268                 if (rc == 0)
269                         rc = mdd_add_unlink_log(mdd, mdd_obj, handle);
270         }
271
272         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
273         mdd_trans_stop(ctxt, mdd, handle);
274         RETURN(rc);
275 }
276
277 static void mdd_object_release(struct lu_context *ctxt, struct lu_object *o)
278 {
279 }
280
281 static int mdd_object_exists(struct lu_context *ctx, struct lu_object *o)
282 {
283         return lu_object_exists(ctx, lu_object_next(o));
284 }
285
286 static int mdd_object_print(struct lu_context *ctxt,
287                             struct seq_file *f, const struct lu_object *o)
288 {
289         return seq_printf(f, LUSTRE_MDD0_NAME"-object@%p", o);
290 }
291
292 static int mdd_fs_setup(struct lu_context *ctx, struct mdd_device *mdd)
293 {
294         return 0;
295 }
296
297 static int mdd_fs_cleanup(struct mdd_device *mdd)
298 {
299         return 0;
300 }
301
302 static int mdd_device_init(struct lu_context *ctx,
303                            struct lu_device *d, struct lu_device *next)
304 {
305         struct mdd_device *mdd = lu2mdd_dev(d);
306         int rc = -EFAULT;
307
308         ENTRY;
309
310         mdd->mdd_child = lu2dt_dev(next);
311
312         rc = mdd_fs_setup(ctx, mdd);
313         if (rc)
314                 GOTO(err, rc);
315
316         RETURN(rc);
317 err:
318         mdd_fs_cleanup(mdd);
319         RETURN(rc);
320 }
321
322 static struct lu_device *mdd_device_fini(struct lu_context *ctx,
323                                          struct lu_device *d)
324 {
325         struct mdd_device *m = lu2mdd_dev(d);
326         struct lu_device *next = &m->mdd_child->dd_lu_dev;
327
328         return next;
329 }
330
331 static int mdd_process_config(struct lu_context *ctx,
332                               struct lu_device *d, struct lustre_cfg *cfg)
333 {
334         struct mdd_device *m = lu2mdd_dev(d);
335         struct lu_device *next = &m->mdd_child->dd_lu_dev;
336         int err;
337
338         switch(cfg->lcfg_command) {
339
340         default:
341                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
342         }
343
344         RETURN(err);
345 }
346
347 static struct lu_device_operations mdd_lu_ops = {
348         .ldo_object_alloc   = mdd_object_alloc,
349         .ldo_object_free    = mdd_object_free,
350         .ldo_process_config = mdd_process_config
351 };
352
353 static struct lu_object_operations mdd_lu_obj_ops = {
354         .loo_object_init    = mdd_object_init,
355         .loo_object_release = mdd_object_release,
356         .loo_object_print   = mdd_object_print,
357         .loo_object_exists  = mdd_object_exists
358 };
359
360 static struct dt_object* mdd_object_child(struct mdd_object *o)
361 {
362         return container_of0(lu_object_next(&o->mod_obj.mo_lu),
363                              struct dt_object, do_lu);
364 }
365
366 static void mdd_lock(struct lu_context *ctxt,
367                      struct mdd_object *obj, enum dt_lock_mode mode)
368 {
369         struct dt_object  *next = mdd_object_child(obj);
370
371         next->do_ops->do_object_lock(ctxt, next, mode);
372 }
373
374 static void mdd_unlock(struct lu_context *ctxt,
375                        struct mdd_object *obj, enum dt_lock_mode mode)
376 {
377         struct dt_object  *next = mdd_object_child(obj);
378
379         next->do_ops->do_object_unlock(ctxt, next, mode);
380 }
381
382 static void mdd_lock2(struct lu_context *ctxt,
383                       struct mdd_object *o0, struct mdd_object *o1)
384 {
385         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
386         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
387 }
388
389 static void mdd_unlock2(struct lu_context *ctxt,
390                         struct mdd_object *o0, struct mdd_object *o1)
391 {
392         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
393         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
394 }
395
396 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
397                                        struct mdd_device *mdd,
398                                        struct txn_param *p)
399 {
400         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
401 }
402
403 static void mdd_trans_stop(struct lu_context *ctxt,
404                            struct mdd_device *mdd, struct thandle *handle)
405 {
406         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
407 }
408
409 static int
410 __mdd_object_create(struct lu_context *ctxt, struct mdd_object *obj,
411                     struct lu_attr *attr, struct thandle *handle)
412 {
413         struct dt_object *next = mdd_object_child(obj);
414         int rc;
415         ENTRY;
416
417         rc = next->do_ops->do_object_create(ctxt, next, attr, handle);
418         /*XXX increase the refcount of the object or not?*/
419         RETURN(rc);
420 }
421
422 static int mdd_object_create(struct lu_context *ctxt, struct md_object *obj,
423                              struct lu_attr *attr)
424 {
425
426         struct mdd_device *mdd = mdo2mdd(obj);
427         struct thandle *handle;
428         int rc;
429         ENTRY;
430
431         handle = mdd_trans_start(ctxt, mdd,
432                                  &TXN_PARAM(MDD_OBJECT_CREATE_CREDITS));
433         if (IS_ERR(handle))
434                 RETURN(PTR_ERR(handle));
435
436         rc = __mdd_object_create(ctxt, mdo2mddo(obj), attr, handle);
437
438         mdd_trans_stop(ctxt, mdd, handle);
439
440         RETURN(rc);
441 }
442
443
444 static int
445 __mdd_attr_set(struct lu_context *ctxt, struct md_object *obj,
446                struct lu_attr *attr, struct thandle *handle)
447 {
448         struct dt_object *next = mdd_object_child(mdo2mddo(obj));
449         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
450 }
451
452 static int
453 mdd_attr_set(struct lu_context *ctxt,
454              struct md_object *obj, struct lu_attr *attr)
455 {
456         struct mdd_device *mdd = mdo2mdd(obj);
457         struct thandle *handle;
458         int  rc;
459         ENTRY;
460
461         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_ATTR_SET_CREDITS));
462         if (!handle)
463                 RETURN(-ENOMEM);
464
465         rc = __mdd_attr_set(ctxt, obj, attr, handle);
466
467         mdd_trans_stop(ctxt, mdd, handle);
468
469         RETURN(rc);
470 }
471
472
473
474 static int
475 __mdd_xattr_set(struct lu_context *ctxt, struct mdd_device *mdd,
476                 struct mdd_object *obj, void *buf,
477                 int buf_len, const char *name, struct thandle *handle)
478 {
479         struct dt_object *next = mdd_object_child(obj);
480         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len,
481                                           name, handle);
482 }
483
484 static int
485 mdd_xattr_set(struct lu_context *ctxt, struct md_object *obj, void *buf,
486               int buf_len, const char *name)
487 {
488         struct mdd_device *mdd = mdo2mdd(obj);
489         struct thandle *handle;
490         int  rc;
491         ENTRY;
492
493         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_XATTR_SET_CREDITS));
494         if (!handle)
495                 RETURN(-ENOMEM);
496
497         rc = __mdd_xattr_set(ctxt, mdd, mdo2mddo(obj), buf, buf_len, name,
498                              handle);
499
500         mdd_trans_stop(ctxt, mdd, handle);
501
502         RETURN(rc);
503 }
504
505 static const struct lu_fid *mdd_object_getfid(struct mdd_object *obj)
506 {
507         return lu_object_fid(&obj->mod_obj.mo_lu);
508 }
509
510 static int
511 __mdd_index_insert(struct lu_context *ctxt, struct mdd_object *pobj,
512                    const struct lu_fid *lf, const char *name,
513                    struct thandle *handle)
514 {
515         int rc;
516         struct dt_object *next = mdd_object_child(pobj);
517
518         rc = next->do_index_ops->dio_insert(ctxt, next, (struct dt_rec *)lf,
519                                             (struct dt_key *)name, handle);
520         return rc;
521 }
522
523 static int
524 __mdd_index_delete(struct lu_context *ctxt, struct mdd_device *mdd,
525                    struct mdd_object *pobj,
526                    struct mdd_object *obj, const char *name,
527                    struct thandle *handle)
528 {
529         int rc;
530         struct dt_object *next = mdd_object_child(pobj);
531         ENTRY;
532
533         mdd_lock2(ctxt, pobj, obj);
534
535         rc = next->do_index_ops->dio_delete(ctxt, next,
536                                             (const struct dt_rec *)mdd_object_getfid(obj),
537                                             (struct dt_key *)name, handle);
538         mdd_unlock2(ctxt, pobj, obj);
539
540         RETURN(rc);
541 }
542
543 static int
544 mdd_index_delete(struct lu_context *ctxt, struct md_object *pobj,
545                  struct md_object *obj, const char *name)
546 {
547         struct mdd_object *mdd_pobj = mdo2mddo(pobj);
548         struct mdd_object *mdd_obj = mdo2mddo(obj);
549         struct mdd_device *mdd = mdo2mdd(obj);
550         struct thandle *handle;
551         int rc;
552         ENTRY;
553
554         handle = mdd_trans_start(ctxt, mdd,
555                                  &TXN_PARAM(MDD_INDEX_DELETE_CREDITS));
556         if (IS_ERR(handle))
557                 RETURN(PTR_ERR(handle));
558
559         rc = __mdd_index_delete(ctxt, mdd, mdd_pobj, mdd_obj, name, handle);
560
561         mdd_trans_stop(ctxt, mdd, handle);
562
563         RETURN(rc);
564 }
565
566 static int
567 mdd_link(struct lu_context *ctxt, struct md_object *tgt_obj,
568          struct md_object *src_obj, const char *name)
569 {
570         struct mdd_object *mdd_tobj = mdo2mddo(tgt_obj);
571         struct mdd_object *mdd_sobj = mdo2mddo(src_obj);
572         struct mdd_device *mdd = mdo2mdd(src_obj);
573         struct thandle *handle;
574         int rc, nlink;
575         ENTRY;
576
577         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_LINK_CREDITS));
578         if (IS_ERR(handle))
579                 RETURN(PTR_ERR(handle));
580
581         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
582
583         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
584                                 name, handle);
585         if (rc)
586                 GOTO(exit, rc);
587
588         rc = mdd_xattr_get(ctxt, src_obj, &nlink, sizeof(nlink), "NLINK");
589         ++nlink;
590
591         rc = __mdd_xattr_set(ctxt, mdd, mdd_sobj,
592                              &nlink, sizeof(nlink), "NLINK", handle);
593 exit:
594         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
595
596         mdd_trans_stop(ctxt, mdd, handle);
597         RETURN(rc);
598 }
599
600 static void mdd_rename_lock(struct mdd_device *mdd, struct mdd_object *src_pobj,
601                             struct mdd_object *tgt_pobj, struct mdd_object *sobj,
602                             struct mdd_object *tobj)
603 {
604         return;
605 }
606
607 static void mdd_rename_unlock(struct mdd_device *mdd, struct mdd_object *src_pobj,
608                               struct mdd_object *tgt_pobj, struct mdd_object *sobj,
609                               struct mdd_object *tobj)
610 {
611         return;
612 }
613
614 static int
615 mdd_rename(struct lu_context *ctxt, struct md_object *src_pobj,
616            struct md_object *tgt_pobj, struct md_object *sobj,
617            const char *sname, struct md_object *tobj, const char *tname)
618 {
619         struct mdd_device *mdd = mdo2mdd(src_pobj);
620         struct mdd_object *mdd_spobj = mdo2mddo(src_pobj);
621         struct mdd_object *mdd_tpobj = mdo2mddo(tgt_pobj);
622         struct mdd_object *mdd_sobj = mdo2mddo(sobj);
623         struct mdd_object *mdd_tobj = mdo2mddo(tobj);
624         int rc;
625         struct thandle *handle;
626
627         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_RENAME_CREDITS));
628         if (IS_ERR(handle))
629                 RETURN(PTR_ERR(handle));
630
631         mdd_rename_lock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
632
633         rc = __mdd_index_delete(ctxt, mdd, mdd_spobj, mdd_sobj, sname, handle);
634         if (rc)
635                 GOTO(cleanup, rc);
636
637         rc = __mdd_index_delete(ctxt, mdd, mdd_tpobj, mdd_tobj, tname, handle);
638         if (rc)
639                 GOTO(cleanup, rc);
640
641         rc = __mdd_index_insert(ctxt, mdd_spobj, lu_object_fid(&tobj->mo_lu),
642                                 tname, handle);
643         if (rc)
644                 GOTO(cleanup, rc);
645
646         /*
647          * XXX nikita: huh? What is this?
648          */
649         rc = __mdd_object_destroy(ctxt, mdd_sobj, handle);
650         if (rc)
651                 GOTO(cleanup, rc);
652 cleanup:
653         mdd_rename_unlock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
654         mdd_trans_stop(ctxt, mdd, handle);
655         RETURN(rc);
656 }
657
658 static int mdd_mkdir(struct lu_context *ctxt, struct lu_attr* attr,
659                      struct md_object *pobj, const char *name,
660                      struct md_object *child)
661 {
662         struct mdd_device *mdd = mdo2mdd(pobj);
663         struct mdd_object *mdo = mdo2mddo(pobj);
664         struct thandle *handle;
665         int rc = 0;
666         ENTRY;
667
668         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_MKDIR_CREDITS));
669         if (IS_ERR(handle))
670                 RETURN(PTR_ERR(handle));
671
672         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
673
674         rc = __mdd_object_create(ctxt, mdo2mddo(child), attr, handle);
675         if (rc)
676                 GOTO(cleanup, rc);
677
678         rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
679                                 name, handle);
680         if (rc)
681                 GOTO(cleanup, rc);
682 cleanup:
683         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
684         mdd_trans_stop(ctxt, mdd, handle);
685         RETURN(rc);
686 }
687
688 static int mdd_mkname(struct lu_context *ctxt, struct md_object *pobj,
689           const char *name, const struct lu_fid *fid, struct lu_attr *attr)
690 {
691         struct mdd_device *mdd = mdo2mdd(pobj);
692         struct mdd_object *mdo = mdo2mddo(pobj);
693         struct thandle *handle;
694         int rc = 0;
695         ENTRY;
696
697         handle = mdd_trans_start(ctxt, mdd,
698                                  &TXN_PARAM(MDD_INDEX_INSERT_CREDITS));
699         if (IS_ERR(handle))
700                 RETURN(PTR_ERR(handle));
701
702         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
703
704         rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
705
706         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
707         mdd_trans_stop(ctxt, mdd, handle);
708         RETURN(rc);
709 }
710
711 static int mdd_root_get(struct lu_context *ctx,
712                         struct md_device *m, struct lu_fid *f)
713 {
714         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
715
716         ENTRY;
717         RETURN(mdd_child_ops(mdd)->dt_root_get(ctx, mdd->mdd_child, f));
718 }
719
720 static int mdd_config(struct lu_context *ctx, struct md_device *m,
721                       const char *name, void *buf, int size, int mode)
722 {
723         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
724         int rc;
725         ENTRY;
726
727         rc = mdd_child_ops(mdd)->dt_config(ctx, mdd->mdd_child,
728                                            name, buf, size, mode);
729         RETURN(rc);
730 }
731
732 static int mdd_statfs(struct lu_context *ctx,
733                       struct md_device *m, struct kstatfs *sfs) {
734         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
735         int rc;
736
737         ENTRY;
738
739         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
740
741         RETURN(rc);
742 }
743
744 struct md_device_operations mdd_ops = {
745         .mdo_root_get       = mdd_root_get,
746         .mdo_config         = mdd_config,
747         .mdo_statfs         = mdd_statfs,
748 };
749
750 static struct md_dir_operations mdd_dir_ops = {
751         .mdo_mkdir         = mdd_mkdir,
752         .mdo_rename        = mdd_rename,
753         .mdo_link          = mdd_link,
754         .mdo_name_insert   = mdd_mkname
755 };
756
757 static struct md_object_operations mdd_obj_ops = {
758         .moo_attr_get      = mdd_attr_get,
759         .moo_attr_set      = mdd_attr_set,
760         .moo_xattr_get     = mdd_xattr_get,
761         .moo_xattr_set     = mdd_xattr_set,
762         .moo_object_create  = mdd_object_create
763 };
764
765 static struct obd_ops mdd_obd_device_ops = {
766         .o_owner = THIS_MODULE
767 };
768
769 struct lu_device *mdd_device_alloc(struct lu_device_type *t,
770                                    struct lustre_cfg *lcfg)
771 {
772         struct lu_device  *l;
773         struct mdd_device *m;
774
775         OBD_ALLOC_PTR(m);
776         if (m == NULL) {
777                 l = ERR_PTR(-ENOMEM);
778         } else {
779                 md_device_init(&m->mdd_md_dev, t);
780                 l = mdd2lu_dev(m);
781                 l->ld_ops = &mdd_lu_ops;
782                 m->mdd_md_dev.md_ops = &mdd_ops;
783         }
784
785         return l;
786 }
787
788 static void mdd_device_free(struct lu_device *lu)
789 {
790         struct mdd_device *m = lu2mdd_dev(lu);
791
792         LASSERT(atomic_read(&lu->ld_ref) == 0);
793         md_device_fini(&m->mdd_md_dev);
794
795         OBD_FREE_PTR(m);
796 }
797
798 static int mdd_type_init(struct lu_device_type *t)
799 {
800         return lu_context_key_register(&mdd_thread_key);
801 }
802
803 static void mdd_type_fini(struct lu_device_type *t)
804 {
805         lu_context_key_degister(&mdd_thread_key);
806 }
807
808 static struct lu_device_type_operations mdd_device_type_ops = {
809         .ldto_init = mdd_type_init,
810         .ldto_fini = mdd_type_fini,
811
812         .ldto_device_alloc = mdd_device_alloc,
813         .ldto_device_free  = mdd_device_free,
814
815         .ldto_device_init    = mdd_device_init,
816         .ldto_device_fini    = mdd_device_fini
817 };
818
819 static struct lu_device_type mdd_device_type = {
820         .ldt_tags = LU_DEVICE_MD,
821         .ldt_name = LUSTRE_MDD0_NAME,
822         .ldt_ops  = &mdd_device_type_ops
823 };
824
825 static void *mdd_key_init(struct lu_context *ctx)
826 {
827         struct mdd_thread_info *info;
828
829         OBD_ALLOC_PTR(info);
830         if (info == NULL)
831                 info = ERR_PTR(-ENOMEM);
832         return info;
833 }
834
835 static void mdd_key_fini(struct lu_context *ctx, void *data)
836 {
837         struct mdd_thread_info *info = data;
838         OBD_FREE_PTR(info);
839 }
840
841 static struct lu_context_key mdd_thread_key = {
842         .lct_init = mdd_key_init,
843         .lct_fini = mdd_key_fini
844 };
845
846 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
847         { 0 }
848 };
849
850 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
851         { 0 }
852 };
853
854 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
855
856 static int __init mdd_mod_init(void)
857 {
858         struct lprocfs_static_vars lvars;
859
860         lprocfs_init_vars(mdd, &lvars);
861         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
862                                    LUSTRE_MDD0_NAME, &mdd_device_type);
863 }
864
865 static void __exit mdd_mod_exit(void)
866 {
867         class_unregister_type(LUSTRE_MDD0_NAME);
868 }
869
870 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
871 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
872 MODULE_LICENSE("GPL");
873
874 cfs_module(mdd, "0.0.2", mdd_mod_init, mdd_mod_exit);