Whamcloud - gitweb
remove pleonastic "_object" from dt_object_operations methods
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51                            struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void mdd_lock(const struct lu_context *ctx,
54                      struct mdd_object *obj, enum dt_lock_mode mode);
55 static void mdd_unlock(const struct lu_context *ctx,
56                        struct mdd_object *obj, enum dt_lock_mode mode);
57 static int __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
58                          struct thandle *handle);
59 static int __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
60                          struct thandle *handle, struct md_attr *);
61 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
62                       const char *name, struct lu_fid* fid);
63 static struct md_object_operations mdd_obj_ops;
64 static struct md_dir_operations    mdd_dir_ops;
65 static struct lu_object_operations mdd_lu_obj_ops;
66
67 static struct lu_context_key       mdd_thread_key;
68
69 const char *mdd_root_dir_name = "root";
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdo;
85
86         OBD_ALLOC_PTR(mdo);
87         if (mdo != NULL) {
88                 struct lu_object *o;
89                 
90                 o = &mdo->mod_obj.mo_lu;
91                 lu_object_init(o, NULL, d);
92                 mdo->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdo->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 o->lo_ops = &mdd_lu_obj_ops;
95                 return &mdo->mod_obj.mo_lu;
96         } else {
97                 return NULL;
98         }
99 }
100
101 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
102 {
103         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
104         struct lu_object  *below;
105         struct lu_device  *under;
106         ENTRY;
107
108         under = &d->mdd_child->dd_lu_dev;
109         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
110
111         if (below == NULL)
112                 RETURN(-ENOMEM);
113
114         lu_object_add(o, below);
115         RETURN(0);
116 }
117
118 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
119 {
120         struct mdd_object *mdd = lu2mdd_obj(o);
121         
122         lu_object_fini(o);
123         OBD_FREE_PTR(mdd);
124 }
125
126 static int mdd_attr_get(const struct lu_context *ctxt,
127                         struct md_object *obj, struct lu_attr *attr)
128 {
129         struct mdd_object *mdd_obj = md2mdd_obj(obj);
130         struct dt_object  *next;
131         int rc;
132
133         ENTRY;
134
135         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
136
137         next = mdd_object_child(mdd_obj);
138         rc = next->do_ops->do_attr_get(ctxt, next, attr);
139
140         RETURN(rc);
141 }
142
143 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
144                          void *buf, int buf_len, const char *name)
145 {
146         struct mdd_object *mdd_obj = md2mdd_obj(obj);
147         struct dt_object  *next;
148         int rc;
149
150         ENTRY;
151
152         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
153
154         next = mdd_object_child(mdd_obj);
155         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
156
157         RETURN(rc);
158 }
159
160 enum mdd_txn_op {
161         MDD_TXN_OBJECT_DESTROY_OP,
162         MDD_TXN_OBJECT_CREATE_OP,
163         MDD_TXN_ATTR_SET_OP,
164         MDD_TXN_XATTR_SET_OP,
165         MDD_TXN_INDEX_INSERT_OP,
166         MDD_TXN_INDEX_DELETE_OP,
167         MDD_TXN_LINK_OP,
168         MDD_TXN_UNLINK_OP,
169         MDD_TXN_RENAME_OP,
170         MDD_TXN_MKDIR_OP
171 };
172
173 struct mdd_txn_op_descr {
174         enum mdd_txn_op mod_op;
175         unsigned int    mod_credits;
176 };
177
178 enum {
179         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
180         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
181         MDD_TXN_ATTR_SET_CREDITS       = 20,
182         MDD_TXN_XATTR_SET_CREDITS      = 20,
183         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
184         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
185         MDD_TXN_LINK_CREDITS           = 20,
186         MDD_TXN_UNLINK_CREDITS         = 20,
187         MDD_TXN_RENAME_CREDITS         = 20,
188         MDD_TXN_MKDIR_CREDITS          = 20
189 };
190
191 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
192 static const struct mdd_txn_op_descr opname = { \
193         .mod_op      = opname ## _OP,           \
194         .mod_credits = opname ## _CREDITS,      \
195 }
196
197 /*
198  * number of blocks to reserve for particular operations. Should be function
199  * of ... something. Stub for now.
200  */
201 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
202 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
203 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
204 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
205 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
206 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
207 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
208 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
209 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
210 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
211
212 static void mdd_txn_param_build(const struct lu_context *ctx,
213                                 const struct mdd_txn_op_descr *opd)
214 {
215         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
216 }
217
218 static int mdd_object_print(const struct lu_context *ctxt,
219                             struct seq_file *f, const struct lu_object *o)
220 {
221         return seq_printf(f, LUSTRE_MDD0_NAME"-object@%p", o);
222 }
223
224 static int mdd_object_exists(const struct lu_context *ctx,
225                              const struct lu_object *o)
226 {
227         return lu_object_exists(ctx, lu_object_next(o));
228 }
229
230 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
231 {
232         int rc;
233         struct dt_object *root;
234         ENTRY;
235
236         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
237                              &mdd->mdd_root_fid);
238         if (!IS_ERR(root)) {
239                 LASSERT(root != NULL);
240                 lu_object_put(ctx, &root->do_lu);
241                 rc = 0;
242         } else
243                 rc = PTR_ERR(root);
244         RETURN(rc);
245 }
246
247 static int mdd_fs_setup(const struct lu_context *ctx, struct mdd_device *mdd)
248 {
249         /*create PENDING and OBJECTS dir for open and llog*/
250         return 0;
251 }
252
253 static int mdd_fs_cleanup(struct mdd_device *mdd)
254 {
255         /*create PENDING and OBJECTS dir for open and llog*/
256         return 0;
257 }
258
259 static int mdd_device_init(const struct lu_context *ctx,
260                            struct lu_device *d, struct lu_device *next)
261 {
262         struct mdd_device *mdd = lu2mdd_dev(d);
263         int rc;
264         ENTRY;
265
266         mdd->mdd_child = lu2dt_dev(next);
267
268         rc = mdd_fs_setup(ctx, mdd);
269         if (rc)
270                 mdd_fs_cleanup(mdd);
271         RETURN(rc);
272 }
273
274 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
275                                          struct lu_device *d)
276 {
277         struct mdd_device *m = lu2mdd_dev(d);
278         struct lu_device *next = &m->mdd_child->dd_lu_dev;
279
280         dt_device_fini(&m->mdd_lov_dev);
281
282         return next;
283 }
284
285 static int mdd_process_config(const struct lu_context *ctxt,
286                               struct lu_device *d, struct lustre_cfg *cfg)
287 {
288         struct mdd_device *m = lu2mdd_dev(d);
289         struct lu_device *next = &m->mdd_child->dd_lu_dev;
290         int rc;
291
292         switch(cfg->lcfg_command) {
293         case LCFG_SETUP:
294                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
295                 if (rc)
296                         GOTO(out, rc);
297                 rc = mdd_mount(ctxt, m);
298                 if (rc)
299                         GOTO(out, rc);
300                 rc = mdd_lov_init(ctxt, m, cfg);
301                 if (rc) {
302                         CERROR("lov init error %d \n", rc);
303                         GOTO(out, rc);
304                 }
305                 break;
306         default:
307                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
308                 break;
309         }
310 out:
311         RETURN(rc);
312 }
313
314 struct lu_device_operations mdd_lu_ops = {
315         .ldo_object_alloc   = mdd_object_alloc,
316         .ldo_process_config = mdd_process_config,
317 };
318
319 static struct lu_object_operations mdd_lu_obj_ops = {
320         .loo_object_init    = mdd_object_init,
321         .loo_object_free    = mdd_object_free,
322         .loo_object_print   = mdd_object_print,
323         .loo_object_exists  = mdd_object_exists,
324 };
325
326
327 static void mdd_lock(const struct lu_context *ctxt,
328                      struct mdd_object *obj, enum dt_lock_mode mode)
329 {
330         struct dt_object  *next = mdd_object_child(obj);
331
332         next->do_ops->do_lock(ctxt, next, mode);
333 }
334
335 static void mdd_unlock(const struct lu_context *ctxt,
336                        struct mdd_object *obj, enum dt_lock_mode mode)
337 {
338         struct dt_object  *next = mdd_object_child(obj);
339
340         next->do_ops->do_unlock(ctxt, next, mode);
341 }
342
343 static void mdd_lock2(const struct lu_context *ctxt,
344                       struct mdd_object *o0, struct mdd_object *o1)
345 {
346         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
347         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
348 }
349
350 static void mdd_unlock2(const struct lu_context *ctxt,
351                         struct mdd_object *o0, struct mdd_object *o1)
352 {
353         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
354         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
355 }
356
357 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
358                                        struct mdd_device *mdd)
359 {
360         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
361
362         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
363 }
364
365 static void mdd_trans_stop(const struct lu_context *ctxt,
366                            struct mdd_device *mdd, struct thandle *handle)
367 {
368         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
369 }
370
371 static int __mdd_object_create(const struct lu_context *ctxt,
372                                struct mdd_object *obj, struct md_attr *ma,
373                                struct thandle *handle)
374 {
375         struct dt_object *next;
376         struct lu_attr *attr = &ma->ma_attr;
377         int rc;
378         ENTRY;
379
380         if (!lu_object_exists(ctxt, &obj->mod_obj.mo_lu)) {
381                 next = mdd_object_child(obj);
382                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
383         } else
384                 rc = -EEXIST;
385
386         LASSERT(ergo(rc == 0, lu_object_exists(ctxt, &obj->mod_obj.mo_lu)));
387         /* increase the nlink for directory */
388         if (rc == 0 && dt_try_as_dir(ctxt, mdd_object_child(obj)))
389                 rc = __mdd_ref_add(ctxt, obj, handle);
390
391         if (rc == 0)
392                 mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
393
394         RETURN(rc);
395 }
396
397 static int mdd_object_create(const struct lu_context *ctxt, struct md_object *obj,
398                              struct md_attr *attr)
399 {
400
401         struct mdd_device *mdd = mdo2mdd(obj);
402         struct thandle *handle;
403         int rc;
404         ENTRY;
405
406         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
407         handle = mdd_trans_start(ctxt, mdd);
408         if (IS_ERR(handle))
409                 RETURN(PTR_ERR(handle));
410
411         rc = __mdd_object_create(ctxt, md2mdd_obj(obj), attr, handle);
412
413         mdd_trans_stop(ctxt, mdd, handle);
414
415         RETURN(rc);
416 }
417
418 static int __mdd_attr_set(const struct lu_context *ctxt, struct md_object *obj,
419                           const struct lu_attr *attr, struct thandle *handle)
420 {
421         struct dt_object *next;
422
423         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
424         next = mdd_object_child(md2mdd_obj(obj));
425         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
426 }
427
428 static int mdd_attr_set(const struct lu_context *ctxt,
429                         struct md_object *obj, const struct lu_attr *attr)
430 {
431         struct mdd_device *mdd = mdo2mdd(obj);
432         struct thandle *handle;
433         int  rc;
434         ENTRY;
435
436         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
437         handle = mdd_trans_start(ctxt, mdd);
438         if (!handle)
439                 RETURN(PTR_ERR(handle));
440
441         rc = __mdd_attr_set(ctxt, obj, attr, handle);
442
443         mdd_trans_stop(ctxt, mdd, handle);
444
445         RETURN(rc);
446 }
447
448 static int __mdd_xattr_set(const struct lu_context *ctxt,struct mdd_device *mdd,
449                            struct mdd_object *obj, const void *buf,
450                            int buf_len, const char *name,struct thandle *handle)
451 {
452         struct dt_object *next;
453
454         LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
455         next = mdd_object_child(obj);
456         return next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
457                                           handle);
458 }
459
460 int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
461                   const void *buf, int buf_len, const char *name)
462 {
463         struct mdd_device *mdd = mdo2mdd(obj);
464         struct thandle *handle;
465         int  rc;
466         ENTRY;
467
468         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
469         handle = mdd_trans_start(ctxt, mdd);
470         if (!handle)
471                 RETURN(PTR_ERR(handle));
472
473         rc = __mdd_xattr_set(ctxt, mdd, md2mdd_obj(obj), buf, buf_len, name,
474                              handle);
475
476         mdd_trans_stop(ctxt, mdd, handle);
477
478         RETURN(rc);
479 }
480
481 static int __mdd_index_insert(const struct lu_context *ctxt,
482                               struct mdd_object *pobj, const struct lu_fid *lf,
483                               const char *name, struct thandle *handle)
484 {
485         int rc;
486         struct dt_object *next = mdd_object_child(pobj);
487         ENTRY;
488
489         if (dt_try_as_dir(ctxt, next))
490                 rc = next->do_index_ops->dio_insert(ctxt, next,
491                                          (struct dt_rec *)lf,
492                                          (struct dt_key *)name, handle);
493         else
494                 rc = -ENOTDIR;
495         RETURN(rc);
496 }
497
498 static int __mdd_index_delete(const struct lu_context *ctxt,
499                               struct mdd_object *pobj, const char *name,
500                               struct thandle *handle)
501 {
502         int rc;
503         struct dt_object *next = mdd_object_child(pobj);
504         ENTRY;
505
506         if (dt_try_as_dir(ctxt, next))
507                 rc = next->do_index_ops->dio_delete(ctxt, next,
508                                         (struct dt_key *)name, handle);
509         else
510                 rc = -ENOTDIR;
511         RETURN(rc);
512 }
513
514 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
515                     struct md_object *src_obj, const char *name)
516 {
517         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
518         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
519         struct mdd_device *mdd = mdo2mdd(src_obj);
520         struct thandle *handle;
521         int rc;
522         ENTRY;
523
524         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
525         handle = mdd_trans_start(ctxt, mdd);
526         if (IS_ERR(handle))
527                 RETURN(PTR_ERR(handle));
528
529         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
530
531         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
532                                 name, handle);
533         if (rc)
534                 GOTO(exit, rc);
535
536         rc = __mdd_ref_add(ctxt, mdd_sobj, handle);
537 exit:
538         if (rc)
539                 rc = __mdd_index_delete(ctxt, mdd_tobj, name, handle);
540         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
541         mdd_trans_stop(ctxt, mdd, handle);
542         RETURN(rc);
543 }
544
545 static int mdd_empty_dir(const struct lu_context *ctxt,
546                          struct md_object *dir)
547 {
548         /*TODO: iterate through the index until first entry
549          * other than dot or dotdot. For now - not empty always */
550         return 0;
551 }
552
553 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
554                       struct md_object *cobj, const char *name,
555                       struct md_attr *ma)
556 {
557         struct mdd_device *mdd = mdo2mdd(pobj);
558         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
559         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
560         struct dt_object  *dt_cobj = mdd_object_child(mdd_cobj);
561         struct thandle *handle;
562         int rc;
563         ENTRY;
564
565         /* sanity checks */
566         if (dt_try_as_dir(ctxt, dt_cobj)) {
567                 if (!S_ISDIR(ma->ma_attr.la_mode))
568                         RETURN(rc = -EISDIR);
569         } else {
570                 if (S_ISDIR(ma->ma_attr.la_mode))
571                         RETURN(rc = -ENOTDIR);
572         }
573
574         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
575         handle = mdd_trans_start(ctxt, mdd);
576         if (IS_ERR(handle))
577                 RETURN(PTR_ERR(handle));
578
579         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
580
581         /* rmdir checks */
582         if (S_ISDIR(ma->ma_attr.la_mode)) {
583                 if (!mdd_empty_dir(ctxt, cobj))
584                         GOTO(cleanup, rc = -ENOTEMPTY);
585         }
586
587         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
588         if (rc)
589                 GOTO(cleanup, rc);
590
591         rc = __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
592
593         if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) {
594                 /* unlink dot */
595                 rc = __mdd_ref_del(ctxt, mdd_cobj, handle, ma);
596                 if (rc == 0)
597                         /* unlink dotdot */
598                         rc = __mdd_ref_del(ctxt, mdd_pobj, handle, NULL);
599         }
600
601 cleanup:
602         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
603         mdd_trans_stop(ctxt, mdd, handle);
604         RETURN(rc);
605 }
606
607 static int mdd_parent_fid(const struct lu_context *ctxt,
608                           struct mdd_object *obj,
609                           struct lu_fid *fid)
610 {
611         const char *name = "..";
612         int rc;
613
614         rc = mdd_lookup(ctxt, &obj->mod_obj, name, fid);
615
616         return rc;
617 }
618
619 #define mdo2fid(obj) (&((obj)->mod_obj.mo_lu.lo_header->loh_fid))
620 static int mdd_is_parent(const struct lu_context *ctxt,
621                          struct mdd_device *mdd,
622                          struct mdd_object *p1,
623                          struct mdd_object *p2)
624 {
625         struct lu_fid * pfid;
626         int rc;
627
628         pfid = &mdd_ctx_info(ctxt)->mti_fid;
629         do {
630                 rc = mdd_parent_fid(ctxt, p1, pfid);
631                 if (rc)
632                         RETURN(rc);
633                 if (lu_fid_eq(pfid, mdo2fid(p2))) {
634                         RETURN(1);
635                 }
636         } while (!lu_fid_eq(pfid, &mdd->mdd_root_fid));
637
638         RETURN(rc);
639 }
640
641 static int mdd_rename_lock(const struct lu_context *ctxt,
642                            struct mdd_device *mdd,
643                            struct mdd_object *src_pobj,
644                            struct mdd_object *tgt_pobj)
645 {
646         ENTRY;
647
648         if (src_pobj == tgt_pobj) {
649                 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
650                 RETURN(0);
651         }
652         /*compared the parent child relationship of src_p&tgt_p*/
653         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
654                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
655                 RETURN(0);
656         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
657                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
658                 RETURN(0);
659         }
660         if (mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
661                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
662                 RETURN(0);
663         }
664         if (mdd_is_parent(ctxt, mdd, tgt_pobj, src_pobj)) {
665                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
666                 RETURN(0);
667         }
668
669         mdd_lock2(ctxt, src_pobj, tgt_pobj);
670         RETURN(0);
671 }
672
673 static void mdd_rename_unlock(const struct lu_context *ctxt,
674                               struct mdd_object *src_pobj,
675                               struct mdd_object *tgt_pobj)
676 {
677         mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
678         if (src_pobj != tgt_pobj)
679                 mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
680         return;
681 }
682
683 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
684                       struct md_object *tgt_pobj, const struct lu_fid *lf,
685                       const char *sname, struct md_object *tobj,
686                       const char *tname)
687 {
688         struct mdd_device *mdd = mdo2mdd(src_pobj);
689         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
690         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
691         struct mdd_object *mdd_tobj = NULL;
692         struct thandle *handle;
693         int rc, locked = 0;
694         ENTRY;
695
696         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
697         handle = mdd_trans_start(ctxt, mdd);
698         if (IS_ERR(handle))
699                 RETURN(PTR_ERR(handle));
700
701         /*FIXME: Should consider tobj and sobj too in rename_lock*/
702         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
703         if (rc)
704                 GOTO(cleanup, rc);
705         locked = 1;
706         if (tobj)
707                 mdd_tobj = md2mdd_obj(tobj);
708
709         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
710         if (rc)
711                 GOTO(cleanup, rc);
712         /*FIXME: no sobj now, we should check sobj type, if it is dir,
713          * the nlink of its parent should be dec
714          */
715         if (tobj) {
716                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
717                 if (rc)
718                         GOTO(cleanup, rc);
719         }
720
721         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
722         if (rc)
723                 GOTO(cleanup, rc);
724
725
726         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
727                 struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
728
729                 rc = __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
730                 if (rc)
731                         GOTO(cleanup, rc);
732                 if (dt_try_as_dir(ctxt, dt_tobj)) {
733                         rc = __mdd_ref_add(ctxt, mdd_tpobj, handle);
734                         if (rc)
735                                 GOTO(cleanup, rc);
736                 }
737         }
738 cleanup:
739        /*FIXME: should we do error handling here?*/
740         if (locked)
741                 mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
742         mdd_trans_stop(ctxt, mdd, handle);
743         RETURN(rc);
744 }
745
746 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
747                       const char *name, struct lu_fid* fid)
748 {
749         struct dt_object    *dir    = mdd_object_child(md2mdd_obj(pobj));
750         struct dt_rec       *rec    = (struct dt_rec *)fid;
751         const struct dt_key *key = (const struct dt_key *)name;
752         int rc;
753         ENTRY;
754
755         if (dt_try_as_dir(ctxt, dir))
756                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
757         else
758                 rc = -ENOTDIR;
759         RETURN(rc);
760 }
761
762 /*
763  * Create object and insert it into namespace.
764  */
765 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
766                       const char *name, struct md_object *child,
767                       const char *target_name, struct md_attr* ma)
768 {
769         struct mdd_device *mdd = mdo2mdd(pobj);
770         struct mdd_object *mdo = md2mdd_obj(pobj);
771         struct mdd_object *son = md2mdd_obj(child);
772         struct dt_object  *dt_son = mdd_object_child(son);
773         struct lu_attr *attr = &ma->ma_attr;
774         struct lu_fid *fid;
775         struct lov_mds_md *lmm = NULL;
776         struct thandle *handle;
777         int rc, created = 0, inserted = 0, ref_add = 0, lmm_size;
778         ENTRY;
779
780         /* sanity checks before big job */
781         fid = &mdd_ctx_info(ctxt)->mti_fid;
782         rc = mdd_lookup(ctxt, pobj, name, fid);
783         if (rc != -ENOENT) {
784                 rc = rc ? rc : -EEXIST;
785                 RETURN(rc);
786         }
787         /* no RPC inside the transaction, so OST objects should be created at
788          * first */
789
790         if (S_ISREG(attr->la_mode)) {
791                 rc = mdd_lov_create(ctxt, mdd, son, &lmm, &lmm_size);
792                 if (rc)
793                         RETURN(rc);
794         }
795
796         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
797         handle = mdd_trans_start(ctxt, mdd);
798         if (IS_ERR(handle))
799                 RETURN(PTR_ERR(handle));
800
801         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
802
803         /*
804          * Two operations have to be performed:
805          *
806          *  - allocation of new object (->do_create()), and
807          *
808          *  - insertion into parent index (->dio_insert()).
809          *
810          * Due to locking, operation order is not important, when both are
811          * successful, *but* error handling cases are quite different:
812          *
813          *  - if insertion is done first, and following object creation fails,
814          *  insertion has to be rolled back, but this operation might fail
815          *  also leaving us with dangling index entry.
816          *
817          *  - if creation is done first, is has to be undone if insertion
818          *  fails, leaving us with leaked space, which is neither good, nor
819          *  fatal.
820          *
821          * It seems that creation-first is simplest solution, but it is
822          * sub-optimal in the frequent
823          *
824          *         $ mkdir foo
825          *         $ mkdir foo
826          *
827          * case, because second mkdir is bound to create object, only to
828          * destroy it immediately.
829          *
830          * Note that local file systems do
831          *
832          *     0. lookup -> -EEXIST
833          *
834          *     1. create
835          *
836          *     2. insert
837          *
838          * Maybe we should do the same. For now: creation-first.
839          */
840
841         rc = __mdd_object_create(ctxt, son, ma, handle);
842         if (rc)
843                 GOTO(cleanup, rc);
844
845         created = 1;
846
847         rc = __mdd_index_insert(ctxt, mdo, lu_object_fid(&child->mo_lu),
848                                 name, handle);
849
850         if (rc)
851                 GOTO(cleanup, rc);
852
853         inserted = 1;
854
855         if (dt_try_as_dir(ctxt, dt_son)) {
856                 rc = __mdd_ref_add(ctxt, mdo, handle);
857                 if (rc)
858                         GOTO(cleanup, rc);
859                 ref_add = 1;
860         }
861         rc = mdd_lov_set_md(ctxt, pobj, child, lmm, lmm_size);
862         if (rc) {
863                 CERROR("error on stripe info copy %d \n", rc);
864         }
865 cleanup:
866         if (rc && created) {
867                 int rc1 = 0, rc2 = 0, rc3 = 0;
868
869                 rc1 = __mdd_ref_del(ctxt, son, handle, NULL);
870                 if (inserted)
871                         rc2 = __mdd_index_delete(ctxt, mdo, name, handle);
872                 if (ref_add)
873                         rc3 = __mdd_ref_add(ctxt, mdo, handle);
874                 if (rc1 || rc2 || rc3)
875                         CERROR("error can not cleanup destory %d insert %d \n",
876                                rc1, rc2);
877         }
878
879         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
880         mdd_trans_stop(ctxt, mdd, handle);
881         RETURN(rc);
882 }
883
884 static int mdd_mkname(const struct lu_context *ctxt, struct md_object *pobj,
885                       const char *name, const struct lu_fid *fid)
886 {
887         struct mdd_device *mdd = mdo2mdd(pobj);
888         struct mdd_object *mdo = md2mdd_obj(pobj);
889         struct thandle *handle;
890         int rc;
891         ENTRY;
892
893         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
894         handle = mdd_trans_start(ctxt, mdd);
895         if (IS_ERR(handle))
896                 RETURN(PTR_ERR(handle));
897
898         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
899
900         rc = __mdd_index_insert(ctxt, mdo, fid, name, handle);
901
902         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
903         mdd_trans_stop(ctxt, mdd, handle);
904         RETURN(rc);
905 }
906
907 static int mdd_name_remove(const struct lu_context *ctxt,
908                            struct md_object *pobj,
909                            const char *name)
910 {
911         struct mdd_device *mdd = mdo2mdd(pobj);
912         struct mdd_object *mdo = md2mdd_obj(pobj);
913         struct thandle *handle;
914         int rc;
915         ENTRY;
916
917         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
918         handle = mdd_trans_start(ctxt, mdd);
919         if (IS_ERR(handle))
920                 RETURN(PTR_ERR(handle));
921
922         mdd_lock(ctxt, mdo, DT_WRITE_LOCK);
923
924         rc = __mdd_index_delete(ctxt, mdo, name, handle);
925
926         mdd_unlock(ctxt, mdo, DT_WRITE_LOCK);
927
928         mdd_trans_stop(ctxt, mdd, handle);
929         RETURN(rc);
930 }
931
932 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
933                           struct md_object *tobj, const struct lu_fid *lf,
934                           const char *name)
935 {
936         struct mdd_device *mdd = mdo2mdd(pobj);
937         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
938         struct mdd_object *mdd_tobj = NULL;
939         struct thandle *handle;
940         int rc;
941         ENTRY;
942
943         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
944         handle = mdd_trans_start(ctxt, mdd);
945         if (IS_ERR(handle))
946                 RETURN(PTR_ERR(handle));
947
948         if (tobj)
949                 mdd_tobj = md2mdd_obj(tobj);
950
951         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
952
953         if (tobj) {
954                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
955                 if (rc)
956                         GOTO(cleanup, rc);
957         }
958
959         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
960         if (rc)
961                 GOTO(cleanup, rc);
962
963         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
964                 rc = __mdd_ref_del(ctxt, mdd_tobj, handle, NULL);
965                 if (rc)
966                         GOTO(cleanup, rc);
967         }
968 cleanup:
969        /*FIXME: should we do error handling here?*/
970         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
971         mdd_trans_stop(ctxt, mdd, handle);
972         RETURN(rc);
973 }
974
975 static int mdd_root_get(const struct lu_context *ctx,
976                         struct md_device *m, struct lu_fid *f)
977 {
978         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
979
980         ENTRY;
981         *f = mdd->mdd_root_fid;
982         RETURN(0);
983 }
984
985 static int mdd_statfs(const struct lu_context *ctx,
986                       struct md_device *m, struct kstatfs *sfs) {
987         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
988         int rc;
989
990         ENTRY;
991
992         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
993
994         RETURN(rc);
995 }
996
997 static int __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
998                          struct thandle *handle)
999 {
1000         struct dt_object *next;
1001
1002         LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
1003         next = mdd_object_child(obj);
1004         return next->do_ops->do_ref_add(ctxt, next, handle);
1005 }
1006
1007 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1008 {
1009         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1010         struct mdd_device *mdd = mdo2mdd(obj);
1011         struct thandle *handle;
1012         int  rc;
1013         ENTRY;
1014
1015         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1016         handle = mdd_trans_start(ctxt, mdd);
1017         if (!handle)
1018                 RETURN(-ENOMEM);
1019         rc = __mdd_ref_add(ctxt, mdd_obj, handle);
1020
1021         mdd_trans_stop(ctxt, mdd, handle);
1022
1023         RETURN(rc);
1024 }
1025
1026 static int
1027 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1028               struct thandle *handle, struct md_attr *ma)
1029 {
1030         struct dt_object *next = mdd_object_child(obj);
1031         int rc;
1032
1033         LASSERT(lu_object_exists(ctxt, &obj->mod_obj.mo_lu));
1034
1035         rc = next->do_ops->do_ref_del(ctxt, next, handle);
1036         if (rc == 0 && ma != NULL)
1037                 mdd_attr_get(ctxt, &obj->mod_obj, &ma->ma_attr);
1038
1039         return rc;
1040 }
1041
1042 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1043                        struct md_attr *ma)
1044 {
1045         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1046         struct mdd_device *mdd = mdo2mdd(obj);
1047         struct thandle *handle;
1048         int  rc;
1049         ENTRY;
1050
1051         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1052         handle = mdd_trans_start(ctxt, mdd);
1053         if (!handle)
1054                 RETURN(-ENOMEM);
1055         rc = __mdd_ref_del(ctxt, mdd_obj, handle, ma);
1056
1057         mdd_trans_stop(ctxt, mdd, handle);
1058
1059         RETURN(rc);
1060 }
1061
1062 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1063 {
1064         return 0;
1065 }
1066
1067 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
1068 {
1069         return 0;
1070 }
1071
1072 struct md_device_operations mdd_ops = {
1073         .mdo_root_get       = mdd_root_get,
1074         .mdo_statfs         = mdd_statfs,
1075         .mdo_notify         = mdd_notify
1076 };
1077
1078 static struct md_dir_operations mdd_dir_ops = {
1079         .mdo_lookup        = mdd_lookup,
1080         .mdo_create        = mdd_create,
1081         .mdo_rename        = mdd_rename,
1082         .mdo_link          = mdd_link,
1083         .mdo_unlink        = mdd_unlink,
1084         .mdo_name_insert   = mdd_mkname,
1085         .mdo_name_remove   = mdd_name_remove,
1086         .mdo_rename_tgt    = mdd_rename_tgt,
1087 };
1088
1089
1090 static struct md_object_operations mdd_obj_ops = {
1091         .moo_attr_get      = mdd_attr_get,
1092         .moo_attr_set      = mdd_attr_set,
1093         .moo_xattr_get     = mdd_xattr_get,
1094         .moo_xattr_set     = mdd_xattr_set,
1095         .moo_object_create  = mdd_object_create,
1096         .moo_ref_add       = mdd_ref_add,
1097         .moo_ref_del       = mdd_ref_del,
1098         .moo_open          = mdd_open,
1099         .moo_close         = mdd_close
1100 };
1101
1102 static struct obd_ops mdd_obd_device_ops = {
1103         .o_owner = THIS_MODULE
1104 };
1105
1106 struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1107                                    struct lu_device_type *t,
1108                                    struct lustre_cfg *lcfg)
1109 {
1110         struct lu_device  *l;
1111         struct mdd_device *m;
1112
1113         OBD_ALLOC_PTR(m);
1114         if (m == NULL) {
1115                 l = ERR_PTR(-ENOMEM);
1116         } else {
1117                 md_device_init(&m->mdd_md_dev, t);
1118                 l = mdd2lu_dev(m);
1119                 l->ld_ops = &mdd_lu_ops;
1120                 m->mdd_md_dev.md_ops = &mdd_ops;
1121         }
1122
1123         return l;
1124 }
1125
1126 static void mdd_device_free(const struct lu_context *ctx, struct lu_device *lu)
1127 {
1128         struct mdd_device *m = lu2mdd_dev(lu);
1129
1130         LASSERT(atomic_read(&lu->ld_ref) == 0);
1131         md_device_fini(&m->mdd_md_dev);
1132         OBD_FREE_PTR(m);
1133 }
1134
1135 static int mdd_type_init(struct lu_device_type *t)
1136 {
1137         return lu_context_key_register(&mdd_thread_key);
1138 }
1139
1140 static void mdd_type_fini(struct lu_device_type *t)
1141 {
1142         lu_context_key_degister(&mdd_thread_key);
1143 }
1144
1145 static struct lu_device_type_operations mdd_device_type_ops = {
1146         .ldto_init = mdd_type_init,
1147         .ldto_fini = mdd_type_fini,
1148
1149         .ldto_device_alloc = mdd_device_alloc,
1150         .ldto_device_free  = mdd_device_free,
1151
1152         .ldto_device_init    = mdd_device_init,
1153         .ldto_device_fini    = mdd_device_fini
1154 };
1155
1156 static struct lu_device_type mdd_device_type = {
1157         .ldt_tags     = LU_DEVICE_MD,
1158         .ldt_name     = LUSTRE_MDD0_NAME,
1159         .ldt_ops      = &mdd_device_type_ops,
1160         .ldt_ctx_tags = LCT_MD_THREAD
1161 };
1162
1163 static void *mdd_key_init(const struct lu_context *ctx,
1164                           struct lu_context_key *key)
1165 {
1166         struct mdd_thread_info *info;
1167
1168         OBD_ALLOC_PTR(info);
1169         if (info == NULL)
1170                 info = ERR_PTR(-ENOMEM);
1171         return info;
1172 }
1173
1174 static void mdd_key_fini(const struct lu_context *ctx,
1175                          struct lu_context_key *key, void *data)
1176 {
1177         struct mdd_thread_info *info = data;
1178         OBD_FREE_PTR(info);
1179 }
1180
1181 static struct lu_context_key mdd_thread_key = {
1182         .lct_tags = LCT_MD_THREAD,
1183         .lct_init = mdd_key_init,
1184         .lct_fini = mdd_key_fini
1185 };
1186
1187 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
1188         { 0 }
1189 };
1190
1191 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
1192         { 0 }
1193 };
1194
1195 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
1196
1197 static int __init mdd_mod_init(void)
1198 {
1199         struct lprocfs_static_vars lvars;
1200
1201         lprocfs_init_vars(mdd, &lvars);
1202         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1203                                    LUSTRE_MDD0_NAME, &mdd_device_type);
1204 }
1205
1206 static void __exit mdd_mod_exit(void)
1207 {
1208         class_unregister_type(LUSTRE_MDD0_NAME);
1209 }
1210
1211 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1212 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
1213 MODULE_LICENSE("GPL");
1214
1215 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);