Whamcloud - gitweb
fe9a4d7e70ad189170311bd11180b8c7c2f2f417
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51                            struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54                      struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56                        struct mdd_object *obj, enum dt_lock_mode mode);
57 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58                           struct thandle *handle);
59 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60                           struct thandle *handle);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62                       const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations    mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
66
67 static struct lu_context_key       mdd_thread_key;
68
69 static const char *mdd_root_dir_name = "root";
70 static const char dot[] = ".";
71 static const char dotdot[] = "..";
72
73
74 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
75 {
76         struct mdd_thread_info *info;
77
78         info = lu_context_key_get(ctx, &mdd_thread_key);
79         LASSERT(info != NULL);
80         return info;
81 }
82
83 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
84                                           const struct lu_object_header *hdr,
85                                           struct lu_device *d)
86 {
87         struct mdd_object *mdo;
88
89         OBD_ALLOC_PTR(mdo);
90         if (mdo != NULL) {
91                 struct lu_object *o;
92                 
93                 o = mdd2lu_obj(mdo);
94                 lu_object_init(o, NULL, d);
95                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
96                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
97                 o->lo_ops = &mdd_lu_obj_ops;
98                 return o;
99         } else {
100                 return NULL;
101         }
102 }
103
104 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
105 {
106         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
107         struct lu_object  *below;
108         struct lu_device  *under;
109         ENTRY;
110
111         under = &d->mdd_child->dd_lu_dev;
112         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
113
114         if (below == NULL)
115                 RETURN(-ENOMEM);
116
117         lu_object_add(o, below);
118         RETURN(0);
119 }
120
121 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         struct mdd_object *mdd = lu2mdd_obj(o);
124         
125         lu_object_fini(o);
126         OBD_FREE_PTR(mdd);
127 }
128
129 static int mdd_attr_get(const struct lu_context *ctxt,
130                         struct md_object *obj, struct md_attr *ma)
131 {
132         struct mdd_object *mdd_obj = md2mdd_obj(obj);
133         struct dt_object  *next;
134         int                rc;
135
136         ENTRY;
137
138         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
139
140         next = mdd_object_child(mdd_obj);
141         rc = next->do_ops->do_attr_get(ctxt, next, &ma->ma_attr);
142         if (rc == 0) {
143                 LASSERT((ma->ma_attr.la_mode & S_IFMT) ==
144                         (obj->mo_lu.lo_header->loh_attr & S_IFMT));
145                 ma->ma_valid |= MA_INODE;
146                 /* get LOV EA also */
147                 if ((S_ISREG(ma->ma_attr.la_mode)
148                      || S_ISDIR(ma->ma_attr.la_mode))
149                      && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) {
150                         rc = mdd_get_md(ctxt, obj, ma->ma_lmm,&ma->ma_lmm_size);
151                         if (rc > 0) {
152                                 ma->ma_valid |= MA_LOV;
153                                 rc = 0;
154                         }
155                 }
156         }
157         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
158                         rc, ma->ma_valid);
159         RETURN(rc);
160 }
161
162 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
163                          void *buf, int buf_len, const char *name)
164 {
165         struct mdd_object *mdd_obj = md2mdd_obj(obj);
166         struct dt_object  *next;
167         int rc;
168
169         ENTRY;
170
171         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
172
173         next = mdd_object_child(mdd_obj);
174         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
175
176         RETURN(rc);
177 }
178
179 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
180                         void *buf, int buf_len)
181 {
182         struct mdd_object *mdd_obj = md2mdd_obj(obj);
183         struct dt_object  *next;
184         loff_t             pos = 0;
185         int                rc;
186         ENTRY;
187
188         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
189
190         next = mdd_object_child(mdd_obj);
191         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
192         RETURN(rc);
193 }
194 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
195                           void *buf, int buf_len)
196 {
197         struct mdd_object *mdd_obj = md2mdd_obj(obj);
198         struct dt_object  *next;
199         int rc;
200
201         ENTRY;
202
203         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
204
205         next = mdd_object_child(mdd_obj);
206         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
207
208         RETURN(rc);
209 }
210
211 enum mdd_txn_op {
212         MDD_TXN_OBJECT_DESTROY_OP,
213         MDD_TXN_OBJECT_CREATE_OP,
214         MDD_TXN_ATTR_SET_OP,
215         MDD_TXN_XATTR_SET_OP,
216         MDD_TXN_INDEX_INSERT_OP,
217         MDD_TXN_INDEX_DELETE_OP,
218         MDD_TXN_LINK_OP,
219         MDD_TXN_UNLINK_OP,
220         MDD_TXN_RENAME_OP,
221         MDD_TXN_CREATE_DATA_OP,
222         MDD_TXN_MKDIR_OP
223 };
224
225 struct mdd_txn_op_descr {
226         enum mdd_txn_op mod_op;
227         unsigned int    mod_credits;
228 };
229
230 enum {
231         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
232         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
233         MDD_TXN_ATTR_SET_CREDITS       = 20,
234         MDD_TXN_XATTR_SET_CREDITS      = 20,
235         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
236         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
237         MDD_TXN_LINK_CREDITS           = 20,
238         MDD_TXN_UNLINK_CREDITS         = 20,
239         MDD_TXN_RENAME_CREDITS         = 20,
240         MDD_TXN_CREATE_DATA_CREDITS    = 20,
241         MDD_TXN_MKDIR_CREDITS          = 20
242 };
243
244 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
245 static const struct mdd_txn_op_descr opname = { \
246         .mod_op      = opname ## _OP,           \
247         .mod_credits = opname ## _CREDITS,      \
248 }
249
250 /*
251  * number of blocks to reserve for particular operations. Should be function
252  * of ... something. Stub for now.
253  */
254 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
255 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
256 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
257 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
258 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
259 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
260 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
261 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
262 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
263 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
264 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
265
266 static void mdd_txn_param_build(const struct lu_context *ctx,
267                                 const struct mdd_txn_op_descr *opd)
268 {
269         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
270 }
271
272 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
273                             lu_printer_t p, const struct lu_object *o)
274 {
275         return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
276 }
277
278 static int mdd_object_exists(const struct lu_context *ctx,
279                              const struct lu_object *o)
280 {
281         return lu_object_exists(ctx, lu_object_next(o));
282 }
283
284 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
285 {
286         int rc;
287         struct dt_object *root;
288         ENTRY;
289
290         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
291                              &mdd->mdd_root_fid);
292         if (!IS_ERR(root)) {
293                 LASSERT(root != NULL);
294                 lu_object_put(ctx, &root->do_lu);
295                 rc = 0;
296         } else
297                 rc = PTR_ERR(root);
298         RETURN(rc);
299 }
300
301 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
302 {
303         /*create PENDING and OBJECTS dir for open and llog*/
304         return 0;
305 }
306
307 static int mdd_fs_cleanup(struct mdd_device *mdd)
308 {
309         /*create PENDING and OBJECTS dir for open and llog*/
310         return 0;
311 }
312
313 static int mdd_device_init(const struct lu_context *ctx,
314                            struct lu_device *d, struct lu_device *next)
315 {
316         struct mdd_device *mdd = lu2mdd_dev(d);
317         int rc;
318         ENTRY;
319
320         mdd->mdd_child = lu2dt_dev(next);
321
322         rc = mdd_fs_setup(ctx, mdd);
323         if (rc)
324                 mdd_fs_cleanup(mdd);
325         RETURN(rc);
326 }
327
328 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
329                                          struct lu_device *d)
330 {
331         struct mdd_device *m = lu2mdd_dev(d);
332         struct lu_device *next = &m->mdd_child->dd_lu_dev;
333
334         dt_device_fini(&m->mdd_lov_dev);
335
336         return next;
337 }
338
339 static int mdd_process_config(const struct lu_context *ctxt,
340                               struct lu_device *d, struct lustre_cfg *cfg)
341 {
342         struct mdd_device *m    = lu2mdd_dev(d);
343         struct dt_device  *dt   = m->mdd_child;
344         struct lu_device  *next = &dt->dd_lu_dev;
345         char              *dev = lustre_cfg_string(cfg, 0);
346         int rc;
347
348         switch(cfg->lcfg_command) {
349         case LCFG_SETUP:
350                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
351                 if (rc)
352                         GOTO(out, rc);
353                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
354
355                 rc = mdd_mount(ctxt, m);
356                 if (rc)
357                         GOTO(out, rc);
358                 rc = mdd_init_obd(ctxt, m, dev);
359                 if (rc) {
360                         CERROR("lov init error %d \n", rc);
361                         GOTO(out, rc);
362                 }
363                 break;
364         default:
365                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
366                 break;
367         }
368 out:
369         RETURN(rc);
370 }
371
372 struct lu_device_operations mdd_lu_ops = {
373         .ldo_object_alloc   = mdd_object_alloc,
374         .ldo_process_config = mdd_process_config,
375 };
376
377 static struct lu_object_operations mdd_lu_obj_ops = {
378         .loo_object_init    = mdd_object_init,
379         .loo_object_free    = mdd_object_free,
380         .loo_object_print   = mdd_object_print,
381         .loo_object_exists  = mdd_object_exists,
382 };
383
384 static void mdd_lock(const struct lu_context *ctxt,
385                      struct mdd_object *obj, enum dt_lock_mode mode)
386 {
387         struct dt_object  *next = mdd_object_child(obj);
388
389         next->do_ops->do_lock(ctxt, next, mode);
390 }
391
392 static void mdd_unlock(const struct lu_context *ctxt,
393                        struct mdd_object *obj, enum dt_lock_mode mode)
394 {
395         struct dt_object  *next = mdd_object_child(obj);
396
397         next->do_ops->do_unlock(ctxt, next, mode);
398 }
399
400 static void mdd_lock2(const struct lu_context *ctxt,
401                       struct mdd_object *o0, struct mdd_object *o1)
402 {
403         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
404         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
405 }
406
407 static void mdd_unlock2(const struct lu_context *ctxt,
408                         struct mdd_object *o0, struct mdd_object *o1)
409 {
410         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
411         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
412 }
413
414 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
415                                        struct mdd_device *mdd)
416 {
417         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
418
419         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
420 }
421
422 static void mdd_trans_stop(const struct lu_context *ctxt,
423                            struct mdd_device *mdd, struct thandle *handle)
424 {
425         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
426 }
427
428 static int __mdd_object_create(const struct lu_context *ctxt,
429                                struct mdd_object *obj, struct md_attr *ma,
430                                struct thandle *handle)
431 {
432         struct dt_object *next;
433         struct lu_attr *attr = &ma->ma_attr;
434         int rc;
435         ENTRY;
436
437         if (!lu_object_exists(ctxt, mdd2lu_obj(obj))) {
438                 next = mdd_object_child(obj);
439                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
440         } else
441                 rc = -EEXIST;
442
443         LASSERT(ergo(rc == 0, lu_object_exists(ctxt, mdd2lu_obj(obj))));
444
445         RETURN(rc);
446 }
447
448 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
449                           const struct lu_attr *attr, struct thandle *handle)
450 {
451         struct dt_object *next;
452
453         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
454         next = mdd_object_child(md2mdd_obj(obj));
455         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
456 }
457
458 static int mdd_attr_set(const struct lu_context *ctxt,
459                         struct md_object *obj, const struct lu_attr *attr)
460 {
461         struct mdd_object *mdo = md2mdd_obj(obj);
462         struct mdd_device *mdd = mdo2mdd(obj);
463         struct thandle *handle;
464         int  rc;
465         ENTRY;
466
467         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
468         handle = mdd_trans_start(ctxt, mdd);
469         if (IS_ERR(handle))
470                 RETURN(PTR_ERR(handle));
471
472         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
473         rc = __mdd_attr_set(ctxt, obj, attr, handle);
474         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
475
476         mdd_trans_stop(ctxt, mdd, handle);
477
478         RETURN(rc);
479 }
480
481 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
482                            struct mdd_object *obj, const void *buf,
483                            int buf_len, const char *name, int fl,
484                            struct thandle *handle)
485 {
486         struct dt_object *next;
487
488         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
489         next = mdd_object_child(obj);
490         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name, fl,
491                                           handle);
492 }
493
494 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct md_object *obj,
495                       const void *buf, int buf_len, const char *name, int fl,
496                       struct thandle *handle)
497 {
498         struct mdd_object *mdo = md2mdd_obj(obj);
499         struct mdd_device *mdd = mdo2mdd(obj);
500         int  rc;
501         ENTRY;
502
503
504         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
505         rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
506                              fl, handle);
507         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
508
509         RETURN(rc);
510 }
511
512 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
513                          const void *buf, int buf_len, const char *name, int fl)
514 {
515         struct mdd_device *mdd = mdo2mdd(obj);
516         struct thandle *handle;
517         int  rc;
518         ENTRY;
519
520         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
521         handle = mdd_trans_start(ctxt, mdd);
522         if (IS_ERR(handle))
523                 RETURN(PTR_ERR(handle));
524
525         rc = mdd_xattr_set_txn(ctxt, obj, buf, buf_len, name, fl, handle);
526
527         mdd_trans_stop(ctxt, mdd, handle);
528
529         RETURN(rc);
530 }
531
532 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
533                            struct mdd_object *obj,
534                            const char *name, struct thandle *handle)
535 {
536         struct dt_object *next;
537
538         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
539         next = mdd_object_child(obj);
540         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
541 }
542
543 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
544                   const char *name)
545 {
546         struct mdd_object *mdo = md2mdd_obj(obj);
547         struct mdd_device *mdd = mdo2mdd(obj);
548         struct thandle *handle;
549         int  rc;
550         ENTRY;
551
552         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
553         handle = mdd_trans_start(ctxt, mdd);
554         if (IS_ERR(handle))
555                 RETURN(PTR_ERR(handle));
556
557         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
558         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
559         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
560
561         mdd_trans_stop(ctxt, mdd, handle);
562
563         RETURN(rc);
564 }
565
566 static int __mdd_index_insert(const struct lu_context *ctxt,
567                               struct mdd_object *pobj, const struct lu_fid *lf,
568                               const char *name, struct thandle *handle)
569 {
570         int rc;
571         struct dt_object *next = mdd_object_child(pobj);
572         ENTRY;
573
574         if (dt_try_as_dir(ctxt, next))
575                 rc = next->do_index_ops->dio_insert(ctxt, next,
576                                          (struct dt_rec *)lf,
577                                          (struct dt_key *)name, handle);
578         else
579                 rc = -ENOTDIR;
580         RETURN(rc);
581 }
582
583 static int __mdd_index_delete(const struct lu_context *ctxt,
584                               struct mdd_object *pobj, const char *name,
585                               struct thandle *handle)
586 {
587         int rc;
588         struct dt_object *next = mdd_object_child(pobj);
589         ENTRY;
590
591         if (dt_try_as_dir(ctxt, next))
592                 rc = next->do_index_ops->dio_delete(ctxt, next,
593                                         (struct dt_key *)name, handle);
594         else
595                 rc = -ENOTDIR;
596         RETURN(rc);
597 }
598
599 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
600                     struct md_object *src_obj, const char *name)
601 {
602         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
603         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
604         struct mdd_device *mdd = mdo2mdd(src_obj);
605         struct thandle *handle;
606         int rc;
607         ENTRY;
608
609         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
610         handle = mdd_trans_start(ctxt, mdd);
611         if (IS_ERR(handle))
612                 RETURN(PTR_ERR(handle));
613
614         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
615
616         /*
617          * XXX Check that link can be added to the child.
618          */
619
620         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
621                                 name, handle);
622         if (rc == 0)
623                 __mdd_ref_add(ctxt, mdd_sobj, handle);
624
625         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
626         mdd_trans_stop(ctxt, mdd, handle);
627         RETURN(rc);
628 }
629
630 /*
631  * Check that @dir contains no entries except (possibly) dot and dotdot.
632  *
633  * Returns:
634  *
635  *             0        empty
636  *    -ENOTEMPTY        not empty
637  *           -ve        other error
638  *
639  */
640 static int mdd_dir_is_empty(const struct lu_context *ctx,
641                             struct mdd_object *dir)
642 {
643         struct dt_it     *it;
644         struct dt_object *obj;
645         struct dt_it_ops *iops;
646         int result;
647
648         obj = mdd_object_child(dir);
649         iops = &obj->do_index_ops->dio_it;
650         it = iops->init(ctx, obj);
651         if (it != NULL) {
652                 result = iops->get(ctx, it, (const void *)"");
653                 if (result > 0) {
654                         int i;
655                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
656                                 result = iops->next(ctx, it);
657                         iops->put(ctx, it);
658                         if (result == 0)
659                                 result = -ENOTEMPTY;
660                         else if (result == +1)
661                                 result = 0;
662                 } else if (result == 0)
663                         /*
664                          * Huh? Index contains no zero key?
665                          */
666                         result = -EIO;
667                 iops->fini(ctx, it);
668         } else
669                 result = -ENOMEM;
670         return result;
671 }
672
673 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
674                       struct md_object *cobj, const char *name,
675                       struct md_attr *ma)
676 {
677         struct mdd_device *mdd = mdo2mdd(pobj);
678         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
679         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
680         struct thandle    *handle;
681         int rc;
682         ENTRY;
683
684         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
685                 if (!S_ISDIR(ma->ma_attr.la_mode))
686                         RETURN(-EISDIR);
687         } else if (S_ISDIR(ma->ma_attr.la_mode))
688                 RETURN(-ENOTDIR);
689
690         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
691         handle = mdd_trans_start(ctxt, mdd);
692         if (IS_ERR(handle))
693                 RETURN(PTR_ERR(handle));
694
695         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
696
697         /* rmdir checks */
698         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
699                 rc = mdd_dir_is_empty(ctxt, mdd_cobj);
700                 if (rc != 0)
701                         GOTO(cleanup, rc);
702         }
703
704         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
705         if (rc)
706                 GOTO(cleanup, rc);
707
708         __mdd_ref_del(ctxt, mdd_cobj, handle);
709         if (S_ISDIR(ma->ma_attr.la_mode)) {
710                 /* unlink dot */
711                 __mdd_ref_del(ctxt, mdd_cobj, handle);
712                 /* unlink dotdot */
713                 __mdd_ref_del(ctxt, mdd_pobj, handle);
714         }
715         mdd_attr_get(ctxt, cobj, ma);
716
717 #if 0
718         /*This should be moved to handle last unlink. wait open
719          * orphan prototype finished*/
720         if (S_ISREG(ma->ma_attr.la_mode) && (ma->ma_valid & MA_LOV) &&
721             ma->ma_attr.la_nlink == 0 && cobj->mo_lu.lo_header->loh_ref == 1) {
722                 rc = mdd_unlink_log(ctxt, mdd, mdd_cobj, ma);
723         }
724 #endif
725 cleanup:
726         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
727         mdd_trans_stop(ctxt, mdd, handle);
728         RETURN(rc);
729 }
730
731 static int mdd_parent_fid(const struct lu_context *ctxt,
732                           struct mdd_object *obj,
733                           struct lu_fid *fid)
734 {
735         int rc;
736
737         rc = mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
738
739         return rc;
740 }
741
742 static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj)
743 {
744         return lu_object_fid(&obj->mod_obj.mo_lu);
745 }
746
747 static int mdd_is_parent(const struct lu_context *ctxt,
748                          struct mdd_device *mdd,
749                          struct mdd_object *p1,
750                          struct mdd_object *p2)
751 {
752         struct lu_fid * pfid;
753         int rc;
754
755         pfid = &mdd_ctx_info(ctxt)->mti_fid;
756         do {
757                 rc = mdd_parent_fid(ctxt, p1, pfid);
758                 if (rc)
759                         RETURN(rc);
760                 if (lu_fid_eq(pfid, mdo2fid(p2))) {
761                         RETURN(1);
762                 }
763         } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
764
765         RETURN(rc);
766 }
767
768 static int mdd_rename_lock(const struct lu_context *ctxt,
769                            struct mdd_device *mdd,
770                            struct mdd_object *src_pobj,
771                            struct mdd_object *tgt_pobj)
772 {
773         ENTRY;
774
775         if (src_pobj == tgt_pobj) {
776                 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
777                 RETURN(0);
778         }
779         /*compared the parent child relationship of src_p&tgt_p*/
780         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
781                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
782                 RETURN(0);
783         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
784                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
785                 RETURN(0);
786         }
787         if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
788                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
789                 RETURN(0);
790         }
791         if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
792                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
793                 RETURN(0);
794         }
795
796         mdd_lock2(ctxt, src_pobj, tgt_pobj);
797         RETURN(0);
798 }
799
800 static void mdd_rename_unlock(const struct lu_context *ctxt,
801                               struct mdd_object *src_pobj,
802                               struct mdd_object *tgt_pobj)
803 {
804         mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
805         if (src_pobj != tgt_pobj)
806                 mdd_unlock(ctxt, tgt_pobj, DT_WRITE_LOCK);
807 }
808
809 static umode_t mdd_object_type(const struct mdd_object *obj)
810 {
811         return obj->mod_obj.mo_lu.lo_header->loh_attr;
812 }
813
814 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
815                       struct md_object *tgt_pobj, const struct lu_fid *lf,
816                       const char *sname, struct md_object *tobj,
817                       const char *tname)
818 {
819         struct mdd_device *mdd = mdo2mdd(src_pobj);
820         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
821         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
822         struct mdd_object *mdd_tobj = NULL;
823         struct thandle *handle;
824         int rc, locked = 0;
825         ENTRY;
826
827         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
828         handle = mdd_trans_start(ctxt, mdd);
829         if (IS_ERR(handle))
830                 RETURN(PTR_ERR(handle));
831
832         /*FIXME: Should consider tobj and sobj too in rename_lock*/
833         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
834         if (rc)
835                 GOTO(cleanup, rc);
836         locked = 1;
837         if (tobj)
838                 mdd_tobj = md2mdd_obj(tobj);
839
840         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
841         if (rc)
842                 GOTO(cleanup, rc);
843         /*FIXME: no sobj now, we should check sobj type, if it is dir,
844          * the nlink of its parent should be dec
845          */
846         if (tobj) {
847                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
848                 if (rc)
849                         GOTO(cleanup, rc);
850         }
851
852         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
853         if (rc)
854                 GOTO(cleanup, rc);
855
856
857         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
858                 __mdd_ref_del(ctxt, mdd_tobj, handle);
859                 if (S_ISDIR(mdd_object_type(mdd_tobj)))
860                         __mdd_ref_del(ctxt, mdd_tpobj, handle);
861                 /* XXX: mdd_attr_get(ctxt, tobj, ma); needed here */
862         }
863
864 cleanup:
865        /*FIXME: should we do error handling here?*/
866         if (locked)
867                 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
868         mdd_trans_stop(ctxt, mdd, handle);
869         RETURN(rc);
870 }
871
872 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
873                       const char *name, struct lu_fid* fid)
874 {
875         struct mdd_object   *mdo = md2mdd_obj(pobj);
876         struct dt_object    *dir = mdd_object_child(mdo);
877         struct dt_rec       *rec    = (struct dt_rec *)fid;
878         const struct dt_key *key = (const struct dt_key *)name;
879         int rc;
880         ENTRY;
881
882         mdd_lock(ctxt, mdo, DT_READ_LOCK);
883         if (S_ISDIR(mdd_object_type(mdo)) && dt_try_as_dir(ctxt, dir))
884                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
885         else
886                 rc = -ENOTDIR;
887         mdd_unlock(ctxt, mdo, DT_READ_LOCK);
888         RETURN(rc);
889 }
890
891 static int __mdd_object_initialize(const struct lu_context *ctxt,
892                                    struct mdd_object *parent,
893                                    struct mdd_object *child,
894                                    struct md_attr *ma, struct thandle *handle)
895 {
896         int rc;
897
898         rc = 0;
899         if (S_ISDIR(ma->ma_attr.la_mode)) {
900                 __mdd_ref_add(ctxt, child, handle);
901                 rc = __mdd_index_insert(ctxt, child,
902                                         mdo2fid(child), dot, handle);
903                 if (rc == 0) {
904                         rc = __mdd_index_insert(ctxt, child, mdo2fid(parent),
905                                                 dotdot, handle);
906                         if (rc == 0)
907                                 __mdd_ref_add(ctxt, parent, handle);
908                         else {
909                                 int rc2;
910
911                                 rc2 = __mdd_index_delete(ctxt,
912                                                          child, dot, handle);
913                                 if (rc2 != 0)
914                                         CERROR("Failure to cleanup after dotdot"
915                                                " creation: %d (%d)\n", rc2, rc);
916                                 else
917                                         __mdd_ref_del(ctxt, child, handle);
918                         }
919                 }
920         }
921         return rc;
922 }
923
924 static int mdd_create_data(const struct lu_context *ctxt,
925                            struct md_object *pobj, struct md_object *cobj,
926                            const void *eadata, int eadatasize,
927                            struct md_attr *ma)
928 {
929         struct mdd_device *mdd = mdo2mdd(pobj);
930         struct mdd_object *mdo = md2mdd_obj(pobj);
931         struct mdd_object *son = md2mdd_obj(cobj);
932         struct lu_attr    *attr = &ma->ma_attr;
933         struct lov_mds_md *lmm = NULL;
934         struct thandle    *handle;
935         int lmm_size = 0;
936         int rc;
937         ENTRY;
938
939         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
940         handle = mdd_trans_start(ctxt, mdd);
941         if (IS_ERR(handle))
942                 RETURN(PTR_ERR(handle));
943
944         /*
945          * XXX: should take transaction handle.
946          */
947         rc = mdd_lov_create(ctxt, mdd, mdo, son, &lmm, &lmm_size, eadata,
948                             eadatasize, attr);
949         if (rc == 0) {
950                 rc = mdd_lov_set_md(ctxt, pobj, cobj, lmm,
951                                     lmm_size, attr->la_mode, handle);
952                 if (rc == 0)
953                         rc = mdd_attr_get(ctxt, cobj, ma);
954         }
955
956         mdd_trans_stop(ctxt, mdd, handle);
957         RETURN(rc);
958 }
959
960 static int mdd_create_sanity_check(const struct lu_context *ctxt,
961                                    struct mdd_device *mdd,
962                                    struct md_object *pobj,
963                                    const char *name, struct md_attr *ma)
964 {
965         struct lu_fid *fid;
966         int rc;
967
968         fid = &mdd_ctx_info(ctxt)->mti_fid;
969         rc = mdd_lookup(ctxt, pobj, name, fid);
970         if (rc != -ENOENT) {
971                 rc = rc ? rc : -EEXIST;
972                 RETURN(rc);
973         }
974
975         switch (ma->ma_attr.la_mode & S_IFMT) {
976         case S_IFREG:
977         case S_IFDIR:
978         case S_IFLNK:
979         case S_IFCHR:
980         case S_IFBLK:
981         case S_IFIFO:
982         case S_IFSOCK:
983                 rc = 0;
984                 break;
985         default:
986                 rc = -EINVAL;
987                 break;
988         }
989         RETURN(rc);
990 }
991
992 /*
993  * Create object and insert it into namespace.
994  */
995 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
996                       const char *name, struct md_object *child,
997                       const char *target_name, const void *eadata,
998                       int eadatasize, struct md_attr* ma)
999 {
1000         struct mdd_device *mdd = mdo2mdd(pobj);
1001         struct mdd_object *mdo = md2mdd_obj(pobj);
1002         struct mdd_object *son = md2mdd_obj(child);
1003         struct lu_attr *attr = &ma->ma_attr;
1004         struct lov_mds_md *lmm = NULL;
1005         struct thandle *handle;
1006         int rc, created = 0, inserted = 0, lmm_size = 0;
1007         ENTRY;
1008
1009         /* sanity checks before big job */
1010         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1011         if (rc)
1012                 RETURN(rc);
1013         /* no RPC inside the transaction, so OST objects should be created at
1014          * first */
1015         if (S_ISREG(attr->la_mode)) {
1016                 rc = mdd_lov_create(ctxt, mdd, mdo, son, &lmm, &lmm_size,
1017                                     eadata, eadatasize, attr);
1018                 if (rc)
1019                         RETURN(rc);
1020         }
1021         
1022         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1023         handle = mdd_trans_start(ctxt, mdd);
1024         if (IS_ERR(handle))
1025                 RETURN(PTR_ERR(handle));
1026
1027         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1028
1029         /*
1030          * XXX check that link can be added to the parent in mkdir case.
1031          */
1032
1033         /*
1034          * Two operations have to be performed:
1035          *
1036          *  - allocation of new object (->do_create()), and
1037          *
1038          *  - insertion into parent index (->dio_insert()).
1039          *
1040          * Due to locking, operation order is not important, when both are
1041          * successful, *but* error handling cases are quite different:
1042          *
1043          *  - if insertion is done first, and following object creation fails,
1044          *  insertion has to be rolled back, but this operation might fail
1045          *  also leaving us with dangling index entry.
1046          *
1047          *  - if creation is done first, is has to be undone if insertion
1048          *  fails, leaving us with leaked space, which is neither good, nor
1049          *  fatal.
1050          *
1051          * It seems that creation-first is simplest solution, but it is
1052          * sub-optimal in the frequent
1053          *
1054          *         $ mkdir foo
1055          *         $ mkdir foo
1056          *
1057          * case, because second mkdir is bound to create object, only to
1058          * destroy it immediately.
1059          *
1060          * Note that local file systems do
1061          *
1062          *     0. lookup -> -EEXIST
1063          *
1064          *     1. create
1065          *
1066          *     2. insert
1067          *
1068          * Maybe we should do the same. For now: creation-first.
1069          */
1070
1071         rc = __mdd_object_create(ctxt, son, ma, handle);
1072         if (rc)
1073                 GOTO(cleanup, rc);
1074
1075         created = 1;
1076
1077         rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
1078         if (rc)
1079                 /*
1080                  * Object has no links, so it will be destroyed when last
1081                  * reference is released. (XXX not now.)
1082                  */
1083                 GOTO(cleanup, rc);
1084
1085         rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
1086                                 name, handle);
1087
1088         if (rc)
1089                 GOTO(cleanup, rc);
1090
1091         inserted = 1;
1092         rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size, attr->la_mode, 
1093                             handle);
1094         if (rc) {
1095                 CERROR("error on stripe info copy %d \n", rc);
1096                 GOTO(cleanup, rc);
1097         }
1098         
1099         if (S_ISLNK(attr->la_mode)) {
1100                 struct dt_object *dt = mdd_object_child(son);
1101                 loff_t pos = 0;
1102                 int sym_len = strlen(target_name);
1103                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1104                                                 sym_len, &pos, handle);
1105                 if (rc == sym_len)
1106                         rc = 0;
1107                 else
1108                         rc = -EFAULT;
1109         }
1110         mdd_attr_get(ctxt, child, ma);
1111 cleanup:
1112         if (rc && created) {
1113                 int rc2 = 0;
1114
1115                 if (inserted) {
1116                         rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
1117                         if (rc2)
1118                                 CERROR("error can not cleanup destroy %d\n",
1119                                        rc2);
1120                 }
1121                 if (rc2 == 0)
1122                         __mdd_ref_del(ctxt, son, handle);
1123         }
1124         if (lmm)
1125                 OBD_FREE(lmm, lmm_size);
1126         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1127         mdd_trans_stop(ctxt, mdd, handle);
1128         RETURN(rc);
1129 }
1130 /* partial operation */
1131 static int mdd_object_create(const struct lu_context *ctxt,
1132                              struct md_object *obj, struct md_attr *ma)
1133 {
1134
1135         struct mdd_device *mdd = mdo2mdd(obj);
1136         struct thandle *handle;
1137         int rc;
1138         ENTRY;
1139
1140         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1141         handle = mdd_trans_start(ctxt, mdd);
1142         if (IS_ERR(handle))
1143                 RETURN(PTR_ERR(handle));
1144
1145         rc = __mdd_object_create(ctxt, md2mdd_obj(obj), ma, handle);
1146 /* XXX: parent fid is needed here
1147         rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
1148 */
1149         mdd_attr_get(ctxt, md2mdd_obj(obj), ma);
1150
1151         mdd_trans_stop(ctxt, mdd, handle);
1152
1153         RETURN(rc);
1154 }
1155 /* partial operation */
1156 static int mdd_name_insert(const struct lu_context *ctxt,
1157                            struct md_object *pobj,
1158                            const char *name, const struct lu_fid *fid)
1159 {
1160         struct mdd_device *mdd = mdo2mdd(pobj);
1161         struct mdd_object *mdo = md2mdd_obj(pobj);
1162         struct thandle *handle;
1163         int rc;
1164         ENTRY;
1165
1166         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1167         handle = mdd_trans_start(ctxt, mdd);
1168         if (IS_ERR(handle))
1169                 RETURN(PTR_ERR(handle));
1170
1171         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1172
1173         rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
1174
1175         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1176         mdd_trans_stop(ctxt, mdd, handle);
1177         RETURN(rc);
1178 }
1179
1180 static int mdd_name_remove(const struct lu_context *ctxt,
1181                            struct md_object *pobj,
1182                            const char *name)
1183 {
1184         struct mdd_device *mdd = mdo2mdd(pobj);
1185         struct mdd_object *mdo = md2mdd_obj(pobj);
1186         struct thandle *handle;
1187         int rc;
1188         ENTRY;
1189
1190         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1191         handle = mdd_trans_start(ctxt, mdd);
1192         if (IS_ERR(handle))
1193                 RETURN(PTR_ERR(handle));
1194
1195         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
1196
1197         rc = __mdd_index_delete(ctxt, mdo, name, handle);
1198
1199         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
1200
1201         mdd_trans_stop(ctxt, mdd, handle);
1202         RETURN(rc);
1203 }
1204
1205 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1206                           struct md_object *tobj, const struct lu_fid *lf,
1207                           const char *name)
1208 {
1209         struct mdd_device *mdd = mdo2mdd(pobj);
1210         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1211         struct mdd_object *mdd_tobj = NULL;
1212         struct thandle *handle;
1213         int rc;
1214         ENTRY;
1215
1216         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1217         handle = mdd_trans_start(ctxt, mdd);
1218         if (IS_ERR(handle))
1219                 RETURN(PTR_ERR(handle));
1220
1221         if (tobj)
1222                 mdd_tobj = md2mdd_obj(tobj);
1223
1224         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1225
1226         if (tobj) {
1227                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1228                 if (rc)
1229                         GOTO(cleanup, rc);
1230         }
1231
1232         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
1233         if (rc)
1234                 GOTO(cleanup, rc);
1235
1236         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu))
1237                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1238 cleanup:
1239        /*FIXME: should we do error handling here?*/
1240         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1241         mdd_trans_stop(ctxt, mdd, handle);
1242         RETURN(rc);
1243 }
1244
1245 static int mdd_root_get(const struct lu_context *ctx,
1246                         struct md_device *m, struct lu_fid *f)
1247 {
1248         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1249
1250         ENTRY;
1251         *f = mdd->mdd_root_fid;
1252         RETURN(0);
1253 }
1254
1255 static int mdd_statfs(const struct lu_context *ctx,
1256                       struct md_device *m, struct kstatfs *sfs)
1257 {
1258         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1259         int rc;
1260
1261         ENTRY;
1262
1263         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int mdd_get_maxsize(const struct lu_context *ctx,
1269                            struct md_device *m, int *md_size,
1270                            int *cookie_size)
1271 {
1272         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1273         int rc;
1274
1275         ENTRY;
1276
1277         rc = mdd_lov_mdsize(ctx, mdd, md_size);
1278         if (rc)
1279                 RETURN(rc);
1280         rc = mdd_lov_cookiesize(ctx, mdd, cookie_size);
1281
1282         RETURN(rc);
1283 }
1284
1285 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1286                          struct thandle *handle)
1287 {
1288         struct dt_object *next;
1289
1290         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1291         next = mdd_object_child(obj);
1292         next->do_ops->do_ref_add(ctxt, next, handle);
1293 }
1294
1295 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1296 {
1297         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1298         struct mdd_device *mdd = mdo2mdd(obj);
1299         struct thandle *handle;
1300         ENTRY;
1301
1302         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1303         handle = mdd_trans_start(ctxt, mdd);
1304         if (IS_ERR(handle))
1305                 RETURN(-ENOMEM);
1306
1307         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1308         __mdd_ref_add(ctxt, mdd_obj, handle);
1309         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1310
1311         mdd_trans_stop(ctxt, mdd, handle);
1312
1313         RETURN(0);
1314 }
1315
1316 static void
1317 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1318               struct thandle *handle)
1319 {
1320         struct dt_object *next = mdd_object_child(obj);
1321
1322         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1323
1324         next->do_ops->do_ref_del(ctxt, next, handle);
1325 }
1326
1327 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1328                        struct md_attr *ma)
1329 {
1330         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1331         struct mdd_device *mdd = mdo2mdd(obj);
1332         struct thandle *handle;
1333         ENTRY;
1334
1335         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1336         handle = mdd_trans_start(ctxt, mdd);
1337         if (IS_ERR(handle))
1338                 RETURN(-ENOMEM);
1339
1340         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1341         __mdd_ref_del(ctxt, mdd_obj, handle);
1342         mdd_attr_get(ctxt, obj, ma);
1343         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1344
1345         mdd_trans_stop(ctxt, mdd, handle);
1346
1347         RETURN(0);
1348 }
1349
1350 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1351 {
1352         return 0;
1353 }
1354
1355 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1356 {
1357         return 0;
1358 }
1359
1360 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
1361                         const struct lu_rdpg *rdpg)
1362 {
1363         struct dt_object *next;
1364         struct mdd_object *mdd = md2mdd_obj(obj);
1365         int rc;
1366
1367         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(md2mdd_obj(obj))));
1368         next = mdd_object_child(md2mdd_obj(obj));
1369
1370         mdd_lock(ctxt, mdd, DT_READ_LOCK);
1371         if (S_ISDIR(mdd_object_type(mdd)) && dt_try_as_dir(ctxt, next))
1372                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
1373         else
1374                 rc = -ENOTDIR;
1375         mdd_unlock(ctxt, mdd, DT_READ_LOCK);
1376         return rc;
1377 }
1378
1379 struct md_device_operations mdd_ops = {
1380         .mdo_root_get       = mdd_root_get,
1381         .mdo_statfs         = mdd_statfs,
1382         .mdo_get_maxsize    = mdd_get_maxsize,
1383 };
1384
1385 static struct md_dir_operations mdd_dir_ops = {
1386         .mdo_lookup        = mdd_lookup,
1387         .mdo_create        = mdd_create,
1388         .mdo_rename        = mdd_rename,
1389         .mdo_link          = mdd_link,
1390         .mdo_unlink        = mdd_unlink,
1391         .mdo_name_insert   = mdd_name_insert,
1392         .mdo_name_remove   = mdd_name_remove,
1393         .mdo_rename_tgt    = mdd_rename_tgt,
1394         .mdo_create_data   = mdd_create_data
1395 };
1396
1397
1398 static struct md_object_operations mdd_obj_ops = {
1399         .moo_attr_get      = mdd_attr_get,
1400         .moo_attr_set      = mdd_attr_set,
1401         .moo_xattr_get     = mdd_xattr_get,
1402         .moo_xattr_set     = mdd_xattr_set,
1403         .moo_xattr_list    = mdd_xattr_list,
1404         .moo_xattr_del     = mdd_xattr_del,
1405         .moo_object_create = mdd_object_create,
1406         .moo_ref_add       = mdd_ref_add,
1407         .moo_ref_del       = mdd_ref_del,
1408         .moo_open          = mdd_open,
1409         .moo_close         = mdd_close,
1410         .moo_readpage      = mdd_readpage,
1411         .moo_readlink      = mdd_readlink
1412 };
1413
1414 static struct obd_ops mdd_obd_device_ops = {
1415         .o_owner = THIS_MODULE
1416 };
1417
1418 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1419                                    struct lu_device_type *t,
1420                                    struct lustre_cfg *lcfg)
1421 {
1422         struct lu_device  *l;
1423         struct mdd_device *m;
1424
1425         OBD_ALLOC_PTR(m);
1426         if (m == NULL) {
1427                 l = ERR_PTR(-ENOMEM);
1428         } else {
1429                 md_device_init(&m->mdd_md_dev, t);
1430                 l = mdd2lu_dev(m);
1431                 l->ld_ops = &mdd_lu_ops;
1432                 m->mdd_md_dev.md_ops = &mdd_ops;
1433         }
1434
1435         return l;
1436 }
1437
1438 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1439 {
1440         struct mdd_device *m = lu2mdd_dev(lu);
1441
1442         LASSERT(atomic_read(&lu->ld_ref) == 0);
1443         md_device_fini(&m->mdd_md_dev);
1444         OBD_FREE_PTR(m);
1445 }
1446
1447 static int mdd_type_init(struct lu_device_type *t)
1448 {
1449         return lu_context_key_register(&mdd_thread_key);
1450 }
1451
1452 static void mdd_type_fini(struct lu_device_type *t)
1453 {
1454         lu_context_key_degister(&mdd_thread_key);
1455 }
1456
1457 static struct lu_device_type_operations mdd_device_type_ops = {
1458         .ldto_init = mdd_type_init,
1459         .ldto_fini = mdd_type_fini,
1460
1461         .ldto_device_alloc = mdd_device_alloc,
1462         .ldto_device_free  = mdd_device_free,
1463
1464         .ldto_device_init    = mdd_device_init,
1465         .ldto_device_fini    = mdd_device_fini
1466 };
1467
1468 static struct lu_device_type mdd_device_type = {
1469         .ldt_tags     = LU_DEVICE_MD,
1470         .ldt_name     = LUSTRE_MDD0_NAME,
1471         .ldt_ops      = &mdd_device_type_ops,
1472         .ldt_ctx_tags = LCT_MD_THREAD
1473 };
1474
1475 static void *mdd_key_init(const struct lu_context *ctx,
1476                           struct lu_context_key *key)
1477 {
1478         struct mdd_thread_info *info;
1479
1480         OBD_ALLOC_PTR(info);
1481         if (info == NULL)
1482                 info = ERR_PTR(-ENOMEM);
1483         return info;
1484 }
1485
1486 static void mdd_key_fini(const struct lu_context *ctx,
1487                          struct lu_context_key *key, void *data)
1488 {
1489         struct mdd_thread_info *info = data;
1490         OBD_FREE_PTR(info);
1491 }
1492
1493 static struct lu_context_key mdd_thread_key = {
1494         .lct_tags = LCT_MD_THREAD,
1495         .lct_init = mdd_key_init,
1496         .lct_fini = mdd_key_fini
1497 };
1498
1499 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1500         { 0 }
1501 };
1502
1503 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1504         { 0 }
1505 };
1506
1507 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1508
1509 static int __init mdd_mod_init(void)
1510 {
1511         struct lprocfs_static_vars lvars;
1512
1513         lprocfs_init_vars(mdd, &lvars);
1514         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1515                                    LUSTRE_MDD0_NAME, &mdd_device_type);
1516 }
1517
1518 static void __exit mdd_mod_exit(void)
1519 {
1520         class_unregister_type(LUSTRE_MDD0_NAME);
1521 }
1522
1523 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1524 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1525 MODULE_LICENSE("GPL");
1526
1527 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);