Whamcloud - gitweb
42eda9a610733f8e98f3b87f04c2ddd5f146cd88
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43
44 #include "mdd_internal.h"
45
46
47 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
48                                        struct mdd_device *);
49 static void mdd_trans_stop(const struct lu_context *ctxt,
50                            struct mdd_device *mdd, int rc,
51                            struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
54                           struct thandle *handle);
55 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
56                           struct thandle *handle);
57 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
58                       const char *name, struct lu_fid* fid);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 static const char *mdd_root_dir_name = "root";
67 static const char dot[] = ".";
68 static const char dotdot[] = "..";
69
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdd_obj;
85
86         OBD_ALLOC_PTR(mdd_obj);
87         if (mdd_obj != NULL) {
88                 struct lu_object *o;
89                 
90                 o = mdd2lu_obj(mdd_obj);
91                 lu_object_init(o, NULL, d);
92                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 mdd_obj->mod_count = 0;
95                 o->lo_ops = &mdd_lu_obj_ops;
96                 return o;
97         } else {
98                 return NULL;
99         }
100 }
101
102 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 {
104         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
105         struct lu_object  *below;
106         struct lu_device  *under;
107         ENTRY;
108
109         under = &d->mdd_child->dd_lu_dev;
110         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
111
112         if (below == NULL)
113                 RETURN(-ENOMEM);
114
115         lu_object_add(o, below);
116         RETURN(0);
117 }
118
119 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj);
120
121 static int mdd_object_start(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         if (lu_object_exists(o))
124                 return mdd_get_flags(ctxt, lu2mdd_obj(o));
125         else
126                 return 0;
127 }
128
129 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
130 {
131         struct mdd_object *mdd = lu2mdd_obj(o);
132         
133         lu_object_fini(o);
134         OBD_FREE_PTR(mdd);
135 }
136
137 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
138                                    struct mdd_device *d,
139                                    const struct lu_fid *f)
140 {
141         struct lu_object *o;
142         struct mdd_object *m;
143         ENTRY;
144
145         o = lu_object_find(ctxt, mdd2lu_dev(d)->ld_site, f);
146         if (IS_ERR(o))
147                 m = (struct mdd_object *)o;
148         else {
149                 o = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
150                 m = o ? lu2mdd_obj(o) : NULL;
151         }
152         RETURN(m);
153 }
154
155 static inline int mdd_is_immutable(struct mdd_object *obj)
156 {
157         return obj->mod_flags & IMMUTE_OBJ;
158 }
159
160 static inline int mdd_is_append(struct mdd_object *obj)
161 {
162         return obj->mod_flags & APPEND_OBJ;
163 }
164
165 static void mdd_set_dead_obj(struct mdd_object *obj)
166 {
167         if (obj)
168                 obj->mod_flags |= DEAD_OBJ;
169 }
170
171 static int mdd_is_dead_obj(struct mdd_object *obj)
172 {
173         return obj && obj->mod_flags & DEAD_OBJ;
174 }
175
176 /*Check whether it may create the cobj under the pobj*/
177 static int mdd_may_create(const struct lu_context *ctxt,
178                           struct mdd_object *pobj, struct mdd_object *cobj)
179 {
180         ENTRY;
181         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
182                 RETURN(-EEXIST);
183
184         if (mdd_is_dead_obj(pobj))
185                 RETURN(-ENOENT);
186
187         /*check pobj may create or not*/
188         RETURN(0);
189 }
190
191 static inline int __mdd_la_get(const struct lu_context *ctxt,
192                                struct mdd_object *obj, struct lu_attr *la)
193 {
194         struct dt_object  *next = mdd_object_child(obj);
195         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
196         return next->do_ops->do_attr_get(ctxt, next, la);
197 }
198
199 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
200 {
201         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
202
203         if (flags & LUSTRE_APPEND_FL)
204                 obj->mod_flags |= APPEND_OBJ;
205
206         if (flags & LUSTRE_IMMUTABLE_FL)
207                 obj->mod_flags |= IMMUTE_OBJ;
208 }
209
210 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj)
211 {
212         struct lu_attr *la = &mdd_ctx_info(ctxt)->mti_la;
213         int rc;
214
215         ENTRY;
216         mdd_read_lock(ctxt, obj);
217         rc = __mdd_la_get(ctxt, obj, la);
218         mdd_read_unlock(ctxt, obj);
219         if (rc == 0)
220                 mdd_flags_xlate(obj, la->la_flags);
221         RETURN(rc);
222 }
223
224 /*Check whether it may delete the cobj under the pobj*/
225 static int mdd_may_delete(const struct lu_context *ctxt,
226                           struct mdd_object *pobj, struct mdd_object *cobj,
227                           int is_dir)
228 {
229         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
230         int rc = 0;
231         ENTRY;
232
233         LASSERT(cobj);
234
235         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
236                 RETURN(-ENOENT);
237
238         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
239                 RETURN(-EPERM);
240
241         if (is_dir) {
242                 if (!S_ISDIR(mdd_object_type(cobj)))
243                         RETURN(-ENOTDIR);
244
245                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
246                         RETURN(-EBUSY);
247
248         } else if (S_ISDIR(mdd_object_type(cobj)))
249                         RETURN(-EISDIR);
250
251         if (pobj && mdd_is_dead_obj(pobj))
252                 RETURN(-ENOENT);
253
254         RETURN(rc);
255 }
256 /* get only inode attributes */
257 static int __mdd_iattr_get(const struct lu_context *ctxt,
258                            struct mdd_object *mdd_obj, struct md_attr *ma)
259 {
260         int rc = 0;
261         ENTRY;
262
263         rc = __mdd_la_get(ctxt, mdd_obj, &ma->ma_attr);
264         if (rc == 0)
265                 ma->ma_valid = MA_INODE;
266         RETURN(rc);
267 }
268 /* get lov EA only */
269 static int __mdd_lmm_get(const struct lu_context *ctxt,
270                          struct mdd_object *mdd_obj, struct md_attr *ma)
271 {
272         int rc;
273
274         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
275         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
276                         MDS_LOV_MD_NAME);
277         if (rc > 0) {
278                 ma->ma_valid |= MA_LOV;
279                 rc = 0;
280         }
281         RETURN(rc);
282 }
283
284 /* get lmv EA only*/
285 static int __mdd_lmv_get(const struct lu_context *ctxt,
286                          struct mdd_object *mdd_obj, struct md_attr *ma)
287 {
288         int rc;
289
290         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
291                         MDS_LMV_MD_NAME);
292         if (rc > 0) {
293                 ma->ma_valid |= MA_LMV;
294                 rc = 0;
295         }
296         RETURN(rc);
297 }
298
299 static int mdd_attr_get_internal(const struct lu_context *ctxt,
300                                  struct mdd_object *mdd_obj,
301                                  struct md_attr *ma)
302 {
303         int rc = 0;
304         ENTRY;
305
306         if (ma->ma_need & MA_INODE)
307                 rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
308
309         if (rc == 0 && ma->ma_need & MA_LOV) {
310                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
311                     S_ISDIR(mdd_object_type(mdd_obj)))
312                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
313         }
314         if (rc == 0 && ma->ma_need & MA_LMV) {
315                 if (S_ISDIR(mdd_object_type(mdd_obj)))
316                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
317         }
318         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
319                         rc, ma->ma_valid);
320         RETURN(rc);
321 }
322
323 static inline int mdd_attr_get_internal_locked(const struct lu_context *ctxt,
324                                                struct mdd_object *mdd_obj,
325                                                struct md_attr *ma)
326 {
327         int rc;
328         mdd_read_lock(ctxt, mdd_obj);
329         rc = mdd_attr_get_internal(ctxt, mdd_obj, ma);
330         mdd_read_unlock(ctxt, mdd_obj);
331         return rc;
332 }
333
334 static int mdd_attr_get(const struct lu_context *ctxt,
335                         struct md_object *obj, struct md_attr *ma)
336 {
337         struct mdd_object *mdd_obj = md2mdd_obj(obj);
338         int                rc;
339
340         ENTRY;
341         rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
342         RETURN(rc);
343 }
344
345 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
346                          void *buf, int buf_len, const char *name)
347 {
348         struct mdd_object *mdd_obj = md2mdd_obj(obj);
349         struct dt_object  *next;
350         int rc;
351
352         ENTRY;
353
354         LASSERT(lu_object_exists(&obj->mo_lu));
355
356         next = mdd_object_child(mdd_obj);
357         mdd_read_lock(ctxt, mdd_obj);
358         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
359         mdd_read_unlock(ctxt, mdd_obj);
360
361         RETURN(rc);
362 }
363
364 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
365                         void *buf, int buf_len)
366 {
367         struct mdd_object *mdd_obj = md2mdd_obj(obj);
368         struct dt_object  *next;
369         loff_t             pos = 0;
370         int                rc;
371         ENTRY;
372
373         LASSERT(lu_object_exists(&obj->mo_lu));
374
375         next = mdd_object_child(mdd_obj);
376         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
377         RETURN(rc);
378 }
379 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
380                           void *buf, int buf_len)
381 {
382         struct mdd_object *mdd_obj = md2mdd_obj(obj);
383         struct dt_object  *next;
384         int rc;
385
386         ENTRY;
387
388         LASSERT(lu_object_exists(&obj->mo_lu));
389
390         next = mdd_object_child(mdd_obj);
391         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
392
393         RETURN(rc);
394 }
395
396 enum mdd_txn_op {
397         MDD_TXN_OBJECT_DESTROY_OP,
398         MDD_TXN_OBJECT_CREATE_OP,
399         MDD_TXN_ATTR_SET_OP,
400         MDD_TXN_XATTR_SET_OP,
401         MDD_TXN_INDEX_INSERT_OP,
402         MDD_TXN_INDEX_DELETE_OP,
403         MDD_TXN_LINK_OP,
404         MDD_TXN_UNLINK_OP,
405         MDD_TXN_RENAME_OP,
406         MDD_TXN_CREATE_DATA_OP,
407         MDD_TXN_MKDIR_OP
408 };
409
410 struct mdd_txn_op_descr {
411         enum mdd_txn_op mod_op;
412         unsigned int    mod_credits;
413 };
414
415 enum {
416         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
417         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
418         MDD_TXN_ATTR_SET_CREDITS       = 20,
419         MDD_TXN_XATTR_SET_CREDITS      = 20,
420         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
421         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
422         MDD_TXN_LINK_CREDITS           = 20,
423         MDD_TXN_UNLINK_CREDITS         = 20,
424         MDD_TXN_RENAME_CREDITS         = 20,
425         MDD_TXN_CREATE_DATA_CREDITS    = 20,
426         MDD_TXN_MKDIR_CREDITS          = 20
427 };
428
429 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
430 static const struct mdd_txn_op_descr opname = { \
431         .mod_op      = opname ## _OP,           \
432         .mod_credits = opname ## _CREDITS,      \
433 }
434
435 /*
436  * number of blocks to reserve for particular operations. Should be function
437  * of ... something. Stub for now.
438  */
439 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
440 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
441 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
442 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
443 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
444 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
445 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
446 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
447 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
448 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
449 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
450
451 static void mdd_txn_param_build(const struct lu_context *ctx,
452                                 const struct mdd_txn_op_descr *opd)
453 {
454         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
455 }
456
457 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
458                             lu_printer_t p, const struct lu_object *o)
459 {
460         return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
461 }
462
463 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
464 {
465         int rc;
466         struct dt_object *root;
467         ENTRY;
468
469         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
470                              &mdd->mdd_root_fid);
471         if (!IS_ERR(root)) {
472                 LASSERT(root != NULL);
473                 lu_object_put(ctx, &root->do_lu);
474                 rc = orph_index_init(ctx, mdd);
475         } else
476                 rc = PTR_ERR(root);
477
478         RETURN(rc);
479 }
480
481 static int mdd_txn_start_cb(const struct lu_context *ctx,
482                             struct txn_param *param, void *cookie)
483 {
484         return 0;
485 }
486
487 static int mdd_txn_stop_cb(const struct lu_context *ctx,
488                            struct thandle *txn, void *cookie)
489 {
490         struct mdd_device *mdd = cookie;
491         struct obd_device *obd = mdd2obd_dev(mdd);
492         
493         return mds_lov_write_objids(obd);
494 }
495
496 static int mdd_txn_commit_cb(const struct lu_context *ctx,
497                              struct thandle *txn, void *cookie)
498 {
499         return 0;
500 }
501
502 static int mdd_device_init(const struct lu_context *ctx,
503                            struct lu_device *d, struct lu_device *next)
504 {
505         struct mdd_device *mdd = lu2mdd_dev(d);
506         struct dt_device  *dt;
507         int rc = 0;
508         ENTRY;
509
510         mdd->mdd_child = lu2dt_dev(next);
511
512         dt = mdd->mdd_child;
513         /* prepare transactions callbacks */
514         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
515         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
516         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
517         mdd->mdd_txn_cb.dtc_cookie = mdd;
518
519         dt_txn_callback_add(dt, &mdd->mdd_txn_cb);
520
521         RETURN(rc);
522 }
523
524 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
525                                          struct lu_device *d)
526 {
527         struct mdd_device *mdd = lu2mdd_dev(d);
528         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
529
530         dt_txn_callback_del(mdd->mdd_child, &mdd->mdd_txn_cb);
531         
532         return next;
533 }
534
535 static void mdd_device_shutdown(const struct lu_context *ctxt,
536                                 struct mdd_device *m)
537 {
538         if (m->mdd_obd_dev)
539                 mdd_fini_obd(ctxt, m);
540         orph_index_fini(ctxt, m);
541 }
542
543 static int mdd_process_config(const struct lu_context *ctxt,
544                               struct lu_device *d, struct lustre_cfg *cfg)
545 {
546         struct mdd_device *m    = lu2mdd_dev(d);
547         struct dt_device  *dt   = m->mdd_child;
548         struct lu_device  *next = &dt->dd_lu_dev;
549         int rc;
550         ENTRY;
551
552         switch (cfg->lcfg_command) {
553         case LCFG_SETUP:
554                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
555                 if (rc)
556                         GOTO(out, rc);
557                 dt->dd_ops->dt_get_conf(ctxt, dt, &m->mdd_dt_conf);
558
559                 rc = mdd_mount(ctxt, m);
560                 if (rc)
561                         GOTO(out, rc);
562                 rc = mdd_init_obd(ctxt, m, cfg);
563                 if (rc) {
564                         CERROR("lov init error %d \n", rc);
565                         GOTO(out, rc);
566                 }
567                 break;
568         case LCFG_CLEANUP:
569                 mdd_device_shutdown(ctxt, m);
570         default:
571                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
572                 break;
573         }
574 out:
575         RETURN(rc);
576 }
577
578 static int mdd_recovery_complete(const struct lu_context *ctxt,
579                                  struct lu_device *d)
580 {
581         struct mdd_device *mdd = lu2mdd_dev(d);
582         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
583         int rc;
584         ENTRY;
585         /* TODO:
586         rc = mdd_lov_set_nextid(ctx, mdd);
587         if (rc) {
588                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
589                        obd->obd_name, rc);
590                 GOTO(out, rc);
591         }
592         rc = mdd_cleanup_unlink_llog(ctx, mdd);
593
594         obd_notify(obd->u.mds.mds_osc_obd, NULL,
595                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
596                    OBD_NOTIFY_SYNC, NULL);
597 */
598         LASSERT(mdd);
599         LASSERT(mdd->mdd_obd_dev);
600
601         mdd->mdd_obd_dev->obd_recovering = 0;
602         //mdd->mdd_obd_dev->obd_type->typ_dt_ops->
603         mds_postrecov(mdd->mdd_obd_dev);
604         /* TODO: orphans handling */
605         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
606         
607         RETURN(rc);
608 }
609
610 struct lu_device_operations mdd_lu_ops = {
611         .ldo_object_alloc      = mdd_object_alloc,
612         .ldo_process_config    = mdd_process_config,
613         .ldo_recovery_complete = mdd_recovery_complete
614 };
615
616 static struct lu_object_operations mdd_lu_obj_ops = {
617         .loo_object_init    = mdd_object_init,
618         .loo_object_start   = mdd_object_start,
619         .loo_object_free    = mdd_object_free,
620         .loo_object_print   = mdd_object_print
621 };
622
623 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
624 {
625         struct dt_object  *next = mdd_object_child(obj);
626
627         next->do_ops->do_write_lock(ctxt, next);
628 }
629
630 void mdd_read_lock(const struct lu_context *ctxt, struct mdd_object *obj)
631 {
632         struct dt_object  *next = mdd_object_child(obj);
633
634         next->do_ops->do_read_lock(ctxt, next);
635 }
636
637 void mdd_write_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
638 {
639         struct dt_object  *next = mdd_object_child(obj);
640
641         next->do_ops->do_write_unlock(ctxt, next);
642 }
643
644 void mdd_read_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
645 {
646         struct dt_object  *next = mdd_object_child(obj);
647
648         next->do_ops->do_read_unlock(ctxt, next);
649 }
650
651 static void mdd_lock2(const struct lu_context *ctxt,
652                       struct mdd_object *o0, struct mdd_object *o1)
653 {
654         mdd_write_lock(ctxt, o0);
655         mdd_write_lock(ctxt, o1);
656 }
657
658 static void mdd_unlock2(const struct lu_context *ctxt,
659                         struct mdd_object *o0, struct mdd_object *o1)
660 {
661         mdd_write_unlock(ctxt, o1);
662         mdd_write_unlock(ctxt, o0);
663 }
664
665 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
666                                        struct mdd_device *mdd)
667 {
668         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
669
670         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
671 }
672
673 static void mdd_trans_stop(const struct lu_context *ctxt,
674                            struct mdd_device *mdd, int result,
675                            struct thandle *handle)
676 {
677         handle->th_result = result;
678         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
679 }
680
681 static int __mdd_object_create(const struct lu_context *ctxt,
682                                struct mdd_object *obj, struct md_attr *ma,
683                                struct thandle *handle)
684 {
685         struct dt_object *next;
686         struct lu_attr *attr = &ma->ma_attr;
687         int rc;
688         ENTRY;
689
690         if (!lu_object_exists(mdd2lu_obj(obj))) {
691                 next = mdd_object_child(obj);
692                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
693         } else
694                 rc = -EEXIST;
695
696         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
697
698         RETURN(rc);
699 }
700
701 int mdd_attr_set_internal(const struct lu_context *ctxt, struct mdd_object *o,
702                           const struct lu_attr *attr, struct thandle *handle)
703 {
704         struct dt_object *next;
705
706         LASSERT(lu_object_exists(mdd2lu_obj(o)));
707         next = mdd_object_child(o);
708         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
709 }
710
711 int mdd_attr_set_internal_locked(const struct lu_context *ctxt,
712                                  struct mdd_object *o,
713                                  const struct lu_attr *attr,
714                                  struct thandle *handle)
715 {
716         int rc;
717         mdd_write_lock(ctxt, o);
718         rc = mdd_attr_set_internal(ctxt, o, attr, handle);
719         mdd_write_unlock(ctxt, o);
720         return rc;
721 }
722
723 static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
724                            const void *buf, int buf_len, const char *name,
725                            int fl, struct thandle *handle)
726 {
727         struct dt_object *next;
728         int rc = 0;
729         ENTRY;
730
731         LASSERT(lu_object_exists(mdd2lu_obj(o)));
732         next = mdd_object_child(o);
733         if (buf && buf_len > 0) {
734                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
735                                                 0, handle);
736 #ifdef HAVE_SPLIT_SUPPORT
737                 if (rc == 0) {
738                         /* very ugly hack, if setting lmv, it means splitting 
739                          * sucess, we should return -ERESTART to notify the 
740                          * client, so transno for this splitting should be
741                          * zero according to the replay rules. so return -ERESTART
742                          * here let mdt trans stop callback know this. 
743                          */
744                         if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0) 
745                                 rc = -ERESTART;
746                 }
747 #endif
748         }else if (buf == NULL && buf_len == 0) {
749                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
750         }
751         RETURN(rc);
752 }
753 /* this gives the same functionality as the code between
754  * sys_chmod and inode_setattr
755  * chown_common and inode_setattr
756  * utimes and inode_setattr
757  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
758  * and port to
759  */
760 int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj,
761                  const struct md_attr *ma, struct lu_attr *la)
762 {
763         struct lu_attr   *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
764         time_t            now = CURRENT_SECONDS;
765         int               rc;
766         ENTRY;
767
768         rc = __mdd_la_get(ctxt, obj, tmp_la);
769         if (rc)
770                 RETURN(rc);
771         /*XXX Check permission */
772         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
773
774                 /*If only change flags of the object, we should
775                  * let it pass, but also need capability check
776                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
777                  * fix it, when implement capable in mds*/
778                 if (la->la_valid & ~LA_FLAGS)
779                         RETURN(-EPERM);
780
781                 /*According to Ext3 implementation on this, the
782                  *Ctime will be changed, but not clear why?*/
783                 la->la_ctime = now;
784                 la->la_valid |= LA_CTIME;
785                 RETURN(0);
786         }
787         if (!(la->la_valid & LA_CTIME)) {
788                 la->la_ctime = now;
789                 la->la_valid |= LA_CTIME;
790         }
791
792 #if 0
793         /* times */
794         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
795                 if (current->fsuid != inode->i_uid &&
796                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
797                         RETURN(error);
798         }
799         if (ia_valid & ATTR_SIZE &&
800             /* NFSD hack for open(O_CREAT|O_TRUNC)=mknod+truncate (bug 5781) */
801             !(rec->ur_uc.luc_fsuid == inode->i_uid &&
802               ia_valid & MDS_OPEN_OWNEROVERRIDE)) {
803                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
804                         RETURN(error);
805         }
806 #endif
807
808         if (la->la_valid & (LA_UID | LA_GID)) {
809                 /* chown */
810
811                 if (mdd_is_immutable(obj) || mdd_is_append(obj))
812                         RETURN(-EPERM);
813                 if (la->la_uid == (uid_t) -1)
814                         la->la_uid = tmp_la->la_uid;
815                 if (la->la_gid == (gid_t) -1)
816                         la->la_gid = tmp_la->la_gid;
817                 if (!(la->la_valid & LA_MODE))
818                         la->la_mode = tmp_la->la_mode;
819                 /*
820                  * If the user or group of a non-directory has been
821                  * changed by a non-root user, remove the setuid bit.
822                  * 19981026 David C Niemi <niemi@tux.org>
823                  *
824                  * Changed this to apply to all users, including root,
825                  * to avoid some races. This is the behavior we had in
826                  * 2.0. The check for non-root was definitely wrong
827                  * for 2.2 anyway, as it should have been using
828                  * CAP_FSETID rather than fsuid -- 19990830 SD.
829                  */
830                 if ((tmp_la->la_mode & S_ISUID) == S_ISUID &&
831                     !S_ISDIR(tmp_la->la_mode)) {
832                         la->la_mode &= ~S_ISUID;
833                         la->la_valid |= LA_MODE;
834                 }
835                 /*
836                  * Likewise, if the user or group of a non-directory
837                  * has been changed by a non-root user, remove the
838                  * setgid bit UNLESS there is no group execute bit
839                  * (this would be a file marked for mandatory
840                  * locking).  19981026 David C Niemi <niemi@tux.org>
841                  *
842                  * Removed the fsuid check (see the comment above) --
843                  * 19990830 SD.
844                  */
845                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
846                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
847                         la->la_mode &= ~S_ISGID;
848                         la->la_valid |= LA_MODE;
849                 }
850         } else if (la->la_valid & LA_MODE) {
851                 int mode = la->la_mode;
852                 /* chmod */
853                 if (la->la_mode == (umode_t)-1)
854                         mode = tmp_la->la_mode;
855                 la->la_mode =
856                         (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO);
857         }
858         RETURN(rc);
859 }
860
861
862 /* set attr and LOV EA at once, return updated attr */
863 static int mdd_attr_set(const struct lu_context *ctxt,
864                         struct md_object *obj,
865                         const struct md_attr *ma)
866 {
867         struct mdd_object *mdd_obj = md2mdd_obj(obj);
868         struct mdd_device *mdd = mdo2mdd(obj);
869         struct thandle *handle;
870         struct lov_mds_md *lmm = NULL;
871         int  rc = 0, lmm_size = 0, max_size;
872         struct lu_attr *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
873         ENTRY;
874
875         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
876         handle = mdd_trans_start(ctxt, mdd);
877         if (IS_ERR(handle))
878                 RETURN(PTR_ERR(handle));
879         /*TODO: add lock here*/
880         /* start a log jounal handle if needed */
881         if (S_ISREG(mdd_object_type(mdd_obj)) &&
882             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
883                 max_size = mdd_lov_mdsize(ctxt, mdd);
884                 OBD_ALLOC(lmm, max_size);
885                 if (lmm == NULL)
886                         GOTO(cleanup, rc = -ENOMEM);
887
888                 rc = mdd_get_md_locked(ctxt, mdd_obj, lmm, &lmm_size,
889                                 MDS_LOV_MD_NAME);
890
891                 if (rc < 0)
892                         GOTO(cleanup, rc);
893         }
894
895         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
896                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
897                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
898
899         *la_copy = ma->ma_attr;
900         mdd_write_lock(ctxt, mdd_obj);
901         rc = mdd_fix_attr(ctxt, mdd_obj, ma, la_copy);
902         mdd_write_unlock(ctxt, mdd_obj);
903         if (rc)
904                 GOTO(cleanup, rc);
905
906         if (la_copy->la_valid & LA_FLAGS) {
907                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
908                                                   handle);
909                 if (rc == 0)
910                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
911         } else if (la_copy->la_valid) {            /* setattr */
912                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
913                                                   handle);
914                 /* journal chown/chgrp in llog, just like unlink */
915                 if (rc == 0 && lmm_size){
916                         /*TODO set_attr llog */
917                 }
918         }
919
920         if (rc == 0 && ma->ma_valid & MA_LOV) {
921                 umode_t mode;
922
923                 mode = mdd_object_type(mdd_obj);
924                 if (S_ISREG(mode) || S_ISDIR(mode)) {
925                         /*TODO check permission*/
926                         rc = mdd_lov_set_md(ctxt, NULL, mdd_obj, ma->ma_lmm,
927                                             ma->ma_lmm_size, handle, 1);
928                 }
929
930         }
931 cleanup:
932         mdd_trans_stop(ctxt, mdd, rc, handle);
933         if (rc == 0 && lmm_size) {
934                 /*set obd attr, if needed*/
935                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
936         }
937         if (lmm != NULL) {
938                 OBD_FREE(lmm, max_size);
939         }
940
941         RETURN(rc);
942 }
943
944 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct mdd_object *obj,
945                       const void *buf, int buf_len, const char *name, int fl,
946                       struct thandle *handle)
947 {
948         int  rc;
949         ENTRY;
950
951         mdd_write_lock(ctxt, obj);
952         rc = __mdd_xattr_set(ctxt, obj, buf, buf_len, name, fl, handle);
953         mdd_write_unlock(ctxt, obj);
954
955         RETURN(rc);
956 }
957
958 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
959                          const void *buf, int buf_len, const char *name,
960                          int fl)
961 {
962         struct mdd_device *mdd = mdo2mdd(obj);
963         struct thandle *handle;
964         int  rc;
965         ENTRY;
966
967         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
968         handle = mdd_trans_start(ctxt, mdd);
969         if (IS_ERR(handle))
970                 RETURN(PTR_ERR(handle));
971
972         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
973                                fl, handle);
974
975         mdd_trans_stop(ctxt, mdd, rc, handle);
976
977         RETURN(rc);
978 }
979
980 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
981                            struct mdd_object *obj,
982                            const char *name, struct thandle *handle)
983 {
984         struct dt_object *next;
985
986         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
987         next = mdd_object_child(obj);
988         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
989 }
990
991 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
992                   const char *name)
993 {
994         struct mdd_object *mdd_obj = md2mdd_obj(obj);
995         struct mdd_device *mdd = mdo2mdd(obj);
996         struct thandle *handle;
997         int  rc;
998         ENTRY;
999
1000         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1001         handle = mdd_trans_start(ctxt, mdd);
1002         if (IS_ERR(handle))
1003                 RETURN(PTR_ERR(handle));
1004
1005         mdd_write_lock(ctxt, mdd_obj);
1006         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
1007         mdd_write_unlock(ctxt, mdd_obj);
1008
1009         mdd_trans_stop(ctxt, mdd, rc, handle);
1010
1011         RETURN(rc);
1012 }
1013
1014 static int __mdd_index_insert_only(const struct lu_context *ctxt,
1015                                    struct mdd_object *pobj,
1016                                    const struct lu_fid *lf,
1017                                    const char *name, struct thandle *th)
1018 {
1019         int rc;
1020         struct dt_object *next = mdd_object_child(pobj);
1021         ENTRY;
1022
1023         if (dt_try_as_dir(ctxt, next))
1024                 rc = next->do_index_ops->dio_insert(ctxt, next,
1025                                          (struct dt_rec *)lf,
1026                                          (struct dt_key *)name, th);
1027         else
1028                 rc = -ENOTDIR;
1029         RETURN(rc);
1030 }
1031
1032 /* insert new index, add reference if isdir, update times */
1033 static int __mdd_index_insert(const struct lu_context *ctxt,
1034                              struct mdd_object *pobj, const struct lu_fid *lf,
1035                              const char *name, int isdir, struct thandle *th)
1036 {
1037         int rc;
1038         struct dt_object *next = mdd_object_child(pobj);
1039         ENTRY;
1040
1041 #if 0
1042         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
1043 #endif
1044
1045         if (dt_try_as_dir(ctxt, next))
1046                 rc = next->do_index_ops->dio_insert(ctxt, next,
1047                                          (struct dt_rec *)lf,
1048                                          (struct dt_key *)name, th);
1049         else
1050                 rc = -ENOTDIR;
1051
1052         if (rc == 0) {
1053                 if (isdir)
1054                         __mdd_ref_add(ctxt, pobj, th);
1055 #if 0
1056                 la->la_valid = LA_MTIME|LA_CTIME;
1057                 la->la_atime = ma->ma_attr.la_atime;
1058                 la->la_ctime = ma->ma_attr.la_ctime;
1059                 rc = mdd_attr_set_internal(ctxt, mdd_obj, la, handle);
1060 #endif
1061         }
1062         return rc;
1063 }
1064
1065 static int __mdd_index_delete(const struct lu_context *ctxt,
1066                               struct mdd_object *pobj, const char *name,
1067                               struct thandle *handle)
1068 {
1069         int rc;
1070         struct dt_object *next = mdd_object_child(pobj);
1071         ENTRY;
1072
1073         if (dt_try_as_dir(ctxt, next))
1074                 rc = next->do_index_ops->dio_delete(ctxt, next,
1075                                         (struct dt_key *)name, handle);
1076         else
1077                 rc = -ENOTDIR;
1078         RETURN(rc);
1079 }
1080
1081 static int mdd_link_sanity_check(const struct lu_context *ctxt,
1082                                  struct mdd_object *tgt_obj,
1083                                  struct mdd_object *src_obj)
1084 {
1085         int rc;
1086
1087         rc = mdd_may_create(ctxt, tgt_obj, NULL);
1088         if (rc)
1089                 RETURN(rc);
1090         if (S_ISDIR(mdd_object_type(src_obj)))
1091                 RETURN(-EPERM);
1092
1093         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1094                 RETURN(-EPERM);
1095
1096         RETURN(rc);
1097 }
1098
1099 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
1100                     struct md_object *src_obj, const char *name,
1101                     struct md_attr *ma)
1102 {
1103         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1104         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1105         struct mdd_device *mdd = mdo2mdd(src_obj);
1106         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1107         struct thandle *handle;
1108         int rc;
1109         ENTRY;
1110
1111         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
1112         handle = mdd_trans_start(ctxt, mdd);
1113         if (IS_ERR(handle))
1114                 RETURN(PTR_ERR(handle));
1115
1116         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
1117
1118         rc = mdd_link_sanity_check(ctxt, mdd_tobj, mdd_sobj);
1119         if (rc)
1120                 GOTO(out, rc);
1121
1122         rc = __mdd_index_insert_only(ctxt, mdd_tobj, mdo2fid(mdd_sobj),
1123                                      name, handle);
1124         if (rc == 0)
1125                 __mdd_ref_add(ctxt, mdd_sobj, handle);
1126
1127         *la_copy = ma->ma_attr;
1128         la_copy->la_valid = LA_CTIME;
1129         rc = mdd_attr_set_internal(ctxt, mdd_sobj, la_copy, handle);
1130         if (rc)
1131                 GOTO(out, rc);
1132
1133         la_copy->la_valid = LA_CTIME | LA_MTIME;
1134         rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1135
1136 out:
1137         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
1138         mdd_trans_stop(ctxt, mdd, rc, handle);
1139         RETURN(rc);
1140 }
1141
1142 /*
1143  * Check that @dir contains no entries except (possibly) dot and dotdot.
1144  *
1145  * Returns:
1146  *
1147  *             0        empty
1148  *    -ENOTEMPTY        not empty
1149  *           -ve        other error
1150  *
1151  */
1152 static int mdd_dir_is_empty(const struct lu_context *ctx,
1153                             struct mdd_object *dir)
1154 {
1155         struct dt_it     *it;
1156         struct dt_object *obj;
1157         struct dt_it_ops *iops;
1158         int result;
1159
1160         obj = mdd_object_child(dir);
1161         iops = &obj->do_index_ops->dio_it;
1162         it = iops->init(ctx, obj, 0);
1163         if (it != NULL) {
1164                 result = iops->get(ctx, it, (const void *)"");
1165                 if (result > 0) {
1166                         int i;
1167                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1168                                 result = iops->next(ctx, it);
1169                         iops->put(ctx, it);
1170                         if (result == 0)
1171                                 result = -ENOTEMPTY;
1172                         else if (result == +1)
1173                                 result = 0;
1174                 } else if (result == 0)
1175                         /*
1176                          * Huh? Index contains no zero key?
1177                          */
1178                         result = -EIO;
1179                 iops->fini(ctx, it);
1180         } else
1181                 result = -ENOMEM;
1182         return result;
1183 }
1184
1185 /* return md_attr back,
1186  * if it is last unlink then return lov ea + llog cookie*/
1187 int __mdd_object_kill(const struct lu_context *ctxt,
1188                       struct mdd_object *obj,
1189                       struct md_attr *ma)
1190 {
1191         int rc = 0;
1192
1193         mdd_set_dead_obj(obj);
1194         if (S_ISREG(mdd_object_type(obj))) {
1195                 rc = __mdd_lmm_get(ctxt, obj, ma);
1196                 if (ma->ma_valid & MA_LOV)
1197                         rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
1198                                             obj, ma);
1199         }
1200         return rc;
1201 }
1202 /* caller should take a lock before calling */
1203 static int __mdd_finish_unlink(const struct lu_context *ctxt,
1204                                struct mdd_object *obj, struct md_attr *ma,
1205                                struct thandle *th)
1206 {
1207         int rc;
1208         ENTRY;
1209
1210         rc = __mdd_iattr_get(ctxt, obj, ma);
1211         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1212                 if (obj->mod_count == 0) {
1213                         rc = __mdd_object_kill(ctxt, obj, ma);
1214                 } else {
1215                         /* add new orphan */
1216                         rc = __mdd_orphan_add(ctxt, obj, th);
1217                 }
1218         }
1219         RETURN(rc);
1220 }
1221
1222 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
1223                                    struct mdd_object *pobj,
1224                                    struct mdd_object *cobj,
1225                                    struct md_attr *ma)
1226 {
1227         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1228         int rc = 0;
1229         ENTRY;
1230
1231         rc = mdd_may_delete(ctxt, pobj, cobj, S_ISDIR(ma->ma_attr.la_mode));
1232         if (rc)
1233                 RETURN(rc);
1234
1235         if (S_ISDIR(mdd_object_type(cobj)) &&
1236             dt_try_as_dir(ctxt, dt_cobj)) {
1237                 rc = mdd_dir_is_empty(ctxt, cobj);
1238                 if (rc != 0)
1239                         RETURN(rc);
1240         }
1241
1242         RETURN(rc);
1243 }
1244
1245 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
1246                       struct md_object *cobj, const char *name,
1247                       struct md_attr *ma)
1248 {
1249         struct mdd_device *mdd = mdo2mdd(pobj);
1250         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1251         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1252         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1253         struct thandle    *handle;
1254         int rc;
1255         ENTRY;
1256
1257         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
1258         handle = mdd_trans_start(ctxt, mdd);
1259         if (IS_ERR(handle))
1260                 RETURN(PTR_ERR(handle));
1261
1262         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
1263
1264         rc = mdd_unlink_sanity_check(ctxt, mdd_pobj, mdd_cobj, ma);
1265         if (rc)
1266                 GOTO(cleanup, rc);
1267
1268
1269         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1270         if (rc)
1271                 GOTO(cleanup, rc);
1272
1273         __mdd_ref_del(ctxt, mdd_cobj, handle);
1274         *la_copy = ma->ma_attr;
1275         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1276                 /* unlink dot */
1277                 __mdd_ref_del(ctxt, mdd_cobj, handle);
1278                 /* unlink dotdot */
1279                 __mdd_ref_del(ctxt, mdd_pobj, handle);
1280         } else {
1281                 la_copy->la_valid = LA_CTIME;
1282                 rc = mdd_attr_set_internal(ctxt, mdd_cobj, la_copy, handle);
1283                 if (rc)
1284                         GOTO(cleanup, rc);
1285         }
1286
1287         la_copy->la_valid = LA_CTIME | LA_MTIME;
1288         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1289         if (rc)
1290                 GOTO(cleanup, rc);
1291
1292         rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma, handle);
1293
1294 cleanup:
1295         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
1296         mdd_trans_stop(ctxt, mdd, rc, handle);
1297         RETURN(rc);
1298 }
1299 /* partial unlink */
1300 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1301                        struct md_attr *ma)
1302 {
1303         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1304         struct mdd_device *mdd = mdo2mdd(obj);
1305         struct thandle *handle;
1306         int rc;
1307         ENTRY;
1308
1309         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1310         handle = mdd_trans_start(ctxt, mdd);
1311         if (IS_ERR(handle))
1312                 RETURN(-ENOMEM);
1313
1314         mdd_write_lock(ctxt, mdd_obj);
1315
1316         rc = mdd_unlink_sanity_check(ctxt, NULL, mdd_obj, ma);
1317         if (rc)
1318                 GOTO(cleanup, rc);
1319
1320         __mdd_ref_del(ctxt, mdd_obj, handle);
1321
1322         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1323                 /* unlink dot */
1324                 __mdd_ref_del(ctxt, mdd_obj, handle);
1325         }
1326
1327         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle);
1328
1329 cleanup:
1330         mdd_write_unlock(ctxt, mdd_obj);
1331         mdd_trans_stop(ctxt, mdd, rc, handle);
1332         RETURN(rc);
1333 }
1334
1335 static int mdd_parent_fid(const struct lu_context *ctxt,
1336                           struct mdd_object *obj,
1337                           struct lu_fid *fid)
1338 {
1339         return mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
1340 }
1341
1342 /*
1343  * return 0: if p2 is the parent of p1
1344  * otherwise: other_value
1345  */
1346 static int mdd_is_parent(const struct lu_context *ctxt,
1347                          struct mdd_device *mdd,
1348                          struct mdd_object *p1,
1349                          struct mdd_object *p2)
1350 {
1351         struct lu_fid * pfid;
1352         struct mdd_object *parent = NULL;
1353         int rc;
1354         ENTRY;
1355
1356         pfid = &mdd_ctx_info(ctxt)->mti_fid;
1357         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1358                 RETURN(1);
1359         for(;;) {
1360                 rc = mdd_parent_fid(ctxt, p1, pfid);
1361                 if (rc)
1362                         GOTO(out, rc);
1363                 if (lu_fid_eq(pfid, mdo2fid(p2)))
1364                         GOTO(out, rc = 0);
1365                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1366                         GOTO(out, rc = 1);
1367                 if (parent)
1368                         mdd_object_put(ctxt, parent);
1369                 parent = mdd_object_find(ctxt, mdd, pfid);
1370                 /* cross-ref parent, not supported yet */
1371                 if (parent == NULL)
1372                         GOTO(out, rc = -EOPNOTSUPP);
1373                 else if (IS_ERR(parent))
1374                         GOTO(out, rc = PTR_ERR(parent));
1375                 p1 = parent;
1376         }
1377 out:
1378         if (parent && !IS_ERR(parent))
1379                 mdd_object_put(ctxt, parent);
1380         RETURN(rc);
1381 }
1382
1383 static int mdd_rename_lock(const struct lu_context *ctxt,
1384                            struct mdd_device *mdd,
1385                            struct mdd_object *src_pobj,
1386                            struct mdd_object *tgt_pobj)
1387 {
1388         ENTRY;
1389
1390         if (src_pobj == tgt_pobj) {
1391                 mdd_write_lock(ctxt, src_pobj);
1392                 RETURN(0);
1393         }
1394         /*compared the parent child relationship of src_p&tgt_p*/
1395         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1396                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
1397                 RETURN(0);
1398         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1399                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1400                 RETURN(0);
1401         }
1402
1403         if (!mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
1404                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1405                 RETURN(0);
1406         }
1407
1408         mdd_lock2(ctxt, src_pobj, tgt_pobj);
1409
1410         RETURN(0);
1411 }
1412
1413 static void mdd_rename_unlock(const struct lu_context *ctxt,
1414                               struct mdd_object *src_pobj,
1415                               struct mdd_object *tgt_pobj)
1416 {
1417         mdd_write_unlock(ctxt, src_pobj);
1418         if (src_pobj != tgt_pobj)
1419                 mdd_write_unlock(ctxt, tgt_pobj);
1420 }
1421
1422 static int mdd_rename_sanity_check(const struct lu_context *ctxt,
1423                                    struct mdd_object *src_pobj,
1424                                    struct mdd_object *tgt_pobj,
1425                                    struct mdd_object *sobj,
1426                                    struct mdd_object *tobj)
1427 {
1428         struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj);
1429         int rc = 0, src_is_dir, tgt_is_dir;
1430         ENTRY;
1431
1432         src_is_dir = S_ISDIR(mdd_object_type(sobj));
1433         rc = mdd_may_delete(ctxt, src_pobj, sobj, src_is_dir);
1434         if (rc)
1435                 GOTO(out, rc);
1436
1437         if (!tobj) {
1438                 rc = mdd_may_create(ctxt, tgt_pobj, NULL);
1439         } else {
1440                 rc = mdd_may_delete(ctxt, tgt_pobj, tobj, src_is_dir);
1441                 if (rc == 0) {
1442                         tgt_is_dir = S_ISDIR(mdd_object_type(tobj));
1443                         if (tgt_is_dir && mdd_dir_is_empty(ctxt, tobj))
1444                                 rc = -ENOTEMPTY;
1445                 }
1446         }
1447         if (rc)
1448                 GOTO(out, rc);
1449
1450         /* source should not be ancestor of target dir */
1451         if (src_is_dir && !mdd_is_parent(ctxt, mdd, tgt_pobj, sobj))
1452                 rc = -EINVAL;
1453
1454 out:
1455         mdd_object_put(ctxt, sobj);
1456         RETURN(rc);
1457 }
1458
1459 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
1460                       struct md_object *tgt_pobj, const struct lu_fid *lf,
1461                       const char *sname, struct md_object *tobj,
1462                       const char *tname, struct md_attr *ma)
1463 {
1464         struct mdd_device *mdd = mdo2mdd(src_pobj);
1465         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1466         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1467         struct mdd_object *mdd_sobj = mdd_object_find(ctxt, mdd, lf);
1468         struct mdd_object *mdd_tobj = NULL;
1469         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1470         struct thandle *handle;
1471         int is_dir;
1472         int rc;
1473         ENTRY;
1474
1475         /* the source can be remote one, not supported yet */
1476         if (mdd_sobj == NULL)
1477                 RETURN(-EOPNOTSUPP);
1478
1479         is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
1480         
1481         if (tobj)
1482                 mdd_tobj = md2mdd_obj(tobj);
1483
1484         /*XXX: shouldn't this check be done under lock below? */
1485         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
1486                                      mdd_sobj, mdd_tobj);
1487         if (rc)
1488                 RETURN(rc);
1489
1490         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1491         handle = mdd_trans_start(ctxt, mdd);
1492         if (IS_ERR(handle))
1493                 RETURN(PTR_ERR(handle));
1494
1495         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1496         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
1497         if (rc)
1498                 GOTO(cleanup_unlocked, rc);
1499
1500         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
1501         if (rc)
1502                 GOTO(cleanup, rc);
1503
1504         /*if sobj is dir, its parent object nlink should be dec too*/
1505         if (is_dir)
1506                 __mdd_ref_del(ctxt, mdd_spobj, handle);
1507
1508         if (tobj) {
1509                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
1510                 if (rc)
1511                         GOTO(cleanup, rc);
1512         }
1513
1514         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, is_dir, handle);
1515         if (rc)
1516                 GOTO(cleanup, rc);
1517
1518         *la_copy = ma->ma_attr;
1519         la_copy->la_valid = LA_CTIME;
1520         rc = mdd_attr_set_internal_locked(ctxt, mdd_sobj, la_copy, handle);
1521         if (rc)
1522                 GOTO(cleanup, rc);
1523
1524         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1525                 mdd_write_lock(ctxt, mdd_tobj);
1526                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1527                 /* remove dot reference */
1528                 if (is_dir)
1529                         __mdd_ref_del(ctxt, mdd_tobj, handle);
1530
1531                 la_copy->la_valid = LA_CTIME;
1532                 rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1533                 if (rc)
1534                         GOTO(cleanup, rc);
1535
1536                 rc = __mdd_finish_unlink(ctxt, mdd_tobj, ma, handle);
1537                 mdd_write_unlock(ctxt, mdd_tobj);
1538                 if (rc)
1539                         GOTO(cleanup, rc);
1540         }
1541
1542         la_copy->la_valid = LA_CTIME | LA_MTIME;
1543         rc = mdd_attr_set_internal(ctxt, mdd_spobj, la_copy, handle);
1544         if (rc)
1545                 GOTO(cleanup, rc);
1546
1547         if (mdd_spobj != mdd_tpobj) {
1548                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1549                 rc = mdd_attr_set_internal(ctxt, mdd_tpobj, la_copy, handle);
1550         }
1551
1552 cleanup:
1553         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
1554 cleanup_unlocked:
1555         mdd_trans_stop(ctxt, mdd, rc, handle);
1556         RETURN(rc);
1557 }
1558
1559 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
1560                       const char *name, struct lu_fid* fid)
1561 {
1562         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1563         struct dt_object    *dir = mdd_object_child(mdd_obj);
1564         struct dt_rec       *rec    = (struct dt_rec *)fid;
1565         const struct dt_key *key = (const struct dt_key *)name;
1566         int rc;
1567         ENTRY;
1568
1569         if (mdd_is_dead_obj(mdd_obj))
1570                 RETURN(-ESTALE);
1571         mdd_read_lock(ctxt, mdd_obj);
1572         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(ctxt, dir))
1573                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
1574         else
1575                 rc = -ENOTDIR;
1576         mdd_read_unlock(ctxt, mdd_obj);
1577         RETURN(rc);
1578 }
1579
1580 static int __mdd_object_initialize(const struct lu_context *ctxt,
1581                                    const struct lu_fid *pfid,
1582                                    struct mdd_object *child,
1583                                    struct md_attr *ma, struct thandle *handle)
1584 {
1585         int rc;
1586         ENTRY;
1587
1588         /* update attributes for child.
1589          * FIXME:
1590          *  (1) the valid bits should be converted between Lustre and Linux;
1591          *  (2) maybe, the child attributes should be set in OSD when creation.
1592          */
1593
1594         rc = mdd_attr_set_internal(ctxt, child, &ma->ma_attr, handle);
1595         if (rc != 0)
1596                 RETURN(rc);
1597
1598         if (S_ISDIR(ma->ma_attr.la_mode)) {
1599                 /* add . and .. for newly created dir */
1600                 __mdd_ref_add(ctxt, child, handle);
1601                 rc = __mdd_index_insert_only(ctxt, child, mdo2fid(child),
1602                                              dot, handle);
1603                 if (rc == 0) {
1604                         rc = __mdd_index_insert_only(ctxt, child, pfid,
1605                                                      dotdot, handle);
1606                         if (rc != 0) {
1607                                 int rc2;
1608
1609                                 rc2 = __mdd_index_delete(ctxt,
1610                                                          child, dot, handle);
1611                                 if (rc2 != 0)
1612                                         CERROR("Failure to cleanup after dotdot"
1613                                                " creation: %d (%d)\n", rc2, rc);
1614                                 else
1615                                         __mdd_ref_del(ctxt, child, handle);
1616                         }
1617                 }
1618         }
1619         RETURN(rc);
1620 }
1621
1622 static int mdd_create_data(const struct lu_context *ctxt,
1623                            struct md_object *pobj, struct md_object *cobj,
1624                            const struct md_create_spec *spec,
1625                            struct md_attr *ma)
1626 {
1627         struct mdd_device *mdd = mdo2mdd(cobj);
1628         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
1629         struct mdd_object *son = md2mdd_obj(cobj);
1630         struct lu_attr    *attr = &ma->ma_attr;
1631         struct lov_mds_md *lmm = NULL;
1632         int                lmm_size = 0;
1633         struct thandle    *handle;
1634         int                rc;
1635         ENTRY;
1636
1637         rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
1638                             attr);
1639         if (rc)
1640                 RETURN(rc);
1641
1642         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
1643         handle = mdd_trans_start(ctxt, mdd);
1644         if (IS_ERR(handle))
1645                 RETURN(rc = PTR_ERR(handle));
1646
1647         /*XXX: setting the lov ea is not locked
1648          * but setting the attr is locked? */
1649         rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
1650         if (rc == 0)
1651                rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1652
1653         /* finish mdd_lov_create() stuff */
1654         mdd_lov_create_finish(ctxt, mdd, rc);
1655         mdd_trans_stop(ctxt, mdd, rc, handle);
1656         if (lmm)
1657                 OBD_FREE(lmm, lmm_size);
1658         RETURN(rc);
1659 }
1660
1661 static int mdd_create_sanity_check(const struct lu_context *ctxt,
1662                                    struct mdd_device *mdd,
1663                                    struct md_object *pobj,
1664                                    const char *name, struct md_attr *ma)
1665 {
1666         struct mdd_thread_info *info = mdd_ctx_info(ctxt);
1667         struct lu_attr    *la        = &info->mti_la;
1668         struct lu_fid     *fid       = &info->mti_fid;
1669         struct mdd_object *obj       = md2mdd_obj(pobj);
1670         int rc;
1671
1672         ENTRY;
1673         /* EEXIST check */
1674         if (mdd_is_dead_obj(obj))
1675                 RETURN(-ENOENT);
1676         rc = mdd_lookup(ctxt, pobj, name, fid);
1677         if (rc != -ENOENT)
1678                 RETURN(rc ? : -EEXIST);
1679
1680         /* sgid check */
1681         mdd_read_lock(ctxt, obj);
1682         rc = __mdd_la_get(ctxt, obj, la);
1683         mdd_read_unlock(ctxt, obj);
1684         if (rc != 0)
1685                 RETURN(rc);
1686
1687         if (la->la_mode & S_ISGID) {
1688                 ma->ma_attr.la_gid = la->la_gid;
1689                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1690                         ma->ma_attr.la_mode |= S_ISGID;
1691                         ma->ma_attr.la_valid |= LA_MODE;
1692                 }
1693         }
1694
1695         switch (ma->ma_attr.la_mode & S_IFMT) {
1696         case S_IFREG:
1697         case S_IFDIR:
1698         case S_IFLNK:
1699         case S_IFCHR:
1700         case S_IFBLK:
1701         case S_IFIFO:
1702         case S_IFSOCK:
1703                 rc = 0;
1704                 break;
1705         default:
1706                 rc = -EINVAL;
1707                 break;
1708         }
1709         RETURN(rc);
1710 }
1711
1712 /*
1713  * Create object and insert it into namespace.
1714  */
1715 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
1716                       const char *name, struct md_object *child,
1717                       const struct md_create_spec *spec,
1718                       struct md_attr* ma)
1719 {
1720         struct mdd_device *mdd = mdo2mdd(pobj);
1721         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1722         struct mdd_object *son = md2mdd_obj(child);
1723         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1724         struct lu_attr    *attr = &ma->ma_attr;
1725         struct lov_mds_md *lmm = NULL;
1726         struct thandle    *handle;
1727         int rc, created = 0, inserted = 0, lmm_size = 0;
1728         ENTRY;
1729
1730         /* sanity checks before big job */
1731         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1732         if (rc)
1733                 RETURN(rc);
1734         /* no RPC inside the transaction, so OST objects should be created at
1735          * first */
1736         if (S_ISREG(attr->la_mode)) {
1737                 rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size,
1738                                     spec, attr);
1739                 if (rc)
1740                         RETURN(rc);
1741         }
1742
1743         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1744         handle = mdd_trans_start(ctxt, mdd);
1745         if (IS_ERR(handle))
1746                 RETURN(PTR_ERR(handle));
1747
1748         mdd_write_lock(ctxt, mdd_pobj);
1749
1750         /*
1751          * XXX check that link can be added to the parent in mkdir case.
1752          */
1753
1754         /*
1755          * Two operations have to be performed:
1756          *
1757          *  - allocation of new object (->do_create()), and
1758          *
1759          *  - insertion into parent index (->dio_insert()).
1760          *
1761          * Due to locking, operation order is not important, when both are
1762          * successful, *but* error handling cases are quite different:
1763          *
1764          *  - if insertion is done first, and following object creation fails,
1765          *  insertion has to be rolled back, but this operation might fail
1766          *  also leaving us with dangling index entry.
1767          *
1768          *  - if creation is done first, is has to be undone if insertion
1769          *  fails, leaving us with leaked space, which is neither good, nor
1770          *  fatal.
1771          *
1772          * It seems that creation-first is simplest solution, but it is
1773          * sub-optimal in the frequent
1774          *
1775          *         $ mkdir foo
1776          *         $ mkdir foo
1777          *
1778          * case, because second mkdir is bound to create object, only to
1779          * destroy it immediately.
1780          *
1781          * Note that local file systems do
1782          *
1783          *     0. lookup -> -EEXIST
1784          *
1785          *     1. create
1786          *
1787          *     2. insert
1788          *
1789          * Maybe we should do the same. For now: creation-first.
1790          */
1791
1792         mdd_write_lock(ctxt, son);
1793         rc = __mdd_object_create(ctxt, son, ma, handle);
1794         if (rc) {
1795                 mdd_write_unlock(ctxt, son);
1796                 GOTO(cleanup, rc);
1797         }
1798
1799         created = 1;
1800
1801         rc = __mdd_object_initialize(ctxt, mdo2fid(mdd_pobj),
1802                                      son, ma, handle);
1803         mdd_write_unlock(ctxt, son);
1804         if (rc)
1805                 /*
1806                  * Object has no links, so it will be destroyed when last
1807                  * reference is released. (XXX not now.)
1808                  */
1809                 GOTO(cleanup, rc);
1810
1811         rc = __mdd_index_insert(ctxt, mdd_pobj, mdo2fid(son),
1812                                 name, S_ISDIR(attr->la_mode), handle);
1813
1814         if (rc)
1815                 GOTO(cleanup, rc);
1816
1817         inserted = 1;
1818         /* replay creates has objects already */
1819         if (spec->u.sp_ea.no_lov_create)
1820                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son,
1821                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1822                                     spec->u.sp_ea.eadatalen, handle, 0);
1823         else
1824                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm,
1825                                     lmm_size, handle, 0);
1826         if (rc) {
1827                 CERROR("error on stripe info copy %d \n", rc);
1828                 GOTO(cleanup, rc);
1829         }
1830
1831         if (S_ISLNK(attr->la_mode)) {
1832                 struct dt_object *dt = mdd_object_child(son);
1833                 const char *target_name = spec->u.sp_symname;
1834                 int sym_len = strlen(target_name);
1835                 loff_t pos = 0;
1836
1837                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1838                                                 sym_len, &pos, handle);
1839                 if (rc == sym_len)
1840                         rc = 0;
1841                 else
1842                         rc = -EFAULT;
1843         }
1844
1845         *la_copy = ma->ma_attr;
1846         la_copy->la_valid = LA_CTIME | LA_MTIME;
1847         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1848         if (rc)
1849                 GOTO(cleanup, rc);
1850
1851         /* return attr back */
1852         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1853 cleanup:
1854         if (rc && created) {
1855                 int rc2 = 0;
1856
1857                 if (inserted) {
1858                         rc2 = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1859                         if (rc2)
1860                                 CERROR("error can not cleanup destroy %d\n",
1861                                        rc2);
1862                 }
1863                 if (rc2 == 0)
1864                         __mdd_ref_del(ctxt, son, handle);
1865         }
1866         /* finish mdd_lov_create() stuff */
1867         mdd_lov_create_finish(ctxt, mdd, rc);
1868         if (lmm)
1869                 OBD_FREE(lmm, lmm_size);
1870         mdd_write_unlock(ctxt, mdd_pobj);
1871         mdd_trans_stop(ctxt, mdd, rc, handle);
1872         RETURN(rc);
1873 }
1874 /* partial operation */
1875 static int mdd_object_create(const struct lu_context *ctxt,
1876                              struct md_object *obj,
1877                              const struct md_create_spec *spec,
1878                              struct md_attr *ma)
1879 {
1880
1881         struct mdd_device *mdd = mdo2mdd(obj);
1882         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1883         struct thandle *handle;
1884         int rc;
1885         ENTRY;
1886
1887         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1888         handle = mdd_trans_start(ctxt, mdd);
1889         if (IS_ERR(handle))
1890                 RETURN(PTR_ERR(handle));
1891
1892         mdd_write_lock(ctxt, mdd_obj);
1893         rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
1894         if (rc == 0)
1895                 rc = __mdd_object_initialize(ctxt, spec->u.sp_pfid, mdd_obj,
1896                                      ma, handle);
1897         mdd_write_unlock(ctxt, mdd_obj);
1898
1899         if (rc == 0)
1900                 rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
1901
1902         mdd_trans_stop(ctxt, mdd, rc, handle);
1903         RETURN(rc);
1904 }
1905 /* partial operation */
1906 static int mdd_name_insert(const struct lu_context *ctxt,
1907                            struct md_object *pobj, const char *name,
1908                            const struct lu_fid *fid, int isdir)
1909 {
1910         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1911         struct thandle *handle;
1912         int rc;
1913         ENTRY;
1914
1915         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1916         handle = mdd_trans_start(ctxt, mdo2mdd(pobj));
1917         if (IS_ERR(handle))
1918                 RETURN(PTR_ERR(handle));
1919
1920         mdd_write_lock(ctxt, mdd_obj);
1921         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, isdir, handle);
1922         mdd_write_unlock(ctxt, mdd_obj);
1923
1924         mdd_trans_stop(ctxt, mdo2mdd(pobj), rc, handle);
1925         RETURN(rc);
1926 }
1927
1928 static int mdd_name_remove(const struct lu_context *ctxt,
1929                            struct md_object *pobj,
1930                            const char *name)
1931 {
1932         struct mdd_device *mdd = mdo2mdd(pobj);
1933         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1934         struct thandle *handle;
1935         int rc;
1936         ENTRY;
1937
1938         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1939         handle = mdd_trans_start(ctxt, mdd);
1940         if (IS_ERR(handle))
1941                 RETURN(PTR_ERR(handle));
1942
1943         mdd_write_lock(ctxt, mdd_obj);
1944
1945         rc = __mdd_index_delete(ctxt, mdd_obj, name, handle);
1946
1947         mdd_write_unlock(ctxt, mdd_obj);
1948
1949         mdd_trans_stop(ctxt, mdd, rc, handle);
1950         RETURN(rc);
1951 }
1952
1953 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1954                           struct md_object *tobj, const struct lu_fid *lf,
1955                           const char *name, struct md_attr *ma)
1956 {
1957         struct mdd_device *mdd = mdo2mdd(pobj);
1958         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1959         struct mdd_object *mdd_tobj = NULL;
1960         struct thandle *handle;
1961         int rc;
1962         ENTRY;
1963
1964         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1965         handle = mdd_trans_start(ctxt, mdd);
1966         if (IS_ERR(handle))
1967                 RETURN(PTR_ERR(handle));
1968
1969         if (tobj)
1970                 mdd_tobj = md2mdd_obj(tobj);
1971
1972         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1973
1974         /*TODO rename sanity checking*/
1975         if (tobj) {
1976                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1977                 if (rc)
1978                         GOTO(cleanup, rc);
1979         }
1980
1981         rc = __mdd_index_insert_only(ctxt, mdd_tpobj, lf, name, handle);
1982         if (rc)
1983                 GOTO(cleanup, rc);
1984
1985         if (tobj && lu_object_exists(&tobj->mo_lu))
1986                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1987 cleanup:
1988         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1989         mdd_trans_stop(ctxt, mdd, rc, handle);
1990         RETURN(rc);
1991 }
1992
1993 static int mdd_get_root(const struct lu_context *ctx,
1994                         struct md_device *m, struct lu_fid *f)
1995 {
1996         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1997
1998         ENTRY;
1999         *f = mdd->mdd_root_fid;
2000         RETURN(0);
2001 }
2002
2003 static int mdd_statfs(const struct lu_context *ctx,
2004                       struct md_device *m, struct kstatfs *sfs)
2005 {
2006         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2007         int rc;
2008
2009         ENTRY;
2010
2011         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
2012
2013         RETURN(rc);
2014 }
2015
2016 static int mdd_get_maxsize(const struct lu_context *ctx,
2017                            struct md_device *m, int *md_size,
2018                            int *cookie_size)
2019 {
2020         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2021         ENTRY;
2022
2023         *md_size =  mdd_lov_mdsize(ctx, mdd);
2024         *cookie_size = mdd_lov_cookiesize(ctx, mdd);
2025
2026         RETURN(0);
2027 }
2028
2029 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
2030                          struct thandle *handle)
2031 {
2032         struct dt_object *next;
2033
2034         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2035         next = mdd_object_child(obj);
2036         next->do_ops->do_ref_add(ctxt, next, handle);
2037 }
2038
2039 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
2040 {
2041         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2042         struct mdd_device *mdd = mdo2mdd(obj);
2043         struct thandle *handle;
2044         ENTRY;
2045
2046         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
2047         handle = mdd_trans_start(ctxt, mdd);
2048         if (IS_ERR(handle))
2049                 RETURN(-ENOMEM);
2050
2051         mdd_write_lock(ctxt, mdd_obj);
2052         __mdd_ref_add(ctxt, mdd_obj, handle);
2053         mdd_write_unlock(ctxt, mdd_obj);
2054
2055         mdd_trans_stop(ctxt, mdd, 0, handle);
2056
2057         RETURN(0);
2058 }
2059
2060 static void
2061 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
2062               struct thandle *handle)
2063 {
2064         struct dt_object *next = mdd_object_child(obj);
2065
2066         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2067
2068         next->do_ops->do_ref_del(ctxt, next, handle);
2069 }
2070
2071 /* do NOT or the MAY_*'s, you'll get the weakest */
2072 static int accmode(struct mdd_object *mdd_obj, int flags)
2073 {
2074         int res = 0;
2075
2076 #if 0
2077         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2078          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2079          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2080          * owner can write to a file even if it is marked readonly to hide
2081          * its brokenness. (bug 5781) */
2082         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2083                 return 0;
2084 #endif
2085         if (flags & FMODE_READ)
2086                 res = MAY_READ;
2087         if (flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
2088                 res |= MAY_WRITE;
2089         if (flags & MDS_FMODE_EXEC)
2090                 res = MAY_EXEC;
2091         return res;
2092 }
2093
2094 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
2095                     int flags)
2096 {
2097         int mode = accmode(md2mdd_obj(obj), flags);
2098         int rc = 0;
2099
2100         mdd_write_lock(ctxt, md2mdd_obj(obj));
2101
2102         if (mode & MAY_WRITE) {
2103                 if (mdd_is_immutable(md2mdd_obj(obj)))
2104                         rc = -EACCES;
2105         }
2106         
2107         if (rc == 0)
2108                 md2mdd_obj(obj)->mod_count ++;
2109
2110         mdd_write_unlock(ctxt, md2mdd_obj(obj));
2111         return rc;
2112 }
2113
2114 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
2115                      struct md_attr *ma)
2116 {
2117         int rc;
2118         struct mdd_object *mdd_obj;
2119         struct thandle *handle = NULL;
2120         ENTRY;
2121
2122         mdd_obj = md2mdd_obj(obj);
2123         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
2124         handle = mdd_trans_start(ctxt, mdo2mdd(obj));
2125         if (IS_ERR(handle))
2126                 RETURN(-ENOMEM);
2127
2128         mdd_write_lock(ctxt, mdd_obj);
2129         rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
2130         if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
2131                 if (ma->ma_attr.la_nlink == 0) {
2132                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
2133                         if (rc == 0)
2134                                 /* let's remove obj from the orphan list */
2135                                 rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
2136                 }
2137         }
2138         mdd_write_unlock(ctxt, mdd_obj);
2139         mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
2140         RETURN(rc);
2141 }
2142
2143 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
2144                         const struct lu_rdpg *rdpg)
2145 {
2146         struct dt_object *next;
2147         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2148         int rc;
2149
2150         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2151         next = mdd_object_child(mdd_obj);
2152
2153         mdd_read_lock(ctxt, mdd_obj);
2154         if (S_ISDIR(mdd_object_type(mdd_obj)) &&
2155             dt_try_as_dir(ctxt, next))
2156                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
2157         else
2158                 rc = -ENOTDIR;
2159         mdd_read_unlock(ctxt, mdd_obj);
2160         return rc;
2161 }
2162
2163 struct md_device_operations mdd_ops = {
2164         .mdo_statfs         = mdd_statfs,
2165         .mdo_get_root       = mdd_get_root,
2166         .mdo_get_maxsize    = mdd_get_maxsize,
2167 };
2168
2169 static struct md_dir_operations mdd_dir_ops = {
2170         .mdo_lookup        = mdd_lookup,
2171         .mdo_create        = mdd_create,
2172         .mdo_rename        = mdd_rename,
2173         .mdo_link          = mdd_link,
2174         .mdo_unlink        = mdd_unlink,
2175         .mdo_name_insert   = mdd_name_insert,
2176         .mdo_name_remove   = mdd_name_remove,
2177         .mdo_rename_tgt    = mdd_rename_tgt,
2178         .mdo_create_data   = mdd_create_data
2179 };
2180
2181
2182 static struct md_object_operations mdd_obj_ops = {
2183         .moo_attr_get      = mdd_attr_get,
2184         .moo_attr_set      = mdd_attr_set,
2185         .moo_xattr_get     = mdd_xattr_get,
2186         .moo_xattr_set     = mdd_xattr_set,
2187         .moo_xattr_list    = mdd_xattr_list,
2188         .moo_xattr_del     = mdd_xattr_del,
2189         .moo_object_create = mdd_object_create,
2190         .moo_ref_add       = mdd_ref_add,
2191         .moo_ref_del       = mdd_ref_del,
2192         .moo_open          = mdd_open,
2193         .moo_close         = mdd_close,
2194         .moo_readpage      = mdd_readpage,
2195         .moo_readlink      = mdd_readlink
2196 };
2197
2198 static struct obd_ops mdd_obd_device_ops = {
2199         .o_owner = THIS_MODULE
2200 };
2201
2202 static struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
2203                                           struct lu_device_type *t,
2204                                           struct lustre_cfg *lcfg)
2205 {
2206         struct lu_device  *l;
2207         struct mdd_device *m;
2208
2209         OBD_ALLOC_PTR(m);
2210         if (m == NULL) {
2211                 l = ERR_PTR(-ENOMEM);
2212         } else {
2213                 md_device_init(&m->mdd_md_dev, t);
2214                 l = mdd2lu_dev(m);
2215                 l->ld_ops = &mdd_lu_ops;
2216                 m->mdd_md_dev.md_ops = &mdd_ops;
2217         }
2218
2219         return l;
2220 }
2221
2222 static void mdd_device_free(const struct lu_context *ctx,
2223                             struct lu_device *lu)
2224 {
2225         struct mdd_device *m = lu2mdd_dev(lu);
2226
2227         LASSERT(atomic_read(&lu->ld_ref) == 0);
2228         md_device_fini(&m->mdd_md_dev);
2229         OBD_FREE_PTR(m);
2230 }
2231
2232 static int mdd_type_init(struct lu_device_type *t)
2233 {
2234         return lu_context_key_register(&mdd_thread_key);
2235 }
2236
2237 static void mdd_type_fini(struct lu_device_type *t)
2238 {
2239         lu_context_key_degister(&mdd_thread_key);
2240 }
2241
2242 static struct lu_device_type_operations mdd_device_type_ops = {
2243         .ldto_init = mdd_type_init,
2244         .ldto_fini = mdd_type_fini,
2245
2246         .ldto_device_alloc = mdd_device_alloc,
2247         .ldto_device_free  = mdd_device_free,
2248
2249         .ldto_device_init    = mdd_device_init,
2250         .ldto_device_fini    = mdd_device_fini
2251 };
2252
2253 static struct lu_device_type mdd_device_type = {
2254         .ldt_tags     = LU_DEVICE_MD,
2255         .ldt_name     = LUSTRE_MDD_NAME,
2256         .ldt_ops      = &mdd_device_type_ops,
2257         .ldt_ctx_tags = LCT_MD_THREAD
2258 };
2259
2260 static void *mdd_key_init(const struct lu_context *ctx,
2261                           struct lu_context_key *key)
2262 {
2263         struct mdd_thread_info *info;
2264
2265         OBD_ALLOC_PTR(info);
2266         if (info == NULL)
2267                 info = ERR_PTR(-ENOMEM);
2268         return info;
2269 }
2270
2271 static void mdd_key_fini(const struct lu_context *ctx,
2272                          struct lu_context_key *key, void *data)
2273 {
2274         struct mdd_thread_info *info = data;
2275         OBD_FREE_PTR(info);
2276 }
2277
2278 static struct lu_context_key mdd_thread_key = {
2279         .lct_tags = LCT_MD_THREAD,
2280         .lct_init = mdd_key_init,
2281         .lct_fini = mdd_key_fini
2282 };
2283
2284 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
2285         { 0 }
2286 };
2287
2288 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
2289         { 0 }
2290 };
2291
2292 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
2293
2294 static int __init mdd_mod_init(void)
2295 {
2296         struct lprocfs_static_vars lvars;
2297         printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
2298         lprocfs_init_vars(mdd, &lvars);
2299         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
2300                                    LUSTRE_MDD_NAME, &mdd_device_type);
2301 }
2302
2303 static void __exit mdd_mod_exit(void)
2304 {
2305         class_unregister_type(LUSTRE_MDD_NAME);
2306 }
2307
2308 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2309 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
2310 MODULE_LICENSE("GPL");
2311
2312 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);