Whamcloud - gitweb
mdd_lookup is added
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <linux/obd.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_ver.h>
38 #include <linux/obd_support.h>
39 #include <linux/lprocfs_status.h>
40
41
42 #include <linux/lu_object.h>
43 #include <linux/md_object.h>
44 #include <linux/dt_object.h>
45
46 #include "mdd_internal.h"
47
48
49 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
50                                        struct mdd_device *);
51 static void mdd_trans_stop(struct lu_context *ctxt,
52                            struct mdd_device *mdd, struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static struct lu_device_operations mdd_lu_ops;
55 static void mdd_lock(struct lu_context *ctx,
56                      struct mdd_object *obj, enum dt_lock_mode mode);
57 static void mdd_unlock(struct lu_context *ctx,
58                        struct mdd_object *obj, enum dt_lock_mode mode);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 struct mdd_thread_info {
67         struct txn_param mti_param;
68         struct lu_fid    mti_fid;
69 };
70
71 const char *mdd_root_dir_name = "ROOT";
72
73 static struct mdd_thread_info *mdd_ctx_info(struct lu_context *ctx)
74 {
75         struct mdd_thread_info *info;
76
77         info = lu_context_key_get(ctx, &mdd_thread_key);
78         LASSERT(info != NULL);
79         return info;
80 }
81
82 static int lu_device_is_mdd(struct lu_device *d)
83 {
84         /*
85          * XXX for now. Tags in lu_device_type->ldt_something are needed.
86          */
87         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdd_lu_ops);
88 }
89
90 static struct mdd_device* lu2mdd_dev(struct lu_device *d)
91 {
92         LASSERT(lu_device_is_mdd(d));
93         return container_of0(d, struct mdd_device, mdd_md_dev.md_lu_dev);
94 }
95
96 static inline struct lu_device *mdd2lu_dev(struct mdd_device *d)
97 {
98         return (&d->mdd_md_dev.md_lu_dev);
99 }
100
101 static struct mdd_object *mdd_obj(struct lu_object *o)
102 {
103         LASSERT(lu_device_is_mdd(o->lo_dev));
104         return container_of0(o, struct mdd_object, mod_obj.mo_lu);
105 }
106
107 static struct mdd_device* mdo2mdd(struct md_object *mdo)
108 {
109         return lu2mdd_dev(mdo->mo_lu.lo_dev);
110 }
111
112 static struct mdd_object* mdo2mddo(struct md_object *mdo)
113 {
114         return container_of0(mdo, struct mdd_object, mod_obj);
115 }
116
117 static inline struct dt_device_operations *mdd_child_ops(struct mdd_device *d)
118 {
119         return d->mdd_child->dd_ops;
120 }
121
122 static struct lu_object *mdd_object_alloc(struct lu_context *ctxt,
123                                           struct lu_device *d)
124 {
125         struct mdd_object *mdo;
126         ENTRY;
127
128         OBD_ALLOC_PTR(mdo);
129         if (mdo != NULL) {
130                 struct lu_object *o;
131                 
132                 o = &mdo->mod_obj.mo_lu;
133                 lu_object_init(o, NULL, d);
134                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
135                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
136                 o->lo_ops = &mdd_lu_obj_ops;
137                 return &mdo->mod_obj.mo_lu;
138         } else
139                 return NULL;
140 }
141
142 static int mdd_object_init(struct lu_context *ctxt, struct lu_object *o)
143 {
144         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
145         struct lu_object  *below;
146         struct lu_device  *under;
147         ENTRY;
148
149         under = &d->mdd_child->dd_lu_dev;
150         below = under->ld_ops->ldo_object_alloc(ctxt, under);
151
152         if (below == NULL)
153                 RETURN(-ENOMEM);
154
155         lu_object_add(o, below);
156         RETURN(0);
157 }
158
159 static void mdd_object_free(struct lu_context *ctxt, struct lu_object *o)
160 {
161         struct lu_object_header *h;
162         struct mdd_object *mdd = mdd_obj(o);
163
164         h = o->lo_header;
165         lu_object_fini(o);
166         OBD_FREE_PTR(mdd);
167 }
168
169 static int
170 mdd_attr_get(struct lu_context *ctxt,
171              struct md_object *obj, struct lu_attr *attr)
172 {
173         struct mdd_object *mdd_obj = mdo2mddo(obj);
174         struct dt_object  *next = mdd_object_child(mdd_obj);
175         int rc;
176
177         ENTRY;
178
179         rc = next->do_ops->do_attr_get(ctxt, next, attr);
180         RETURN(rc);
181 }
182
183 static int
184 mdd_xattr_get(struct lu_context *ctxt, struct md_object *obj, void *buf,
185               int buf_len, const char *name)
186 {
187         struct mdd_object *mdd_obj = mdo2mddo(obj);
188         struct dt_object  *next = mdd_object_child(mdd_obj);
189         int rc;
190
191         ENTRY;
192
193         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
194         RETURN(rc);
195 }
196
197 static int
198 __mdd_object_destroy(struct lu_context *ctxt, struct mdd_object *obj,
199                      struct thandle *handle)
200 {
201         struct dt_object  *next = mdd_object_child(obj);
202         int rc;
203         rc = next->do_ops->do_object_destroy(ctxt, next, handle);
204         RETURN(rc);
205 }
206
207 static int mdd_add_orphan(struct mdd_device *mdd, struct mdd_object *obj,
208                           struct thandle *handle)
209 {
210         int rc = 0;
211         ENTRY;
212
213         RETURN(rc);
214 }
215
216 static int
217 open_orphan(struct mdd_object *obj)
218 {
219         return 0;
220 }
221
222 static int
223 mdd_add_unlink_log(struct mdd_device *mdd, struct mdd_object *obj,
224                    struct thandle *handle)
225 {
226         return 0;
227 }
228
229 enum mdd_txn_op {
230         MDD_TXN_OBJECT_DESTROY_OP,
231         MDD_TXN_OBJECT_CREATE_OP,
232         MDD_TXN_ATTR_SET_OP,
233         MDD_TXN_XATTR_SET_OP,
234         MDD_TXN_INDEX_INSERT_OP,
235         MDD_TXN_INDEX_DELETE_OP,
236         MDD_TXN_LINK_OP,
237         MDD_TXN_RENAME_OP,
238         MDD_TXN_MKDIR_OP
239 };
240
241 struct mdd_txn_op_descr {
242         enum mdd_txn_op mod_op;
243         unsigned int    mod_credits;
244 };
245
246 enum {
247         MDD_TXN_OBJECT_DESTROY_CREDITS = 10,
248         MDD_TXN_OBJECT_CREATE_CREDITS  = 10,
249         MDD_TXN_ATTR_SET_CREDITS       = 10,
250         MDD_TXN_XATTR_SET_CREDITS      = 10,
251         MDD_TXN_INDEX_INSERT_CREDITS   = 10,
252         MDD_TXN_INDEX_DELETE_CREDITS   = 10,
253         MDD_TXN_LINK_CREDITS           = 10,
254         MDD_TXN_RENAME_CREDITS         = 10,
255         MDD_TXN_MKDIR_CREDITS          = 10
256 };
257
258 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
259 static const struct mdd_txn_op_descr opname = { \
260         .mod_op      = opname ## _OP,           \
261         .mod_credits = opname ## _CREDITS,      \
262 }
263
264 /*
265  * number of blocks to reserve for particular operations. Should be function
266  * of ... something. Stub for now.
267  */
268 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
269 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
270 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
271 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
272 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
273 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
274 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
275 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
276 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
277
278 static void mdd_txn_param_build(struct lu_context *ctx,
279                                 const struct mdd_txn_op_descr *opd)
280 {
281         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
282 }
283
284 static int
285 mdd_object_destroy(struct lu_context *ctxt, struct md_object *obj)
286 {
287         struct mdd_device *mdd = mdo2mdd(obj);
288         struct mdd_object *mdd_obj = mdo2mddo(obj);
289         struct thandle *handle;
290         int rc ;
291         ENTRY;
292
293         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_DESTROY);
294         handle = mdd_trans_start(ctxt, mdd);
295         if (IS_ERR(handle))
296                 RETURN(PTR_ERR(handle));
297
298         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
299         if (open_orphan(mdd_obj))
300                 rc = mdd_add_orphan(mdd, mdd_obj, handle);
301         else {
302                 rc = __mdd_object_destroy(ctxt, mdd_obj, handle);
303                 if (rc == 0)
304                         rc = mdd_add_unlink_log(mdd, mdd_obj, handle);
305         }
306
307         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
308         mdd_trans_stop(ctxt, mdd, handle);
309         RETURN(rc);
310 }
311
312 static void mdd_object_release(struct lu_context *ctxt, struct lu_object *o)
313 {
314 }
315
316 static int mdd_object_exists(struct lu_context *ctx, struct lu_object *o)
317 {
318         return lu_object_exists(ctx, lu_object_next(o));
319 }
320
321 static int mdd_object_print(struct lu_context *ctxt,
322                             struct seq_file *f, const struct lu_object *o)
323 {
324         return seq_printf(f, LUSTRE_MDD0_NAME"-object@%p", o);
325 }
326
327 static int mdd_dt_lookup(struct lu_context *ctx, struct mdd_device *mdd,
328                          struct mdd_object *obj, const char *name,
329                          struct lu_fid *fid)
330 {
331         struct dt_object *dir    = mdd_object_child(obj);
332         struct dt_rec    *rec    = (struct dt_rec *)fid;
333         const struct dt_key *key = (const struct dt_key *)name;
334         int result;
335
336         if (dir->do_index_ops != NULL)
337                 result = dir->do_index_ops->dio_lookup(ctx, dir, rec, key);
338         else
339                 result = -ENOTDIR;
340         return result;
341 }
342
343 static int mdd_mount(struct lu_context *ctx, struct mdd_device *mdd)
344 {
345         int result;
346         struct mdd_thread_info *info = lu_context_key_get(ctx,
347                                                           &mdd_thread_key);
348         struct lu_device *dev = &mdd->mdd_md_dev.md_lu_dev;
349
350         result = mdd_child_ops(mdd)->dt_root_get(ctx, mdd->mdd_child,
351                                                  &info->mti_fid);
352         if (result == 0) {
353                 struct lu_object *root;
354
355                 root = lu_object_find(ctx, dev->ld_site, &info->mti_fid);
356                 if (!IS_ERR(root)) {
357                         struct mdd_object *obj;
358
359                         obj = mdd_obj(lu_object_locate(root->lo_header,
360                                                        dev->ld_type));
361                         if (obj != NULL)
362                                 result = mdd_dt_lookup(ctx, mdd, obj,
363                                                        mdd_root_dir_name ,
364                                                        &mdd->mdd_root_fid);
365                         else {
366                                 CERROR("No slice\n");
367                                 result = -EINVAL;
368                         }
369                         lu_object_put(ctx, root);
370                 } else {
371                         CERROR("No root\n");
372                         result = PTR_ERR(root);
373                 }
374         }
375         return result;
376 }
377
378 static int mdd_fs_setup(struct lu_context *ctx, struct mdd_device *mdd)
379 {
380         return 0;
381 }
382
383 static int mdd_fs_cleanup(struct mdd_device *mdd)
384 {
385         return 0;
386 }
387
388 static int mdd_device_init(struct lu_context *ctx,
389                            struct lu_device *d, struct lu_device *next)
390 {
391         struct mdd_device *mdd = lu2mdd_dev(d);
392         int rc = -EFAULT;
393
394         ENTRY;
395
396         mdd->mdd_child = lu2dt_dev(next);
397
398         rc = mdd_fs_setup(ctx, mdd);
399         if (rc)
400                 GOTO(err, rc);
401
402         RETURN(rc);
403 err:
404         mdd_fs_cleanup(mdd);
405         RETURN(rc);
406 }
407
408 static struct lu_device *mdd_device_fini(struct lu_context *ctx,
409                                          struct lu_device *d)
410 {
411         struct mdd_device *m = lu2mdd_dev(d);
412         struct lu_device *next = &m->mdd_child->dd_lu_dev;
413
414         return next;
415 }
416
417 static int mdd_process_config(struct lu_context *ctx,
418                               struct lu_device *d, struct lustre_cfg *cfg)
419 {
420         struct mdd_device *m = lu2mdd_dev(d);
421         struct lu_device *next = &m->mdd_child->dd_lu_dev;
422         int err;
423
424         switch(cfg->lcfg_command) {
425
426         default:
427                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
428                 if (err == 0 && cfg->lcfg_command == LCFG_SETUP)
429                         err = mdd_mount(ctx, m);
430         }
431
432         RETURN(err);
433 }
434
435 static struct lu_device_operations mdd_lu_ops = {
436         .ldo_object_alloc   = mdd_object_alloc,
437         .ldo_object_free    = mdd_object_free,
438         .ldo_process_config = mdd_process_config
439 };
440
441 static struct lu_object_operations mdd_lu_obj_ops = {
442         .loo_object_init    = mdd_object_init,
443         .loo_object_release = mdd_object_release,
444         .loo_object_print   = mdd_object_print,
445         .loo_object_exists  = mdd_object_exists
446 };
447
448 static struct dt_object* mdd_object_child(struct mdd_object *o)
449 {
450         return container_of0(lu_object_next(&o->mod_obj.mo_lu),
451                              struct dt_object, do_lu);
452 }
453
454 static void mdd_lock(struct lu_context *ctxt,
455                      struct mdd_object *obj, enum dt_lock_mode mode)
456 {
457         struct dt_object  *next = mdd_object_child(obj);
458
459         next->do_ops->do_object_lock(ctxt, next, mode);
460 }
461
462 static void mdd_unlock(struct lu_context *ctxt,
463                        struct mdd_object *obj, enum dt_lock_mode mode)
464 {
465         struct dt_object  *next = mdd_object_child(obj);
466
467         next->do_ops->do_object_unlock(ctxt, next, mode);
468 }
469
470 static void mdd_lock2(struct lu_context *ctxt,
471                       struct mdd_object *o0, struct mdd_object *o1)
472 {
473         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
474         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
475 }
476
477 static void mdd_unlock2(struct lu_context *ctxt,
478                         struct mdd_object *o0, struct mdd_object *o1)
479 {
480         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
481         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
482 }
483
484 static struct thandle* mdd_trans_start(struct lu_context *ctxt,
485                                        struct mdd_device *mdd)
486 {
487         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
488
489         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
490 }
491
492 static void mdd_trans_stop(struct lu_context *ctxt,
493                            struct mdd_device *mdd, struct thandle *handle)
494 {
495         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
496 }
497
498 static int
499 __mdd_object_create(struct lu_context *ctxt, struct mdd_object *obj,
500                     struct lu_attr *attr, struct thandle *handle)
501 {
502         struct dt_object *next = mdd_object_child(obj);
503         int rc;
504         ENTRY;
505
506         rc = next->do_ops->do_object_create(ctxt, next, attr, handle);
507         /*XXX increase the refcount of the object or not?*/
508         RETURN(rc);
509 }
510
511 static int mdd_object_create(struct lu_context *ctxt, struct md_object *obj,
512                              struct lu_attr *attr)
513 {
514
515         struct mdd_device *mdd = mdo2mdd(obj);
516         struct thandle *handle;
517         int rc;
518         ENTRY;
519
520         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
521         handle = mdd_trans_start(ctxt, mdd);
522         if (IS_ERR(handle))
523                 RETURN(PTR_ERR(handle));
524
525         rc = __mdd_object_create(ctxt, mdo2mddo(obj), attr, handle);
526
527         mdd_trans_stop(ctxt, mdd, handle);
528
529         RETURN(rc);
530 }
531
532
533 static int
534 __mdd_attr_set(struct lu_context *ctxt, struct md_object *obj,
535                struct lu_attr *attr, struct thandle *handle)
536 {
537         struct dt_object *next = mdd_object_child(mdo2mddo(obj));
538         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
539 }
540
541 static int
542 mdd_attr_set(struct lu_context *ctxt,
543              struct md_object *obj, struct lu_attr *attr)
544 {
545         struct mdd_device *mdd = mdo2mdd(obj);
546         struct thandle *handle;
547         int  rc;
548         ENTRY;
549
550         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
551         handle = mdd_trans_start(ctxt, mdd);
552         if (!handle)
553                 RETURN(-ENOMEM);
554
555         rc = __mdd_attr_set(ctxt, obj, attr, handle);
556
557         mdd_trans_stop(ctxt, mdd, handle);
558
559         RETURN(rc);
560 }
561
562
563
564 static int
565 __mdd_xattr_set(struct lu_context *ctxt, struct mdd_device *mdd,
566                 struct mdd_object *obj, void *buf,
567                 int buf_len, const char *name, struct thandle *handle)
568 {
569         struct dt_object *next = mdd_object_child(obj);
570         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len,
571                                           name, handle);
572 }
573
574 static int
575 mdd_xattr_set(struct lu_context *ctxt, struct md_object *obj, void *buf,
576               int buf_len, const char *name)
577 {
578         struct mdd_device *mdd = mdo2mdd(obj);
579         struct thandle *handle;
580         int  rc;
581         ENTRY;
582
583         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
584         handle = mdd_trans_start(ctxt, mdd);
585         if (!handle)
586                 RETURN(-ENOMEM);
587
588         rc = __mdd_xattr_set(ctxt, mdd, mdo2mddo(obj), buf, buf_len, name,
589                              handle);
590
591         mdd_trans_stop(ctxt, mdd, handle);
592
593         RETURN(rc);
594 }
595
596 static const struct lu_fid *mdd_object_getfid(struct mdd_object *obj)
597 {
598         return lu_object_fid(&obj->mod_obj.mo_lu);
599 }
600
601 static int
602 __mdd_index_insert(struct lu_context *ctxt, struct mdd_object *pobj,
603                    const struct lu_fid *lf, const char *name,
604                    struct thandle *handle)
605 {
606         int rc;
607         struct dt_object *next = mdd_object_child(pobj);
608
609         rc = next->do_index_ops->dio_insert(ctxt, next, (struct dt_rec *)lf,
610                                             (struct dt_key *)name, handle);
611         return rc;
612 }
613
614 static int
615 __mdd_index_delete(struct lu_context *ctxt, struct mdd_device *mdd,
616                    struct mdd_object *pobj,
617                    struct mdd_object *obj, const char *name,
618                    struct thandle *handle)
619 {
620         int rc;
621         struct dt_object *next = mdd_object_child(pobj);
622         ENTRY;
623
624         mdd_lock2(ctxt, pobj, obj);
625
626         rc = next->do_index_ops->dio_delete(ctxt, next,
627                                             (const struct dt_rec *)mdd_object_getfid(obj),
628                                             (struct dt_key *)name, handle);
629         mdd_unlock2(ctxt, pobj, obj);
630
631         RETURN(rc);
632 }
633
634 static int
635 mdd_index_delete(struct lu_context *ctxt, struct md_object *pobj,
636                  struct md_object *obj, const char *name)
637 {
638         struct mdd_object *mdd_pobj = mdo2mddo(pobj);
639         struct mdd_object *mdd_obj = mdo2mddo(obj);
640         struct mdd_device *mdd = mdo2mdd(obj);
641         struct thandle *handle;
642         int rc;
643         ENTRY;
644
645         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
646         handle = mdd_trans_start(ctxt, mdd);
647         if (IS_ERR(handle))
648                 RETURN(PTR_ERR(handle));
649
650         rc = __mdd_index_delete(ctxt, mdd, mdd_pobj, mdd_obj, name, handle);
651
652         mdd_trans_stop(ctxt, mdd, handle);
653
654         RETURN(rc);
655 }
656
657 static int
658 mdd_link(struct lu_context *ctxt, struct md_object *tgt_obj,
659          struct md_object *src_obj, const char *name)
660 {
661         struct mdd_object *mdd_tobj = mdo2mddo(tgt_obj);
662         struct mdd_object *mdd_sobj = mdo2mddo(src_obj);
663         struct mdd_device *mdd = mdo2mdd(src_obj);
664         struct thandle *handle;
665         int rc, nlink;
666         ENTRY;
667
668         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
669         handle = mdd_trans_start(ctxt, mdd);
670         if (IS_ERR(handle))
671                 RETURN(PTR_ERR(handle));
672
673         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
674
675         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
676                                 name, handle);
677         if (rc)
678                 GOTO(exit, rc);
679
680         rc = mdd_xattr_get(ctxt, src_obj, &nlink, sizeof(nlink), "NLINK");
681         ++nlink;
682
683         rc = __mdd_xattr_set(ctxt, mdd, mdd_sobj,
684                              &nlink, sizeof(nlink), "NLINK", handle);
685 exit:
686         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
687
688         mdd_trans_stop(ctxt, mdd, handle);
689         RETURN(rc);
690 }
691
692 static void mdd_rename_lock(struct mdd_device *mdd,
693                             struct mdd_object *src_pobj,
694                             struct mdd_object *tgt_pobj,
695                             struct mdd_object *sobj,
696                             struct mdd_object *tobj)
697 {
698         return;
699 }
700
701 static void mdd_rename_unlock(struct mdd_device *mdd, struct mdd_object *src_pobj,
702                               struct mdd_object *tgt_pobj, struct mdd_object *sobj,
703                               struct mdd_object *tobj)
704 {
705         return;
706 }
707
708 static int
709 mdd_rename(struct lu_context *ctxt, struct md_object *src_pobj,
710            struct md_object *tgt_pobj, struct md_object *sobj,
711            const char *sname, struct md_object *tobj, const char *tname)
712 {
713         struct mdd_device *mdd = mdo2mdd(src_pobj);
714         struct mdd_object *mdd_spobj = mdo2mddo(src_pobj);
715         struct mdd_object *mdd_tpobj = mdo2mddo(tgt_pobj);
716         struct mdd_object *mdd_sobj = mdo2mddo(sobj);
717         struct mdd_object *mdd_tobj = mdo2mddo(tobj);
718         int rc;
719         struct thandle *handle;
720
721         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
722         handle = mdd_trans_start(ctxt, mdd);
723         if (IS_ERR(handle))
724                 RETURN(PTR_ERR(handle));
725
726         mdd_rename_lock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
727
728         rc = __mdd_index_delete(ctxt, mdd, mdd_spobj, mdd_sobj, sname, handle);
729         if (rc)
730                 GOTO(cleanup, rc);
731
732         rc = __mdd_index_delete(ctxt, mdd, mdd_tpobj, mdd_tobj, tname, handle);
733         if (rc)
734                 GOTO(cleanup, rc);
735
736         rc = __mdd_index_insert(ctxt, mdd_spobj, lu_object_fid(&tobj->mo_lu),
737                                 tname, handle);
738         if (rc)
739                 GOTO(cleanup, rc);
740
741         /*
742          * XXX nikita: huh? What is this?
743          */
744         rc = __mdd_object_destroy(ctxt, mdd_sobj, handle);
745         if (rc)
746                 GOTO(cleanup, rc);
747 cleanup:
748         mdd_rename_unlock(mdd, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj);
749         mdd_trans_stop(ctxt, mdd, handle);
750         RETURN(rc);
751 }
752
753 static int mdd_lookup(struct lu_context *ctxt, struct md_object *pobj,
754                       const char *name, struct lu_fid* fid)
755 {
756         struct dt_object *next = mdd_object_child(mdo2mddo(pobj));
757       
758         return next->do_index_ops->dio_lookup(ctxt, next, fid, name);
759 }
760
761 static int mdd_mkdir(struct lu_context *ctxt, struct lu_attr* attr,
762                      struct md_object *pobj, const char *name,
763                      struct md_object *child)
764 {
765         struct mdd_device *mdd = mdo2mdd(pobj);
766         struct mdd_object *mdo = mdo2mddo(pobj);
767         struct thandle *handle;
768         int rc = 0;
769         ENTRY;
770
771         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
772         handle = mdd_trans_start(ctxt, mdd);
773         if (IS_ERR(handle))
774                 RETURN(PTR_ERR(handle));
775
776         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
777
778         rc = __mdd_object_create(ctxt, mdo2mddo(child), attr, handle);
779         if (rc)
780                 GOTO(cleanup, rc);
781
782         rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
783                                 name, handle);
784         if (rc)
785                 GOTO(cleanup, rc);
786 cleanup:
787         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
788         mdd_trans_stop(ctxt, mdd, handle);
789         RETURN(rc);
790 }
791
792 static int mdd_mkname(struct lu_context *ctxt, struct md_object *pobj,
793           const char *name, const struct lu_fid *fid, struct lu_attr *attr)
794 {
795         struct mdd_device *mdd = mdo2mdd(pobj);
796         struct mdd_object *mdo = mdo2mddo(pobj);
797         struct thandle *handle;
798         int rc = 0;
799         ENTRY;
800
801         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
802         handle = mdd_trans_start(ctxt, mdd);
803         if (IS_ERR(handle))
804                 RETURN(PTR_ERR(handle));
805
806         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
807
808         rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
809
810         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
811         mdd_trans_stop(ctxt, mdd, handle);
812         RETURN(rc);
813 }
814
815 static int mdd_root_get(struct lu_context *ctx,
816                         struct md_device *m, struct lu_fid *f)
817 {
818         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
819
820         ENTRY;
821         *f = mdd->mdd_root_fid;
822         RETURN(0);
823 }
824
825 static int mdd_config(struct lu_context *ctx, struct md_device *m,
826                       const char *name, void *buf, int size, int mode)
827 {
828         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
829         int rc;
830         ENTRY;
831
832         rc = mdd_child_ops(mdd)->dt_config(ctx, mdd->mdd_child,
833                                            name, buf, size, mode);
834         RETURN(rc);
835 }
836
837 static int mdd_statfs(struct lu_context *ctx,
838                       struct md_device *m, struct kstatfs *sfs) {
839         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
840         int rc;
841
842         ENTRY;
843
844         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
845
846         RETURN(rc);
847 }
848
849 struct md_device_operations mdd_ops = {
850         .mdo_root_get       = mdd_root_get,
851         .mdo_config         = mdd_config,
852         .mdo_statfs         = mdd_statfs,
853 };
854
855 static struct md_dir_operations mdd_dir_ops = {
856         .mdo_lookup        = mdd_lookup,
857         .mdo_mkdir         = mdd_mkdir,
858         .mdo_rename        = mdd_rename,
859         .mdo_link          = mdd_link,
860         .mdo_name_insert   = mdd_mkname
861 };
862
863 static struct md_object_operations mdd_obj_ops = {
864         .moo_attr_get      = mdd_attr_get,
865         .moo_attr_set      = mdd_attr_set,
866         .moo_xattr_get     = mdd_xattr_get,
867         .moo_xattr_set     = mdd_xattr_set,
868         .moo_object_create  = mdd_object_create
869 };
870
871 static struct obd_ops mdd_obd_device_ops = {
872         .o_owner = THIS_MODULE
873 };
874
875 struct lu_device *mdd_device_alloc(struct lu_context *ctx,
876                                    struct lu_device_type *t,
877                                    struct lustre_cfg *lcfg)
878 {
879         struct lu_device  *l;
880         struct mdd_device *m;
881
882         OBD_ALLOC_PTR(m);
883         if (m == NULL) {
884                 l = ERR_PTR(-ENOMEM);
885         } else {
886                 md_device_init(&m->mdd_md_dev, t);
887                 l = mdd2lu_dev(m);
888                 l->ld_ops = &mdd_lu_ops;
889                 m->mdd_md_dev.md_ops = &mdd_ops;
890         }
891
892         return l;
893 }
894
895 static void mdd_device_free(struct lu_context *ctx, struct lu_device *lu)
896 {
897         struct mdd_device *m = lu2mdd_dev(lu);
898
899         LASSERT(atomic_read(&lu->ld_ref) == 0);
900         md_device_fini(&m->mdd_md_dev);
901
902         OBD_FREE_PTR(m);
903 }
904
905 static int mdd_type_init(struct lu_device_type *t)
906 {
907         return lu_context_key_register(&mdd_thread_key);
908 }
909
910 static void mdd_type_fini(struct lu_device_type *t)
911 {
912         lu_context_key_degister(&mdd_thread_key);
913 }
914
915 static struct lu_device_type_operations mdd_device_type_ops = {
916         .ldto_init = mdd_type_init,
917         .ldto_fini = mdd_type_fini,
918
919         .ldto_device_alloc = mdd_device_alloc,
920         .ldto_device_free  = mdd_device_free,
921
922         .ldto_device_init    = mdd_device_init,
923         .ldto_device_fini    = mdd_device_fini
924 };
925
926 static struct lu_device_type mdd_device_type = {
927         .ldt_tags = LU_DEVICE_MD,
928         .ldt_name = LUSTRE_MDD0_NAME,
929         .ldt_ops  = &mdd_device_type_ops
930 };
931
932 static void *mdd_key_init(struct lu_context *ctx)
933 {
934         struct mdd_thread_info *info;
935
936         OBD_ALLOC_PTR(info);
937         if (info == NULL)
938                 info = ERR_PTR(-ENOMEM);
939         return info;
940 }
941
942 static void mdd_key_fini(struct lu_context *ctx, void *data)
943 {
944         struct mdd_thread_info *info = data;
945         OBD_FREE_PTR(info);
946 }
947
948 static struct lu_context_key mdd_thread_key = {
949         .lct_init = mdd_key_init,
950         .lct_fini = mdd_key_fini
951 };
952
953 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
954         { 0 }
955 };
956
957 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
958         { 0 }
959 };
960
961 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
962
963 static int __init mdd_mod_init(void)
964 {
965         struct lprocfs_static_vars lvars;
966
967         lprocfs_init_vars(mdd, &lvars);
968         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
969                                    LUSTRE_MDD0_NAME, &mdd_device_type);
970 }
971
972 static void __exit mdd_mod_exit(void)
973 {
974         class_unregister_type(LUSTRE_MDD0_NAME);
975 }
976
977 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
978 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
979 MODULE_LICENSE("GPL");
980
981 cfs_module(mdd, "0.0.2", mdd_mod_init, mdd_mod_exit);