Whamcloud - gitweb
xattr support in md/dt stack
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51                            struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54                      struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56                        struct mdd_object *obj, enum dt_lock_mode mode);
57 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58                           struct thandle *handle);
59 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60                           struct thandle *handle, struct md_attr *);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62                       const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations    mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
66
67 static struct lu_context_key       mdd_thread_key;
68
69 static const char *mdd_root_dir_name = "root";
70 static const char dot[] = ".";
71 static const char dotdot[] = "..";
72
73
74 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
75 {
76         struct mdd_thread_info *info;
77
78         info = lu_context_key_get(ctx, &mdd_thread_key);
79         LASSERT(info != NULL);
80         return info;
81 }
82
83 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
84                                           const struct lu_object_header *hdr,
85                                           struct lu_device *d)
86 {
87         struct mdd_object *mdo;
88
89         OBD_ALLOC_PTR(mdo);
90         if (mdo != NULL) {
91                 struct lu_object *o;
92                 
93                 o = mdd2lu_obj(mdo);
94                 lu_object_init(o, NULL, d);
95                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
96                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
97                 o->lo_ops = &mdd_lu_obj_ops;
98                 return o;
99         } else {
100                 return NULL;
101         }
102 }
103
104 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
105 {
106         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
107         struct lu_object  *below;
108         struct lu_device  *under;
109         ENTRY;
110
111         under = &d->mdd_child->dd_lu_dev;
112         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
113
114         if (below == NULL)
115                 RETURN(-ENOMEM);
116
117         lu_object_add(o, below);
118         RETURN(0);
119 }
120
121 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         struct mdd_object *mdd = lu2mdd_obj(o);
124         
125         lu_object_fini(o);
126         OBD_FREE_PTR(mdd);
127 }
128
129 static int mdd_attr_get(const struct lu_context *ctxt,
130                         struct md_object *obj, struct lu_attr *attr)
131 {
132         struct mdd_object *mdd_obj = md2mdd_obj(obj);
133         struct dt_object  *next;
134         int rc;
135
136         ENTRY;
137
138         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
139
140         next = mdd_object_child(mdd_obj);
141         rc = next->do_ops->do_attr_get(ctxt, next, attr);
142
143         RETURN(rc);
144 }
145
146 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
147                          void *buf, int buf_len, const char *name)
148 {
149         struct mdd_object *mdd_obj = md2mdd_obj(obj);
150         struct dt_object  *next;
151         int rc;
152
153         ENTRY;
154
155         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
156
157         next = mdd_object_child(mdd_obj);
158         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
159
160         RETURN(rc);
161 }
162
163 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
164                           void *buf, int buf_len)
165 {
166         struct mdd_object *mdd_obj = md2mdd_obj(obj);
167         struct dt_object  *next;
168         int rc;
169
170         ENTRY;
171
172         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
173
174         next = mdd_object_child(mdd_obj);
175         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
176
177         RETURN(rc);
178 }
179
180 enum mdd_txn_op {
181         MDD_TXN_OBJECT_DESTROY_OP,
182         MDD_TXN_OBJECT_CREATE_OP,
183         MDD_TXN_ATTR_SET_OP,
184         MDD_TXN_XATTR_SET_OP,
185         MDD_TXN_INDEX_INSERT_OP,
186         MDD_TXN_INDEX_DELETE_OP,
187         MDD_TXN_LINK_OP,
188         MDD_TXN_UNLINK_OP,
189         MDD_TXN_RENAME_OP,
190         MDD_TXN_MKDIR_OP
191 };
192
193 struct mdd_txn_op_descr {
194         enum mdd_txn_op mod_op;
195         unsigned int    mod_credits;
196 };
197
198 enum {
199         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
200         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
201         MDD_TXN_ATTR_SET_CREDITS       = 20,
202         MDD_TXN_XATTR_SET_CREDITS      = 20,
203         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
204         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
205         MDD_TXN_LINK_CREDITS           = 20,
206         MDD_TXN_UNLINK_CREDITS         = 20,
207         MDD_TXN_RENAME_CREDITS         = 20,
208         MDD_TXN_MKDIR_CREDITS          = 20
209 };
210
211 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
212 static const struct mdd_txn_op_descr opname = { \
213         .mod_op      = opname ## _OP,           \
214         .mod_credits = opname ## _CREDITS,      \
215 }
216
217 /*
218  * number of blocks to reserve for particular operations. Should be function
219  * of ... something. Stub for now.
220  */
221 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
222 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
223 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
224 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
225 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
226 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
227 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
228 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
229 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
230 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
231
232 static void mdd_txn_param_build(const struct lu_context *ctx,
233                                 const struct mdd_txn_op_descr *opd)
234 {
235         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
236 }
237
238 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
239                             lu_printer_t p, const struct lu_object *o)
240 {
241         return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
242 }
243
244 static int mdd_object_exists(const struct lu_context *ctx,
245                              const struct lu_object *o)
246 {
247         return lu_object_exists(ctx, lu_object_next(o));
248 }
249
250 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
251 {
252         int rc;
253         struct dt_object *root;
254         ENTRY;
255
256         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
257                              &mdd->mdd_root_fid);
258         if (!IS_ERR(root)) {
259                 LASSERT(root != NULL);
260                 lu_object_put(ctx, &root->do_lu);
261                 rc = 0;
262         } else
263                 rc = PTR_ERR(root);
264         RETURN(rc);
265 }
266
267 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
268 {
269         /*create PENDING and OBJECTS dir for open and llog*/
270         return 0;
271 }
272
273 static int mdd_fs_cleanup(struct mdd_device *mdd)
274 {
275         /*create PENDING and OBJECTS dir for open and llog*/
276         return 0;
277 }
278
279 static int mdd_device_init(const struct lu_context *ctx,
280                            struct lu_device *d, struct lu_device *next)
281 {
282         struct mdd_device *mdd = lu2mdd_dev(d);
283         int rc;
284         ENTRY;
285
286         mdd->mdd_child = lu2dt_dev(next);
287
288         rc = mdd_fs_setup(ctx, mdd);
289         if (rc)
290                 mdd_fs_cleanup(mdd);
291         RETURN(rc);
292 }
293
294 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
295                                          struct lu_device *d)
296 {
297         struct mdd_device *m = lu2mdd_dev(d);
298         struct lu_device *next = &m->mdd_child->dd_lu_dev;
299
300         dt_device_fini(&m->mdd_lov_dev);
301
302         return next;
303 }
304
305 static int mdd_process_config(const struct lu_context *ctxt,
306                               struct lu_device *d, struct lustre_cfg *cfg)
307 {
308         struct mdd_device *m    = lu2mdd_dev(d);
309         struct dt_device  *dt   = m->mdd_child;
310         struct lu_device  *next = &dt->dd_lu_dev;
311         int rc;
312
313         switch(cfg->lcfg_command) {
314         case LCFG_SETUP:
315                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
316                 if (rc)
317                         GOTO(out, rc);
318                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
319                 rc = mdd_mount(ctxt, m);
320                 if (rc)
321                         GOTO(out, rc);
322                 rc = mdd_init_obd(ctxt, m);
323                 if (rc) {
324                         CERROR("lov init error %d \n", rc);
325                         GOTO(out, rc);
326                 }
327                 break;
328         default:
329                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
330                 break;
331         }
332 out:
333         RETURN(rc);
334 }
335
336 struct lu_device_operations mdd_lu_ops = {
337         .ldo_object_alloc   = mdd_object_alloc,
338         .ldo_process_config = mdd_process_config,
339 };
340
341 static struct lu_object_operations mdd_lu_obj_ops = {
342         .loo_object_init    = mdd_object_init,
343         .loo_object_free    = mdd_object_free,
344         .loo_object_print   = mdd_object_print,
345         .loo_object_exists  = mdd_object_exists,
346 };
347
348 static void mdd_lock(const struct lu_context *ctxt,
349                      struct mdd_object *obj, enum dt_lock_mode mode)
350 {
351         struct dt_object  *next = mdd_object_child(obj);
352
353         next->do_ops->do_lock(ctxt, next, mode);
354 }
355
356 static void mdd_unlock(const struct lu_context *ctxt,
357                        struct mdd_object *obj, enum dt_lock_mode mode)
358 {
359         struct dt_object  *next = mdd_object_child(obj);
360
361         next->do_ops->do_unlock(ctxt, next, mode);
362 }
363
364 static void mdd_lock2(const struct lu_context *ctxt,
365                       struct mdd_object *o0, struct mdd_object *o1)
366 {
367         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
368         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
369 }
370
371 static void mdd_unlock2(const struct lu_context *ctxt,
372                         struct mdd_object *o0, struct mdd_object *o1)
373 {
374         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
375         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
376 }
377
378 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
379                                        struct mdd_device *mdd)
380 {
381         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
382
383         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
384 }
385
386 static void mdd_trans_stop(const struct lu_context *ctxt,
387                            struct mdd_device *mdd, struct thandle *handle)
388 {
389         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
390 }
391
392 static int __mdd_object_create(const struct lu_context *ctxt,
393                                struct mdd_object *obj, struct md_attr *ma,
394                                struct thandle *handle)
395 {
396         struct dt_object *next;
397         struct lu_attr *attr = &ma->ma_attr;
398         int rc;
399         ENTRY;
400
401         if (!lu_object_exists(ctxt, mdd2lu_obj(obj))) {
402                 next = mdd_object_child(obj);
403                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
404                 if (rc == 0) {
405                         rc = mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
406                         if (rc == 0)
407                                 ma->ma_valid |= MA_INODE;
408                 }
409         } else
410                 rc = -EEXIST;
411
412         LASSERT(ergo(rc == 0, lu_object_exists(ctxt, mdd2lu_obj(obj))));
413
414         RETURN(rc);
415 }
416
417 static int mdd_object_create(const struct lu_context *ctxt,
418                              struct md_object *obj, struct md_attr *attr)
419 {
420
421         struct mdd_device *mdd = mdo2mdd(obj);
422         struct thandle *handle;
423         int rc;
424         ENTRY;
425
426         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
427         handle = mdd_trans_start(ctxt, mdd);
428         if (IS_ERR(handle))
429                 RETURN(PTR_ERR(handle));
430
431         rc = __mdd_object_create(ctxt, md2mdd_obj(obj), attr, handle);
432
433         mdd_trans_stop(ctxt, mdd, handle);
434
435         RETURN(rc);
436 }
437
438 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
439                           const struct lu_attr *attr, struct thandle *handle)
440 {
441         struct dt_object *next;
442
443         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
444         next = mdd_object_child(md2mdd_obj(obj));
445         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
446 }
447
448 static int mdd_attr_set(const struct lu_context *ctxt,
449                         struct md_object *obj, const struct lu_attr *attr)
450 {
451         struct mdd_device *mdd = mdo2mdd(obj);
452         struct thandle *handle;
453         int  rc;
454         ENTRY;
455
456         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
457         handle = mdd_trans_start(ctxt, mdd);
458         if (IS_ERR(handle))
459                 RETURN(PTR_ERR(handle));
460
461         rc = __mdd_attr_set(ctxt, obj, attr, handle);
462
463         mdd_trans_stop(ctxt, mdd, handle);
464
465         RETURN(rc);
466 }
467
468 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
469                            struct mdd_object *obj, const void *buf,
470                            int buf_len, const char *name, int fl,
471                            struct thandle *handle)
472 {
473         struct dt_object *next;
474
475         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
476         next = mdd_object_child(obj);
477         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name, fl,
478                                           handle);
479 }
480
481 int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
482                   const void *buf, int buf_len, const char *name, int fl)
483 {
484         struct mdd_device *mdd = mdo2mdd(obj);
485         struct thandle *handle;
486         int  rc;
487         ENTRY;
488
489         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
490         handle = mdd_trans_start(ctxt, mdd);
491         if (IS_ERR(handle))
492                 RETURN(PTR_ERR(handle));
493
494         rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
495                              fl, handle);
496
497         mdd_trans_stop(ctxt, mdd, handle);
498
499         RETURN(rc);
500 }
501
502 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
503                            struct mdd_object *obj,
504                            const char *name, struct thandle *handle)
505 {
506         struct dt_object *next;
507
508         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
509         next = mdd_object_child(obj);
510         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
511 }
512
513 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
514                   const char *name)
515 {
516         struct mdd_device *mdd = mdo2mdd(obj);
517         struct thandle *handle;
518         int  rc;
519         ENTRY;
520
521         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
522         handle = mdd_trans_start(ctxt, mdd);
523         if (IS_ERR(handle))
524                 RETURN(PTR_ERR(handle));
525
526         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
527
528         mdd_trans_stop(ctxt, mdd, handle);
529
530         RETURN(rc);
531 }
532
533 static int __mdd_index_insert(const struct lu_context *ctxt,
534                               struct mdd_object *pobj, const struct lu_fid *lf,
535                               const char *name, struct thandle *handle)
536 {
537         int rc;
538         struct dt_object *next = mdd_object_child(pobj);
539         ENTRY;
540
541         if (dt_try_as_dir(ctxt, next))
542                 rc = next->do_index_ops->dio_insert(ctxt, next,
543                                          (struct dt_rec *)lf,
544                                          (struct dt_key *)name, handle);
545         else
546                 rc = -ENOTDIR;
547         RETURN(rc);
548 }
549
550 static int __mdd_index_delete(const struct lu_context *ctxt,
551                               struct mdd_object *pobj, const char *name,
552                               struct thandle *handle)
553 {
554         int rc;
555         struct dt_object *next = mdd_object_child(pobj);
556         ENTRY;
557
558         if (dt_try_as_dir(ctxt, next))
559                 rc = next->do_index_ops->dio_delete(ctxt, next,
560                                         (struct dt_key *)name, handle);
561         else
562                 rc = -ENOTDIR;
563         RETURN(rc);
564 }
565
566 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
567                     struct md_object *src_obj, const char *name)
568 {
569         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
570         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
571         struct mdd_device *mdd = mdo2mdd(src_obj);
572         struct thandle *handle;
573         int rc;
574         ENTRY;
575
576         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
577         handle = mdd_trans_start(ctxt, mdd);
578         if (IS_ERR(handle))
579                 RETURN(PTR_ERR(handle));
580
581         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
582
583         /*
584          * XXX Check that link can be added to the child.
585          */
586
587         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
588                                 name, handle);
589         if (rc == 0)
590                 __mdd_ref_add(ctxt, mdd_sobj, handle);
591
592         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
593         mdd_trans_stop(ctxt, mdd, handle);
594         RETURN(rc);
595 }
596
597 /*
598  * Check that @dir contains no entries except (possibly) dot and dotdot.
599  *
600  * Returns:
601  *
602  *             0        empty
603  *    -ENOTEMPTY        not empty
604  *           -ve        other error
605  *
606  */
607 static int mdd_dir_is_empty(const struct lu_context *ctx,
608                             struct mdd_object *dir)
609 {
610         struct dt_it     *it;
611         struct dt_object *obj;
612         struct dt_it_ops *iops;
613         int result;
614
615         obj = mdd_object_child(dir);
616         iops = &obj->do_index_ops->dio_it;
617         it = iops->init(ctx, obj);
618         if (it != NULL) {
619                 result = iops->get(ctx, it, (const void *)"");
620                 if (result > 0) {
621                         int i;
622                         for (result = 0, i = 0; result == 0 && i < 3; ++i) {
623                                 result = iops->next(ctx, it);
624 #if 0
625                                 if (result == 0) {
626                                         struct lu_fid *fid;
627                                         char          *name;
628                                         int            len;
629
630                                         fid  = (void *)iops->rec(ctx, it);
631                                         name = (void *)iops->key(ctx, it);
632                                         len  = iops->key_size(ctx, it);
633                                         CERROR("entry: "DFID3": \"%*.*s\"\n",
634                                                PFID3(fid), len, len, name);
635                                 }
636 #endif
637                         }
638                         iops->put(ctx, it);
639                         if (result == 0)
640                                 result = -ENOTEMPTY;
641                         else if (result == +1)
642                                 result = 0;
643                 } else if (result == 0)
644                         /*
645                          * Huh? Index contains no zero key?
646                          */
647                         result = -EIO;
648                 iops->fini(ctx, it);
649         } else
650                 result = -ENOMEM;
651         return result;
652 }
653
654 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
655                       struct md_object *cobj, const char *name,
656                       struct md_attr *ma)
657 {
658         struct mdd_device *mdd = mdo2mdd(pobj);
659         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
660         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
661         struct dt_object  *dt_cobj = mdd_object_child(mdd_cobj);
662         struct thandle *handle;
663         int rc;
664         ENTRY;
665
666         /* sanity checks */
667         if (dt_try_as_dir(ctxt, dt_cobj)) {
668                 if (!S_ISDIR(ma->ma_attr.la_mode))
669                         RETURN(rc = -EISDIR);
670         } else {
671                 if (S_ISDIR(ma->ma_attr.la_mode))
672                         RETURN(rc = -ENOTDIR);
673         }
674
675         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
676         handle = mdd_trans_start(ctxt, mdd);
677         if (IS_ERR(handle))
678                 RETURN(PTR_ERR(handle));
679
680         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
681
682         /* rmdir checks */
683         if (S_ISDIR(ma->ma_attr.la_mode)) {
684                 rc = mdd_dir_is_empty(ctxt, mdd_cobj);
685                 if (rc != 0)
686                         GOTO(cleanup, rc);
687         }
688
689         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
690         if (rc)
691                 GOTO(cleanup, rc);
692
693         __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
694
695         if (S_ISDIR(ma->ma_attr.la_mode)) {
696                 /* unlink dot */
697                 __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
698                 /* unlink dotdot */
699                 __mdd_ref_del(ctxt, mdd_pobj, handle, NULL);
700         }
701
702 cleanup:
703         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
704         mdd_trans_stop(ctxt, mdd, handle);
705         RETURN(rc);
706 }
707
708 static int mdd_parent_fid(const struct lu_context *ctxt,
709                           struct mdd_object *obj,
710                           struct lu_fid *fid)
711 {
712         int rc;
713
714         rc = mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
715
716         return rc;
717 }
718
719 static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj)
720 {
721         return lu_object_fid(&obj->mod_obj.mo_lu);
722 }
723
724 static int mdd_is_parent(const struct lu_context *ctxt,
725                          struct mdd_device *mdd,
726                          struct mdd_object *p1,
727                          struct mdd_object *p2)
728 {
729         struct lu_fid * pfid;
730         int rc;
731
732         pfid = &mdd_ctx_info(ctxt)->mti_fid;
733         do {
734                 rc = mdd_parent_fid(ctxt, p1, pfid);
735                 if (rc)
736                         RETURN(rc);
737                 if (lu_fid_eq(pfid, mdo2fid(p2))) {
738                         RETURN(1);
739                 }
740         } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
741
742         RETURN(rc);
743 }
744
745 static int mdd_rename_lock(const struct lu_context *ctxt,
746                            struct mdd_device *mdd,
747                            struct mdd_object *src_pobj,
748                            struct mdd_object *tgt_pobj)
749 {
750         ENTRY;
751
752         if (src_pobj == tgt_pobj) {
753                 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
754                 RETURN(0);
755         }
756         /*compared the parent child relationship of src_p&tgt_p*/
757         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
758                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
759                 RETURN(0);
760         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
761                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
762                 RETURN(0);
763         }
764         if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
765                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
766                 RETURN(0);
767         }
768         if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
769                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
770                 RETURN(0);
771         }
772
773         mdd_lock2(ctxt, src_pobj, tgt_pobj);
774         RETURN(0);
775 }
776
777 static void mdd_rename_unlock(const struct lu_context *ctxt,
778                               struct mdd_object *src_pobj,
779                               struct mdd_object *tgt_pobj)
780 {
781         mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
782         if (src_pobj != tgt_pobj)
783                 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
784 }
785
786 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
787                       struct md_object *tgt_pobj, const struct lu_fid *lf,
788                       const char *sname, struct md_object *tobj,
789                       const char *tname)
790 {
791         struct mdd_device *mdd = mdo2mdd(src_pobj);
792         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
793         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
794         struct mdd_object *mdd_tobj = NULL;
795         struct thandle *handle;
796         int rc, locked = 0;
797         ENTRY;
798
799         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
800         handle = mdd_trans_start(ctxt, mdd);
801         if (IS_ERR(handle))
802                 RETURN(PTR_ERR(handle));
803
804         /*FIXME: Should consider tobj and sobj too in rename_lock*/
805         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
806         if (rc)
807                 GOTO(cleanup, rc);
808         locked = 1;
809         if (tobj)
810                 mdd_tobj = md2mdd_obj(tobj);
811
812         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
813         if (rc)
814                 GOTO(cleanup, rc);
815         /*FIXME: no sobj now, we should check sobj type, if it is dir,
816          * the nlink of its parent should be dec
817          */
818         if (tobj) {
819                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
820                 if (rc)
821                         GOTO(cleanup, rc);
822         }
823
824         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
825         if (rc)
826                 GOTO(cleanup, rc);
827
828
829         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
830                 struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
831
832                 __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
833                 if (dt_try_as_dir(ctxt, dt_tobj))
834                         __mdd_ref_del(ctxt, mdd_tpobj, handle, NULL);
835         }
836 cleanup:
837        /*FIXME: should we do error handling here?*/
838         if (locked)
839                 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
840         mdd_trans_stop(ctxt, mdd, handle);
841         RETURN(rc);
842 }
843
844 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
845                       const char *name, struct lu_fid* fid)
846 {
847         struct dt_object    *dir    = mdd_object_child(md2mdd_obj(pobj));
848         struct dt_rec       *rec    = (struct dt_rec *)fid;
849         const struct dt_key *key = (const struct dt_key *)name;
850         int rc;
851         ENTRY;
852
853         if (dt_try_as_dir(ctxt, dir))
854                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
855         else
856                 rc = -ENOTDIR;
857         RETURN(rc);
858 }
859
860 static int __mdd_object_initialize(const struct lu_context *ctxt,
861                                    struct mdd_object *parent,
862                                    struct mdd_object *child,
863                                    struct md_attr *ma, struct thandle *handle)
864 {
865         int rc;
866
867         rc = 0;
868         if (S_ISDIR(ma->ma_attr.la_mode)) {
869                 __mdd_ref_add(ctxt, child, handle);
870                 rc = __mdd_index_insert(ctxt, child,
871                                         mdo2fid(child), dot, handle);
872                 if (rc == 0) {
873                         rc = __mdd_index_insert(ctxt, child, mdo2fid(parent),
874                                                 dotdot, handle);
875                         if (rc == 0)
876                                 __mdd_ref_add(ctxt, parent, handle);
877                         else {
878                                 int rc2;
879
880                                 rc2 = __mdd_index_delete(ctxt,
881                                                          child, dot, handle);
882                                 if (rc2 != 0)
883                                         CERROR("Failure to cleanup after dotdot"
884                                                " creation: %d (%d)\n", rc2, rc);
885                                 else
886                                         __mdd_ref_del(ctxt, child, handle, 0);
887                         }
888                 }
889         }
890         return rc;
891 }
892
893 /*
894  * Create object and insert it into namespace.
895  */
896 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
897                       const char *name, struct md_object *child,
898                       const char *target_name, struct md_attr* ma)
899 {
900         struct mdd_device *mdd = mdo2mdd(pobj);
901         struct mdd_object *mdo = md2mdd_obj(pobj);
902         struct mdd_object *son = md2mdd_obj(child);
903         struct lu_attr *attr = &ma->ma_attr;
904         struct lu_fid *fid;
905         struct lov_mds_md *lmm = NULL;
906         struct thandle *handle;
907         int rc, created = 0, inserted = 0, lmm_size;
908         ENTRY;
909
910         /* sanity checks before big job */
911         fid = &mdd_ctx_info(ctxt)->mti_fid;
912         rc = mdd_lookup(ctxt, pobj, name, fid);
913         if (rc != -ENOENT) {
914                 rc = rc ? rc : -EEXIST;
915                 RETURN(rc);
916         }
917         /* no RPC inside the transaction, so OST objects should be created at
918          * first */
919
920         if (S_ISREG(attr->la_mode)) {
921                 rc = mdd_lov_create(ctxt, mdd, son, &lmm, &lmm_size);
922                 if (rc)
923                         RETURN(rc);
924                 if (lmm_size < ma->ma_lmm_size)
925                         ma->ma_lmm_size = lmm_size;
926                 if (ma->ma_lmm_size > 0) {
927                         memcpy(ma->ma_lmm, lmm, ma->ma_lmm_size);
928                         ma->ma_valid |= MA_LOV;
929                 }
930         }
931
932         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
933         handle = mdd_trans_start(ctxt, mdd);
934         if (IS_ERR(handle))
935                 RETURN(PTR_ERR(handle));
936
937         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
938
939         /*
940          * XXX check that link can be added to the parent in mkdir case.
941          */
942
943         /*
944          * Two operations have to be performed:
945          *
946          *  - allocation of new object (->do_create()), and
947          *
948          *  - insertion into parent index (->dio_insert()).
949          *
950          * Due to locking, operation order is not important, when both are
951          * successful, *but* error handling cases are quite different:
952          *
953          *  - if insertion is done first, and following object creation fails,
954          *  insertion has to be rolled back, but this operation might fail
955          *  also leaving us with dangling index entry.
956          *
957          *  - if creation is done first, is has to be undone if insertion
958          *  fails, leaving us with leaked space, which is neither good, nor
959          *  fatal.
960          *
961          * It seems that creation-first is simplest solution, but it is
962          * sub-optimal in the frequent
963          *
964          *         $ mkdir foo
965          *         $ mkdir foo
966          *
967          * case, because second mkdir is bound to create object, only to
968          * destroy it immediately.
969          *
970          * Note that local file systems do
971          *
972          *     0. lookup -> -EEXIST
973          *
974          *     1. create
975          *
976          *     2. insert
977          *
978          * Maybe we should do the same. For now: creation-first.
979          */
980
981         rc = __mdd_object_create(ctxt, son, ma, handle);
982         if (rc)
983                 GOTO(cleanup, rc);
984
985         created = 1;
986
987         rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
988         if (rc)
989                 /*
990                  * Object has no links, so it will be destroyed when last
991                  * reference is released. (XXX not now.)
992                  */
993                 GOTO(cleanup, rc);
994
995         rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
996                                 name, handle);
997
998         if (rc)
999                 GOTO(cleanup, rc);
1000
1001         inserted = 1;
1002
1003         rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size);
1004         if (rc) {
1005                 CERROR("error on stripe info copy %d \n", rc);
1006         }
1007 cleanup:
1008         if (rc && created) {
1009                 int rc2 = 0;
1010
1011                 if (inserted) {
1012                         rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
1013                         if (rc2)
1014                                 CERROR("error can not cleanup destroy %d\n",
1015                                        rc2);
1016                 }
1017                 if (rc2 == 0)
1018                         __mdd_ref_del(ctxt, son, handle, NULL);
1019         }
1020         if (lmm)
1021                 OBD_FREE(lmm, lmm_size);
1022         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1023         mdd_trans_stop(ctxt, mdd, handle);
1024         RETURN(rc);
1025 }
1026
1027 static int mdd_mkname(const struct lu_context *ctxt, struct md_object *pobj,
1028                       const char *name, const struct lu_fid *fid)
1029 {
1030         struct mdd_device *mdd = mdo2mdd(pobj);
1031         struct mdd_object *mdo = md2mdd_obj(pobj);
1032         struct thandle *handle;
1033         int rc;
1034         ENTRY;
1035
1036         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1037         handle = mdd_trans_start(ctxt, mdd);
1038         if (IS_ERR(handle))
1039                 RETURN(PTR_ERR(handle));
1040
1041         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1042
1043         rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
1044
1045         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1046         mdd_trans_stop(ctxt, mdd, handle);
1047         RETURN(rc);
1048 }
1049
1050 static int mdd_name_remove(const struct lu_context *ctxt,
1051                            struct md_object *pobj,
1052                            const char *name)
1053 {
1054         struct mdd_device *mdd = mdo2mdd(pobj);
1055         struct mdd_object *mdo = md2mdd_obj(pobj);
1056         struct thandle *handle;
1057         int rc;
1058         ENTRY;
1059
1060         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1061         handle = mdd_trans_start(ctxt, mdd);
1062         if (IS_ERR(handle))
1063                 RETURN(PTR_ERR(handle));
1064
1065         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1066
1067         rc = __mdd_index_delete(ctxt, mdo, name, handle);
1068
1069         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1070
1071         mdd_trans_stop(ctxt, mdd, handle);
1072         RETURN(rc);
1073 }
1074
1075 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1076                           struct md_object *tobj, const struct lu_fid *lf,
1077                           const char *name)
1078 {
1079         struct mdd_device *mdd = mdo2mdd(pobj);
1080         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1081         struct mdd_object *mdd_tobj = NULL;
1082         struct thandle *handle;
1083         int rc;
1084         ENTRY;
1085
1086         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1087         handle = mdd_trans_start(ctxt, mdd);
1088         if (IS_ERR(handle))
1089                 RETURN(PTR_ERR(handle));
1090
1091         if (tobj)
1092                 mdd_tobj = md2mdd_obj(tobj);
1093
1094         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1095
1096         if (tobj) {
1097                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1098                 if (rc)
1099                         GOTO(cleanup, rc);
1100         }
1101
1102         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
1103         if (rc)
1104                 GOTO(cleanup, rc);
1105
1106         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu))
1107                 __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
1108 cleanup:
1109        /*FIXME: should we do error handling here?*/
1110         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1111         mdd_trans_stop(ctxt, mdd, handle);
1112         RETURN(rc);
1113 }
1114
1115 static int mdd_root_get(const struct lu_context *ctx,
1116                         struct md_device *m, struct lu_fid *f)
1117 {
1118         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1119
1120         ENTRY;
1121         *f = mdd->mdd_root_fid;
1122         RETURN(0);
1123 }
1124
1125 static int mdd_statfs(const struct lu_context *ctx,
1126                       struct md_device *m, struct kstatfs *sfs) {
1127         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1128         int rc;
1129
1130         ENTRY;
1131
1132         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1133
1134         RETURN(rc);
1135 }
1136
1137 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1138                          struct thandle *handle)
1139 {
1140         struct dt_object *next;
1141
1142         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1143         next = mdd_object_child(obj);
1144         next->do_ops->do_ref_add(ctxt, next, handle);
1145 }
1146
1147 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1148 {
1149         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1150         struct mdd_device *mdd = mdo2mdd(obj);
1151         struct thandle *handle;
1152         ENTRY;
1153
1154         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1155         handle = mdd_trans_start(ctxt, mdd);
1156         if (IS_ERR(handle))
1157                 RETURN(-ENOMEM);
1158         __mdd_ref_add(ctxt, mdd_obj, handle);
1159
1160         mdd_trans_stop(ctxt, mdd, handle);
1161
1162         RETURN(0);
1163 }
1164
1165 static void
1166 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1167               struct thandle *handle, struct md_attr *ma)
1168 {
1169         struct dt_object *next = mdd_object_child(obj);
1170
1171         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1172
1173         next->do_ops->do_ref_del(ctxt, next, handle);
1174         if (ma != NULL) {
1175                 int rc = mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
1176                 if (rc == 0)
1177                         ma->ma_valid |= MA_INODE;
1178         }
1179 }
1180
1181 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1182                        struct md_attr *ma)
1183 {
1184         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1185         struct mdd_device *mdd = mdo2mdd(obj);
1186         struct thandle *handle;
1187         ENTRY;
1188
1189         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1190         handle = mdd_trans_start(ctxt, mdd);
1191         if (IS_ERR(handle))
1192                 RETURN(-ENOMEM);
1193         __mdd_ref_del(ctxt, mdd_obj, handle, ma);
1194
1195         mdd_trans_stop(ctxt, mdd, handle);
1196
1197         RETURN(0);
1198 }
1199
1200 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1201 {
1202         return 0;
1203 }
1204
1205 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1206 {
1207         return 0;
1208 }
1209
1210 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
1211                         struct lu_rdpg *rdpg)
1212 {
1213         struct dt_object *next;
1214         int rc;
1215
1216         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(md2mdd_obj(obj))));
1217         next = mdd_object_child(md2mdd_obj(obj));
1218         rc = next->do_ops->do_readpage(ctxt, next, rdpg);
1219         return rc;
1220 }
1221
1222 struct md_device_operations mdd_ops = {
1223         .mdo_root_get       = mdd_root_get,
1224         .mdo_statfs         = mdd_statfs,
1225 };
1226
1227 static struct md_dir_operations mdd_dir_ops = {
1228         .mdo_lookup        = mdd_lookup,
1229         .mdo_create        = mdd_create,
1230         .mdo_rename        = mdd_rename,
1231         .mdo_link          = mdd_link,
1232         .mdo_unlink        = mdd_unlink,
1233         .mdo_name_insert   = mdd_mkname,
1234         .mdo_name_remove   = mdd_name_remove,
1235         .mdo_rename_tgt    = mdd_rename_tgt,
1236 };
1237
1238
1239 static struct md_object_operations mdd_obj_ops = {
1240         .moo_attr_get      = mdd_attr_get,
1241         .moo_attr_set      = mdd_attr_set,
1242         .moo_xattr_get     = mdd_xattr_get,
1243         .moo_xattr_set     = mdd_xattr_set,
1244         .moo_xattr_list    = mdd_xattr_list,
1245         .moo_xattr_del     = mdd_xattr_del,
1246         .moo_object_create = mdd_object_create,
1247         .moo_ref_add       = mdd_ref_add,
1248         .moo_ref_del       = mdd_ref_del,
1249         .moo_open          = mdd_open,
1250         .moo_close         = mdd_close,
1251         .moo_readpage      = mdd_readpage
1252 };
1253
1254 static struct obd_ops mdd_obd_device_ops = {
1255         .o_owner = THIS_MODULE
1256 };
1257
1258 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1259                                    struct lu_device_type *t,
1260                                    struct lustre_cfg *lcfg)
1261 {
1262         struct lu_device  *l;
1263         struct mdd_device *m;
1264
1265         OBD_ALLOC_PTR(m);
1266         if (m == NULL) {
1267                 l = ERR_PTR(-ENOMEM);
1268         } else {
1269                 md_device_init(&m->mdd_md_dev, t);
1270                 l = mdd2lu_dev(m);
1271                 l->ld_ops = &mdd_lu_ops;
1272                 m->mdd_md_dev.md_ops = &mdd_ops;
1273         }
1274
1275         return l;
1276 }
1277
1278 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1279 {
1280         struct mdd_device *m = lu2mdd_dev(lu);
1281
1282         LASSERT(atomic_read(&lu->ld_ref) == 0);
1283         md_device_fini(&m->mdd_md_dev);
1284         OBD_FREE_PTR(m);
1285 }
1286
1287 static int mdd_type_init(struct lu_device_type *t)
1288 {
1289         return lu_context_key_register(&mdd_thread_key);
1290 }
1291
1292 static void mdd_type_fini(struct lu_device_type *t)
1293 {
1294         lu_context_key_degister(&mdd_thread_key);
1295 }
1296
1297 static struct lu_device_type_operations mdd_device_type_ops = {
1298         .ldto_init = mdd_type_init,
1299         .ldto_fini = mdd_type_fini,
1300
1301         .ldto_device_alloc = mdd_device_alloc,
1302         .ldto_device_free  = mdd_device_free,
1303
1304         .ldto_device_init    = mdd_device_init,
1305         .ldto_device_fini    = mdd_device_fini
1306 };
1307
1308 static struct lu_device_type mdd_device_type = {
1309         .ldt_tags     = LU_DEVICE_MD,
1310         .ldt_name     = LUSTRE_MDD0_NAME,
1311         .ldt_ops      = &mdd_device_type_ops,
1312         .ldt_ctx_tags = LCT_MD_THREAD
1313 };
1314
1315 static void *mdd_key_init(const struct lu_context *ctx,
1316                           struct lu_context_key *key)
1317 {
1318         struct mdd_thread_info *info;
1319
1320         OBD_ALLOC_PTR(info);
1321         if (info == NULL)
1322                 info = ERR_PTR(-ENOMEM);
1323         return info;
1324 }
1325
1326 static void mdd_key_fini(const struct lu_context *ctx,
1327                          struct lu_context_key *key, void *data)
1328 {
1329         struct mdd_thread_info *info = data;
1330         OBD_FREE_PTR(info);
1331 }
1332
1333 static struct lu_context_key mdd_thread_key = {
1334         .lct_tags = LCT_MD_THREAD,
1335         .lct_init = mdd_key_init,
1336         .lct_fini = mdd_key_fini
1337 };
1338
1339 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1340         { 0 }
1341 };
1342
1343 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1344         { 0 }
1345 };
1346
1347 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1348
1349 static int __init mdd_mod_init(void)
1350 {
1351         struct lprocfs_static_vars lvars;
1352
1353         lprocfs_init_vars(mdd, &lvars);
1354         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1355                                    LUSTRE_MDD0_NAME, &mdd_device_type);
1356 }
1357
1358 static void __exit mdd_mod_exit(void)
1359 {
1360         class_unregister_type(LUSTRE_MDD0_NAME);
1361 }
1362
1363 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1364 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1365 MODULE_LICENSE("GPL");
1366
1367 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);