Whamcloud - gitweb
Fix the file(parent/child) x(a/m/c)time change rule according to the POSIX.
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43
44 #include "mdd_internal.h"
45
46
47 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
48                                        struct mdd_device *);
49 static void mdd_trans_stop(const struct lu_context *ctxt,
50                            struct mdd_device *mdd, int rc,
51                            struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
54                           struct thandle *handle);
55 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
56                           struct thandle *handle);
57 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
58                       const char *name, struct lu_fid* fid);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 static const char *mdd_root_dir_name = "root";
67 static const char dot[] = ".";
68 static const char dotdot[] = "..";
69
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdd_obj;
85
86         OBD_ALLOC_PTR(mdd_obj);
87         if (mdd_obj != NULL) {
88                 struct lu_object *o;
89                 
90                 o = mdd2lu_obj(mdd_obj);
91                 lu_object_init(o, NULL, d);
92                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 atomic_set(&mdd_obj->mod_count, 0);
95                 o->lo_ops = &mdd_lu_obj_ops;
96                 return o;
97         } else {
98                 return NULL;
99         }
100 }
101
102 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 {
104         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
105         struct lu_object  *below;
106         struct lu_device  *under;
107         ENTRY;
108
109         under = &d->mdd_child->dd_lu_dev;
110         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
111
112         if (below == NULL)
113                 RETURN(-ENOMEM);
114
115         lu_object_add(o, below);
116         RETURN(0);
117 }
118
119 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj);
120
121 static int mdd_object_start(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         if (lu_object_exists(o))
124                 return mdd_get_flags(ctxt, lu2mdd_obj(o));
125         else
126                 return 0;
127 }
128
129 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
130 {
131         struct mdd_object *mdd = lu2mdd_obj(o);
132         
133         lu_object_fini(o);
134         OBD_FREE_PTR(mdd);
135 }
136
137 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
138                                    struct mdd_device *d,
139                                    const struct lu_fid *f)
140 {
141         struct lu_object *o;
142         struct mdd_object *m;
143         ENTRY;
144
145         o = lu_object_find(ctxt, d->mdd_md_dev.md_lu_dev.ld_site, f);
146         if (IS_ERR(o))
147                 m = (struct mdd_object *)o;
148         else
149                 m = lu2mdd_obj(lu_object_locate(o->lo_header,
150                                  d->mdd_md_dev.md_lu_dev.ld_type));
151         RETURN(m);
152 }
153
154 static inline void mdd_object_put(const struct lu_context *ctxt,
155                                   struct mdd_object *o)
156 {
157         lu_object_put(ctxt, &o->mod_obj.mo_lu);
158 }
159
160 static inline int mdd_is_immutable(struct mdd_object *obj)
161 {
162         return obj->mod_flags & IMMUTE_OBJ;
163 }
164
165 static inline int mdd_is_append(struct mdd_object *obj)
166 {
167         return obj->mod_flags & APPEND_OBJ;
168 }
169
170 static void mdd_set_dead_obj(struct mdd_object *obj)
171 {
172         if (obj)
173                 obj->mod_flags |= DEAD_OBJ;
174 }
175
176 static int mdd_is_dead_obj(struct mdd_object *obj)
177 {
178         return obj && obj->mod_flags & DEAD_OBJ;
179 }
180
181 /*Check whether it may create the cobj under the pobj*/
182 static int mdd_may_create(const struct lu_context *ctxt,
183                           struct mdd_object *pobj, struct mdd_object *cobj)
184 {
185         ENTRY;
186         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
187                 RETURN(-EEXIST);
188
189         if (mdd_is_dead_obj(pobj))
190                 RETURN(-ENOENT);
191
192         /*check pobj may create or not*/
193         RETURN(0);
194 }
195
196 static inline int __mdd_la_get(const struct lu_context *ctxt,
197                                struct mdd_object *obj, struct lu_attr *la)
198 {
199         struct dt_object  *next = mdd_object_child(obj);
200         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
201         return next->do_ops->do_attr_get(ctxt, next, la);
202 }
203
204 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
205 {
206         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
207
208         if (flags & LUSTRE_APPEND_FL)
209                 obj->mod_flags |= APPEND_OBJ;
210
211         if (flags & LUSTRE_IMMUTABLE_FL)
212                 obj->mod_flags |= IMMUTE_OBJ;
213 }
214
215 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj)
216 {
217         struct lu_attr *la = &mdd_ctx_info(ctxt)->mti_la;
218         int rc;
219
220         ENTRY;
221         mdd_read_lock(ctxt, obj);
222         rc = __mdd_la_get(ctxt, obj, la);
223         mdd_read_unlock(ctxt, obj);
224         if (rc == 0)
225                 mdd_flags_xlate(obj, la->la_flags);
226         RETURN(rc);
227 }
228
229 /*Check whether it may delete the cobj under the pobj*/
230 static int mdd_may_delete(const struct lu_context *ctxt,
231                           struct mdd_object *pobj, struct mdd_object *cobj,
232                           int is_dir)
233 {
234         struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
235         int rc = 0;
236         ENTRY;
237
238         LASSERT(cobj && pobj);
239
240         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
241                 RETURN(-ENOENT);
242
243         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
244                 RETURN(-EPERM);
245
246         if (is_dir) {
247                 if (!S_ISDIR(mdd_object_type(cobj)))
248                         RETURN(-ENOTDIR);
249
250                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
251                         RETURN(-EBUSY);
252
253         } else if (S_ISDIR(mdd_object_type(cobj)))
254                         RETURN(-EISDIR);
255
256         if (mdd_is_dead_obj(pobj))
257                 RETURN(-ENOENT);
258
259         RETURN(rc);
260 }
261 /* get only inode attributes */
262 static int __mdd_iattr_get(const struct lu_context *ctxt,
263                            struct mdd_object *mdd_obj, struct md_attr *ma)
264 {
265         int rc = 0;
266         ENTRY;
267
268         rc = __mdd_la_get(ctxt, mdd_obj, &ma->ma_attr);
269         if (rc == 0)
270                 ma->ma_valid = MA_INODE;
271         RETURN(rc);
272 }
273 /* get lov EA only */
274 static int __mdd_lmm_get(const struct lu_context *ctxt,
275                          struct mdd_object *mdd_obj, struct md_attr *ma)
276 {
277         int rc;
278
279         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
280         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, 0, 
281                         MDS_LOV_MD_NAME);
282         if (rc > 0) {
283                 ma->ma_valid |= MA_LOV;
284                 rc = 0;
285         }
286         RETURN(rc);
287 }
288
289 #ifdef HAVE_SPLIT_SUPPORT
290 /* get lmv EA only*/
291 static int __mdd_lmv_get(const struct lu_context *ctxt,
292                          struct mdd_object *mdd_obj, struct md_attr *ma)
293 {
294         int rc;
295
296         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size, 0,
297                         MDS_LMV_MD_NAME);
298         if (rc > 0) {
299                 ma->ma_valid |= MA_LMV;
300                 rc = 0;
301         }
302         RETURN(rc);
303 }
304 #endif
305
306 static int mdd_attr_get_internal(const struct lu_context *ctxt,
307                                  struct mdd_object *mdd_obj,
308                                  struct md_attr *ma)
309 {
310         int rc = 0;
311         ENTRY;
312
313         if (ma->ma_need & MA_INODE)
314                 rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
315
316         if (rc == 0 && ma->ma_need & MA_LOV) {
317                 if (S_ISREG(mdd_object_type(mdd_obj)) || 
318                     S_ISDIR(mdd_object_type(mdd_obj)))
319                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
320         }
321 #ifdef HAVE_SPLIT_SUPPORT 
322         if (rc == 0 && ma->ma_need & MA_LMV) {
323                 if (S_ISDIR(mdd_object_type(mdd_obj)))
324                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
325         }
326 #endif        
327         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
328                         rc, ma->ma_valid);
329         RETURN(rc);
330 }
331
332 static inline int mdd_attr_get_internal_locked(const struct lu_context *ctxt,
333                                                struct mdd_object *mdd_obj,
334                                                struct md_attr *ma)
335 {
336         int rc;
337         mdd_read_lock(ctxt, mdd_obj);
338         rc = mdd_attr_get_internal(ctxt, mdd_obj, ma);
339         mdd_read_unlock(ctxt, mdd_obj);
340         return rc;
341 }
342
343 static int mdd_attr_get(const struct lu_context *ctxt,
344                         struct md_object *obj, struct md_attr *ma)
345 {
346         struct mdd_object *mdd_obj = md2mdd_obj(obj);
347         int                rc;
348
349         ENTRY;
350         rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
351         RETURN(rc);
352 }
353
354 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
355                          void *buf, int buf_len, const char *name)
356 {
357         struct mdd_object *mdd_obj = md2mdd_obj(obj);
358         struct dt_object  *next;
359         int rc;
360
361         ENTRY;
362
363         LASSERT(lu_object_exists(&obj->mo_lu));
364
365         next = mdd_object_child(mdd_obj);
366         mdd_read_lock(ctxt, mdd_obj);
367         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
368         mdd_read_unlock(ctxt, mdd_obj);
369
370         RETURN(rc);
371 }
372
373 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
374                         void *buf, int buf_len)
375 {
376         struct mdd_object *mdd_obj = md2mdd_obj(obj);
377         struct dt_object  *next;
378         loff_t             pos = 0;
379         int                rc;
380         ENTRY;
381
382         LASSERT(lu_object_exists(&obj->mo_lu));
383
384         next = mdd_object_child(mdd_obj);
385         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
386         RETURN(rc);
387 }
388 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
389                           void *buf, int buf_len)
390 {
391         struct mdd_object *mdd_obj = md2mdd_obj(obj);
392         struct dt_object  *next;
393         int rc;
394
395         ENTRY;
396
397         LASSERT(lu_object_exists(&obj->mo_lu));
398
399         next = mdd_object_child(mdd_obj);
400         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
401
402         RETURN(rc);
403 }
404
405 enum mdd_txn_op {
406         MDD_TXN_OBJECT_DESTROY_OP,
407         MDD_TXN_OBJECT_CREATE_OP,
408         MDD_TXN_ATTR_SET_OP,
409         MDD_TXN_XATTR_SET_OP,
410         MDD_TXN_INDEX_INSERT_OP,
411         MDD_TXN_INDEX_DELETE_OP,
412         MDD_TXN_LINK_OP,
413         MDD_TXN_UNLINK_OP,
414         MDD_TXN_RENAME_OP,
415         MDD_TXN_CREATE_DATA_OP,
416         MDD_TXN_MKDIR_OP
417 };
418
419 struct mdd_txn_op_descr {
420         enum mdd_txn_op mod_op;
421         unsigned int    mod_credits;
422 };
423
424 enum {
425         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
426         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
427         MDD_TXN_ATTR_SET_CREDITS       = 20,
428         MDD_TXN_XATTR_SET_CREDITS      = 20,
429         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
430         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
431         MDD_TXN_LINK_CREDITS           = 20,
432         MDD_TXN_UNLINK_CREDITS         = 20,
433         MDD_TXN_RENAME_CREDITS         = 20,
434         MDD_TXN_CREATE_DATA_CREDITS    = 20,
435         MDD_TXN_MKDIR_CREDITS          = 20
436 };
437
438 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
439 static const struct mdd_txn_op_descr opname = { \
440         .mod_op      = opname ## _OP,           \
441         .mod_credits = opname ## _CREDITS,      \
442 }
443
444 /*
445  * number of blocks to reserve for particular operations. Should be function
446  * of ... something. Stub for now.
447  */
448 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
449 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
450 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
451 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
452 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
453 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
454 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
455 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
456 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
457 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
458 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
459
460 static void mdd_txn_param_build(const struct lu_context *ctx,
461                                 const struct mdd_txn_op_descr *opd)
462 {
463         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
464 }
465
466 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
467                             lu_printer_t p, const struct lu_object *o)
468 {
469         return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
470 }
471
472 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
473 {
474         int rc;
475         struct dt_object *root;
476         ENTRY;
477
478         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
479                              &mdd->mdd_root_fid);
480         if (!IS_ERR(root)) {
481                 LASSERT(root != NULL);
482                 lu_object_put(ctx, &root->do_lu);
483                 rc = orph_index_init(ctx, mdd);
484         } else
485                 rc = PTR_ERR(root);
486
487         RETURN(rc);
488 }
489
490 static int mdd_device_init(const struct lu_context *ctx,
491                            struct lu_device *d, struct lu_device *next)
492 {
493         struct mdd_device *mdd = lu2mdd_dev(d);
494         int rc = 0;
495         ENTRY;
496
497         mdd->mdd_child = lu2dt_dev(next);
498
499         RETURN(rc);
500 }
501
502 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
503                                          struct lu_device *d)
504 {
505         struct mdd_device *m = lu2mdd_dev(d);
506         struct lu_device *next = &m->mdd_child->dd_lu_dev;
507
508         return next;
509 }
510 static void mdd_device_shutdown(const struct lu_context *ctxt,
511                                 struct mdd_device *m)
512 {
513         if (m->mdd_obd_dev)
514                 mdd_fini_obd(ctxt, m);
515         orph_index_fini(ctxt, m);
516 }
517
518 static int mdd_process_config(const struct lu_context *ctxt,
519                               struct lu_device *d, struct lustre_cfg *cfg)
520 {
521         struct mdd_device *m    = lu2mdd_dev(d);
522         struct dt_device  *dt   = m->mdd_child;
523         struct lu_device  *next = &dt->dd_lu_dev;
524         char              *dev = lustre_cfg_string(cfg, 0);
525         int rc;
526
527         switch (cfg->lcfg_command) {
528         case LCFG_SETUP:
529                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
530                 if (rc)
531                         GOTO(out, rc);
532                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
533
534                 rc = mdd_mount(ctxt, m);
535                 if (rc)
536                         GOTO(out, rc);
537                 rc = mdd_init_obd(ctxt, m, dev);
538                 if (rc) {
539                         CERROR("lov init error %d \n", rc);
540                         GOTO(out, rc);
541                 }
542                 break;
543         case LCFG_CLEANUP:
544                 mdd_device_shutdown(ctxt, m);
545         default:
546                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
547                 break;
548         }
549 out:
550         RETURN(rc);
551 }
552
553 static int mdd_recovery_complete(const struct lu_context *ctxt,
554                                  struct lu_device *d)
555 {
556         struct mdd_device *mdd = lu2mdd_dev(d);
557         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
558         int rc;
559         ENTRY;
560 /* TODO:
561         rc = mdd_lov_set_nextid(ctx, mdd);
562         if (rc) {
563                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
564                        obd->obd_name, rc);
565                 GOTO(out, rc);
566         }
567         rc = mdd_cleanup_unlink_llog(ctx, mdd);
568
569         obd_notify(obd->u.mds.mds_osc_obd, NULL,
570                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
571                    OBD_NOTIFY_SYNC, NULL);
572 */
573         /* TODO: orphans handling */
574         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
575         RETURN(rc);
576 }
577
578 struct lu_device_operations mdd_lu_ops = {
579         .ldo_object_alloc      = mdd_object_alloc,
580         .ldo_process_config    = mdd_process_config,
581         .ldo_recovery_complete = mdd_recovery_complete
582 };
583
584 static struct lu_object_operations mdd_lu_obj_ops = {
585         .loo_object_init    = mdd_object_init,
586         .loo_object_start   = mdd_object_start,
587         .loo_object_free    = mdd_object_free,
588         .loo_object_print   = mdd_object_print
589 };
590
591 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
592 {
593         struct dt_object  *next = mdd_object_child(obj);
594
595         next->do_ops->do_write_lock(ctxt, next);
596 }
597
598 void mdd_read_lock(const struct lu_context *ctxt, struct mdd_object *obj)
599 {
600         struct dt_object  *next = mdd_object_child(obj);
601
602         next->do_ops->do_read_lock(ctxt, next);
603 }
604
605 void mdd_write_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
606 {
607         struct dt_object  *next = mdd_object_child(obj);
608
609         next->do_ops->do_write_unlock(ctxt, next);
610 }
611
612 void mdd_read_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
613 {
614         struct dt_object  *next = mdd_object_child(obj);
615
616         next->do_ops->do_read_unlock(ctxt, next);
617 }
618
619 static void mdd_lock2(const struct lu_context *ctxt,
620                       struct mdd_object *o0, struct mdd_object *o1)
621 {
622         mdd_write_lock(ctxt, o0);
623         mdd_write_lock(ctxt, o1);
624 }
625
626 static void mdd_unlock2(const struct lu_context *ctxt,
627                         struct mdd_object *o0, struct mdd_object *o1)
628 {
629         mdd_write_unlock(ctxt, o1);
630         mdd_write_unlock(ctxt, o0);
631 }
632
633 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
634                                        struct mdd_device *mdd)
635 {
636         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
637
638         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
639 }
640
641 static void mdd_trans_stop(const struct lu_context *ctxt,
642                            struct mdd_device *mdd, int result,
643                            struct thandle *handle)
644 {
645         handle->th_result = result;
646         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
647 }
648
649 static int __mdd_object_create(const struct lu_context *ctxt,
650                                struct mdd_object *obj, struct md_attr *ma,
651                                struct thandle *handle)
652 {
653         struct dt_object *next;
654         struct lu_attr *attr = &ma->ma_attr;
655         int rc;
656         ENTRY;
657
658         if (!lu_object_exists(mdd2lu_obj(obj))) {
659                 next = mdd_object_child(obj);
660                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
661         } else
662                 rc = -EEXIST;
663
664         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
665
666         RETURN(rc);
667 }
668
669 int mdd_attr_set_internal(const struct lu_context *ctxt, struct mdd_object *o,
670                           const struct lu_attr *attr, struct thandle *handle)
671 {
672         struct dt_object *next;
673
674         LASSERT(lu_object_exists(mdd2lu_obj(o)));
675         next = mdd_object_child(o);
676         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
677 }
678
679 int mdd_attr_set_internal_locked(const struct lu_context *ctxt,
680                                  struct mdd_object *o,
681                                  const struct lu_attr *attr,
682                                  struct thandle *handle)
683 {
684         int rc;
685         mdd_write_lock(ctxt, o);
686         rc = mdd_attr_set_internal(ctxt, o, attr, handle);
687         mdd_write_unlock(ctxt, o);
688         return rc;
689 }
690
691 static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
692                            const void *buf, int buf_len, const char *name,
693                            int fl, struct thandle *handle)
694 {
695         struct dt_object *next;
696         int rc = 0;
697         ENTRY;
698
699         LASSERT(lu_object_exists(mdd2lu_obj(o)));
700         next = mdd_object_child(o);
701         if (buf && buf_len > 0) {
702                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
703                                                 0, handle);
704         }else if (buf == NULL && buf_len == 0) {
705                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
706         }
707         RETURN(rc);
708 }
709 /* this gives the same functionality as the code between
710  * sys_chmod and inode_setattr
711  * chown_common and inode_setattr
712  * utimes and inode_setattr
713  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
714  * and port to
715  */
716 int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj,
717                  const struct md_attr *ma, struct lu_attr *la)
718 {
719         struct lu_attr   *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
720         time_t            now = CURRENT_SECONDS;
721         int               rc;
722         ENTRY;
723
724         rc = __mdd_la_get(ctxt, obj, tmp_la);
725         if (rc)
726                 RETURN(rc);
727         /*XXX Check permission */
728         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
729
730                 /*If only change flags of the object, we should
731                  * let it pass, but also need capability check
732                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
733                  * fix it, when implement capable in mds*/
734                 if (la->la_valid & ~LA_FLAGS)
735                         RETURN(-EPERM);
736
737                 /*According to Ext3 implementation on this, the
738                  *Ctime will be changed, but not clear why?*/
739                 la->la_ctime = now;
740                 la->la_valid |= LA_CTIME;
741                 RETURN(0);
742         }
743         if (!(la->la_valid & LA_CTIME)) {
744                 la->la_ctime = now;
745                 la->la_valid |= LA_CTIME;
746         }
747
748 #if 0
749         /* times */
750         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
751                 if (current->fsuid != inode->i_uid &&
752                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
753                         RETURN(error);
754         }
755         if (ia_valid & ATTR_SIZE &&
756             /* NFSD hack for open(O_CREAT|O_TRUNC)=mknod+truncate (bug 5781) */
757             !(rec->ur_uc.luc_fsuid == inode->i_uid &&
758               ia_valid & MDS_OPEN_OWNEROVERRIDE)) {
759                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
760                         RETURN(error);
761         }
762 #endif
763
764         if (la->la_valid & (LA_UID | LA_GID)) {
765                 /* chown */
766
767                 if (mdd_is_immutable(obj) || mdd_is_append(obj))
768                         RETURN(-EPERM);
769                 if (la->la_uid == (uid_t) -1)
770                         la->la_uid = tmp_la->la_uid;
771                 if (la->la_gid == (gid_t) -1)
772                         la->la_gid = tmp_la->la_gid;
773                 if (!(la->la_valid & LA_MODE))
774                         la->la_mode = tmp_la->la_mode;
775                 /*
776                  * If the user or group of a non-directory has been
777                  * changed by a non-root user, remove the setuid bit.
778                  * 19981026 David C Niemi <niemi@tux.org>
779                  *
780                  * Changed this to apply to all users, including root,
781                  * to avoid some races. This is the behavior we had in
782                  * 2.0. The check for non-root was definitely wrong
783                  * for 2.2 anyway, as it should have been using
784                  * CAP_FSETID rather than fsuid -- 19990830 SD.
785                  */
786                 if ((tmp_la->la_mode & S_ISUID) == S_ISUID &&
787                     !S_ISDIR(tmp_la->la_mode)) {
788                         la->la_mode &= ~S_ISUID;
789                         la->la_valid |= LA_MODE;
790                 }
791                 /*
792                  * Likewise, if the user or group of a non-directory
793                  * has been changed by a non-root user, remove the
794                  * setgid bit UNLESS there is no group execute bit
795                  * (this would be a file marked for mandatory
796                  * locking).  19981026 David C Niemi <niemi@tux.org>
797                  *
798                  * Removed the fsuid check (see the comment above) --
799                  * 19990830 SD.
800                  */
801                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
802                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
803                         la->la_mode &= ~S_ISGID;
804                         la->la_valid |= LA_MODE;
805                 }
806         } else if (la->la_valid & LA_MODE) {
807                 int mode = la->la_mode;
808                 /* chmod */
809                 if (la->la_mode == (umode_t)-1)
810                         mode = tmp_la->la_mode;
811                 la->la_mode =
812                         (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO);
813         }
814         RETURN(rc);
815 }
816
817
818 /* set attr and LOV EA at once, return updated attr */
819 static int mdd_attr_set(const struct lu_context *ctxt,
820                         struct md_object *obj,
821                         const struct md_attr *ma)
822 {
823         struct mdd_object *mdd_obj = md2mdd_obj(obj);
824         struct mdd_device *mdd = mdo2mdd(obj);
825         struct thandle *handle;
826         struct lov_mds_md *lmm = NULL;
827         int  rc = 0, lmm_size = 0, max_size;
828         struct lu_attr *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
829         ENTRY;
830
831         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
832         handle = mdd_trans_start(ctxt, mdd);
833         if (IS_ERR(handle))
834                 RETURN(PTR_ERR(handle));
835         /*TODO: add lock here*/
836         /* start a log jounal handle if needed */
837         if (S_ISREG(mdd_object_type(mdd_obj)) &&
838             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
839                 max_size = mdd_lov_mdsize(ctxt, mdd);
840                 OBD_ALLOC(lmm, max_size);
841                 if (lmm == NULL)
842                         GOTO(cleanup, rc = -ENOMEM);
843
844                 rc = mdd_get_md(ctxt, mdd_obj, lmm, &lmm_size, 1, 
845                                 MDS_LOV_MD_NAME);
846
847                 if (rc < 0)
848                         GOTO(cleanup, rc);
849         }
850
851         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
852                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
853                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
854
855         *la_copy = ma->ma_attr;
856         mdd_write_lock(ctxt, mdd_obj);
857         rc = mdd_fix_attr(ctxt, mdd_obj, ma, la_copy);
858         mdd_write_unlock(ctxt, mdd_obj);
859         if (rc)
860                 GOTO(cleanup, rc);
861
862         if (la_copy->la_valid & LA_FLAGS) {
863                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
864                                                   handle);
865                 if (rc == 0)
866                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
867         } else if (la_copy->la_valid) {            /* setattr */
868                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
869                                                   handle);
870                 /* journal chown/chgrp in llog, just like unlink */
871                 if (rc == 0 && lmm_size){
872                         /*TODO set_attr llog */
873                 }
874         }
875
876         if (rc == 0 && ma->ma_valid & MA_LOV) {
877                 umode_t mode;
878
879                 mode = mdd_object_type(mdd_obj);
880                 if (S_ISREG(mode) || S_ISDIR(mode)) {
881                         /*TODO check permission*/
882                         rc = mdd_lov_set_md(ctxt, NULL, mdd_obj, ma->ma_lmm,
883                                             ma->ma_lmm_size, handle, 1);
884                 }
885
886         }
887 cleanup:
888         mdd_trans_stop(ctxt, mdd, rc, handle);
889         if (rc == 0 && lmm_size) {
890                 /*set obd attr, if needed*/
891                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
892         }
893         if (lmm != NULL) {
894                 OBD_FREE(lmm, max_size);
895         }
896
897         RETURN(rc);
898 }
899
900 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct mdd_object *obj,
901                       const void *buf, int buf_len, const char *name, int fl,
902                       struct thandle *handle)
903 {
904         int  rc;
905         ENTRY;
906
907         mdd_write_lock(ctxt, obj);
908         rc = __mdd_xattr_set(ctxt, obj, buf, buf_len, name, fl, handle);
909         mdd_write_unlock(ctxt, obj);
910
911         RETURN(rc);
912 }
913
914 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
915                          const void *buf, int buf_len, const char *name,
916                          int fl)
917 {
918         struct mdd_device *mdd = mdo2mdd(obj);
919         struct thandle *handle;
920         int  rc;
921         ENTRY;
922
923         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
924         handle = mdd_trans_start(ctxt, mdd);
925         if (IS_ERR(handle))
926                 RETURN(PTR_ERR(handle));
927
928         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
929                                fl, handle);
930
931         mdd_trans_stop(ctxt, mdd, rc, handle);
932
933         RETURN(rc);
934 }
935
936 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
937                            struct mdd_object *obj,
938                            const char *name, struct thandle *handle)
939 {
940         struct dt_object *next;
941
942         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
943         next = mdd_object_child(obj);
944         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
945 }
946
947 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
948                   const char *name)
949 {
950         struct mdd_object *mdd_obj = md2mdd_obj(obj);
951         struct mdd_device *mdd = mdo2mdd(obj);
952         struct thandle *handle;
953         int  rc;
954         ENTRY;
955
956         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
957         handle = mdd_trans_start(ctxt, mdd);
958         if (IS_ERR(handle))
959                 RETURN(PTR_ERR(handle));
960
961         mdd_write_lock(ctxt, mdd_obj);
962         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
963         mdd_write_unlock(ctxt, mdd_obj);
964
965         mdd_trans_stop(ctxt, mdd, rc, handle);
966
967         RETURN(rc);
968 }
969
970 static int __mdd_index_insert_only(const struct lu_context *ctxt,
971                                    struct mdd_object *pobj,
972                                    const struct lu_fid *lf,
973                                    const char *name, struct thandle *th)
974 {
975         int rc;
976         struct dt_object *next = mdd_object_child(pobj);
977         ENTRY;
978
979         if (dt_try_as_dir(ctxt, next))
980                 rc = next->do_index_ops->dio_insert(ctxt, next,
981                                          (struct dt_rec *)lf,
982                                          (struct dt_key *)name, th);
983         else
984                 rc = -ENOTDIR;
985         RETURN(rc);
986 }
987
988 /* insert new index, add reference if isdir, update times */
989 static int __mdd_index_insert(const struct lu_context *ctxt,
990                              struct mdd_object *pobj, const struct lu_fid *lf,
991                              const char *name, int isdir, struct thandle *th)
992 {
993         int rc;
994         struct dt_object *next = mdd_object_child(pobj);
995         ENTRY;
996
997 #if 0
998         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
999 #endif
1000
1001         if (dt_try_as_dir(ctxt, next))
1002                 rc = next->do_index_ops->dio_insert(ctxt, next,
1003                                          (struct dt_rec *)lf,
1004                                          (struct dt_key *)name, th);
1005         else
1006                 rc = -ENOTDIR;
1007
1008         if (rc == 0) {
1009                 if (isdir)
1010                         __mdd_ref_add(ctxt, pobj, th);
1011 #if 0
1012                 la->la_valid = LA_MTIME|LA_CTIME;
1013                 la->la_atime = ma->ma_attr.la_atime;
1014                 la->la_ctime = ma->ma_attr.la_ctime;
1015                 rc = mdd_attr_set_internal(ctxt, mdd_obj, la, handle);
1016 #endif
1017         }
1018         return rc;
1019 }
1020
1021 static int __mdd_index_delete(const struct lu_context *ctxt,
1022                               struct mdd_object *pobj, const char *name,
1023                               struct thandle *handle)
1024 {
1025         int rc;
1026         struct dt_object *next = mdd_object_child(pobj);
1027         ENTRY;
1028
1029         if (dt_try_as_dir(ctxt, next))
1030                 rc = next->do_index_ops->dio_delete(ctxt, next,
1031                                         (struct dt_key *)name, handle);
1032         else
1033                 rc = -ENOTDIR;
1034         RETURN(rc);
1035 }
1036
1037 static int mdd_link_sanity_check(const struct lu_context *ctxt,
1038                                  struct mdd_object *tgt_obj,
1039                                  struct mdd_object *src_obj)
1040 {
1041         int rc;
1042
1043         rc = mdd_may_create(ctxt, tgt_obj, NULL);
1044         if (rc)
1045                 RETURN(rc);
1046         if (S_ISDIR(mdd_object_type(src_obj)))
1047                 RETURN(-EPERM);
1048
1049         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1050                 RETURN(-EPERM);
1051
1052         RETURN(rc);
1053 }
1054
1055 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
1056                     struct md_object *src_obj, const char *name,
1057                     struct md_attr *ma)
1058 {
1059         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1060         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1061         struct mdd_device *mdd = mdo2mdd(src_obj);
1062         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1063         struct thandle *handle;
1064         int rc;
1065         ENTRY;
1066
1067         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
1068         handle = mdd_trans_start(ctxt, mdd);
1069         if (IS_ERR(handle))
1070                 RETURN(PTR_ERR(handle));
1071
1072         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
1073
1074         rc = mdd_link_sanity_check(ctxt, mdd_tobj, mdd_sobj);
1075         if (rc)
1076                 GOTO(out, rc);
1077
1078         rc = __mdd_index_insert_only(ctxt, mdd_tobj, mdo2fid(mdd_sobj),
1079                                      name, handle);
1080         if (rc == 0)
1081                 __mdd_ref_add(ctxt, mdd_sobj, handle);
1082
1083         *la_copy = ma->ma_attr;
1084         la_copy->la_valid = LA_CTIME;
1085         rc = mdd_attr_set_internal(ctxt, mdd_sobj, la_copy, handle);
1086         if (rc)
1087                 GOTO(out, rc);
1088
1089         la_copy->la_valid = LA_CTIME | LA_MTIME;
1090         rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1091
1092 out:
1093         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
1094         mdd_trans_stop(ctxt, mdd, rc, handle);
1095         RETURN(rc);
1096 }
1097
1098 /*
1099  * Check that @dir contains no entries except (possibly) dot and dotdot.
1100  *
1101  * Returns:
1102  *
1103  *             0        empty
1104  *    -ENOTEMPTY        not empty
1105  *           -ve        other error
1106  *
1107  */
1108 static int mdd_dir_is_empty(const struct lu_context *ctx,
1109                             struct mdd_object *dir)
1110 {
1111         struct dt_it     *it;
1112         struct dt_object *obj;
1113         struct dt_it_ops *iops;
1114         int result;
1115
1116         obj = mdd_object_child(dir);
1117         iops = &obj->do_index_ops->dio_it;
1118         it = iops->init(ctx, obj);
1119         if (it != NULL) {
1120                 result = iops->get(ctx, it, (const void *)"");
1121                 if (result > 0) {
1122                         int i;
1123                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1124                                 result = iops->next(ctx, it);
1125                         iops->put(ctx, it);
1126                         if (result == 0)
1127                                 result = -ENOTEMPTY;
1128                         else if (result == +1)
1129                                 result = 0;
1130                 } else if (result == 0)
1131                         /*
1132                          * Huh? Index contains no zero key?
1133                          */
1134                         result = -EIO;
1135                 iops->fini(ctx, it);
1136         } else
1137                 result = -ENOMEM;
1138         return result;
1139 }
1140
1141 /* return md_attr back,
1142  * if it is last unlink then return lov ea + llog cookie*/
1143 static inline int __mdd_object_kill(const struct lu_context *ctxt,
1144                                     struct mdd_object *obj,
1145                                     struct md_attr *ma)
1146 {
1147         int rc = 0;
1148
1149         mdd_set_dead_obj(obj);
1150         if (S_ISREG(mdd_object_type(obj))) {
1151                 rc = __mdd_lmm_get(ctxt, obj, ma);
1152                 if (ma->ma_valid & MA_LOV)
1153                         rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
1154                                             obj, ma);
1155         }
1156         return rc;
1157 }
1158
1159 static int __mdd_finish_unlink(const struct lu_context *ctxt,
1160                                struct mdd_object *obj, struct md_attr *ma,
1161                                struct thandle *th)
1162 {
1163         int rc;
1164         ENTRY;
1165
1166         rc = __mdd_iattr_get(ctxt, obj, ma);
1167         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1168                 if (atomic_read(&obj->mod_count) == 0) {
1169                         rc = __mdd_object_kill(ctxt, obj, ma);
1170                 } else {
1171                         /* add new orphan */
1172                         rc = __mdd_orphan_add(ctxt, obj, th);
1173                 }
1174         }
1175         RETURN(rc);
1176 }
1177
1178 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
1179                                    struct mdd_object *pobj,
1180                                    struct mdd_object *cobj,
1181                                    struct md_attr *ma)
1182 {
1183         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1184         int rc = 0;
1185         ENTRY;
1186
1187         rc = mdd_may_delete(ctxt, pobj, cobj, S_ISDIR(ma->ma_attr.la_mode));
1188         if (rc)
1189                 RETURN(rc);
1190
1191         if (S_ISDIR(mdd_object_type(cobj)) &&
1192             dt_try_as_dir(ctxt, dt_cobj)) {
1193                 rc = mdd_dir_is_empty(ctxt, cobj);
1194                 if (rc != 0)
1195                         RETURN(rc);
1196         }
1197
1198         RETURN(rc);
1199 }
1200
1201 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
1202                       struct md_object *cobj, const char *name,
1203                       struct md_attr *ma)
1204 {
1205         struct mdd_device *mdd = mdo2mdd(pobj);
1206         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1207         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1208         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1209         struct thandle    *handle;
1210         int rc;
1211         ENTRY;
1212
1213         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
1214         handle = mdd_trans_start(ctxt, mdd);
1215         if (IS_ERR(handle))
1216                 RETURN(PTR_ERR(handle));
1217
1218         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
1219
1220         rc = mdd_unlink_sanity_check(ctxt, mdd_pobj, mdd_cobj, ma);
1221         if (rc)
1222                 GOTO(cleanup, rc);
1223
1224
1225         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1226         if (rc)
1227                 GOTO(cleanup, rc);
1228
1229         __mdd_ref_del(ctxt, mdd_cobj, handle);
1230         *la_copy = ma->ma_attr;
1231         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1232                 /* unlink dot */
1233                 __mdd_ref_del(ctxt, mdd_cobj, handle);
1234                 /* unlink dotdot */
1235                 __mdd_ref_del(ctxt, mdd_pobj, handle);
1236         } else {
1237                 la_copy->la_valid = LA_CTIME;
1238                 rc = mdd_attr_set_internal(ctxt, mdd_cobj, la_copy, handle);
1239                 if (rc)
1240                         GOTO(cleanup, rc);
1241         }
1242
1243         la_copy->la_valid = LA_CTIME | LA_MTIME;
1244         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1245         if (rc)
1246                 GOTO(cleanup, rc);
1247
1248         rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma, handle);
1249
1250 cleanup:
1251         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
1252         mdd_trans_stop(ctxt, mdd, rc, handle);
1253         RETURN(rc);
1254 }
1255
1256 static int mdd_parent_fid(const struct lu_context *ctxt,
1257                           struct mdd_object *obj,
1258                           struct lu_fid *fid)
1259 {
1260         return mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
1261 }
1262
1263 /*
1264  * return 0: if p2 is the parent of p1
1265  * otherwise: other_value
1266  */
1267 static int mdd_is_parent(const struct lu_context *ctxt,
1268                          struct mdd_device *mdd,
1269                          struct mdd_object *p1,
1270                          struct mdd_object *p2)
1271 {
1272         struct lu_fid * pfid;
1273         struct mdd_object *parent = NULL;
1274         int rc;
1275         ENTRY;
1276
1277         pfid = &mdd_ctx_info(ctxt)->mti_fid;
1278         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1279                 RETURN(1);
1280         for(;;) {
1281                 rc = mdd_parent_fid(ctxt, p1, pfid);
1282                 if (rc)
1283                         GOTO(out, rc);
1284                 if (lu_fid_eq(pfid, mdo2fid(p2)))
1285                         GOTO(out, rc = 0);
1286                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1287                         GOTO(out, rc = 1);
1288                 if (parent)
1289                         mdd_object_put(ctxt, parent);
1290                 parent = mdd_object_find(ctxt, mdd, pfid);
1291                 if (IS_ERR(parent))
1292                         GOTO(out, rc = PTR_ERR(parent));
1293                 p1 = parent;
1294         }
1295 out:
1296         if (parent && !IS_ERR(parent))
1297                 mdd_object_put(ctxt, parent);
1298         RETURN(rc);
1299 }
1300
1301 static int mdd_rename_lock(const struct lu_context *ctxt,
1302                            struct mdd_device *mdd,
1303                            struct mdd_object *src_pobj,
1304                            struct mdd_object *tgt_pobj)
1305 {
1306         ENTRY;
1307
1308         if (src_pobj == tgt_pobj) {
1309                 mdd_write_lock(ctxt, src_pobj);
1310                 RETURN(0);
1311         }
1312         /*compared the parent child relationship of src_p&tgt_p*/
1313         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1314                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
1315                 RETURN(0);
1316         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1317                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1318                 RETURN(0);
1319         }
1320
1321         if (!mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
1322                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1323                 RETURN(0);
1324         }
1325
1326         mdd_lock2(ctxt, src_pobj, tgt_pobj);
1327
1328         RETURN(0);
1329 }
1330
1331 static void mdd_rename_unlock(const struct lu_context *ctxt,
1332                               struct mdd_object *src_pobj,
1333                               struct mdd_object *tgt_pobj)
1334 {
1335         mdd_write_unlock(ctxt, src_pobj);
1336         if (src_pobj != tgt_pobj)
1337                 mdd_write_unlock(ctxt, tgt_pobj);
1338 }
1339
1340 static int mdd_rename_sanity_check(const struct lu_context *ctxt,
1341                                    struct mdd_object *src_pobj,
1342                                    struct mdd_object *tgt_pobj,
1343                                    struct mdd_object *sobj,
1344                                    struct mdd_object *tobj)
1345 {
1346         struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj);
1347         int rc = 0, src_is_dir, tgt_is_dir;
1348         ENTRY;
1349
1350         src_is_dir = S_ISDIR(mdd_object_type(sobj));
1351         rc = mdd_may_delete(ctxt, src_pobj, sobj, src_is_dir);
1352         if (rc)
1353                 GOTO(out, rc);
1354
1355         if (!tobj) {
1356                 rc = mdd_may_create(ctxt, tgt_pobj, NULL);
1357                 if (rc)
1358                         GOTO(out, rc);
1359                 if (!mdd_is_parent(ctxt, mdd, tgt_pobj, sobj))
1360                         GOTO(out, rc = -EINVAL);
1361                 GOTO(out, rc);
1362         }
1363
1364         /* source should not be ancestor of target */
1365         if (!mdd_is_parent(ctxt, mdd, tobj, sobj))
1366                 GOTO(out, rc = -EINVAL);
1367
1368         rc = mdd_may_delete(ctxt, tgt_pobj, tobj, src_is_dir);
1369         if (rc)
1370                 GOTO(out, rc);
1371
1372         tgt_is_dir = S_ISDIR(mdd_object_type(tobj));
1373         if (tgt_is_dir && mdd_dir_is_empty(ctxt, tobj))
1374                 GOTO(out, rc = -ENOTEMPTY);
1375 out:
1376         mdd_object_put(ctxt, sobj);
1377         RETURN(rc);
1378 }
1379
1380 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
1381                       struct md_object *tgt_pobj, const struct lu_fid *lf,
1382                       const char *sname, struct md_object *tobj,
1383                       const char *tname, struct md_attr *ma)
1384 {
1385         struct mdd_device *mdd = mdo2mdd(src_pobj);
1386         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1387         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1388         struct mdd_object *mdd_sobj = mdd_object_find(ctxt, mdd, lf);
1389         struct mdd_object *mdd_tobj = NULL;
1390         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1391         struct thandle *handle;
1392         int is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
1393         int rc;
1394         ENTRY;
1395
1396         if (tobj)
1397                 mdd_tobj = md2mdd_obj(tobj);
1398
1399         /*XXX: shouldn't this check be done under lock below? */
1400         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
1401                                      mdd_sobj, mdd_tobj);
1402         if (rc)
1403                 RETURN(rc);
1404
1405         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1406         handle = mdd_trans_start(ctxt, mdd);
1407         if (IS_ERR(handle))
1408                 RETURN(PTR_ERR(handle));
1409
1410         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1411         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
1412         if (rc)
1413                 GOTO(cleanup_unlocked, rc);
1414
1415         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
1416         if (rc)
1417                 GOTO(cleanup, rc);
1418
1419         /*if sobj is dir, its parent object nlink should be dec too*/
1420         if (is_dir)
1421                 __mdd_ref_del(ctxt, mdd_spobj, handle);
1422
1423         if (tobj) {
1424                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
1425                 if (rc)
1426                         GOTO(cleanup, rc);
1427         }
1428
1429         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, is_dir, handle);
1430         if (rc)
1431                 GOTO(cleanup, rc);
1432
1433         *la_copy = ma->ma_attr;
1434         la_copy->la_valid = LA_CTIME;
1435         rc = mdd_attr_set_internal_locked(ctxt, mdd_sobj, la_copy, handle);
1436         if (rc)
1437                 GOTO(cleanup, rc);
1438
1439         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1440                 mdd_write_lock(ctxt, mdd_tobj);
1441                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1442                 /* remove dot reference */
1443                 if (is_dir)
1444                         __mdd_ref_del(ctxt, mdd_tobj, handle);
1445
1446                 la_copy->la_valid = LA_CTIME;
1447                 rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1448                 if (rc)
1449                         GOTO(cleanup, rc);
1450
1451                 rc = __mdd_finish_unlink(ctxt, mdd_tobj, ma, handle);
1452                 mdd_write_unlock(ctxt, mdd_tobj);
1453                 if (rc)
1454                         GOTO(cleanup, rc);
1455         }
1456
1457         la_copy->la_valid = LA_CTIME | LA_MTIME;
1458         rc = mdd_attr_set_internal(ctxt, mdd_spobj, la_copy, handle);
1459         if (rc)
1460                 GOTO(cleanup, rc);
1461
1462         if (mdd_spobj != mdd_tpobj) {
1463                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1464                 rc = mdd_attr_set_internal(ctxt, mdd_tpobj, la_copy, handle);
1465         }
1466
1467 cleanup:
1468         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
1469 cleanup_unlocked:
1470         mdd_trans_stop(ctxt, mdd, rc, handle);
1471         RETURN(rc);
1472 }
1473
1474 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
1475                       const char *name, struct lu_fid* fid)
1476 {
1477         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1478         struct dt_object    *dir = mdd_object_child(mdd_obj);
1479         struct dt_rec       *rec    = (struct dt_rec *)fid;
1480         const struct dt_key *key = (const struct dt_key *)name;
1481         int rc;
1482         ENTRY;
1483
1484         if (mdd_is_dead_obj(mdd_obj))
1485                 RETURN(-ESTALE);
1486         mdd_read_lock(ctxt, mdd_obj);
1487         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(ctxt, dir))
1488                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
1489         else
1490                 rc = -ENOTDIR;
1491         mdd_read_unlock(ctxt, mdd_obj);
1492         RETURN(rc);
1493 }
1494
1495 static int __mdd_object_initialize(const struct lu_context *ctxt,
1496                                    const struct lu_fid *pfid,
1497                                    struct mdd_object *child,
1498                                    struct md_attr *ma, struct thandle *handle)
1499 {
1500         int rc;
1501         ENTRY;
1502
1503         /* update attributes for child.
1504          * FIXME:
1505          *  (1) the valid bits should be converted between Lustre and Linux;
1506          *  (2) maybe, the child attributes should be set in OSD when creation.
1507          */
1508
1509         rc = mdd_attr_set_internal(ctxt, child, &ma->ma_attr, handle);
1510         if (rc != 0)
1511                 RETURN(rc);
1512
1513         if (S_ISDIR(ma->ma_attr.la_mode)) {
1514                 /* add . and .. for newly created dir */
1515                 __mdd_ref_add(ctxt, child, handle);
1516                 rc = __mdd_index_insert_only(ctxt, child, mdo2fid(child),
1517                                              dot, handle);
1518                 if (rc == 0) {
1519                         rc = __mdd_index_insert_only(ctxt, child, pfid,
1520                                                      dotdot, handle);
1521                         if (rc != 0) {
1522                                 int rc2;
1523
1524                                 rc2 = __mdd_index_delete(ctxt,
1525                                                          child, dot, handle);
1526                                 if (rc2 != 0)
1527                                         CERROR("Failure to cleanup after dotdot"
1528                                                " creation: %d (%d)\n", rc2, rc);
1529                                 else
1530                                         __mdd_ref_del(ctxt, child, handle);
1531                         }
1532                 }
1533         }
1534         RETURN(rc);
1535 }
1536
1537 static int mdd_create_data(const struct lu_context *ctxt,
1538                            struct md_object *pobj, struct md_object *cobj,
1539                            const struct md_create_spec *spec,
1540                            struct md_attr *ma)
1541 {
1542         struct mdd_device *mdd = mdo2mdd(pobj);
1543         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1544         struct mdd_object *son = md2mdd_obj(cobj);
1545         struct lu_attr    *attr = &ma->ma_attr;
1546         struct lov_mds_md *lmm = NULL;
1547         int                lmm_size = 0;
1548         struct thandle    *handle;
1549         int                rc;
1550         ENTRY;
1551
1552         rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
1553                             attr);
1554         if (rc)
1555                 RETURN(rc);
1556
1557         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
1558         handle = mdd_trans_start(ctxt, mdd);
1559         if (IS_ERR(handle))
1560                 RETURN(PTR_ERR(handle));
1561
1562         /*XXX: setting the lov ea is not locked but setting the attr is locked? */
1563         if (rc == 0) {
1564                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size,
1565                                     handle, 0);
1566                 if (rc == 0)
1567                         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1568         }
1569
1570         mdd_trans_stop(ctxt, mdd, rc, handle);
1571         RETURN(rc);
1572 }
1573
1574 static int mdd_create_sanity_check(const struct lu_context *ctxt,
1575                                    struct mdd_device *mdd,
1576                                    struct md_object *pobj,
1577                                    const char *name, struct md_attr *ma)
1578 {
1579         struct mdd_thread_info *info = mdd_ctx_info(ctxt);
1580         struct lu_attr    *la        = &info->mti_la;
1581         struct lu_fid     *fid       = &info->mti_fid;
1582         struct mdd_object *obj       = md2mdd_obj(pobj);
1583         int rc;
1584
1585         ENTRY;
1586         /* EEXIST check */
1587         if (mdd_is_dead_obj(obj))
1588                 RETURN(-ENOENT);
1589         rc = mdd_lookup(ctxt, pobj, name, fid);
1590         if (rc != -ENOENT)
1591                 RETURN(rc ? : -EEXIST);
1592
1593         /* sgid check */
1594         mdd_read_lock(ctxt, obj);
1595         rc = __mdd_la_get(ctxt, obj, la);
1596         mdd_read_unlock(ctxt, obj);
1597         if (rc != 0)
1598                 RETURN(rc);
1599
1600         if (la->la_mode & S_ISGID) {
1601                 ma->ma_attr.la_gid = la->la_gid;
1602                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1603                         ma->ma_attr.la_mode |= S_ISGID;
1604                         ma->ma_attr.la_valid |= LA_MODE;
1605                 }
1606         }
1607
1608         switch (ma->ma_attr.la_mode & S_IFMT) {
1609         case S_IFREG:
1610         case S_IFDIR:
1611         case S_IFLNK:
1612         case S_IFCHR:
1613         case S_IFBLK:
1614         case S_IFIFO:
1615         case S_IFSOCK:
1616                 rc = 0;
1617                 break;
1618         default:
1619                 rc = -EINVAL;
1620                 break;
1621         }
1622         RETURN(rc);
1623 }
1624
1625 /*
1626  * Create object and insert it into namespace.
1627  */
1628 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
1629                       const char *name, struct md_object *child,
1630                       const struct md_create_spec *spec,
1631                       struct md_attr* ma)
1632 {
1633         struct mdd_device *mdd = mdo2mdd(pobj);
1634         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1635         struct mdd_object *son = md2mdd_obj(child);
1636         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1637         struct lu_attr *attr = &ma->ma_attr;
1638         struct lov_mds_md *lmm = NULL;
1639         struct thandle *handle;
1640         int rc, created = 0, inserted = 0, lmm_size = 0;
1641         ENTRY;
1642
1643         /* sanity checks before big job */
1644         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1645         if (rc)
1646                 RETURN(rc);
1647         /* no RPC inside the transaction, so OST objects should be created at
1648          * first */
1649         if (S_ISREG(attr->la_mode)) {
1650                 rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size,
1651                                     spec, attr);
1652                 if (rc)
1653                         RETURN(rc);
1654         }
1655
1656         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1657         handle = mdd_trans_start(ctxt, mdd);
1658         if (IS_ERR(handle))
1659                 RETURN(PTR_ERR(handle));
1660
1661         mdd_write_lock(ctxt, mdd_pobj);
1662
1663         /*
1664          * XXX check that link can be added to the parent in mkdir case.
1665          */
1666
1667         /*
1668          * Two operations have to be performed:
1669          *
1670          *  - allocation of new object (->do_create()), and
1671          *
1672          *  - insertion into parent index (->dio_insert()).
1673          *
1674          * Due to locking, operation order is not important, when both are
1675          * successful, *but* error handling cases are quite different:
1676          *
1677          *  - if insertion is done first, and following object creation fails,
1678          *  insertion has to be rolled back, but this operation might fail
1679          *  also leaving us with dangling index entry.
1680          *
1681          *  - if creation is done first, is has to be undone if insertion
1682          *  fails, leaving us with leaked space, which is neither good, nor
1683          *  fatal.
1684          *
1685          * It seems that creation-first is simplest solution, but it is
1686          * sub-optimal in the frequent
1687          *
1688          *         $ mkdir foo
1689          *         $ mkdir foo
1690          *
1691          * case, because second mkdir is bound to create object, only to
1692          * destroy it immediately.
1693          *
1694          * Note that local file systems do
1695          *
1696          *     0. lookup -> -EEXIST
1697          *
1698          *     1. create
1699          *
1700          *     2. insert
1701          *
1702          * Maybe we should do the same. For now: creation-first.
1703          */
1704
1705         mdd_write_lock(ctxt, son);
1706         rc = __mdd_object_create(ctxt, son, ma, handle);
1707         if (rc) {
1708                 mdd_write_unlock(ctxt, son);
1709                 GOTO(cleanup, rc);
1710         }
1711
1712         created = 1;
1713
1714         rc = __mdd_object_initialize(ctxt, mdo2fid(mdd_pobj),
1715                                      son, ma, handle);
1716         mdd_write_unlock(ctxt, son);
1717         if (rc)
1718                 /*
1719                  * Object has no links, so it will be destroyed when last
1720                  * reference is released. (XXX not now.)
1721                  */
1722                 GOTO(cleanup, rc);
1723
1724         rc = __mdd_index_insert(ctxt, mdd_pobj, mdo2fid(son),
1725                                 name, S_ISDIR(attr->la_mode), handle);
1726
1727         if (rc)
1728                 GOTO(cleanup, rc);
1729
1730         inserted = 1;
1731         rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
1732         if (rc) {
1733                 CERROR("error on stripe info copy %d \n", rc);
1734                 GOTO(cleanup, rc);
1735         }
1736
1737         if (S_ISLNK(attr->la_mode)) {
1738                 struct dt_object *dt = mdd_object_child(son);
1739                 const char *target_name = spec->u.sp_symname;
1740                 int sym_len = strlen(target_name);
1741                 loff_t pos = 0;
1742
1743                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1744                                                 sym_len, &pos, handle);
1745                 if (rc == sym_len)
1746                         rc = 0;
1747                 else
1748                         rc = -EFAULT;
1749         }
1750
1751         *la_copy = ma->ma_attr;
1752         la_copy->la_valid = LA_CTIME | LA_MTIME;
1753         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1754         if (rc)
1755                 GOTO(cleanup, rc);
1756
1757         /* return attr back */
1758         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1759 cleanup:
1760         if (rc && created) {
1761                 int rc2 = 0;
1762
1763                 if (inserted) {
1764                         rc2 = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1765                         if (rc2)
1766                                 CERROR("error can not cleanup destroy %d\n",
1767                                        rc2);
1768                 }
1769                 if (rc2 == 0)
1770                         __mdd_ref_del(ctxt, son, handle);
1771         }
1772         if (lmm)
1773                 OBD_FREE(lmm, lmm_size);
1774         mdd_write_unlock(ctxt, mdd_pobj);
1775         mdd_trans_stop(ctxt, mdd, rc, handle);
1776         RETURN(rc);
1777 }
1778 /* partial operation */
1779 static int mdd_object_create(const struct lu_context *ctxt,
1780                              struct md_object *obj,
1781                              const struct md_create_spec *spec,
1782                              struct md_attr *ma)
1783 {
1784
1785         struct mdd_device *mdd = mdo2mdd(obj);
1786         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1787         struct thandle *handle;
1788         int rc;
1789         ENTRY;
1790
1791         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1792         handle = mdd_trans_start(ctxt, mdd);
1793         if (IS_ERR(handle))
1794                 RETURN(PTR_ERR(handle));
1795
1796         mdd_write_lock(ctxt, mdd_obj);
1797         rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
1798         if (rc == 0)
1799                 rc = __mdd_object_initialize(ctxt, spec->u.sp_pfid, mdd_obj,
1800                                      ma, handle);
1801         mdd_write_unlock(ctxt, mdd_obj);
1802
1803         if (rc == 0)
1804                 rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
1805
1806         mdd_trans_stop(ctxt, mdd, rc, handle);
1807         RETURN(rc);
1808 }
1809 /* partial operation */
1810 static int mdd_name_insert(const struct lu_context *ctxt,
1811                            struct md_object *pobj, const char *name,
1812                            const struct lu_fid *fid, int isdir)
1813 {
1814         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1815         struct thandle *handle;
1816         int rc;
1817         ENTRY;
1818
1819         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1820         handle = mdd_trans_start(ctxt, mdo2mdd(pobj));
1821         if (IS_ERR(handle))
1822                 RETURN(PTR_ERR(handle));
1823
1824         mdd_write_lock(ctxt, mdd_obj);
1825         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, isdir, handle);
1826         mdd_write_unlock(ctxt, mdd_obj);
1827
1828         mdd_trans_stop(ctxt, mdo2mdd(pobj), rc, handle);
1829         RETURN(rc);
1830 }
1831
1832 static int mdd_name_remove(const struct lu_context *ctxt,
1833                            struct md_object *pobj,
1834                            const char *name)
1835 {
1836         struct mdd_device *mdd = mdo2mdd(pobj);
1837         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1838         struct thandle *handle;
1839         int rc;
1840         ENTRY;
1841
1842         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1843         handle = mdd_trans_start(ctxt, mdd);
1844         if (IS_ERR(handle))
1845                 RETURN(PTR_ERR(handle));
1846
1847         mdd_write_lock(ctxt, mdd_obj);
1848
1849         rc = __mdd_index_delete(ctxt, mdd_obj, name, handle);
1850
1851         mdd_write_unlock(ctxt, mdd_obj);
1852
1853         mdd_trans_stop(ctxt, mdd, rc, handle);
1854         RETURN(rc);
1855 }
1856
1857 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1858                           struct md_object *tobj, const struct lu_fid *lf,
1859                           const char *name)
1860 {
1861         struct mdd_device *mdd = mdo2mdd(pobj);
1862         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1863         struct mdd_object *mdd_tobj = NULL;
1864         struct thandle *handle;
1865         int rc;
1866         ENTRY;
1867
1868         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1869         handle = mdd_trans_start(ctxt, mdd);
1870         if (IS_ERR(handle))
1871                 RETURN(PTR_ERR(handle));
1872
1873         if (tobj)
1874                 mdd_tobj = md2mdd_obj(tobj);
1875
1876         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1877
1878         /*TODO rename sanity checking*/
1879         if (tobj) {
1880                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1881                 if (rc)
1882                         GOTO(cleanup, rc);
1883         }
1884
1885         rc = __mdd_index_insert_only(ctxt, mdd_tpobj, lf, name, handle);
1886         if (rc)
1887                 GOTO(cleanup, rc);
1888
1889         if (tobj && lu_object_exists(&tobj->mo_lu))
1890                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1891 cleanup:
1892         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1893         mdd_trans_stop(ctxt, mdd, rc, handle);
1894         RETURN(rc);
1895 }
1896
1897 static int mdd_root_get(const struct lu_context *ctx,
1898                         struct md_device *m, struct lu_fid *f)
1899 {
1900         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1901
1902         ENTRY;
1903         *f = mdd->mdd_root_fid;
1904         RETURN(0);
1905 }
1906
1907 static int mdd_statfs(const struct lu_context *ctx,
1908                       struct md_device *m, struct kstatfs *sfs)
1909 {
1910         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1911         int rc;
1912
1913         ENTRY;
1914
1915         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1916
1917         RETURN(rc);
1918 }
1919
1920 static int mdd_get_maxsize(const struct lu_context *ctx,
1921                            struct md_device *m, int *md_size,
1922                            int *cookie_size)
1923 {
1924         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1925         ENTRY;
1926
1927         *md_size =  mdd_lov_mdsize(ctx, mdd);
1928         *cookie_size = mdd_lov_cookiesize(ctx, mdd);
1929
1930         RETURN(0);
1931 }
1932
1933 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1934                          struct thandle *handle)
1935 {
1936         struct dt_object *next;
1937
1938         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1939         next = mdd_object_child(obj);
1940         next->do_ops->do_ref_add(ctxt, next, handle);
1941 }
1942
1943 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1944 {
1945         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1946         struct mdd_device *mdd = mdo2mdd(obj);
1947         struct thandle *handle;
1948         ENTRY;
1949
1950         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1951         handle = mdd_trans_start(ctxt, mdd);
1952         if (IS_ERR(handle))
1953                 RETURN(-ENOMEM);
1954
1955         mdd_write_lock(ctxt, mdd_obj);
1956         __mdd_ref_add(ctxt, mdd_obj, handle);
1957         mdd_write_unlock(ctxt, mdd_obj);
1958
1959         mdd_trans_stop(ctxt, mdd, 0, handle);
1960
1961         RETURN(0);
1962 }
1963
1964 static void
1965 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1966               struct thandle *handle)
1967 {
1968         struct dt_object *next = mdd_object_child(obj);
1969
1970         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1971
1972         next->do_ops->do_ref_del(ctxt, next, handle);
1973 }
1974
1975 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1976                        struct md_attr *ma)
1977 {
1978         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1979         struct mdd_device *mdd = mdo2mdd(obj);
1980         struct thandle *handle;
1981         int isdir;
1982         int rc;
1983         ENTRY;
1984
1985         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1986         handle = mdd_trans_start(ctxt, mdd);
1987         if (IS_ERR(handle))
1988                 RETURN(-ENOMEM);
1989
1990         mdd_write_lock(ctxt, mdd_obj);
1991
1992         isdir = S_ISDIR(lu_object_attr(&obj->mo_lu));
1993         /* rmdir checks */
1994         if (isdir && dt_try_as_dir(ctxt, mdd_object_child(mdd_obj))) {
1995                 rc = mdd_dir_is_empty(ctxt, mdd_obj);
1996                 if (rc != 0)
1997                         GOTO(cleanup, rc);
1998         }
1999
2000         __mdd_ref_del(ctxt, mdd_obj, handle);
2001
2002         if (isdir) {
2003                 /* unlink dot */
2004                 __mdd_ref_del(ctxt, mdd_obj, handle);
2005         }
2006
2007         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle);
2008
2009 cleanup:
2010         mdd_write_unlock(ctxt, mdd_obj);
2011         mdd_trans_stop(ctxt, mdd, rc, handle);
2012         RETURN(rc);
2013 }
2014
2015 /* do NOT or the MAY_*'s, you'll get the weakest */
2016 static int accmode(struct mdd_object *mdd_obj, int flags)
2017 {
2018         int res = 0;
2019
2020 #if 0
2021         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2022          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2023          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2024          * owner can write to a file even if it is marked readonly to hide
2025          * its brokenness. (bug 5781) */
2026         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2027                 return 0;
2028 #endif
2029         if (flags & FMODE_READ)
2030                 res = MAY_READ;
2031         if (flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
2032                 res |= MAY_WRITE;
2033         if (flags & MDS_FMODE_EXEC)
2034                 res = MAY_EXEC;
2035         return res;
2036 }
2037
2038 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
2039                     int flags)
2040 {
2041         int mode = accmode(md2mdd_obj(obj), flags);
2042
2043         if (mode & MAY_WRITE) {
2044                 if (mdd_is_immutable(md2mdd_obj(obj)))
2045                         RETURN(-EACCES);
2046         }
2047
2048         atomic_inc(&md2mdd_obj(obj)->mod_count);
2049         return 0;
2050 }
2051
2052 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
2053                      struct md_attr *ma)
2054 {
2055         int rc;
2056         struct mdd_object *mdd_obj;
2057         struct thandle *handle = NULL;
2058         ENTRY;
2059
2060         mdd_obj = md2mdd_obj(obj);
2061         mdd_read_lock(ctxt, mdd_obj);
2062         rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
2063         if (rc)
2064                 GOTO(out_locked, rc);
2065
2066         if (atomic_dec_and_test(&mdd_obj->mod_count)) {
2067                 if (ma->ma_attr.la_nlink == 0) {
2068                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
2069                         if (rc)
2070                                 GOTO(out_locked, rc);
2071                         mdd_read_unlock(ctxt, mdd_obj);
2072                         /* let's remove obj from the orphan list */
2073                         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
2074                         handle = mdd_trans_start(ctxt, mdo2mdd(obj));
2075                         if (IS_ERR(handle))
2076                                 GOTO(out, rc = -ENOMEM);
2077                         
2078                         rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
2079
2080                         mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
2081                         GOTO(out, rc);
2082                 }
2083         }
2084 out_locked:
2085         mdd_read_unlock(ctxt, mdd_obj);
2086 out:
2087         RETURN(rc);
2088 }
2089
2090 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
2091                         const struct lu_rdpg *rdpg)
2092 {
2093         struct dt_object *next;
2094         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2095         int rc;
2096
2097         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2098         next = mdd_object_child(mdd_obj);
2099
2100         mdd_read_lock(ctxt, mdd_obj);
2101         if (S_ISDIR(mdd_object_type(mdd_obj)) &&
2102             dt_try_as_dir(ctxt, next))
2103                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
2104         else
2105                 rc = -ENOTDIR;
2106         mdd_read_unlock(ctxt, mdd_obj);
2107         return rc;
2108 }
2109
2110 struct md_device_operations mdd_ops = {
2111         .mdo_root_get       = mdd_root_get,
2112         .mdo_statfs         = mdd_statfs,
2113         .mdo_get_maxsize    = mdd_get_maxsize,
2114 };
2115
2116 static struct md_dir_operations mdd_dir_ops = {
2117         .mdo_lookup        = mdd_lookup,
2118         .mdo_create        = mdd_create,
2119         .mdo_rename        = mdd_rename,
2120         .mdo_link          = mdd_link,
2121         .mdo_unlink        = mdd_unlink,
2122         .mdo_name_insert   = mdd_name_insert,
2123         .mdo_name_remove   = mdd_name_remove,
2124         .mdo_rename_tgt    = mdd_rename_tgt,
2125         .mdo_create_data   = mdd_create_data
2126 };
2127
2128
2129 static struct md_object_operations mdd_obj_ops = {
2130         .moo_attr_get      = mdd_attr_get,
2131         .moo_attr_set      = mdd_attr_set,
2132         .moo_xattr_get     = mdd_xattr_get,
2133         .moo_xattr_set     = mdd_xattr_set,
2134         .moo_xattr_list    = mdd_xattr_list,
2135         .moo_xattr_del     = mdd_xattr_del,
2136         .moo_object_create = mdd_object_create,
2137         .moo_ref_add       = mdd_ref_add,
2138         .moo_ref_del       = mdd_ref_del,
2139         .moo_open          = mdd_open,
2140         .moo_close         = mdd_close,
2141         .moo_readpage      = mdd_readpage,
2142         .moo_readlink      = mdd_readlink
2143 };
2144
2145 static struct obd_ops mdd_obd_device_ops = {
2146         .o_owner = THIS_MODULE
2147 };
2148
2149 static struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
2150                                           struct lu_device_type *t,
2151                                           struct lustre_cfg *lcfg)
2152 {
2153         struct lu_device  *l;
2154         struct mdd_device *m;
2155
2156         OBD_ALLOC_PTR(m);
2157         if (m == NULL) {
2158                 l = ERR_PTR(-ENOMEM);
2159         } else {
2160                 md_device_init(&m->mdd_md_dev, t);
2161                 l = mdd2lu_dev(m);
2162                 l->ld_ops = &mdd_lu_ops;
2163                 m->mdd_md_dev.md_ops = &mdd_ops;
2164         }
2165
2166         return l;
2167 }
2168
2169 static void mdd_device_free(const struct lu_context *ctx,
2170                             struct lu_device *lu)
2171 {
2172         struct mdd_device *m = lu2mdd_dev(lu);
2173
2174         LASSERT(atomic_read(&lu->ld_ref) == 0);
2175         md_device_fini(&m->mdd_md_dev);
2176         OBD_FREE_PTR(m);
2177 }
2178
2179 static int mdd_type_init(struct lu_device_type *t)
2180 {
2181         return lu_context_key_register(&mdd_thread_key);
2182 }
2183
2184 static void mdd_type_fini(struct lu_device_type *t)
2185 {
2186         lu_context_key_degister(&mdd_thread_key);
2187 }
2188
2189 static struct lu_device_type_operations mdd_device_type_ops = {
2190         .ldto_init = mdd_type_init,
2191         .ldto_fini = mdd_type_fini,
2192
2193         .ldto_device_alloc = mdd_device_alloc,
2194         .ldto_device_free  = mdd_device_free,
2195
2196         .ldto_device_init    = mdd_device_init,
2197         .ldto_device_fini    = mdd_device_fini
2198 };
2199
2200 static struct lu_device_type mdd_device_type = {
2201         .ldt_tags     = LU_DEVICE_MD,
2202         .ldt_name     = LUSTRE_MDD0_NAME,
2203         .ldt_ops      = &mdd_device_type_ops,
2204         .ldt_ctx_tags = LCT_MD_THREAD
2205 };
2206
2207 static void *mdd_key_init(const struct lu_context *ctx,
2208                           struct lu_context_key *key)
2209 {
2210         struct mdd_thread_info *info;
2211
2212         OBD_ALLOC_PTR(info);
2213         if (info == NULL)
2214                 info = ERR_PTR(-ENOMEM);
2215         return info;
2216 }
2217
2218 static void mdd_key_fini(const struct lu_context *ctx,
2219                          struct lu_context_key *key, void *data)
2220 {
2221         struct mdd_thread_info *info = data;
2222         OBD_FREE_PTR(info);
2223 }
2224
2225 static struct lu_context_key mdd_thread_key = {
2226         .lct_tags = LCT_MD_THREAD,
2227         .lct_init = mdd_key_init,
2228         .lct_fini = mdd_key_fini
2229 };
2230
2231 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
2232         { 0 }
2233 };
2234
2235 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
2236         { 0 }
2237 };
2238
2239 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
2240
2241 static int __init mdd_mod_init(void)
2242 {
2243         struct lprocfs_static_vars lvars;
2244
2245         lprocfs_init_vars(mdd, &lvars);
2246         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
2247                                    LUSTRE_MDD0_NAME, &mdd_device_type);
2248 }
2249
2250 static void __exit mdd_mod_exit(void)
2251 {
2252         class_unregister_type(LUSTRE_MDD0_NAME);
2253 }
2254
2255 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2256 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
2257 MODULE_LICENSE("GPL");
2258
2259 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);