Whamcloud - gitweb
fix wrong parameter in mdd_object_create
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <linux/obd.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_ver.h>
38 #include <linux/obd_support.h>
39 #include <linux/lprocfs_status.h>
40
41
42 #include <linux/lu_object.h>
43 #include <linux/md_object.h>
44 #include <linux/dt_object.h>
45
46 #include "mdd_internal.h"
47
48
49 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
50                                        struct mdd_device *, struct txn_param *);
51 static void mdd_trans_stop(struct lu_context *ctxt,
52                            struct mdd_device *mdd, struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static struct lu_device_operations mdd_lu_ops;
55 static void mdd_lock(struct lu_context *ctx,
56                      struct mdd_object *obj, enum dt_lock_mode mode);
57 static void mdd_unlock(struct lu_context *ctx,
58                        struct mdd_object *obj, enum dt_lock_mode mode);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static int lu_device_is_mdd(struct lu_device *d)
65 {
66         /*
67          * XXX for now. Tags in lu_device_type->ldt_something are needed.
68          */
69         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdd_lu_ops);
70 }
71
72 static struct mdd_device* lu2mdd_dev(struct lu_device *d)
73 {
74         LASSERT(lu_device_is_mdd(d));
75         return container_of0(d, struct mdd_device, mdd_md_dev.md_lu_dev);
76 }
77
78 static inline struct lu_device *mdd2lu_dev(struct mdd_device *d)
79 {
80         return (&d->mdd_md_dev.md_lu_dev);
81 }
82
83 static struct mdd_object *mdd_obj(struct lu_object *o)
84 {
85         LASSERT(lu_device_is_mdd(o->lo_dev));
86         return container_of0(o, struct mdd_object, mod_obj.mo_lu);
87 }
88
89 static struct mdd_device* mdo2mdd(struct md_object *mdo)
90 {
91         return lu2mdd_dev(mdo->mo_lu.lo_dev);
92 }
93
94 static struct mdd_object* mdo2mddo(struct md_object *mdo)
95 {
96         return container_of0(mdo, struct mdd_object, mod_obj);
97 }
98
99 static inline struct dt_device_operations *mdd_child_ops(struct mdd_device *d)
100 {
101         return d->mdd_child->dd_ops;
102 }
103
104 static struct lu_object *mdd_object_alloc(struct lu_context *ctxt,
105                                           struct lu_device *d)
106 {
107         struct mdd_object *mdo;
108         ENTRY;
109
110         OBD_ALLOC_PTR(mdo);
111         if (mdo != NULL) {
112                 struct lu_object *o;
113                 
114                 o = &mdo->mod_obj.mo_lu;
115                 lu_object_init(o, NULL, d);
116                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
117                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
118                 o->lo_ops = &mdd_lu_obj_ops;
119                 return &mdo->mod_obj.mo_lu;
120         } else
121                 return NULL;
122 }
123
124 static int mdd_object_init(struct lu_context *ctxt, struct lu_object *o)
125 {
126         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
127         struct lu_object  *below;
128         struct lu_device  *under;
129         ENTRY;
130
131         under = &d->mdd_child->dd_lu_dev;
132         below = under->ld_ops->ldo_object_alloc(ctxt, under);
133
134         if (below == NULL)
135                 RETURN(-ENOMEM);
136
137         lu_object_add(o, below);
138         RETURN(0);
139 }
140
141 static void mdd_object_free(struct lu_context *ctxt, struct lu_object *o)
142 {
143         struct lu_object_header *h;
144         struct mdd_object *mdd = mdd_obj(o);
145
146         h = o->lo_header;
147         lu_object_fini(o);
148         OBD_FREE_PTR(mdd);
149 }
150
151 static int
152 mdd_attr_get(struct lu_context *ctxt,
153              struct md_object *obj, struct lu_attr *attr)
154 {
155         struct mdd_object *mdd_obj = mdo2mddo(obj);
156         struct dt_object  *next = mdd_object_child(mdd_obj);
157         int rc;
158
159         ENTRY;
160
161         rc = next->do_ops->do_attr_get(ctxt, next, attr);
162         RETURN(rc);
163 }
164
165 static int
166 mdd_xattr_get(struct lu_context *ctxt, struct md_object *obj, void *buf,
167               int buf_len, const char *name)
168 {
169         struct mdd_object *mdd_obj = mdo2mddo(obj);
170         struct dt_object  *next = mdd_object_child(mdd_obj);
171         int rc;
172
173         ENTRY;
174
175         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
176         RETURN(rc);
177 }
178
179 static int
180 __mdd_object_destroy(struct lu_context *ctxt, struct mdd_object *obj,
181                      struct thandle *handle)
182 {
183         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
184         int rc;
185         rc = mdd_child_ops(mdd)->dt_object_destroy(ctxt, mdd_object_child(obj),
186                                                    handle);
187         RETURN(rc);
188 }
189
190 static int mdd_add_orphan(struct mdd_device *mdd, struct mdd_object *obj,
191                           struct thandle *handle)
192 {
193         int rc = 0;
194         ENTRY;
195
196         RETURN(rc);
197 }
198
199 static int
200 open_orphan(struct mdd_object *obj)
201 {
202         return 0;
203 }
204
205 static int
206 mdd_add_unlink_log(struct mdd_device *mdd, struct mdd_object *obj,
207                    struct thandle *handle)
208 {
209         return 0;
210 }
211
212 /*
213  * number of blocks to reserve for particular operations. Should be function
214  * of ... something. Stub for now.
215  */
216 enum {
217         MDD_OBJECT_DESTROY_CREDITS = 10,
218         MDD_OBJECT_CREATE_CREDITS  = 10,
219         MDD_ATTR_SET_CREDITS       = 10,
220         MDD_XATTR_SET_CREDITS      = 10,
221         MDD_INDEX_INSERT_CREDITS   = 10,
222         MDD_INDEX_DELETE_CREDITS   = 10,
223         MDD_LINK_CREDITS           = 10,
224         MDD_RENAME_CREDITS         = 10,
225         MDD_MKDIR_CREDITS          = 10
226 };
227
228 static int
229 mdd_object_destroy(struct lu_context *ctxt, struct md_object *obj)
230 {
231         struct mdd_device *mdd = mdo2mdd(obj);
232         struct mdd_object *mdd_obj = mdo2mddo(obj);
233         struct thandle *handle;
234         int rc ;
235         ENTRY;
236
237         handle = mdd_trans_start(ctxt, mdd,
238                                  &TXN_PARAM(MDD_OBJECT_DESTROY_CREDITS));
239         if (IS_ERR(handle))
240                 RETURN(PTR_ERR(handle));
241
242         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
243         if (open_orphan(mdd_obj))
244                 rc = mdd_add_orphan(mdd, mdd_obj, handle);
245         else {
246                 rc = __mdd_object_destroy(ctxt, mdd_obj, handle);
247                 if (rc == 0)
248                         rc = mdd_add_unlink_log(mdd, mdd_obj, handle);
249         }
250
251         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
252         mdd_trans_stop(ctxt, mdd, handle);
253         RETURN(rc);
254 }
255
256 static void mdd_object_release(struct lu_context *ctxt, struct lu_object *o)
257 {
258 }
259
260 static int mdd_object_exists(struct lu_context *ctx, struct lu_object *o)
261 {
262         struct lu_object *next = lu_object_next(o);
263
264         return next->lo_ops->loo_object_exists(ctx, next);
265 }
266
267 static int mdd_object_print(struct lu_context *ctxt,
268                             struct seq_file *f, const struct lu_object *o)
269 {
270         return seq_printf(f, LUSTRE_MDD0_NAME"-object@%p", o);
271 }
272
273 static int mdd_fs_setup(struct mdd_device *mdd)
274 {
275         return 0;
276 }
277
278 static int mdd_fs_cleanup(struct mdd_device *mdd)
279 {
280         return 0;
281 }
282
283 static int mdd_device_init(struct lu_device *d, struct lu_device *next)
284 {
285         struct mdd_device *mdd = lu2mdd_dev(d);
286         int rc = -EFAULT;
287
288         ENTRY;
289
290         mdd->mdd_child = lu2dt_dev(next);
291
292         rc = mdd_fs_setup(mdd);
293         if (rc)
294                 GOTO(err, rc);
295
296         RETURN(rc);
297 err:
298         mdd_fs_cleanup(mdd);
299         RETURN(rc);
300 }
301
302 static struct lu_device *mdd_device_fini(struct lu_device *d)
303 {
304         struct mdd_device *m = lu2mdd_dev(d);
305         struct lu_device *next = &m->mdd_child->dd_lu_dev;
306
307         return next;
308 }
309
310 static int mdd_process_config(struct lu_device *d, struct lustre_cfg *cfg)
311 {
312         struct mdd_device *m = lu2mdd_dev(d);
313         struct lu_device *next = &m->mdd_child->dd_lu_dev;
314         int err;
315
316         switch(cfg->lcfg_command) {
317
318         default:
319                 err = next->ld_ops->ldo_process_config(next, cfg);
320         }
321
322         RETURN(err);
323 }
324
325 static struct lu_device_operations mdd_lu_ops = {
326         .ldo_object_alloc   = mdd_object_alloc,
327         .ldo_object_free    = mdd_object_free,
328         .ldo_process_config = mdd_process_config
329 };
330
331 static struct lu_object_operations mdd_lu_obj_ops = {
332         .loo_object_init    = mdd_object_init,
333         .loo_object_release = mdd_object_release,
334         .loo_object_print   = mdd_object_print,
335         .loo_object_exists  = mdd_object_exists
336 };
337
338 static struct dt_object* mdd_object_child(struct mdd_object *o)
339 {
340         return container_of0(lu_object_next(&o->mod_obj.mo_lu),
341                              struct dt_object, do_lu);
342 }
343
344 static void mdd_lock(struct lu_context *ctxt,
345                      struct mdd_object *obj, enum dt_lock_mode mode)
346 {
347         struct dt_object  *next = mdd_object_child(obj);
348
349         next->do_ops->do_object_lock(ctxt, next, mode);
350 }
351
352 static void mdd_unlock(struct lu_context *ctxt,
353                        struct mdd_object *obj, enum dt_lock_mode mode)
354 {
355         struct dt_object  *next = mdd_object_child(obj);
356
357         next->do_ops->do_object_unlock(ctxt, next, mode);
358 }
359
360 static void mdd_lock2(struct lu_context *ctxt,
361                       struct mdd_object *o0, struct mdd_object *o1)
362 {
363         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
364         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
365 }
366
367 static void mdd_unlock2(struct lu_context *ctxt,
368                         struct mdd_object *o0, struct mdd_object *o1)
369 {
370         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
371         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
372 }
373
374 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
375                                        struct mdd_device *mdd,
376                                        struct txn_param *p)
377 {
378         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
379 }
380
381 static void mdd_trans_stop(struct lu_context *ctxt,
382                            struct mdd_device *mdd, struct thandle *handle)
383 {
384         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
385 }
386
387 static int
388 __mdd_object_create(struct lu_context *ctxt, struct mdd_object *obj,
389                     struct thandle *handle)
390 {
391         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
392         int rc;
393         ENTRY;
394
395         rc = mdd_child_ops(mdd)->dt_object_create(ctxt, mdd_object_child(obj),
396                                                   handle);
397         /*XXX increase the refcount of the object or not?*/
398         RETURN(rc);
399 }
400
401 static int mdd_object_create(struct lu_context *ctxt, struct md_object *obj)
402 {
403         
404         struct mdd_device *mdd = mdo2mdd(obj);
405         struct thandle *handle;
406         int rc;
407         ENTRY;
408
409         handle = mdd_trans_start(ctxt, mdd,
410                                  &TXN_PARAM(MDD_OBJECT_CREATE_CREDITS));
411         if (IS_ERR(handle))
412                 RETURN(PTR_ERR(handle));
413
414         rc = __mdd_object_create(ctxt, mdo2mddo(obj), handle);
415
416         mdd_trans_stop(ctxt, mdd, handle);
417
418         RETURN(rc);
419 }
420
421
422 static int
423 __mdd_attr_set(struct lu_context *ctxt, struct md_object *obj,
424                struct lu_attr *attr, struct thandle *handle)
425 {
426         struct dt_object *next = mdd_object_child(mdo2mddo(obj));
427         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
428 }
429
430 static int
431 mdd_attr_set(struct lu_context *ctxt,
432              struct md_object *obj, struct lu_attr *attr)
433 {
434         struct mdd_device *mdd = mdo2mdd(obj);
435         struct thandle *handle;
436         int  rc;
437         ENTRY;
438
439         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_ATTR_SET_CREDITS));
440         if (!handle)
441                 RETURN(-ENOMEM);
442
443         rc = __mdd_attr_set(ctxt, obj, attr, handle);
444
445         mdd_trans_stop(ctxt, mdd, handle);
446
447         RETURN(rc);
448 }
449
450
451
452 static int
453 __mdd_xattr_set(struct lu_context *ctxt, struct mdd_device *mdd,
454                 struct mdd_object *obj, void *buf,
455                 int buf_len, const char *name, struct thandle *handle)
456 {
457         struct dt_object *next = mdd_object_child(obj);
458         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len,
459                                           name, handle);
460 }
461
462 static int
463 mdd_xattr_set(struct lu_context *ctxt, struct md_object *obj, void *buf,
464               int buf_len, const char *name)
465 {
466         struct mdd_device *mdd = mdo2mdd(obj);
467         struct thandle *handle;
468         int  rc;
469         ENTRY;
470
471         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_XATTR_SET_CREDITS));
472         if (!handle)
473                 RETURN(-ENOMEM);
474
475         rc = __mdd_xattr_set(ctxt, mdd, mdo2mddo(obj), buf, buf_len, name,
476                              handle);
477
478         mdd_trans_stop(ctxt, mdd, handle);
479
480         RETURN(rc);
481 }
482
483 static struct lu_fid *mdd_object_getfid(struct mdd_object *obj)
484 {
485         return lu_object_fid(&obj->mod_obj.mo_lu);
486 }
487
488 static int
489 __mdd_index_insert(struct lu_context *ctxt, struct mdd_device *mdd,
490                    struct mdd_object *pobj,
491                    struct mdd_object *obj, const char *name,
492                    struct thandle *handle)
493 {
494         int rc;
495         struct dt_object *next = mdd_object_child(pobj);
496         ENTRY;
497
498         mdd_lock2(ctxt, pobj, obj);
499
500         rc = next->do_index_ops->dio_index_insert(ctxt, next,
501                                                   mdd_object_getfid(obj),
502                                                   name, handle);
503         mdd_unlock2(ctxt, pobj, obj);
504
505         RETURN(rc);
506 }
507
508 static int
509 mdd_index_insert(struct lu_context *ctxt, struct md_object *pobj,
510                  struct md_object *obj, const char *name)
511 {
512         struct mdd_device *mdd = mdo2mdd(pobj);
513         int rc;
514         struct thandle *handle;
515         ENTRY;
516
517         handle = mdd_trans_start(ctxt, mdd,
518                                  &TXN_PARAM(MDD_INDEX_INSERT_CREDITS));
519         if (IS_ERR(handle))
520                 RETURN(PTR_ERR(handle));
521
522         rc = __mdd_index_insert(ctxt, mdd, mdo2mddo(pobj), mdo2mddo(obj),
523                                 name, handle);
524
525         mdd_trans_stop(ctxt, mdd, handle);
526         RETURN(rc);
527 }
528
529 static int
530 __mdd_index_delete(struct lu_context *ctxt, struct mdd_device *mdd,
531                    struct mdd_object *pobj,
532                    struct mdd_object *obj, const char *name,
533                    struct thandle *handle)
534 {
535         int rc;
536         struct dt_object *next = mdd_object_child(pobj);
537         ENTRY;
538
539         mdd_lock2(ctxt, pobj, obj);
540
541         rc = next->do_index_ops->dio_index_delete(ctxt, next,
542                                                   mdd_object_getfid(obj),
543                                                   name, handle);
544         mdd_unlock2(ctxt, pobj, obj);
545
546         RETURN(rc);
547 }
548
549 static int
550 mdd_index_delete(struct lu_context *ctxt, struct md_object *pobj,
551                  struct md_object *obj, const char *name)
552 {
553         struct mdd_object *mdd_pobj = mdo2mddo(pobj);
554         struct mdd_object *mdd_obj = mdo2mddo(obj);
555         struct mdd_device *mdd = mdo2mdd(obj);
556         struct thandle *handle;
557         int rc;
558         ENTRY;
559
560         handle = mdd_trans_start(ctxt, mdd,
561                                  &TXN_PARAM(MDD_INDEX_DELETE_CREDITS));
562         if (IS_ERR(handle))
563                 RETURN(PTR_ERR(handle));
564
565         rc = __mdd_index_delete(ctxt, mdd, mdd_pobj, mdd_obj, name, handle);
566
567         mdd_trans_stop(ctxt, mdd, handle);
568
569         RETURN(rc);
570 }
571
572 static int
573 mdd_link(struct lu_context *ctxt, struct md_object *tgt_obj,
574          struct md_object *src_obj, const char *name)
575 {
576         struct mdd_object *mdd_tobj = mdo2mddo(tgt_obj);
577         struct mdd_object *mdd_sobj = mdo2mddo(src_obj);
578         struct mdd_device *mdd = mdo2mdd(src_obj);
579         struct thandle *handle;
580         int rc, nlink;
581         ENTRY;
582
583         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_LINK_CREDITS));
584         if (IS_ERR(handle))
585                 RETURN(PTR_ERR(handle));
586
587         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
588
589         rc = __mdd_index_insert(ctxt, mdd, mdd_tobj, mdd_sobj, name, handle);
590         if (rc)
591                 GOTO(exit, rc);
592
593         rc = mdd_xattr_get(ctxt, src_obj, &nlink, sizeof(nlink), "NLINK");
594         ++nlink;
595
596         rc = __mdd_xattr_set(ctxt, mdd, mdd_sobj,
597                              &nlink, sizeof(nlink), "NLINK", handle);
598 exit:
599         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
600
601         mdd_trans_stop(ctxt, mdd, handle);
602         RETURN(rc);
603 }
604
605 static void mdd_rename_lock(struct mdd_device *mdd, struct mdd_object *src_pobj,
606                             struct mdd_object *tgt_pobj, struct mdd_object *sobj,
607                             struct mdd_object *tobj)
608 {
609         return;
610 }
611
612 static void mdd_rename_unlock(struct mdd_device *mdd, struct mdd_object *src_pobj,
613                               struct mdd_object *tgt_pobj, struct mdd_object *sobj,
614                               struct mdd_object *tobj)
615 {
616         return;
617 }
618
619 static int
620 mdd_rename(struct lu_context *ctxt, struct md_object *src_pobj,
621            struct md_object *tgt_pobj,
622            struct md_object *sobj, const char *sname, struct md_object *tobj,
623            const char *tname)
624 {
625         struct mdd_device *mdd = mdo2mdd(src_pobj);
626         struct mdd_object *mdd_spobj = mdo2mddo(src_pobj);
627         struct mdd_object *mdd_tpobj = mdo2mddo(tgt_pobj);
628         struct mdd_object *mdd_sobj = mdo2mddo(sobj);
629         struct mdd_object *mdd_tobj = mdo2mddo(tobj);
630         int rc;
631         struct thandle *handle;
632
633         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_RENAME_CREDITS));
634         if (IS_ERR(handle))
635                 RETURN(PTR_ERR(handle));
636
637         mdd_rename_lock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
638
639         rc = __mdd_index_delete(ctxt, mdd, mdd_spobj, mdd_sobj, sname, handle);
640         if (rc)
641                 GOTO(cleanup, rc);
642
643         rc = __mdd_index_delete(ctxt, mdd, mdd_tpobj, mdd_tobj, tname, handle);
644         if (rc)
645                 GOTO(cleanup, rc);
646
647         rc = __mdd_index_insert(ctxt, mdd, mdd_spobj, mdd_tobj, tname, handle);
648         if (rc)
649                 GOTO(cleanup, rc);
650
651         /*
652          * XXX nikita: huh? What is this?
653          */
654         rc = __mdd_object_destroy(ctxt, mdd_sobj, handle);
655         if (rc)
656                 GOTO(cleanup, rc);
657 cleanup:
658         mdd_rename_unlock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
659         mdd_trans_stop(ctxt, mdd, handle);
660         RETURN(rc);
661 }
662
663 static int
664 mdd_mkdir(struct lu_context *ctxt, struct lu_attr* attr, struct md_object *pobj,
665           const char *name, struct md_object *child)
666 {
667         struct mdd_device *mdd = mdo2mdd(pobj);
668         struct thandle *handle;
669         int rc = 0;
670         ENTRY;
671
672         handle = mdd_trans_start(ctxt, mdd, &TXN_PARAM(MDD_MKDIR_CREDITS));
673         if (IS_ERR(handle))
674                 RETURN(PTR_ERR(handle));
675
676         mdd_lock(ctxt, mdo2mddo(pobj), DT_WRITE_LOCK);
677
678         rc = __mdd_object_create(ctxt, mdo2mddo(child), handle);
679         if (rc)
680                 GOTO(cleanup, rc);
681
682         rc = __mdd_index_insert(ctxt, mdd, mdo2mddo(pobj), mdo2mddo(child),
683                                 name, handle);
684         if (rc)
685                 GOTO(cleanup, rc);
686 cleanup:
687         mdd_unlock(ctxt, mdo2mddo(pobj), DT_WRITE_LOCK);
688         mdd_trans_stop(ctxt, mdd, handle);
689         RETURN(rc);
690 }
691
692 static int mdd_mkname(struct lu_context *ctxt, struct md_object *pobj,
693           const char *name, struct lu_fid *fid, struct lu_attr *attr)
694 {
695         struct mdd_device *mdd = mdo2mdd(pobj);
696         struct thandle *handle;
697         int rc = 0;
698         ENTRY;
699
700         handle = mdd_trans_start(ctxt, mdd,
701                                  &TXN_PARAM(MDD_INDEX_INSERT_CREDITS));
702         if (IS_ERR(handle))
703                 RETURN(PTR_ERR(handle));
704
705         mdd_lock(ctxt, mdo2mddo(pobj), DT_WRITE_LOCK);
706
707 #if 0
708         rc = __mdd_index_insert(ctxt, mdd, mdo2mddo(pobj), mdo2mddo(child),
709                                 name, handle);
710 #endif 
711         rc = -EOPNOTSUPP;
712
713         mdd_unlock(ctxt, mdo2mddo(pobj), DT_WRITE_LOCK);
714         mdd_trans_stop(ctxt, mdd, handle);
715         RETURN(rc);
716 }
717
718 static int mdd_root_get(struct lu_context *ctx,
719                         struct md_device *m, struct lu_fid *f)
720 {
721         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
722
723         ENTRY;
724         RETURN(mdd_child_ops(mdd)->dt_root_get(ctx, mdd->mdd_child, f));
725 }
726
727 static int mdd_config(struct lu_context *ctx, struct md_device *m,
728                       const char *name, void *buf, int size, int mode)
729 {
730         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
731         int rc;
732         ENTRY;
733
734         rc = mdd_child_ops(mdd)->dt_config(ctx, mdd->mdd_child,
735                                            name, buf, size, mode);
736         RETURN(rc);
737 }
738
739 static int mdd_statfs(struct lu_context *ctx,
740                       struct md_device *m, struct kstatfs *sfs) {
741         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
742         int rc;
743
744         ENTRY;
745
746         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
747
748         RETURN(rc);
749 }
750
751 struct md_device_operations mdd_ops = {
752         .mdo_root_get       = mdd_root_get,
753         .mdo_config         = mdd_config,
754         .mdo_statfs         = mdd_statfs,
755         .mdo_object_create  = mdd_object_create
756 };
757
758 static struct md_dir_operations mdd_dir_ops = {
759         .mdo_mkdir         = mdd_mkdir,
760         .mdo_rename        = mdd_rename,
761         .mdo_link          = mdd_link,
762         .mdo_name_insert   = mdd_mkname
763 };
764
765 static struct md_object_operations mdd_obj_ops = {
766         .moo_attr_get      = mdd_attr_get,
767         .moo_attr_set      = mdd_attr_set,
768         .moo_xattr_get     = mdd_xattr_get,
769         .moo_xattr_set     = mdd_xattr_set,
770 };
771
772 static struct obd_ops mdd_obd_device_ops = {
773         .o_owner = THIS_MODULE
774 };
775
776 struct lu_device *mdd_device_alloc(struct lu_device_type *t,
777                                    struct lustre_cfg *lcfg)
778 {
779         struct lu_device  *l;
780         struct mdd_device *m;
781
782         OBD_ALLOC_PTR(m);
783         if (m == NULL) {
784                 l = ERR_PTR(-ENOMEM);
785         } else {
786                 md_device_init(&m->mdd_md_dev, t);
787                 l = mdd2lu_dev(m);
788                 l->ld_ops = &mdd_lu_ops;
789                 m->mdd_md_dev.md_ops = &mdd_ops;
790         }
791
792         return l;
793 }
794
795 void mdd_device_free(struct lu_device *lu)
796 {
797         struct mdd_device *m = lu2mdd_dev(lu);
798
799         LASSERT(atomic_read(&lu->ld_ref) == 0);
800         md_device_fini(&m->mdd_md_dev);
801
802         OBD_FREE_PTR(m);
803 }
804
805 int mdd_type_init(struct lu_device_type *t)
806 {
807         return 0;
808 }
809
810 void mdd_type_fini(struct lu_device_type *t)
811 {
812 }
813
814 static struct lu_device_type_operations mdd_device_type_ops = {
815         .ldto_init = mdd_type_init,
816         .ldto_fini = mdd_type_fini,
817
818         .ldto_device_alloc = mdd_device_alloc,
819         .ldto_device_free  = mdd_device_free,
820
821         .ldto_device_init    = mdd_device_init,
822         .ldto_device_fini    = mdd_device_fini
823 };
824
825 static struct lu_device_type mdd_device_type = {
826         .ldt_tags = LU_DEVICE_MD,
827         .ldt_name = LUSTRE_MDD0_NAME,
828         .ldt_ops  = &mdd_device_type_ops
829 };
830
831 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
832         { 0 }
833 };
834
835 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
836         { 0 }
837 };
838
839 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
840
841 static int __init mdd_mod_init(void)
842 {
843         struct lprocfs_static_vars lvars;
844
845         lprocfs_init_vars(mdd, &lvars);
846         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
847                                    LUSTRE_MDD0_NAME, &mdd_device_type);
848 }
849
850 static void __exit mdd_mod_exit(void)
851 {
852         class_unregister_type(LUSTRE_MDD0_NAME);
853 }
854
855 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
856 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
857 MODULE_LICENSE("GPL");
858
859 cfs_module(mdd, "0.0.2", mdd_mod_init, mdd_mod_exit);