Whamcloud - gitweb
mdd: use ->loo_object_start() to initialize object flags
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <lu_object.h>
42 #include <md_object.h>
43 #include <dt_object.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_context *ctxt,
51                            struct mdd_device *mdd, struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
54                           struct thandle *handle);
55 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
56                           struct thandle *handle);
57 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
58                       const char *name, struct lu_fid* fid);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 static const char *mdd_root_dir_name = "root";
67 static const char dot[] = ".";
68 static const char dotdot[] = "..";
69
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdd_obj;
85
86         OBD_ALLOC_PTR(mdd_obj);
87         if (mdd_obj != NULL) {
88                 struct lu_object *o;
89                 
90                 o = mdd2lu_obj(mdd_obj);
91                 lu_object_init(o, NULL, d);
92                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 atomic_set(&mdd_obj->mod_count, 0);
95                 o->lo_ops = &mdd_lu_obj_ops;
96                 return o;
97         } else {
98                 return NULL;
99         }
100 }
101
102 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 {
104         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
105         struct lu_object  *below;
106         struct lu_device  *under;
107         ENTRY;
108
109         under = &d->mdd_child->dd_lu_dev;
110         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
111
112         if (below == NULL)
113                 RETURN(-ENOMEM);
114
115         lu_object_add(o, below);
116         RETURN(0);
117 }
118
119 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj);
120
121 static int mdd_object_start(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         if (lu_object_exists(ctxt, o))
124                 return mdd_get_flags(ctxt, lu2mdd_obj(o));
125         else
126                 return 0;
127 }
128
129 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
130 {
131         struct mdd_object *mdd = lu2mdd_obj(o);
132         
133         lu_object_fini(o);
134         OBD_FREE_PTR(mdd);
135 }
136
137 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
138                                    struct mdd_device *d,
139                                    const struct lu_fid *f)
140 {
141         struct lu_object *o;
142         struct mdd_object *m;
143         ENTRY;
144
145         o = lu_object_find(ctxt, d->mdd_md_dev.md_lu_dev.ld_site, f);
146         if (IS_ERR(o))
147                 m = (struct mdd_object *)o;
148         else
149                 m = lu2mdd_obj(lu_object_locate(o->lo_header,
150                                  d->mdd_md_dev.md_lu_dev.ld_type));
151         RETURN(m);
152 }
153
154 static inline void mdd_object_put(const struct lu_context *ctxt,
155                                   struct mdd_object *o)
156 {
157         lu_object_put(ctxt, &o->mod_obj.mo_lu);
158 }
159
160 static inline int mdd_is_immutable(struct mdd_object *obj)
161 {
162         return obj->mod_flags & IMMUTE_OBJ;
163 }
164
165 static inline int mdd_is_append(struct mdd_object *obj)
166 {
167         return obj->mod_flags & APPEND_OBJ;
168 }
169
170 static void mdd_set_dead_obj(struct mdd_object *obj)
171 {
172         if (obj)
173                 obj->mod_flags |= DEAD_OBJ;
174 }
175
176 static int mdd_is_dead_obj(struct mdd_object *obj)
177 {
178         return obj && obj->mod_flags & DEAD_OBJ;
179 }
180
181 /*Check whether it may create the cobj under the pobj*/
182 static int mdd_may_create(const struct lu_context *ctxt,
183                           struct mdd_object *pobj, struct mdd_object *cobj)
184 {
185         ENTRY;
186         if (cobj && lu_object_exists(ctxt, &cobj->mod_obj.mo_lu))
187                 RETURN(-EEXIST);
188
189         if (mdd_is_dead_obj(pobj))
190                 RETURN(-ENOENT);
191
192         /*check pobj may create or not*/
193         RETURN(0);
194 }
195
196 static inline int __mdd_la_get(const struct lu_context *ctxt,
197                                struct mdd_object *obj, struct lu_attr *la)
198 {
199         struct dt_object  *next = mdd_object_child(obj);
200         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
201         return next->do_ops->do_attr_get(ctxt, next, la);
202
203 }
204
205 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
206 {
207         obj->mod_flags = 0;
208         if (flags & S_APPEND)
209                 obj->mod_flags |= APPEND_OBJ;
210
211         if (flags & S_IMMUTABLE)
212                 obj->mod_flags |= IMMUTE_OBJ;
213 }
214
215 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj)
216 {
217         struct lu_attr *la = &mdd_ctx_info(ctxt)->mti_la;
218         int rc;
219
220         ENTRY;
221         rc = __mdd_la_get(ctxt, obj, la);
222         if (rc == 0)
223                 mdd_flags_xlate(obj, la->la_flags);
224         RETURN(rc);
225 }
226
227 /*Check whether it may delete the cobj under the pobj*/
228 static int mdd_may_delete(const struct lu_context *ctxt,
229                           struct mdd_object *pobj, struct mdd_object *cobj,
230                           int is_dir)
231 {
232         struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
233         int rc = 0;
234         ENTRY;
235
236         LASSERT(cobj && pobj);
237
238         if (!lu_object_exists(ctxt, &cobj->mod_obj.mo_lu))
239                 RETURN(-ENOENT);
240
241         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
242                 RETURN(-EPERM);
243
244         if (is_dir) {
245                 if (!S_ISDIR(mdd_object_type(ctxt, cobj)))
246                         RETURN(-ENOTDIR);
247
248                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
249                         RETURN(-EBUSY);
250
251         } else if (S_ISDIR(mdd_object_type(ctxt, cobj)))
252                         RETURN(-EISDIR);
253
254         if (mdd_is_dead_obj(pobj))
255                 RETURN(-ENOENT);
256
257         RETURN(rc);
258 }
259 /* get only inode attributes */
260 static int __mdd_iattr_get(const struct lu_context *ctxt,
261                            struct mdd_object *mdd_obj, struct md_attr *ma)
262 {
263         int rc = 0;
264         ENTRY;
265
266         rc = __mdd_la_get(ctxt, mdd_obj, &ma->ma_attr);
267         if (rc == 0)
268                 ma->ma_valid = MA_INODE;
269         RETURN(rc);
270 }
271 /* get lov EA only */
272 static int __mdd_lmm_get(const struct lu_context *ctxt,
273                          struct mdd_object *mdd_obj, struct md_attr *ma)
274 {
275         int rc;
276
277         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
278         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, 0);
279         if (rc > 0) {
280                 ma->ma_valid |= MA_LOV;
281                 rc = 0;
282         }
283         RETURN(rc);
284 }
285
286 static int mdd_attr_get_internal(const struct lu_context *ctxt,
287                                  struct mdd_object *mdd_obj,
288                                  struct md_attr *ma)
289 {
290         int rc = 0;
291         ENTRY;
292
293         if (ma->ma_need & MA_INODE)
294                 rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
295
296         if (rc == 0 && ma->ma_need & MA_LOV) {
297                 if (S_ISREG(lu_object_attr(ctxt, mdd2lu_obj(mdd_obj))) ||
298                     S_ISDIR(lu_object_attr(ctxt, mdd2lu_obj(mdd_obj)))) {
299                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
300                  }
301         }
302
303         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
304                         rc, ma->ma_valid);
305         RETURN(rc);
306 }
307
308 static inline int mdd_attr_get_internal_locked (const struct lu_context *ctxt,
309                                                 struct mdd_object *mdd_obj,
310                                                 struct md_attr *ma)
311 {
312         int rc;
313         mdd_lock(ctxt, mdd_obj, DT_READ_LOCK);
314         rc = mdd_attr_get_internal(ctxt, mdd_obj, ma);
315         mdd_unlock(ctxt, mdd_obj, DT_READ_LOCK);
316         return rc;
317 }
318
319 static int mdd_attr_get(const struct lu_context *ctxt,
320                         struct md_object *obj, struct md_attr *ma)
321 {
322         struct mdd_object *mdd_obj = md2mdd_obj(obj);
323         int                rc;
324
325         ENTRY;
326         rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
327         RETURN(rc);
328 }
329
330 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
331                          void *buf, int buf_len, const char *name)
332 {
333         struct mdd_object *mdd_obj = md2mdd_obj(obj);
334         struct dt_object  *next;
335         int rc;
336
337         ENTRY;
338
339         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
340
341         next = mdd_object_child(mdd_obj);
342         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
343
344         RETURN(rc);
345 }
346
347 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
348                         void *buf, int buf_len)
349 {
350         struct mdd_object *mdd_obj = md2mdd_obj(obj);
351         struct dt_object  *next;
352         loff_t             pos = 0;
353         int                rc;
354         ENTRY;
355
356         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
357
358         next = mdd_object_child(mdd_obj);
359         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
360         RETURN(rc);
361 }
362 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
363                           void *buf, int buf_len)
364 {
365         struct mdd_object *mdd_obj = md2mdd_obj(obj);
366         struct dt_object  *next;
367         int rc;
368
369         ENTRY;
370
371         LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
372
373         next = mdd_object_child(mdd_obj);
374         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
375
376         RETURN(rc);
377 }
378
379 enum mdd_txn_op {
380         MDD_TXN_OBJECT_DESTROY_OP,
381         MDD_TXN_OBJECT_CREATE_OP,
382         MDD_TXN_ATTR_SET_OP,
383         MDD_TXN_XATTR_SET_OP,
384         MDD_TXN_INDEX_INSERT_OP,
385         MDD_TXN_INDEX_DELETE_OP,
386         MDD_TXN_LINK_OP,
387         MDD_TXN_UNLINK_OP,
388         MDD_TXN_RENAME_OP,
389         MDD_TXN_CREATE_DATA_OP,
390         MDD_TXN_MKDIR_OP
391 };
392
393 struct mdd_txn_op_descr {
394         enum mdd_txn_op mod_op;
395         unsigned int    mod_credits;
396 };
397
398 enum {
399         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
400         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
401         MDD_TXN_ATTR_SET_CREDITS       = 20,
402         MDD_TXN_XATTR_SET_CREDITS      = 20,
403         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
404         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
405         MDD_TXN_LINK_CREDITS           = 20,
406         MDD_TXN_UNLINK_CREDITS         = 20,
407         MDD_TXN_RENAME_CREDITS         = 20,
408         MDD_TXN_CREATE_DATA_CREDITS    = 20,
409         MDD_TXN_MKDIR_CREDITS          = 20
410 };
411
412 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
413 static const struct mdd_txn_op_descr opname = { \
414         .mod_op      = opname ## _OP,           \
415         .mod_credits = opname ## _CREDITS,      \
416 }
417
418 /*
419  * number of blocks to reserve for particular operations. Should be function
420  * of ... something. Stub for now.
421  */
422 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
423 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
424 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
425 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
426 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
427 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
428 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
429 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
430 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
431 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
432 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
433
434 static void mdd_txn_param_build(const struct lu_context *ctx,
435                                 const struct mdd_txn_op_descr *opd)
436 {
437         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
438 }
439
440 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
441                             lu_printer_t p, const struct lu_object *o)
442 {
443         return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
444 }
445
446 static int mdd_object_exists(const struct lu_context *ctx,
447                              const struct lu_object *o)
448 {
449         return lu_object_exists(ctx, lu_object_next(o));
450 }
451
452 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
453 {
454         int rc;
455         struct dt_object *root;
456         ENTRY;
457
458         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
459                              &mdd->mdd_root_fid);
460         if (!IS_ERR(root)) {
461                 LASSERT(root != NULL);
462                 lu_object_put(ctx, &root->do_lu);
463                 rc = 0;
464         } else
465                 rc = PTR_ERR(root);
466
467         RETURN(rc);
468 }
469
470 static int mdd_device_init(const struct lu_context *ctx,
471                            struct lu_device *d, struct lu_device *next)
472 {
473         struct mdd_device *mdd = lu2mdd_dev(d);
474         int rc = 0;
475         ENTRY;
476
477         mdd->mdd_child = lu2dt_dev(next);
478
479         RETURN(rc);
480 }
481
482 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
483                                          struct lu_device *d)
484 {
485         struct mdd_device *m = lu2mdd_dev(d);
486         struct lu_device *next = &m->mdd_child->dd_lu_dev;
487
488         return next;
489 }
490 static void mdd_device_shutdown(const struct lu_context *ctxt,
491                                 struct mdd_device *m)
492 {
493         mdd_fini_obd(ctxt, m);
494 }
495
496 static int mdd_process_config(const struct lu_context *ctxt,
497                               struct lu_device *d, struct lustre_cfg *cfg)
498 {
499         struct mdd_device *m    = lu2mdd_dev(d);
500         struct dt_device  *dt   = m->mdd_child;
501         struct lu_device  *next = &dt->dd_lu_dev;
502         char              *dev = lustre_cfg_string(cfg, 0);
503         int rc;
504
505         switch (cfg->lcfg_command) {
506         case LCFG_SETUP:
507                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
508                 if (rc)
509                         GOTO(out, rc);
510                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
511
512                 rc = mdd_mount(ctxt, m);
513                 if (rc)
514                         GOTO(out, rc);
515                 rc = mdd_init_obd(ctxt, m, dev);
516                 if (rc) {
517                         CERROR("lov init error %d \n", rc);
518                         GOTO(out, rc);
519                 }
520                 break;
521         case LCFG_CLEANUP:
522                 mdd_device_shutdown(ctxt, m);
523         default:
524                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
525                 break;
526         }
527 out:
528         RETURN(rc);
529 }
530
531 struct lu_device_operations mdd_lu_ops = {
532         .ldo_object_alloc   = mdd_object_alloc,
533         .ldo_process_config = mdd_process_config,
534 };
535
536 static struct lu_object_operations mdd_lu_obj_ops = {
537         .loo_object_init    = mdd_object_init,
538         .loo_object_start   = mdd_object_start,
539         .loo_object_free    = mdd_object_free,
540         .loo_object_print   = mdd_object_print,
541         .loo_object_exists  = mdd_object_exists,
542 };
543
544 void mdd_lock(const struct lu_context *ctxt, struct mdd_object *obj,
545               enum dt_lock_mode mode)
546 {
547         struct dt_object  *next = mdd_object_child(obj);
548
549         next->do_ops->do_lock(ctxt, next, mode);
550 }
551
552 void mdd_unlock(const struct lu_context *ctxt, struct mdd_object *obj,
553                 enum dt_lock_mode mode)
554 {
555         struct dt_object  *next = mdd_object_child(obj);
556
557         next->do_ops->do_unlock(ctxt, next, mode);
558 }
559
560 static void mdd_lock2(const struct lu_context *ctxt,
561                       struct mdd_object *o0, struct mdd_object *o1)
562 {
563         mdd_lock(ctxt, o0, DT_WRITE_LOCK);
564         mdd_lock(ctxt, o1, DT_WRITE_LOCK);
565 }
566
567 static void mdd_unlock2(const struct lu_context *ctxt,
568                         struct mdd_object *o0, struct mdd_object *o1)
569 {
570         mdd_unlock(ctxt, o0, DT_WRITE_LOCK);
571         mdd_unlock(ctxt, o1, DT_WRITE_LOCK);
572 }
573
574 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
575                                        struct mdd_device *mdd)
576 {
577         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
578
579         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
580 }
581
582 static void mdd_trans_stop(const struct lu_context *ctxt,
583                            struct mdd_device *mdd, struct thandle *handle)
584 {
585         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
586 }
587
588 static int __mdd_object_create(const struct lu_context *ctxt,
589                                struct mdd_object *obj, struct md_attr *ma,
590                                struct thandle *handle)
591 {
592         struct dt_object *next;
593         struct lu_attr *attr = &ma->ma_attr;
594         int rc;
595         ENTRY;
596
597         if (!lu_object_exists(ctxt, mdd2lu_obj(obj))) {
598                 next = mdd_object_child(obj);
599                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
600         } else
601                 rc = -EEXIST;
602
603         LASSERT(ergo(rc == 0, lu_object_exists(ctxt, mdd2lu_obj(obj))));
604
605         RETURN(rc);
606 }
607
608 int mdd_attr_set_internal(const struct lu_context *ctxt, struct mdd_object *o,
609                           const struct lu_attr *attr, struct thandle *handle)
610 {
611         struct dt_object *next;
612
613         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(o)));
614         next = mdd_object_child(o);
615         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
616 }
617
618 int mdd_attr_set_internal_locked(const struct lu_context *ctxt,
619                                  struct mdd_object *o,
620                                  const struct lu_attr *attr,
621                                  struct thandle *handle)
622 {
623         int rc;
624         mdd_lock(ctxt, o, DT_WRITE_LOCK);
625         rc = mdd_attr_set_internal(ctxt, o, attr, handle);
626         mdd_unlock(ctxt, o, DT_WRITE_LOCK);
627         return rc;
628 }
629
630 static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
631                            const void *buf, int buf_len, const char *name,
632                            int fl, struct thandle *handle)
633 {
634         struct dt_object *next;
635         int rc = 0;
636         ENTRY;
637
638         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(o)));
639         next = mdd_object_child(o);
640         if (buf && buf_len > 0) {
641                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
642                                                 0, handle);
643         }else if (buf == NULL && buf_len == 0) {
644                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
645         }
646         RETURN(rc);
647 }
648 /* this gives the same functionality as the code between
649  * sys_chmod and inode_setattr
650  * chown_common and inode_setattr
651  * utimes and inode_setattr
652  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
653  * and port to
654  */
655 int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj,
656                  const struct md_attr *ma, struct lu_attr *la)
657 {
658         struct lu_attr   *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
659         time_t            now = CURRENT_SECONDS;
660         int               rc;
661         ENTRY;
662
663         rc = __mdd_la_get(ctxt, obj, tmp_la);
664         if (rc)
665                 RETURN(rc);
666         /*XXX Check permission */
667         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
668
669                 /*If only change flags of the object, we should
670                  * let it pass, but also need capability check
671                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
672                  * fix it, when implement capable in mds*/
673                 if (la->la_valid & ~LA_FLAGS)
674                         RETURN(-EPERM);
675
676                 /*According to Ext3 implementation on this, the
677                  *Ctime will be changed, but not clear why?*/
678                 la->la_ctime = now;
679                 la->la_valid |= LA_CTIME;
680                 RETURN(0);
681         }
682         if (!(la->la_valid & LA_CTIME)) {
683                 la->la_ctime = now;
684                 la->la_valid |= LA_CTIME;
685         } else
686                 /*According to original MDS implementation, it
687                  * will clear ATTR_CTIME_SET flags, but seems
688                  * no sense, should clear ATTR_CTIME? XXX*/
689                 la->la_valid &= ~LA_CTIME;
690
691         if (!(la->la_valid & LA_ATIME)) {
692                 la->la_atime = now;
693                 la->la_valid |= LA_ATIME;
694         }
695         if (!(la->la_valid & LA_MTIME)) {
696                 la->la_mtime = now;
697                 la->la_valid |= LA_MTIME;
698         }
699
700
701 #if 0
702         /* times */
703         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
704                 if (current->fsuid != inode->i_uid &&
705                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
706                         RETURN(error);
707         }
708         if (ia_valid & ATTR_SIZE &&
709             /* NFSD hack for open(O_CREAT|O_TRUNC)=mknod+truncate (bug 5781) */
710             !(rec->ur_uc.luc_fsuid == inode->i_uid &&
711               ia_valid & MDS_OPEN_OWNEROVERRIDE)) {
712                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
713                         RETURN(error);
714         }
715 #endif
716
717         if (la->la_valid & (LA_UID | LA_GID)) {
718                 /* chown */
719
720                 if (mdd_is_immutable(obj) || mdd_is_append(obj))
721                         RETURN(-EPERM);
722                 if (la->la_uid == (uid_t) -1)
723                         la->la_uid = tmp_la->la_uid;
724                 if (la->la_gid == (gid_t) -1)
725                         la->la_gid = tmp_la->la_gid;
726                 if (!(la->la_valid & LA_MODE))
727                         la->la_mode = tmp_la->la_mode;
728                 /*
729                  * If the user or group of a non-directory has been
730                  * changed by a non-root user, remove the setuid bit.
731                  * 19981026 David C Niemi <niemi@tux.org>
732                  *
733                  * Changed this to apply to all users, including root,
734                  * to avoid some races. This is the behavior we had in
735                  * 2.0. The check for non-root was definitely wrong
736                  * for 2.2 anyway, as it should have been using
737                  * CAP_FSETID rather than fsuid -- 19990830 SD.
738                  */
739                 if ((tmp_la->la_mode & S_ISUID) == S_ISUID &&
740                     !S_ISDIR(tmp_la->la_mode)) {
741                         la->la_mode &= ~S_ISUID;
742                         la->la_valid |= LA_MODE;
743                 }
744                 /*
745                  * Likewise, if the user or group of a non-directory
746                  * has been changed by a non-root user, remove the
747                  * setgid bit UNLESS there is no group execute bit
748                  * (this would be a file marked for mandatory
749                  * locking).  19981026 David C Niemi <niemi@tux.org>
750                  *
751                  * Removed the fsuid check (see the comment above) --
752                  * 19990830 SD.
753                  */
754                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
755                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
756                         la->la_mode &= ~S_ISGID;
757                         la->la_valid |= LA_MODE;
758                 }
759         } else if (la->la_valid & LA_MODE) {
760                 int mode = la->la_mode;
761                 /* chmod */
762                 if (la->la_mode == (umode_t)-1)
763                         mode = tmp_la->la_mode;
764                 la->la_mode =
765                         (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO);
766         }
767         RETURN(rc);
768 }
769
770
771 /* set attr and LOV EA at once, return updated attr */
772 static int mdd_attr_set(const struct lu_context *ctxt,
773                         struct md_object *obj,
774                         const struct md_attr *ma)
775 {
776         struct mdd_object *mdd_obj = md2mdd_obj(obj);
777         struct mdd_device *mdd = mdo2mdd(obj);
778         struct thandle *handle;
779         struct lov_mds_md *lmm = NULL;
780         int  rc = 0, lmm_size = 0, max_size;
781         struct lu_attr *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
782         ENTRY;
783
784         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
785         handle = mdd_trans_start(ctxt, mdd);
786         if (IS_ERR(handle))
787                 RETURN(PTR_ERR(handle));
788         /*TODO: add lock here*/
789         /* start a log jounal handle if needed */
790         if (S_ISREG(mdd_object_type(ctxt, mdd_obj)) &&
791             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
792                 max_size = mdd_lov_mdsize(ctxt, mdd);
793                 OBD_ALLOC(lmm, max_size);
794                 if (lmm == NULL)
795                         GOTO(cleanup, rc = -ENOMEM);
796
797                 rc = mdd_get_md(ctxt, mdd_obj, lmm, &lmm_size, 1);
798
799                 if (rc < 0)
800                         GOTO(cleanup, rc);
801         }
802
803         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
804                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
805                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
806
807         *la_copy = ma->ma_attr;
808         rc = mdd_fix_attr(ctxt, mdd_obj, ma, la_copy);
809         if (rc)
810                 GOTO(cleanup, rc);
811
812         if (la_copy->la_valid & LA_FLAGS) {
813                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
814                                                   handle);
815                 if (rc == 0)
816                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
817         } else if (la_copy->la_valid) {            /* setattr */
818                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
819                                                   handle);
820                 /* journal chown/chgrp in llog, just like unlink */
821                 if (rc == 0 && lmm_size){
822                         /*TODO set_attr llog */
823                 }
824         }
825
826         if (rc == 0 && ma->ma_valid & MA_LOV) {
827                 umode_t mode;
828
829                 mode = mdd_object_type(ctxt, mdd_obj);
830                 if (S_ISREG(mode) || S_ISDIR(mode)) {
831                         /*TODO check permission*/
832                         rc = mdd_lov_set_md(ctxt, NULL, mdd_obj, ma->ma_lmm,
833                                             ma->ma_lmm_size, handle, 1);
834                 }
835
836         }
837 cleanup:
838         mdd_trans_stop(ctxt, mdd, handle);
839         if (rc == 0 && lmm_size) {
840                 /*set obd attr, if needed*/
841                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
842         }
843         if (lmm != NULL) {
844                 OBD_FREE(lmm, max_size);
845         }
846
847         RETURN(rc);
848 }
849
850 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct mdd_object *obj,
851                       const void *buf, int buf_len, const char *name, int fl,
852                       struct thandle *handle)
853 {
854         int  rc;
855         ENTRY;
856
857         mdd_lock(ctxt, obj, DT_WRITE_LOCK);
858         rc = __mdd_xattr_set(ctxt, obj, buf, buf_len, name,
859                              fl, handle);
860         mdd_unlock(ctxt, obj, DT_WRITE_LOCK);
861
862         RETURN(rc);
863 }
864
865 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
866                          const void *buf, int buf_len, const char *name, int fl)
867 {
868         struct mdd_device *mdd = mdo2mdd(obj);
869         struct thandle *handle;
870         int  rc;
871         ENTRY;
872
873         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
874         handle = mdd_trans_start(ctxt, mdd);
875         if (IS_ERR(handle))
876                 RETURN(PTR_ERR(handle));
877
878         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
879                                fl, handle);
880
881         mdd_trans_stop(ctxt, mdd, handle);
882
883         RETURN(rc);
884 }
885
886 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
887                            struct mdd_object *obj,
888                            const char *name, struct thandle *handle)
889 {
890         struct dt_object *next;
891
892         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
893         next = mdd_object_child(obj);
894         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
895 }
896
897 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
898                   const char *name)
899 {
900         struct mdd_object *mdd_obj = md2mdd_obj(obj);
901         struct mdd_device *mdd = mdo2mdd(obj);
902         struct thandle *handle;
903         int  rc;
904         ENTRY;
905
906         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
907         handle = mdd_trans_start(ctxt, mdd);
908         if (IS_ERR(handle))
909                 RETURN(PTR_ERR(handle));
910
911         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
912         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
913         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
914
915         mdd_trans_stop(ctxt, mdd, handle);
916
917         RETURN(rc);
918 }
919
920 static int __mdd_index_insert(const struct lu_context *ctxt,
921                               struct mdd_object *pobj, const struct lu_fid *lf,
922                               const char *name, struct thandle *handle)
923 {
924         int rc;
925         struct dt_object *next = mdd_object_child(pobj);
926         ENTRY;
927
928         if (dt_try_as_dir(ctxt, next))
929                 rc = next->do_index_ops->dio_insert(ctxt, next,
930                                          (struct dt_rec *)lf,
931                                          (struct dt_key *)name, handle);
932         else
933                 rc = -ENOTDIR;
934         RETURN(rc);
935 }
936
937 static int __mdd_index_delete(const struct lu_context *ctxt,
938                               struct mdd_object *pobj, const char *name,
939                               struct thandle *handle)
940 {
941         int rc;
942         struct dt_object *next = mdd_object_child(pobj);
943         ENTRY;
944
945         if (dt_try_as_dir(ctxt, next))
946                 rc = next->do_index_ops->dio_delete(ctxt, next,
947                                         (struct dt_key *)name, handle);
948         else
949                 rc = -ENOTDIR;
950         RETURN(rc);
951 }
952
953 static int mdd_link_sanity_check(const struct lu_context *ctxt,
954                                  struct mdd_object *tgt_obj,
955                                  struct mdd_object *src_obj)
956 {
957         int rc;
958
959         rc = mdd_may_create(ctxt, tgt_obj, NULL);
960         if (rc)
961                 RETURN(rc);
962         if (S_ISDIR(mdd_object_type(ctxt, src_obj)))
963                 RETURN(-EPERM);
964
965         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
966                 RETURN(-EPERM);
967
968         RETURN(rc);
969 }
970
971 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
972                     struct md_object *src_obj, const char *name)
973 {
974         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
975         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
976         struct mdd_device *mdd = mdo2mdd(src_obj);
977         struct thandle *handle;
978         int rc;
979         ENTRY;
980
981         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
982         handle = mdd_trans_start(ctxt, mdd);
983         if (IS_ERR(handle))
984                 RETURN(PTR_ERR(handle));
985
986         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
987
988         rc = mdd_link_sanity_check(ctxt, mdd_tobj, mdd_sobj);
989         if (rc)
990                 GOTO(out, rc);
991
992         rc = __mdd_index_insert(ctxt, mdd_tobj, lu_object_fid(&src_obj->mo_lu),
993                                 name, handle);
994         if (rc == 0)
995                 __mdd_ref_add(ctxt, mdd_sobj, handle);
996
997 out:
998         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
999         mdd_trans_stop(ctxt, mdd, handle);
1000         RETURN(rc);
1001 }
1002
1003 /*
1004  * Check that @dir contains no entries except (possibly) dot and dotdot.
1005  *
1006  * Returns:
1007  *
1008  *             0        empty
1009  *    -ENOTEMPTY        not empty
1010  *           -ve        other error
1011  *
1012  */
1013 static int mdd_dir_is_empty(const struct lu_context *ctx,
1014                             struct mdd_object *dir)
1015 {
1016         struct dt_it     *it;
1017         struct dt_object *obj;
1018         struct dt_it_ops *iops;
1019         int result;
1020
1021         obj = mdd_object_child(dir);
1022         iops = &obj->do_index_ops->dio_it;
1023         it = iops->init(ctx, obj);
1024         if (it != NULL) {
1025                 result = iops->get(ctx, it, (const void *)"");
1026                 if (result > 0) {
1027                         int i;
1028                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1029                                 result = iops->next(ctx, it);
1030                         iops->put(ctx, it);
1031                         if (result == 0)
1032                                 result = -ENOTEMPTY;
1033                         else if (result == +1)
1034                                 result = 0;
1035                 } else if (result == 0)
1036                         /*
1037                          * Huh? Index contains no zero key?
1038                          */
1039                         result = -EIO;
1040                 iops->fini(ctx, it);
1041         } else
1042                 result = -ENOMEM;
1043         return result;
1044 }
1045
1046 /* return md_attr back,
1047  * if it is last unlink then return lov ea + llog cookie*/
1048 static int __mdd_finish_unlink(const struct lu_context *ctxt,
1049                                struct mdd_object *obj, struct md_attr *ma)
1050 {
1051         int rc;
1052         ENTRY;
1053
1054         rc = __mdd_iattr_get(ctxt, obj, ma);
1055         if (rc == 0) {
1056                 if (atomic_read(&obj->mod_count) == 0 &&
1057                     ma->ma_attr.la_nlink == 0) {
1058                         mdd_set_dead_obj(obj);
1059                         if (S_ISREG(mdd_object_type(ctxt, obj))) {
1060                                 rc = __mdd_lmm_get(ctxt, obj, ma);
1061                                 if (ma->ma_valid & MA_LOV)
1062                                         rc = mdd_unlink_log(ctxt,
1063                                                     mdo2mdd(&obj->mod_obj),
1064                                                     obj, ma);
1065                         }
1066                 }
1067         }
1068         RETURN(rc);
1069 }
1070
1071 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
1072                                    struct mdd_object *pobj,
1073                                    struct mdd_object *cobj,
1074                                    struct md_attr *ma)
1075 {
1076         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1077         int rc = 0;
1078         ENTRY;
1079
1080         rc = mdd_may_delete(ctxt, pobj, cobj, S_ISDIR(ma->ma_attr.la_mode));
1081         if (rc)
1082                 RETURN(rc);
1083
1084         if (S_ISDIR(mdd_object_type(ctxt, cobj)) &&
1085             dt_try_as_dir(ctxt, dt_cobj)) {
1086                 rc = mdd_dir_is_empty(ctxt, cobj);
1087                 if (rc != 0)
1088                         RETURN(rc);
1089         }
1090
1091         RETURN(rc);
1092 }
1093
1094 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
1095                       struct md_object *cobj, const char *name,
1096                       struct md_attr *ma)
1097 {
1098         struct mdd_device *mdd = mdo2mdd(pobj);
1099         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1100         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1101         struct thandle    *handle;
1102         int rc;
1103         ENTRY;
1104
1105         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
1106         handle = mdd_trans_start(ctxt, mdd);
1107         if (IS_ERR(handle))
1108                 RETURN(PTR_ERR(handle));
1109
1110         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
1111
1112         rc = mdd_unlink_sanity_check(ctxt, mdd_pobj, mdd_cobj, ma);
1113         if (rc)
1114                 GOTO(cleanup, rc);
1115
1116
1117         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1118         if (rc)
1119                 GOTO(cleanup, rc);
1120
1121         __mdd_ref_del(ctxt, mdd_cobj, handle);
1122         if (S_ISDIR(lu_object_attr(ctxt, &cobj->mo_lu))) {
1123                 /* unlink dot */
1124                 __mdd_ref_del(ctxt, mdd_cobj, handle);
1125                 /* unlink dotdot */
1126                 __mdd_ref_del(ctxt, mdd_pobj, handle);
1127         }
1128
1129         rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma);
1130
1131 cleanup:
1132         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
1133         mdd_trans_stop(ctxt, mdd, handle);
1134         RETURN(rc);
1135 }
1136
1137 static int mdd_parent_fid(const struct lu_context *ctxt,
1138                           struct mdd_object *obj,
1139                           struct lu_fid *fid)
1140 {
1141         return mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
1142 }
1143
1144 /*
1145  * return 0: if p2 is the parent of p1
1146  * otherwise: other_value
1147  */
1148 static int mdd_is_parent(const struct lu_context *ctxt,
1149                          struct mdd_device *mdd,
1150                          struct mdd_object *p1,
1151                          struct mdd_object *p2)
1152 {
1153         struct lu_fid * pfid;
1154         struct mdd_object *parent = NULL;
1155         int rc;
1156         ENTRY;
1157
1158         pfid = &mdd_ctx_info(ctxt)->mti_fid;
1159         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1160                 RETURN(1);
1161         for(;;) {
1162                 rc = mdd_parent_fid(ctxt, p1, pfid);
1163                 if (rc)
1164                         GOTO(out, rc);
1165                 if (lu_fid_eq(pfid, mdo2fid(p2)))
1166                         GOTO(out, rc = 0);
1167                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1168                         GOTO(out, rc = 1);
1169                 if (parent)
1170                         mdd_object_put(ctxt, parent);
1171                 parent = mdd_object_find(ctxt, mdd, pfid);
1172                 if (IS_ERR(parent))
1173                         GOTO(out, rc = PTR_ERR(parent));
1174                 p1 = parent;
1175         }
1176 out:
1177         if (parent && !IS_ERR(parent))
1178                 mdd_object_put(ctxt, parent);
1179         RETURN(rc);
1180 }
1181
1182 static int mdd_rename_lock(const struct lu_context *ctxt,
1183                            struct mdd_device *mdd,
1184                            struct mdd_object *src_pobj,
1185                            struct mdd_object *tgt_pobj)
1186 {
1187         ENTRY;
1188
1189         if (src_pobj == tgt_pobj) {
1190                 mdd_lock(ctxt, src_pobj, DT_WRITE_LOCK);
1191                 RETURN(0);
1192         }
1193         /*compared the parent child relationship of src_p&tgt_p*/
1194         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1195                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
1196                 RETURN(0);
1197         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1198                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1199                 RETURN(0);
1200         }
1201
1202         if (!mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
1203                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1204                 RETURN(0);
1205         }
1206
1207         mdd_lock2(ctxt, src_pobj, tgt_pobj);
1208
1209         RETURN(0);
1210 }
1211
1212 static void mdd_rename_unlock(const struct lu_context *ctxt,
1213                               struct mdd_object *src_pobj,
1214                               struct mdd_object *tgt_pobj)
1215 {
1216         mdd_unlock(ctxt, src_pobj, DT_WRITE_LOCK);
1217         if (src_pobj != tgt_pobj)
1218                 mdd_unlock(ctxt, tgt_pobj, DT_WRITE_LOCK);
1219 }
1220
1221 static int mdd_rename_sanity_check(const struct lu_context *ctxt,
1222                                    struct mdd_object *src_pobj,
1223                                    struct mdd_object *tgt_pobj,
1224                                    struct mdd_object *sobj,
1225                                    struct mdd_object *tobj)
1226 {
1227         struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj);
1228         int rc = 0, src_is_dir, tgt_is_dir;
1229         ENTRY;
1230
1231         src_is_dir = S_ISDIR(mdd_object_type(ctxt, sobj));
1232         rc = mdd_may_delete(ctxt, src_pobj, sobj, src_is_dir);
1233         if (rc)
1234                 GOTO(out, rc);
1235
1236         if (!tobj) {
1237                 rc = mdd_may_create(ctxt, tgt_pobj, NULL);
1238                 if (rc)
1239                         GOTO(out, rc);
1240                 if (!mdd_is_parent(ctxt, mdd, tgt_pobj, sobj))
1241                         GOTO(out, rc = -EINVAL);
1242                 GOTO(out, rc);
1243         }
1244
1245         /* source should not be ancestor of target */
1246         if (!mdd_is_parent(ctxt, mdd, tobj, sobj))
1247                 GOTO(out, rc = -EINVAL);
1248
1249         rc = mdd_may_delete(ctxt, tgt_pobj, tobj, src_is_dir);
1250         if (rc)
1251                 GOTO(out, rc);
1252
1253         tgt_is_dir = S_ISDIR(mdd_object_type(ctxt, tobj));
1254         if (tgt_is_dir && mdd_dir_is_empty(ctxt, tobj))
1255                 GOTO(out, rc = -ENOTEMPTY);
1256 out:
1257         mdd_object_put(ctxt, sobj);
1258         RETURN(rc);
1259 }
1260
1261 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
1262                       struct md_object *tgt_pobj, const struct lu_fid *lf,
1263                       const char *sname, struct md_object *tobj,
1264                       const char *tname, struct md_attr *ma)
1265 {
1266         struct mdd_device *mdd = mdo2mdd(src_pobj);
1267         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1268         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1269         struct mdd_object *mdd_sobj = mdd_object_find(ctxt, mdd, lf);
1270         struct mdd_object *mdd_tobj = NULL;
1271         struct thandle *handle;
1272         int rc;
1273         ENTRY;
1274
1275         if (tobj)
1276                 mdd_tobj = md2mdd_obj(tobj);
1277
1278         /*XXX: shouldn't this check be done under lock below? */
1279         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
1280                                      mdd_sobj, mdd_tobj);
1281         if (rc)
1282                 RETURN(rc);
1283
1284         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1285         handle = mdd_trans_start(ctxt, mdd);
1286         if (IS_ERR(handle))
1287                 RETURN(PTR_ERR(handle));
1288
1289         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1290         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
1291         if (rc)
1292                 GOTO(cleanup_unlocked, rc);
1293
1294         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
1295         if (rc)
1296                 GOTO(cleanup, rc);
1297
1298         /*if sobj is dir, its parent object nlink should be dec too*/
1299         if (S_ISDIR(mdd_object_type(ctxt, mdd_sobj)))
1300                 __mdd_ref_del(ctxt, mdd_spobj, handle);
1301
1302         if (tobj) {
1303                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
1304                 if (rc)
1305                         GOTO(cleanup, rc);
1306         }
1307
1308         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, handle);
1309         if (rc)
1310                 GOTO(cleanup, rc);
1311
1312         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu)) {
1313                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1314                 /* remove dot reference */
1315                 if (S_ISDIR(mdd_object_type(ctxt, mdd_tobj)))
1316                         __mdd_ref_del(ctxt, mdd_tobj, handle);
1317
1318                 rc = __mdd_finish_unlink(ctxt, mdd_tobj, ma);
1319         }
1320 cleanup:
1321         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
1322 cleanup_unlocked:
1323         mdd_trans_stop(ctxt, mdd, handle);
1324         RETURN(rc);
1325 }
1326
1327 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
1328                       const char *name, struct lu_fid* fid)
1329 {
1330         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1331         struct dt_object    *dir = mdd_object_child(mdd_obj);
1332         struct dt_rec       *rec    = (struct dt_rec *)fid;
1333         const struct dt_key *key = (const struct dt_key *)name;
1334         int rc;
1335         ENTRY;
1336
1337         if (mdd_is_dead_obj(mdd_obj))
1338                 RETURN(-ESTALE);
1339         mdd_lock(ctxt, mdd_obj, DT_READ_LOCK);
1340         if (S_ISDIR(mdd_object_type(ctxt, mdd_obj)) && dt_try_as_dir(ctxt, dir))
1341                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
1342         else
1343                 rc = -ENOTDIR;
1344         mdd_unlock(ctxt, mdd_obj, DT_READ_LOCK);
1345         RETURN(rc);
1346 }
1347
1348 static int __mdd_object_initialize(const struct lu_context *ctxt,
1349                                    struct mdd_object *parent,
1350                                    struct mdd_object *child,
1351                                    struct md_attr *ma, struct thandle *handle)
1352 {
1353         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
1354         int               rc;
1355         ENTRY;
1356
1357         /* update attributes for child and parent.
1358          * FIXME:
1359          *  (1) the valid bits should be converted between Lustre and Linux;
1360          *  (2) maybe, the child attributes should be set in OSD when creation.
1361          */
1362
1363         rc = mdd_attr_set_internal(ctxt, child, &ma->ma_attr, handle);
1364         if (rc != 0)
1365                 RETURN(rc);
1366
1367         la->la_valid = LA_MTIME|LA_CTIME;
1368         la->la_atime = ma->ma_attr.la_atime;
1369         la->la_ctime = ma->ma_attr.la_ctime;
1370         rc = mdd_attr_set_internal(ctxt, parent, la, handle);
1371         if (rc != 0)
1372                 RETURN(rc);
1373
1374         if (S_ISDIR(ma->ma_attr.la_mode)) {
1375                 /* add . and .. for newly created dir */
1376                 __mdd_ref_add(ctxt, child, handle);
1377                 rc = __mdd_index_insert(ctxt, child,
1378                                         mdo2fid(child), dot, handle);
1379                 if (rc == 0) {
1380                         rc = __mdd_index_insert(ctxt, child, mdo2fid(parent),
1381                                                 dotdot, handle);
1382                         if (rc == 0)
1383                                 __mdd_ref_add(ctxt, parent, handle);
1384                         else {
1385                                 int rc2;
1386
1387                                 rc2 = __mdd_index_delete(ctxt,
1388                                                          child, dot, handle);
1389                                 if (rc2 != 0)
1390                                         CERROR("Failure to cleanup after dotdot"
1391                                                " creation: %d (%d)\n", rc2, rc);
1392                                 else
1393                                         __mdd_ref_del(ctxt, child, handle);
1394                         }
1395                 }
1396         }
1397         RETURN(rc);
1398 }
1399
1400 static int mdd_create_data(const struct lu_context *ctxt,
1401                            struct md_object *pobj, struct md_object *cobj,
1402                            const struct md_create_spec *spec,
1403                            struct md_attr *ma)
1404 {
1405         struct mdd_device *mdd = mdo2mdd(pobj);
1406         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1407         struct mdd_object *son = md2mdd_obj(cobj);
1408         struct lu_attr    *attr = &ma->ma_attr;
1409         struct lov_mds_md *lmm = NULL;
1410         int                lmm_size = 0;
1411         struct thandle    *handle;
1412         int                rc;
1413         ENTRY;
1414
1415         rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
1416                             attr);
1417         if (rc)
1418                 RETURN(rc);
1419
1420         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
1421         handle = mdd_trans_start(ctxt, mdd);
1422         if (IS_ERR(handle))
1423                 RETURN(PTR_ERR(handle));
1424
1425         /*XXX: setting the lov ea is not locked but setting the attr is locked? */
1426         if (rc == 0) {
1427                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size,
1428                                     handle, 0);
1429                 if (rc == 0)
1430                         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1431         }
1432
1433         mdd_trans_stop(ctxt, mdd, handle);
1434         RETURN(rc);
1435 }
1436
1437 static int mdd_create_sanity_check(const struct lu_context *ctxt,
1438                                    struct mdd_device *mdd,
1439                                    struct md_object *pobj,
1440                                    const char *name, struct md_attr *ma)
1441 {
1442         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
1443         struct lu_fid    *fid = &mdd_ctx_info(ctxt)->mti_fid;
1444         int rc;
1445
1446         ENTRY;
1447         /* EEXIST check */
1448         if (mdd_is_dead_obj(md2mdd_obj(pobj)))
1449                 RETURN(-ENOENT);
1450         rc = mdd_lookup(ctxt, pobj, name, fid);
1451         if (rc != -ENOENT) {
1452                 rc = rc ? rc : -EEXIST;
1453                 RETURN(rc);
1454         }
1455         /* sgid check */
1456         rc = __mdd_la_get(ctxt, md2mdd_obj(pobj), la);
1457         if (rc != 0)
1458                 RETURN(rc);
1459
1460         if (la->la_mode & S_ISGID) {
1461                 ma->ma_attr.la_gid = la->la_gid;
1462                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1463                         ma->ma_attr.la_mode |= S_ISGID;
1464                         ma->ma_attr.la_valid |= LA_MODE;
1465                 }
1466         }
1467
1468         switch (ma->ma_attr.la_mode & S_IFMT) {
1469         case S_IFREG:
1470         case S_IFDIR:
1471         case S_IFLNK:
1472         case S_IFCHR:
1473         case S_IFBLK:
1474         case S_IFIFO:
1475         case S_IFSOCK:
1476                 rc = 0;
1477                 break;
1478         default:
1479                 rc = -EINVAL;
1480                 break;
1481         }
1482         RETURN(rc);
1483 }
1484
1485 /*
1486  * Create object and insert it into namespace.
1487  */
1488 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
1489                       const char *name, struct md_object *child,
1490                       const struct md_create_spec *spec,
1491                       struct md_attr* ma)
1492 {
1493         struct mdd_device *mdd = mdo2mdd(pobj);
1494         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1495         struct mdd_object *son = md2mdd_obj(child);
1496         struct lu_attr *attr = &ma->ma_attr;
1497         struct lov_mds_md *lmm = NULL;
1498         struct thandle *handle;
1499         int rc, created = 0, inserted = 0, lmm_size = 0;
1500         ENTRY;
1501
1502         /* sanity checks before big job */
1503         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1504         if (rc)
1505                 RETURN(rc);
1506         /* no RPC inside the transaction, so OST objects should be created at
1507          * first */
1508         if (S_ISREG(attr->la_mode)) {
1509                 rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size,
1510                                     spec, attr);
1511                 if (rc)
1512                         RETURN(rc);
1513         }
1514
1515         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1516         handle = mdd_trans_start(ctxt, mdd);
1517         if (IS_ERR(handle))
1518                 RETURN(PTR_ERR(handle));
1519
1520         mdd_lock(ctxt, mdd_pobj, DT_WRITE_LOCK);
1521
1522         /*
1523          * XXX check that link can be added to the parent in mkdir case.
1524          */
1525
1526         /*
1527          * Two operations have to be performed:
1528          *
1529          *  - allocation of new object (->do_create()), and
1530          *
1531          *  - insertion into parent index (->dio_insert()).
1532          *
1533          * Due to locking, operation order is not important, when both are
1534          * successful, *but* error handling cases are quite different:
1535          *
1536          *  - if insertion is done first, and following object creation fails,
1537          *  insertion has to be rolled back, but this operation might fail
1538          *  also leaving us with dangling index entry.
1539          *
1540          *  - if creation is done first, is has to be undone if insertion
1541          *  fails, leaving us with leaked space, which is neither good, nor
1542          *  fatal.
1543          *
1544          * It seems that creation-first is simplest solution, but it is
1545          * sub-optimal in the frequent
1546          *
1547          *         $ mkdir foo
1548          *         $ mkdir foo
1549          *
1550          * case, because second mkdir is bound to create object, only to
1551          * destroy it immediately.
1552          *
1553          * Note that local file systems do
1554          *
1555          *     0. lookup -> -EEXIST
1556          *
1557          *     1. create
1558          *
1559          *     2. insert
1560          *
1561          * Maybe we should do the same. For now: creation-first.
1562          */
1563
1564         rc = __mdd_object_create(ctxt, son, ma, handle);
1565         if (rc)
1566                 GOTO(cleanup, rc);
1567
1568         created = 1;
1569
1570         rc = __mdd_object_initialize(ctxt, mdd_pobj, son, ma, handle);
1571         if (rc)
1572                 /*
1573                  * Object has no links, so it will be destroyed when last
1574                  * reference is released. (XXX not now.)
1575                  */
1576                 GOTO(cleanup, rc);
1577
1578         rc = __mdd_index_insert(ctxt, mdd_pobj, lu_object_fid(&child->mo_lu),
1579                                 name, handle);
1580
1581         if (rc)
1582                 GOTO(cleanup, rc);
1583
1584         inserted = 1;
1585         rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
1586         if (rc) {
1587                 CERROR("error on stripe info copy %d \n", rc);
1588                 GOTO(cleanup, rc);
1589         }
1590
1591         if (S_ISLNK(attr->la_mode)) {
1592                 struct dt_object *dt = mdd_object_child(son);
1593                 const char *target_name = spec->u.sp_symname;
1594                 int sym_len = strlen(target_name);
1595                 loff_t pos = 0;
1596
1597                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1598                                                 sym_len, &pos, handle);
1599                 if (rc == sym_len)
1600                         rc = 0;
1601                 else
1602                         rc = -EFAULT;
1603         }
1604         /* return attr back */
1605         rc = mdd_attr_get_internal(ctxt, son, ma);
1606 cleanup:
1607         if (rc && created) {
1608                 int rc2 = 0;
1609
1610                 if (inserted) {
1611                         rc2 = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1612                         if (rc2)
1613                                 CERROR("error can not cleanup destroy %d\n",
1614                                        rc2);
1615                 }
1616                 if (rc2 == 0)
1617                         __mdd_ref_del(ctxt, son, handle);
1618         }
1619         if (lmm)
1620                 OBD_FREE(lmm, lmm_size);
1621         mdd_unlock(ctxt, mdd_pobj, DT_WRITE_LOCK);
1622         mdd_trans_stop(ctxt, mdd, handle);
1623         RETURN(rc);
1624 }
1625 /* partial operation */
1626 static int mdd_object_create(const struct lu_context *ctxt,
1627                              struct md_object *obj,
1628                              const struct md_create_spec *spec,
1629                              struct md_attr *ma)
1630 {
1631
1632         struct mdd_device *mdd = mdo2mdd(obj);
1633         struct thandle *handle;
1634         int rc;
1635         ENTRY;
1636
1637         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1638         handle = mdd_trans_start(ctxt, mdd);
1639         if (IS_ERR(handle))
1640                 RETURN(PTR_ERR(handle));
1641
1642         rc = __mdd_object_create(ctxt, md2mdd_obj(obj), ma, handle);
1643 /* XXX: parent fid is needed here
1644         rc = __mdd_object_initialize(ctxt, mdo, son, ma, handle);
1645 */
1646         if (rc)
1647                 GOTO(out, rc);
1648
1649         rc = mdd_attr_get_internal(ctxt, md2mdd_obj(obj), ma);
1650
1651         mdd_trans_stop(ctxt, mdd, handle);
1652 out:
1653         RETURN(rc);
1654 }
1655 /* partial operation */
1656 static int mdd_name_insert(const struct lu_context *ctxt,
1657                            struct md_object *pobj,
1658                            const char *name, const struct lu_fid *fid)
1659 {
1660         struct mdd_device *mdd = mdo2mdd(pobj);
1661         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1662         struct thandle *handle;
1663         int rc;
1664         ENTRY;
1665
1666         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1667         handle = mdd_trans_start(ctxt, mdd);
1668         if (IS_ERR(handle))
1669                 RETURN(PTR_ERR(handle));
1670
1671         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1672
1673         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, handle);
1674
1675         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1676         mdd_trans_stop(ctxt, mdd, handle);
1677         RETURN(rc);
1678 }
1679
1680 static int mdd_name_remove(const struct lu_context *ctxt,
1681                            struct md_object *pobj,
1682                            const char *name)
1683 {
1684         struct mdd_device *mdd = mdo2mdd(pobj);
1685         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1686         struct thandle *handle;
1687         int rc;
1688         ENTRY;
1689
1690         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1691         handle = mdd_trans_start(ctxt, mdd);
1692         if (IS_ERR(handle))
1693                 RETURN(PTR_ERR(handle));
1694
1695         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1696
1697         rc = __mdd_index_delete(ctxt, mdd_obj, name, handle);
1698
1699         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1700
1701         mdd_trans_stop(ctxt, mdd, handle);
1702         RETURN(rc);
1703 }
1704
1705 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1706                           struct md_object *tobj, const struct lu_fid *lf,
1707                           const char *name)
1708 {
1709         struct mdd_device *mdd = mdo2mdd(pobj);
1710         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1711         struct mdd_object *mdd_tobj = NULL;
1712         struct thandle *handle;
1713         int rc;
1714         ENTRY;
1715
1716         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1717         handle = mdd_trans_start(ctxt, mdd);
1718         if (IS_ERR(handle))
1719                 RETURN(PTR_ERR(handle));
1720
1721         if (tobj)
1722                 mdd_tobj = md2mdd_obj(tobj);
1723
1724         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1725
1726         /*TODO rename sanity checking*/
1727         if (tobj) {
1728                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1729                 if (rc)
1730                         GOTO(cleanup, rc);
1731         }
1732
1733         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, name, handle);
1734         if (rc)
1735                 GOTO(cleanup, rc);
1736
1737         if (tobj && lu_object_exists(ctxt, &tobj->mo_lu))
1738                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1739 cleanup:
1740         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1741         mdd_trans_stop(ctxt, mdd, handle);
1742         RETURN(rc);
1743 }
1744
1745 static int mdd_root_get(const struct lu_context *ctx,
1746                         struct md_device *m, struct lu_fid *f)
1747 {
1748         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1749
1750         ENTRY;
1751         *f = mdd->mdd_root_fid;
1752         RETURN(0);
1753 }
1754
1755 static int mdd_statfs(const struct lu_context *ctx,
1756                       struct md_device *m, struct kstatfs *sfs)
1757 {
1758         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1759         int rc;
1760
1761         ENTRY;
1762
1763         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1764
1765         RETURN(rc);
1766 }
1767
1768 static int mdd_get_maxsize(const struct lu_context *ctx,
1769                            struct md_device *m, int *md_size,
1770                            int *cookie_size)
1771 {
1772         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1773         ENTRY;
1774
1775         *md_size =  mdd_lov_mdsize(ctx, mdd);
1776         *cookie_size = mdd_lov_cookiesize(ctx, mdd);
1777
1778         RETURN(0);
1779 }
1780
1781 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1782                          struct thandle *handle)
1783 {
1784         struct dt_object *next;
1785
1786         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1787         next = mdd_object_child(obj);
1788         next->do_ops->do_ref_add(ctxt, next, handle);
1789 }
1790
1791 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1792 {
1793         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1794         struct mdd_device *mdd = mdo2mdd(obj);
1795         struct thandle *handle;
1796         ENTRY;
1797
1798         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1799         handle = mdd_trans_start(ctxt, mdd);
1800         if (IS_ERR(handle))
1801                 RETURN(-ENOMEM);
1802
1803         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1804         __mdd_ref_add(ctxt, mdd_obj, handle);
1805         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1806
1807         mdd_trans_stop(ctxt, mdd, handle);
1808
1809         RETURN(0);
1810 }
1811
1812 static void
1813 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1814               struct thandle *handle)
1815 {
1816         struct dt_object *next = mdd_object_child(obj);
1817
1818         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
1819
1820         next->do_ops->do_ref_del(ctxt, next, handle);
1821 }
1822
1823 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1824                        struct md_attr *ma)
1825 {
1826         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1827         struct mdd_device *mdd = mdo2mdd(obj);
1828         struct thandle *handle;
1829         int rc;
1830         ENTRY;
1831
1832         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1833         handle = mdd_trans_start(ctxt, mdd);
1834         if (IS_ERR(handle))
1835                 RETURN(-ENOMEM);
1836
1837         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
1838
1839         /* rmdir checks */
1840         if (S_ISDIR(lu_object_attr(ctxt, &obj->mo_lu)) &&
1841             dt_try_as_dir(ctxt, mdd_object_child(mdd_obj))) {
1842                 rc = mdd_dir_is_empty(ctxt, mdd_obj);
1843                 if (rc != 0)
1844                         GOTO(cleanup, rc);
1845         }
1846
1847         __mdd_ref_del(ctxt, mdd_obj, handle);
1848
1849         if (S_ISDIR(lu_object_attr(ctxt, &obj->mo_lu))) {
1850                 /* unlink dot */
1851                 __mdd_ref_del(ctxt, mdd_obj, handle);
1852         }
1853
1854         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma);
1855
1856 cleanup:
1857         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
1858         mdd_trans_stop(ctxt, mdd, handle);
1859         RETURN(rc);
1860 }
1861
1862 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
1863 {
1864         /*XXX access mod checking*/
1865         atomic_inc(&md2mdd_obj(obj)->mod_count);
1866         return 0;
1867 }
1868
1869 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
1870                      struct md_attr *ma)
1871 {
1872         if (atomic_dec_and_test(&md2mdd_obj(obj)->mod_count)) {
1873                 /*TODO: Remove it from orphan list */
1874         }
1875
1876         return __mdd_finish_unlink(ctxt, md2mdd_obj(obj), ma);
1877 }
1878
1879 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
1880                         const struct lu_rdpg *rdpg)
1881 {
1882         struct dt_object *next;
1883         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1884         int rc;
1885
1886         LASSERT(lu_object_exists(ctxt, mdd2lu_obj(mdd_obj)));
1887         next = mdd_object_child(mdd_obj);
1888
1889         mdd_lock(ctxt, mdd_obj, DT_READ_LOCK);
1890         if (S_ISDIR(mdd_object_type(ctxt, mdd_obj)) &&
1891             dt_try_as_dir(ctxt, next))
1892                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
1893         else
1894                 rc = -ENOTDIR;
1895         mdd_unlock(ctxt, mdd_obj, DT_READ_LOCK);
1896         return rc;
1897 }
1898
1899 struct md_device_operations mdd_ops = {
1900         .mdo_root_get       = mdd_root_get,
1901         .mdo_statfs         = mdd_statfs,
1902         .mdo_get_maxsize    = mdd_get_maxsize,
1903 };
1904
1905 static struct md_dir_operations mdd_dir_ops = {
1906         .mdo_lookup        = mdd_lookup,
1907         .mdo_create        = mdd_create,
1908         .mdo_rename        = mdd_rename,
1909         .mdo_link          = mdd_link,
1910         .mdo_unlink        = mdd_unlink,
1911         .mdo_name_insert   = mdd_name_insert,
1912         .mdo_name_remove   = mdd_name_remove,
1913         .mdo_rename_tgt    = mdd_rename_tgt,
1914         .mdo_create_data   = mdd_create_data
1915 };
1916
1917
1918 static struct md_object_operations mdd_obj_ops = {
1919         .moo_attr_get      = mdd_attr_get,
1920         .moo_attr_set      = mdd_attr_set,
1921         .moo_xattr_get     = mdd_xattr_get,
1922         .moo_xattr_set     = mdd_xattr_set,
1923         .moo_xattr_list    = mdd_xattr_list,
1924         .moo_xattr_del     = mdd_xattr_del,
1925         .moo_object_create = mdd_object_create,
1926         .moo_ref_add       = mdd_ref_add,
1927         .moo_ref_del       = mdd_ref_del,
1928         .moo_open          = mdd_open,
1929         .moo_close         = mdd_close,
1930         .moo_readpage      = mdd_readpage,
1931         .moo_readlink      = mdd_readlink
1932 };
1933
1934 static struct obd_ops mdd_obd_device_ops = {
1935         .o_owner = THIS_MODULE
1936 };
1937
1938 static struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
1939                                           struct lu_device_type *t,
1940                                           struct lustre_cfg *lcfg)
1941 {
1942         struct lu_device  *l;
1943         struct mdd_device *m;
1944
1945         OBD_ALLOC_PTR(m);
1946         if (m == NULL) {
1947                 l = ERR_PTR(-ENOMEM);
1948         } else {
1949                 md_device_init(&m->mdd_md_dev, t);
1950                 l = mdd2lu_dev(m);
1951                 l->ld_ops = &mdd_lu_ops;
1952                 m->mdd_md_dev.md_ops = &mdd_ops;
1953         }
1954
1955         return l;
1956 }
1957
1958 static void mdd_device_free(const struct lu_context *ctx,
1959                             struct lu_device *lu)
1960 {
1961         struct mdd_device *m = lu2mdd_dev(lu);
1962
1963         LASSERT(atomic_read(&lu->ld_ref) == 0);
1964         md_device_fini(&m->mdd_md_dev);
1965         OBD_FREE_PTR(m);
1966 }
1967
1968 static int mdd_type_init(struct lu_device_type *t)
1969 {
1970         return lu_context_key_register(&mdd_thread_key);
1971 }
1972
1973 static void mdd_type_fini(struct lu_device_type *t)
1974 {
1975         lu_context_key_degister(&mdd_thread_key);
1976 }
1977
1978 static struct lu_device_type_operations mdd_device_type_ops = {
1979         .ldto_init = mdd_type_init,
1980         .ldto_fini = mdd_type_fini,
1981
1982         .ldto_device_alloc = mdd_device_alloc,
1983         .ldto_device_free  = mdd_device_free,
1984
1985         .ldto_device_init    = mdd_device_init,
1986         .ldto_device_fini    = mdd_device_fini
1987 };
1988
1989 static struct lu_device_type mdd_device_type = {
1990         .ldt_tags     = LU_DEVICE_MD,
1991         .ldt_name     = LUSTRE_MDD0_NAME,
1992         .ldt_ops      = &mdd_device_type_ops,
1993         .ldt_ctx_tags = LCT_MD_THREAD
1994 };
1995
1996 static void *mdd_key_init(const struct lu_context *ctx,
1997                           struct lu_context_key *key)
1998 {
1999         struct mdd_thread_info *info;
2000
2001         OBD_ALLOC_PTR(info);
2002         if (info == NULL)
2003                 info = ERR_PTR(-ENOMEM);
2004         return info;
2005 }
2006
2007 static void mdd_key_fini(const struct lu_context *ctx,
2008                          struct lu_context_key *key, void *data)
2009 {
2010         struct mdd_thread_info *info = data;
2011         OBD_FREE_PTR(info);
2012 }
2013
2014 static struct lu_context_key mdd_thread_key = {
2015         .lct_tags = LCT_MD_THREAD,
2016         .lct_init = mdd_key_init,
2017         .lct_fini = mdd_key_fini
2018 };
2019
2020 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
2021         { 0 }
2022 };
2023
2024 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
2025         { 0 }
2026 };
2027
2028 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
2029
2030 static int __init mdd_mod_init(void)
2031 {
2032         struct lprocfs_static_vars lvars;
2033
2034         lprocfs_init_vars(mdd, &lvars);
2035         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
2036                                    LUSTRE_MDD0_NAME, &mdd_device_type);
2037 }
2038
2039 static void __exit mdd_mod_exit(void)
2040 {
2041         class_unregister_type(LUSTRE_MDD0_NAME);
2042 }
2043
2044 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2045 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
2046 MODULE_LICENSE("GPL");
2047
2048 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);