Whamcloud - gitweb
25a3478c2e9abcf2e5365394c791cad1048b858e
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43
44 #include "mdd_internal.h"
45
46
47 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
48                                        struct mdd_device *);
49 static void mdd_trans_stop(const struct lu_context *ctxt,
50                            struct mdd_device *mdd, int rc,
51                            struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
54                           struct thandle *handle);
55 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
56                           struct thandle *handle);
57 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
58                       const char *name, struct lu_fid* fid);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 static const char *mdd_root_dir_name = "root";
67 static const char dot[] = ".";
68 static const char dotdot[] = "..";
69
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdd_obj;
85
86         OBD_ALLOC_PTR(mdd_obj);
87         if (mdd_obj != NULL) {
88                 struct lu_object *o;
89                 
90                 o = mdd2lu_obj(mdd_obj);
91                 lu_object_init(o, NULL, d);
92                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 atomic_set(&mdd_obj->mod_count, 0);
95                 o->lo_ops = &mdd_lu_obj_ops;
96                 return o;
97         } else {
98                 return NULL;
99         }
100 }
101
102 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 {
104         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
105         struct lu_object  *below;
106         struct lu_device  *under;
107         ENTRY;
108
109         under = &d->mdd_child->dd_lu_dev;
110         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
111
112         if (below == NULL)
113                 RETURN(-ENOMEM);
114
115         lu_object_add(o, below);
116         RETURN(0);
117 }
118
119 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj);
120
121 static int mdd_object_start(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         if (lu_object_exists(o))
124                 return mdd_get_flags(ctxt, lu2mdd_obj(o));
125         else
126                 return 0;
127 }
128
129 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
130 {
131         struct mdd_object *mdd = lu2mdd_obj(o);
132         
133         lu_object_fini(o);
134         OBD_FREE_PTR(mdd);
135 }
136
137 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
138                                    struct mdd_device *d,
139                                    const struct lu_fid *f)
140 {
141         struct lu_object *o;
142         struct mdd_object *m;
143         ENTRY;
144
145         o = lu_object_find(ctxt, d->mdd_md_dev.md_lu_dev.ld_site, f);
146         if (IS_ERR(o))
147                 m = (struct mdd_object *)o;
148         else
149                 m = lu2mdd_obj(lu_object_locate(o->lo_header,
150                                  d->mdd_md_dev.md_lu_dev.ld_type));
151         RETURN(m);
152 }
153
154 static inline void mdd_object_put(const struct lu_context *ctxt,
155                                   struct mdd_object *o)
156 {
157         lu_object_put(ctxt, &o->mod_obj.mo_lu);
158 }
159
160 static inline int mdd_is_immutable(struct mdd_object *obj)
161 {
162         return obj->mod_flags & IMMUTE_OBJ;
163 }
164
165 static inline int mdd_is_append(struct mdd_object *obj)
166 {
167         return obj->mod_flags & APPEND_OBJ;
168 }
169
170 static void mdd_set_dead_obj(struct mdd_object *obj)
171 {
172         if (obj)
173                 obj->mod_flags |= DEAD_OBJ;
174 }
175
176 static int mdd_is_dead_obj(struct mdd_object *obj)
177 {
178         return obj && obj->mod_flags & DEAD_OBJ;
179 }
180
181 /*Check whether it may create the cobj under the pobj*/
182 static int mdd_may_create(const struct lu_context *ctxt,
183                           struct mdd_object *pobj, struct mdd_object *cobj)
184 {
185         ENTRY;
186         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
187                 RETURN(-EEXIST);
188
189         if (mdd_is_dead_obj(pobj))
190                 RETURN(-ENOENT);
191
192         /*check pobj may create or not*/
193         RETURN(0);
194 }
195
196 static inline int __mdd_la_get(const struct lu_context *ctxt,
197                                struct mdd_object *obj, struct lu_attr *la)
198 {
199         struct dt_object  *next = mdd_object_child(obj);
200         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
201         return next->do_ops->do_attr_get(ctxt, next, la);
202 }
203
204 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
205 {
206         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
207
208         if (flags & LUSTRE_APPEND_FL)
209                 obj->mod_flags |= APPEND_OBJ;
210
211         if (flags & LUSTRE_IMMUTABLE_FL)
212                 obj->mod_flags |= IMMUTE_OBJ;
213 }
214
215 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj)
216 {
217         struct lu_attr *la = &mdd_ctx_info(ctxt)->mti_la;
218         int rc;
219
220         ENTRY;
221         mdd_read_lock(ctxt, obj);
222         rc = __mdd_la_get(ctxt, obj, la);
223         mdd_read_unlock(ctxt, obj);
224         if (rc == 0)
225                 mdd_flags_xlate(obj, la->la_flags);
226         RETURN(rc);
227 }
228
229 /*Check whether it may delete the cobj under the pobj*/
230 static int mdd_may_delete(const struct lu_context *ctxt,
231                           struct mdd_object *pobj, struct mdd_object *cobj,
232                           int is_dir)
233 {
234         struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
235         int rc = 0;
236         ENTRY;
237
238         LASSERT(cobj && pobj);
239
240         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
241                 RETURN(-ENOENT);
242
243         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
244                 RETURN(-EPERM);
245
246         if (is_dir) {
247                 if (!S_ISDIR(mdd_object_type(cobj)))
248                         RETURN(-ENOTDIR);
249
250                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
251                         RETURN(-EBUSY);
252
253         } else if (S_ISDIR(mdd_object_type(cobj)))
254                         RETURN(-EISDIR);
255
256         if (mdd_is_dead_obj(pobj))
257                 RETURN(-ENOENT);
258
259         RETURN(rc);
260 }
261 /* get only inode attributes */
262 static int __mdd_iattr_get(const struct lu_context *ctxt,
263                            struct mdd_object *mdd_obj, struct md_attr *ma)
264 {
265         int rc = 0;
266         ENTRY;
267
268         rc = __mdd_la_get(ctxt, mdd_obj, &ma->ma_attr);
269         if (rc == 0)
270                 ma->ma_valid = MA_INODE;
271         RETURN(rc);
272 }
273 /* get lov EA only */
274 static int __mdd_lmm_get(const struct lu_context *ctxt,
275                          struct mdd_object *mdd_obj, struct md_attr *ma)
276 {
277         int rc;
278
279         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
280         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, 0, 
281                         MDS_LOV_MD_NAME);
282         if (rc > 0) {
283                 ma->ma_valid |= MA_LOV;
284                 rc = 0;
285         }
286         RETURN(rc);
287 }
288
289 #ifdef HAVE_SPLIT_SUPPORT
290 /* get lmv EA only*/
291 static int __mdd_lmv_get(const struct lu_context *ctxt,
292                          struct mdd_object *mdd_obj, struct md_attr *ma)
293 {
294         int rc;
295
296         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size, 0,
297                         MDS_LMV_MD_NAME);
298         if (rc > 0) {
299                 ma->ma_valid |= MA_LMV;
300                 rc = 0;
301         }
302         RETURN(rc);
303 }
304 #endif
305
306 static int mdd_attr_get_internal(const struct lu_context *ctxt,
307                                  struct mdd_object *mdd_obj,
308                                  struct md_attr *ma)
309 {
310         int rc = 0;
311         ENTRY;
312
313         if (ma->ma_need & MA_INODE)
314                 rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
315
316         if (rc == 0 && ma->ma_need & MA_LOV) {
317                 if (S_ISREG(mdd_object_type(mdd_obj)) || 
318                     S_ISDIR(mdd_object_type(mdd_obj)))
319                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
320         }
321 #ifdef HAVE_SPLIT_SUPPORT 
322         if (rc == 0 && ma->ma_need & MA_LMV) {
323                 if (S_ISDIR(mdd_object_type(mdd_obj)))
324                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
325         }
326 #endif        
327         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
328                         rc, ma->ma_valid);
329         RETURN(rc);
330 }
331
332 static inline int mdd_attr_get_internal_locked(const struct lu_context *ctxt,
333                                                struct mdd_object *mdd_obj,
334                                                struct md_attr *ma)
335 {
336         int rc;
337         mdd_read_lock(ctxt, mdd_obj);
338         rc = mdd_attr_get_internal(ctxt, mdd_obj, ma);
339         mdd_read_unlock(ctxt, mdd_obj);
340         return rc;
341 }
342
343 static int mdd_attr_get(const struct lu_context *ctxt,
344                         struct md_object *obj, struct md_attr *ma)
345 {
346         struct mdd_object *mdd_obj = md2mdd_obj(obj);
347         int                rc;
348
349         ENTRY;
350         rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
351         RETURN(rc);
352 }
353
354 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
355                          void *buf, int buf_len, const char *name)
356 {
357         struct mdd_object *mdd_obj = md2mdd_obj(obj);
358         struct dt_object  *next;
359         int rc;
360
361         ENTRY;
362
363         LASSERT(lu_object_exists(&obj->mo_lu));
364
365         next = mdd_object_child(mdd_obj);
366         mdd_read_lock(ctxt, mdd_obj);
367         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
368         mdd_read_unlock(ctxt, mdd_obj);
369
370         RETURN(rc);
371 }
372
373 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
374                         void *buf, int buf_len)
375 {
376         struct mdd_object *mdd_obj = md2mdd_obj(obj);
377         struct dt_object  *next;
378         loff_t             pos = 0;
379         int                rc;
380         ENTRY;
381
382         LASSERT(lu_object_exists(&obj->mo_lu));
383
384         next = mdd_object_child(mdd_obj);
385         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
386         RETURN(rc);
387 }
388 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
389                           void *buf, int buf_len)
390 {
391         struct mdd_object *mdd_obj = md2mdd_obj(obj);
392         struct dt_object  *next;
393         int rc;
394
395         ENTRY;
396
397         LASSERT(lu_object_exists(&obj->mo_lu));
398
399         next = mdd_object_child(mdd_obj);
400         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
401
402         RETURN(rc);
403 }
404
405 enum mdd_txn_op {
406         MDD_TXN_OBJECT_DESTROY_OP,
407         MDD_TXN_OBJECT_CREATE_OP,
408         MDD_TXN_ATTR_SET_OP,
409         MDD_TXN_XATTR_SET_OP,
410         MDD_TXN_INDEX_INSERT_OP,
411         MDD_TXN_INDEX_DELETE_OP,
412         MDD_TXN_LINK_OP,
413         MDD_TXN_UNLINK_OP,
414         MDD_TXN_RENAME_OP,
415         MDD_TXN_CREATE_DATA_OP,
416         MDD_TXN_MKDIR_OP
417 };
418
419 struct mdd_txn_op_descr {
420         enum mdd_txn_op mod_op;
421         unsigned int    mod_credits;
422 };
423
424 enum {
425         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
426         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
427         MDD_TXN_ATTR_SET_CREDITS       = 20,
428         MDD_TXN_XATTR_SET_CREDITS      = 20,
429         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
430         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
431         MDD_TXN_LINK_CREDITS           = 20,
432         MDD_TXN_UNLINK_CREDITS         = 20,
433         MDD_TXN_RENAME_CREDITS         = 20,
434         MDD_TXN_CREATE_DATA_CREDITS    = 20,
435         MDD_TXN_MKDIR_CREDITS          = 20
436 };
437
438 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
439 static const struct mdd_txn_op_descr opname = { \
440         .mod_op      = opname ## _OP,           \
441         .mod_credits = opname ## _CREDITS,      \
442 }
443
444 /*
445  * number of blocks to reserve for particular operations. Should be function
446  * of ... something. Stub for now.
447  */
448 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
449 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
450 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
451 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
452 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
453 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
454 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
455 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
456 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
457 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
458 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
459
460 static void mdd_txn_param_build(const struct lu_context *ctx,
461                                 const struct mdd_txn_op_descr *opd)
462 {
463         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
464 }
465
466 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
467                             lu_printer_t p, const struct lu_object *o)
468 {
469         return (*p)(ctxt, cookie, LUSTRE_MDD0_NAME"-object@%p", o);
470 }
471
472 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
473 {
474         int rc;
475         struct dt_object *root;
476         ENTRY;
477
478         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
479                              &mdd->mdd_root_fid);
480         if (!IS_ERR(root)) {
481                 LASSERT(root != NULL);
482                 lu_object_put(ctx, &root->do_lu);
483                 rc = orph_index_init(ctx, mdd);
484         } else
485                 rc = PTR_ERR(root);
486
487         RETURN(rc);
488 }
489
490 static int mdd_device_init(const struct lu_context *ctx,
491                            struct lu_device *d, struct lu_device *next)
492 {
493         struct mdd_device *mdd = lu2mdd_dev(d);
494         int rc = 0;
495         ENTRY;
496
497         mdd->mdd_child = lu2dt_dev(next);
498
499         RETURN(rc);
500 }
501
502 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
503                                          struct lu_device *d)
504 {
505         struct mdd_device *m = lu2mdd_dev(d);
506         struct lu_device *next = &m->mdd_child->dd_lu_dev;
507
508         return next;
509 }
510 static void mdd_device_shutdown(const struct lu_context *ctxt,
511                                 struct mdd_device *m)
512 {
513         if (m->mdd_obd_dev)
514                 mdd_fini_obd(ctxt, m);
515         orph_index_fini(ctxt, m);
516 }
517
518 static int mdd_process_config(const struct lu_context *ctxt,
519                               struct lu_device *d, struct lustre_cfg *cfg)
520 {
521         struct mdd_device *m    = lu2mdd_dev(d);
522         struct dt_device  *dt   = m->mdd_child;
523         struct lu_device  *next = &dt->dd_lu_dev;
524         char              *dev = lustre_cfg_string(cfg, 0);
525         int rc;
526
527         switch (cfg->lcfg_command) {
528         case LCFG_SETUP:
529                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
530                 if (rc)
531                         GOTO(out, rc);
532                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
533
534                 rc = mdd_mount(ctxt, m);
535                 if (rc)
536                         GOTO(out, rc);
537                 rc = mdd_init_obd(ctxt, m, dev);
538                 if (rc) {
539                         CERROR("lov init error %d \n", rc);
540                         GOTO(out, rc);
541                 }
542                 break;
543         case LCFG_CLEANUP:
544                 mdd_device_shutdown(ctxt, m);
545         default:
546                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
547                 break;
548         }
549 out:
550         RETURN(rc);
551 }
552
553 static int mdd_recovery_complete(const struct lu_context *ctxt,
554                                  struct lu_device *d)
555 {
556         struct mdd_device *mdd = lu2mdd_dev(d);
557         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
558         int rc;
559         ENTRY;
560 /* TODO:
561         rc = mdd_lov_set_nextid(ctx, mdd);
562         if (rc) {
563                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
564                        obd->obd_name, rc);
565                 GOTO(out, rc);
566         }
567         rc = mdd_cleanup_unlink_llog(ctx, mdd);
568
569         obd_notify(obd->u.mds.mds_osc_obd, NULL,
570                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
571                    OBD_NOTIFY_SYNC, NULL);
572 */
573         /* TODO: orphans handling */
574         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
575         RETURN(rc);
576 }
577
578 struct lu_device_operations mdd_lu_ops = {
579         .ldo_object_alloc      = mdd_object_alloc,
580         .ldo_process_config    = mdd_process_config,
581         .ldo_recovery_complete = mdd_recovery_complete
582 };
583
584 static struct lu_object_operations mdd_lu_obj_ops = {
585         .loo_object_init    = mdd_object_init,
586         .loo_object_start   = mdd_object_start,
587         .loo_object_free    = mdd_object_free,
588         .loo_object_print   = mdd_object_print
589 };
590
591 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
592 {
593         struct dt_object  *next = mdd_object_child(obj);
594
595         next->do_ops->do_write_lock(ctxt, next);
596 }
597
598 void mdd_read_lock(const struct lu_context *ctxt, struct mdd_object *obj)
599 {
600         struct dt_object  *next = mdd_object_child(obj);
601
602         next->do_ops->do_read_lock(ctxt, next);
603 }
604
605 void mdd_write_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
606 {
607         struct dt_object  *next = mdd_object_child(obj);
608
609         next->do_ops->do_write_unlock(ctxt, next);
610 }
611
612 void mdd_read_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
613 {
614         struct dt_object  *next = mdd_object_child(obj);
615
616         next->do_ops->do_read_unlock(ctxt, next);
617 }
618
619 static void mdd_lock2(const struct lu_context *ctxt,
620                       struct mdd_object *o0, struct mdd_object *o1)
621 {
622         mdd_write_lock(ctxt, o0);
623         mdd_write_lock(ctxt, o1);
624 }
625
626 static void mdd_unlock2(const struct lu_context *ctxt,
627                         struct mdd_object *o0, struct mdd_object *o1)
628 {
629         mdd_write_unlock(ctxt, o1);
630         mdd_write_unlock(ctxt, o0);
631 }
632
633 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
634                                        struct mdd_device *mdd)
635 {
636         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
637
638         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
639 }
640
641 static void mdd_trans_stop(const struct lu_context *ctxt,
642                            struct mdd_device *mdd, int result,
643                            struct thandle *handle)
644 {
645         handle->th_result = result;
646         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
647 }
648
649 static int __mdd_object_create(const struct lu_context *ctxt,
650                                struct mdd_object *obj, struct md_attr *ma,
651                                struct thandle *handle)
652 {
653         struct dt_object *next;
654         struct lu_attr *attr = &ma->ma_attr;
655         int rc;
656         ENTRY;
657
658         if (!lu_object_exists(mdd2lu_obj(obj))) {
659                 next = mdd_object_child(obj);
660                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
661         } else
662                 rc = -EEXIST;
663
664         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
665
666         RETURN(rc);
667 }
668
669 int mdd_attr_set_internal(const struct lu_context *ctxt, struct mdd_object *o,
670                           const struct lu_attr *attr, struct thandle *handle)
671 {
672         struct dt_object *next;
673
674         LASSERT(lu_object_exists(mdd2lu_obj(o)));
675         next = mdd_object_child(o);
676         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
677 }
678
679 int mdd_attr_set_internal_locked(const struct lu_context *ctxt,
680                                  struct mdd_object *o,
681                                  const struct lu_attr *attr,
682                                  struct thandle *handle)
683 {
684         int rc;
685         mdd_write_lock(ctxt, o);
686         rc = mdd_attr_set_internal(ctxt, o, attr, handle);
687         mdd_write_unlock(ctxt, o);
688         return rc;
689 }
690
691 static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
692                            const void *buf, int buf_len, const char *name,
693                            int fl, struct thandle *handle)
694 {
695         struct dt_object *next;
696         int rc = 0;
697         ENTRY;
698
699         LASSERT(lu_object_exists(mdd2lu_obj(o)));
700         next = mdd_object_child(o);
701         if (buf && buf_len > 0) {
702                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
703                                                 0, handle);
704         }else if (buf == NULL && buf_len == 0) {
705                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
706         }
707         RETURN(rc);
708 }
709 /* this gives the same functionality as the code between
710  * sys_chmod and inode_setattr
711  * chown_common and inode_setattr
712  * utimes and inode_setattr
713  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
714  * and port to
715  */
716 int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj,
717                  const struct md_attr *ma, struct lu_attr *la)
718 {
719         struct lu_attr   *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
720         time_t            now = CURRENT_SECONDS;
721         int               rc;
722         ENTRY;
723
724         rc = __mdd_la_get(ctxt, obj, tmp_la);
725         if (rc)
726                 RETURN(rc);
727         /*XXX Check permission */
728         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
729
730                 /*If only change flags of the object, we should
731                  * let it pass, but also need capability check
732                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
733                  * fix it, when implement capable in mds*/
734                 if (la->la_valid & ~LA_FLAGS)
735                         RETURN(-EPERM);
736
737                 /*According to Ext3 implementation on this, the
738                  *Ctime will be changed, but not clear why?*/
739                 la->la_ctime = now;
740                 la->la_valid |= LA_CTIME;
741                 RETURN(0);
742         }
743         if (!(la->la_valid & LA_CTIME)) {
744                 la->la_ctime = now;
745                 la->la_valid |= LA_CTIME;
746         }
747
748 #if 0
749         /* times */
750         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
751                 if (current->fsuid != inode->i_uid &&
752                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
753                         RETURN(error);
754         }
755         if (ia_valid & ATTR_SIZE &&
756             /* NFSD hack for open(O_CREAT|O_TRUNC)=mknod+truncate (bug 5781) */
757             !(rec->ur_uc.luc_fsuid == inode->i_uid &&
758               ia_valid & MDS_OPEN_OWNEROVERRIDE)) {
759                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
760                         RETURN(error);
761         }
762 #endif
763
764         if (la->la_valid & (LA_UID | LA_GID)) {
765                 /* chown */
766
767                 if (mdd_is_immutable(obj) || mdd_is_append(obj))
768                         RETURN(-EPERM);
769                 if (la->la_uid == (uid_t) -1)
770                         la->la_uid = tmp_la->la_uid;
771                 if (la->la_gid == (gid_t) -1)
772                         la->la_gid = tmp_la->la_gid;
773                 if (!(la->la_valid & LA_MODE))
774                         la->la_mode = tmp_la->la_mode;
775                 /*
776                  * If the user or group of a non-directory has been
777                  * changed by a non-root user, remove the setuid bit.
778                  * 19981026 David C Niemi <niemi@tux.org>
779                  *
780                  * Changed this to apply to all users, including root,
781                  * to avoid some races. This is the behavior we had in
782                  * 2.0. The check for non-root was definitely wrong
783                  * for 2.2 anyway, as it should have been using
784                  * CAP_FSETID rather than fsuid -- 19990830 SD.
785                  */
786                 if ((tmp_la->la_mode & S_ISUID) == S_ISUID &&
787                     !S_ISDIR(tmp_la->la_mode)) {
788                         la->la_mode &= ~S_ISUID;
789                         la->la_valid |= LA_MODE;
790                 }
791                 /*
792                  * Likewise, if the user or group of a non-directory
793                  * has been changed by a non-root user, remove the
794                  * setgid bit UNLESS there is no group execute bit
795                  * (this would be a file marked for mandatory
796                  * locking).  19981026 David C Niemi <niemi@tux.org>
797                  *
798                  * Removed the fsuid check (see the comment above) --
799                  * 19990830 SD.
800                  */
801                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
802                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
803                         la->la_mode &= ~S_ISGID;
804                         la->la_valid |= LA_MODE;
805                 }
806         } else if (la->la_valid & LA_MODE) {
807                 int mode = la->la_mode;
808                 /* chmod */
809                 if (la->la_mode == (umode_t)-1)
810                         mode = tmp_la->la_mode;
811                 la->la_mode =
812                         (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO);
813         }
814         RETURN(rc);
815 }
816
817
818 /* set attr and LOV EA at once, return updated attr */
819 static int mdd_attr_set(const struct lu_context *ctxt,
820                         struct md_object *obj,
821                         const struct md_attr *ma)
822 {
823         struct mdd_object *mdd_obj = md2mdd_obj(obj);
824         struct mdd_device *mdd = mdo2mdd(obj);
825         struct thandle *handle;
826         struct lov_mds_md *lmm = NULL;
827         int  rc = 0, lmm_size = 0, max_size;
828         struct lu_attr *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
829         ENTRY;
830
831         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
832         handle = mdd_trans_start(ctxt, mdd);
833         if (IS_ERR(handle))
834                 RETURN(PTR_ERR(handle));
835         /*TODO: add lock here*/
836         /* start a log jounal handle if needed */
837         if (S_ISREG(mdd_object_type(mdd_obj)) &&
838             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
839                 max_size = mdd_lov_mdsize(ctxt, mdd);
840                 OBD_ALLOC(lmm, max_size);
841                 if (lmm == NULL)
842                         GOTO(cleanup, rc = -ENOMEM);
843
844                 rc = mdd_get_md(ctxt, mdd_obj, lmm, &lmm_size, 1, 
845                                 MDS_LOV_MD_NAME);
846
847                 if (rc < 0)
848                         GOTO(cleanup, rc);
849         }
850
851         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
852                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
853                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
854
855         *la_copy = ma->ma_attr;
856         mdd_write_lock(ctxt, mdd_obj);
857         rc = mdd_fix_attr(ctxt, mdd_obj, ma, la_copy);
858         mdd_write_unlock(ctxt, mdd_obj);
859         if (rc)
860                 GOTO(cleanup, rc);
861
862         if (la_copy->la_valid & LA_FLAGS) {
863                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
864                                                   handle);
865                 if (rc == 0)
866                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
867         } else if (la_copy->la_valid) {            /* setattr */
868                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
869                                                   handle);
870                 /* journal chown/chgrp in llog, just like unlink */
871                 if (rc == 0 && lmm_size){
872                         /*TODO set_attr llog */
873                 }
874         }
875
876         if (rc == 0 && ma->ma_valid & MA_LOV) {
877                 umode_t mode;
878
879                 mode = mdd_object_type(mdd_obj);
880                 if (S_ISREG(mode) || S_ISDIR(mode)) {
881                         /*TODO check permission*/
882                         rc = mdd_lov_set_md(ctxt, NULL, mdd_obj, ma->ma_lmm,
883                                             ma->ma_lmm_size, handle, 1);
884                 }
885
886         }
887 cleanup:
888         mdd_trans_stop(ctxt, mdd, rc, handle);
889         if (rc == 0 && lmm_size) {
890                 /*set obd attr, if needed*/
891                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
892         }
893         if (lmm != NULL) {
894                 OBD_FREE(lmm, max_size);
895         }
896
897         RETURN(rc);
898 }
899
900 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct mdd_object *obj,
901                       const void *buf, int buf_len, const char *name, int fl,
902                       struct thandle *handle)
903 {
904         int  rc;
905         ENTRY;
906
907         mdd_write_lock(ctxt, obj);
908         rc = __mdd_xattr_set(ctxt, obj, buf, buf_len, name, fl, handle);
909         mdd_write_unlock(ctxt, obj);
910
911         RETURN(rc);
912 }
913
914 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
915                          const void *buf, int buf_len, const char *name,
916                          int fl)
917 {
918         struct mdd_device *mdd = mdo2mdd(obj);
919         struct thandle *handle;
920         int  rc;
921         ENTRY;
922
923         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
924         handle = mdd_trans_start(ctxt, mdd);
925         if (IS_ERR(handle))
926                 RETURN(PTR_ERR(handle));
927
928         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
929                                fl, handle);
930
931         mdd_trans_stop(ctxt, mdd, rc, handle);
932
933         RETURN(rc);
934 }
935
936 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
937                            struct mdd_object *obj,
938                            const char *name, struct thandle *handle)
939 {
940         struct dt_object *next;
941
942         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
943         next = mdd_object_child(obj);
944         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
945 }
946
947 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
948                   const char *name)
949 {
950         struct mdd_object *mdd_obj = md2mdd_obj(obj);
951         struct mdd_device *mdd = mdo2mdd(obj);
952         struct thandle *handle;
953         int  rc;
954         ENTRY;
955
956         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
957         handle = mdd_trans_start(ctxt, mdd);
958         if (IS_ERR(handle))
959                 RETURN(PTR_ERR(handle));
960
961         mdd_write_lock(ctxt, mdd_obj);
962         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
963         mdd_write_unlock(ctxt, mdd_obj);
964
965         mdd_trans_stop(ctxt, mdd, rc, handle);
966
967         RETURN(rc);
968 }
969
970 static int __mdd_index_insert_only(const struct lu_context *ctxt,
971                                    struct mdd_object *pobj,
972                                    const struct lu_fid *lf,
973                                    const char *name, struct thandle *th)
974 {
975         int rc;
976         struct dt_object *next = mdd_object_child(pobj);
977         ENTRY;
978
979         if (dt_try_as_dir(ctxt, next))
980                 rc = next->do_index_ops->dio_insert(ctxt, next,
981                                          (struct dt_rec *)lf,
982                                          (struct dt_key *)name, th);
983         else
984                 rc = -ENOTDIR;
985         RETURN(rc);
986 }
987
988 /* insert new index, add reference if isdir, update times */
989 static int __mdd_index_insert(const struct lu_context *ctxt,
990                              struct mdd_object *pobj, const struct lu_fid *lf,
991                              const char *name, int isdir, struct thandle *th)
992 {
993         int rc;
994         struct dt_object *next = mdd_object_child(pobj);
995         ENTRY;
996
997 #if 0
998         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
999 #endif
1000
1001         if (dt_try_as_dir(ctxt, next))
1002                 rc = next->do_index_ops->dio_insert(ctxt, next,
1003                                          (struct dt_rec *)lf,
1004                                          (struct dt_key *)name, th);
1005         else
1006                 rc = -ENOTDIR;
1007
1008         if (rc == 0) {
1009                 if (isdir)
1010                         __mdd_ref_add(ctxt, pobj, th);
1011 #if 0
1012                 la->la_valid = LA_MTIME|LA_CTIME;
1013                 la->la_atime = ma->ma_attr.la_atime;
1014                 la->la_ctime = ma->ma_attr.la_ctime;
1015                 rc = mdd_attr_set_internal(ctxt, mdd_obj, la, handle);
1016 #endif
1017         }
1018         return rc;
1019 }
1020
1021 static int __mdd_index_delete(const struct lu_context *ctxt,
1022                               struct mdd_object *pobj, const char *name,
1023                               struct thandle *handle)
1024 {
1025         int rc;
1026         struct dt_object *next = mdd_object_child(pobj);
1027         ENTRY;
1028
1029         if (dt_try_as_dir(ctxt, next))
1030                 rc = next->do_index_ops->dio_delete(ctxt, next,
1031                                         (struct dt_key *)name, handle);
1032         else
1033                 rc = -ENOTDIR;
1034         RETURN(rc);
1035 }
1036
1037 static int mdd_link_sanity_check(const struct lu_context *ctxt,
1038                                  struct mdd_object *tgt_obj,
1039                                  struct mdd_object *src_obj)
1040 {
1041         int rc;
1042
1043         rc = mdd_may_create(ctxt, tgt_obj, NULL);
1044         if (rc)
1045                 RETURN(rc);
1046         if (S_ISDIR(mdd_object_type(src_obj)))
1047                 RETURN(-EPERM);
1048
1049         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1050                 RETURN(-EPERM);
1051
1052         RETURN(rc);
1053 }
1054
1055 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
1056                     struct md_object *src_obj, const char *name)
1057 {
1058         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1059         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1060         struct mdd_device *mdd = mdo2mdd(src_obj);
1061         struct thandle *handle;
1062         int rc;
1063         ENTRY;
1064
1065         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
1066         handle = mdd_trans_start(ctxt, mdd);
1067         if (IS_ERR(handle))
1068                 RETURN(PTR_ERR(handle));
1069
1070         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
1071
1072         rc = mdd_link_sanity_check(ctxt, mdd_tobj, mdd_sobj);
1073         if (rc)
1074                 GOTO(out, rc);
1075
1076         rc = __mdd_index_insert_only(ctxt, mdd_tobj, mdo2fid(mdd_sobj),
1077                                      name, handle);
1078         if (rc == 0)
1079                 __mdd_ref_add(ctxt, mdd_sobj, handle);
1080
1081 out:
1082         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
1083         mdd_trans_stop(ctxt, mdd, rc, handle);
1084         RETURN(rc);
1085 }
1086
1087 /*
1088  * Check that @dir contains no entries except (possibly) dot and dotdot.
1089  *
1090  * Returns:
1091  *
1092  *             0        empty
1093  *    -ENOTEMPTY        not empty
1094  *           -ve        other error
1095  *
1096  */
1097 static int mdd_dir_is_empty(const struct lu_context *ctx,
1098                             struct mdd_object *dir)
1099 {
1100         struct dt_it     *it;
1101         struct dt_object *obj;
1102         struct dt_it_ops *iops;
1103         int result;
1104
1105         obj = mdd_object_child(dir);
1106         iops = &obj->do_index_ops->dio_it;
1107         it = iops->init(ctx, obj);
1108         if (it != NULL) {
1109                 result = iops->get(ctx, it, (const void *)"");
1110                 if (result > 0) {
1111                         int i;
1112                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1113                                 result = iops->next(ctx, it);
1114                         iops->put(ctx, it);
1115                         if (result == 0)
1116                                 result = -ENOTEMPTY;
1117                         else if (result == +1)
1118                                 result = 0;
1119                 } else if (result == 0)
1120                         /*
1121                          * Huh? Index contains no zero key?
1122                          */
1123                         result = -EIO;
1124                 iops->fini(ctx, it);
1125         } else
1126                 result = -ENOMEM;
1127         return result;
1128 }
1129
1130 /* return md_attr back,
1131  * if it is last unlink then return lov ea + llog cookie*/
1132 static inline int __mdd_object_kill(const struct lu_context *ctxt,
1133                                     struct mdd_object *obj,
1134                                     struct md_attr *ma)
1135 {
1136         int rc = 0;
1137
1138         mdd_set_dead_obj(obj);
1139         if (S_ISREG(mdd_object_type(obj))) {
1140                 rc = __mdd_lmm_get(ctxt, obj, ma);
1141                 if (ma->ma_valid & MA_LOV)
1142                         rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
1143                                             obj, ma);
1144         }
1145         return rc;
1146 }
1147
1148 static int __mdd_finish_unlink(const struct lu_context *ctxt,
1149                                struct mdd_object *obj, struct md_attr *ma,
1150                                struct thandle *th)
1151 {
1152         int rc;
1153         ENTRY;
1154
1155         rc = __mdd_iattr_get(ctxt, obj, ma);
1156         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1157                 if (atomic_read(&obj->mod_count) == 0) {
1158                         rc = __mdd_object_kill(ctxt, obj, ma);
1159                 } else {
1160                         /* add new orphan */
1161                         rc = __mdd_orphan_add(ctxt, obj, th);
1162                 }
1163         }
1164         RETURN(rc);
1165 }
1166
1167 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
1168                                    struct mdd_object *pobj,
1169                                    struct mdd_object *cobj,
1170                                    struct md_attr *ma)
1171 {
1172         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1173         int rc = 0;
1174         ENTRY;
1175
1176         rc = mdd_may_delete(ctxt, pobj, cobj, S_ISDIR(ma->ma_attr.la_mode));
1177         if (rc)
1178                 RETURN(rc);
1179
1180         if (S_ISDIR(mdd_object_type(cobj)) &&
1181             dt_try_as_dir(ctxt, dt_cobj)) {
1182                 rc = mdd_dir_is_empty(ctxt, cobj);
1183                 if (rc != 0)
1184                         RETURN(rc);
1185         }
1186
1187         RETURN(rc);
1188 }
1189
1190 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
1191                       struct md_object *cobj, const char *name,
1192                       struct md_attr *ma)
1193 {
1194         struct mdd_device *mdd = mdo2mdd(pobj);
1195         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1196         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1197         struct thandle    *handle;
1198         int rc;
1199         ENTRY;
1200
1201         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
1202         handle = mdd_trans_start(ctxt, mdd);
1203         if (IS_ERR(handle))
1204                 RETURN(PTR_ERR(handle));
1205
1206         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
1207
1208         rc = mdd_unlink_sanity_check(ctxt, mdd_pobj, mdd_cobj, ma);
1209         if (rc)
1210                 GOTO(cleanup, rc);
1211
1212
1213         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1214         if (rc)
1215                 GOTO(cleanup, rc);
1216
1217         __mdd_ref_del(ctxt, mdd_cobj, handle);
1218         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1219                 /* unlink dot */
1220                 __mdd_ref_del(ctxt, mdd_cobj, handle);
1221                 /* unlink dotdot */
1222                 __mdd_ref_del(ctxt, mdd_pobj, handle);
1223         }
1224
1225         rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma, handle);
1226
1227 cleanup:
1228         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
1229         mdd_trans_stop(ctxt, mdd, rc, handle);
1230         RETURN(rc);
1231 }
1232
1233 static int mdd_parent_fid(const struct lu_context *ctxt,
1234                           struct mdd_object *obj,
1235                           struct lu_fid *fid)
1236 {
1237         return mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
1238 }
1239
1240 /*
1241  * return 0: if p2 is the parent of p1
1242  * otherwise: other_value
1243  */
1244 static int mdd_is_parent(const struct lu_context *ctxt,
1245                          struct mdd_device *mdd,
1246                          struct mdd_object *p1,
1247                          struct mdd_object *p2)
1248 {
1249         struct lu_fid * pfid;
1250         struct mdd_object *parent = NULL;
1251         int rc;
1252         ENTRY;
1253
1254         pfid = &mdd_ctx_info(ctxt)->mti_fid;
1255         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1256                 RETURN(1);
1257         for(;;) {
1258                 rc = mdd_parent_fid(ctxt, p1, pfid);
1259                 if (rc)
1260                         GOTO(out, rc);
1261                 if (lu_fid_eq(pfid, mdo2fid(p2)))
1262                         GOTO(out, rc = 0);
1263                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1264                         GOTO(out, rc = 1);
1265                 if (parent)
1266                         mdd_object_put(ctxt, parent);
1267                 parent = mdd_object_find(ctxt, mdd, pfid);
1268                 if (IS_ERR(parent))
1269                         GOTO(out, rc = PTR_ERR(parent));
1270                 p1 = parent;
1271         }
1272 out:
1273         if (parent && !IS_ERR(parent))
1274                 mdd_object_put(ctxt, parent);
1275         RETURN(rc);
1276 }
1277
1278 static int mdd_rename_lock(const struct lu_context *ctxt,
1279                            struct mdd_device *mdd,
1280                            struct mdd_object *src_pobj,
1281                            struct mdd_object *tgt_pobj)
1282 {
1283         ENTRY;
1284
1285         if (src_pobj == tgt_pobj) {
1286                 mdd_write_lock(ctxt, src_pobj);
1287                 RETURN(0);
1288         }
1289         /*compared the parent child relationship of src_p&tgt_p*/
1290         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1291                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
1292                 RETURN(0);
1293         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1294                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1295                 RETURN(0);
1296         }
1297
1298         if (!mdd_is_parent(ctxt, mdd, src_pobj, tgt_pobj)) {
1299                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1300                 RETURN(0);
1301         }
1302
1303         mdd_lock2(ctxt, src_pobj, tgt_pobj);
1304
1305         RETURN(0);
1306 }
1307
1308 static void mdd_rename_unlock(const struct lu_context *ctxt,
1309                               struct mdd_object *src_pobj,
1310                               struct mdd_object *tgt_pobj)
1311 {
1312         mdd_write_unlock(ctxt, src_pobj);
1313         if (src_pobj != tgt_pobj)
1314                 mdd_write_unlock(ctxt, tgt_pobj);
1315 }
1316
1317 static int mdd_rename_sanity_check(const struct lu_context *ctxt,
1318                                    struct mdd_object *src_pobj,
1319                                    struct mdd_object *tgt_pobj,
1320                                    struct mdd_object *sobj,
1321                                    struct mdd_object *tobj)
1322 {
1323         struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj);
1324         int rc = 0, src_is_dir, tgt_is_dir;
1325         ENTRY;
1326
1327         src_is_dir = S_ISDIR(mdd_object_type(sobj));
1328         rc = mdd_may_delete(ctxt, src_pobj, sobj, src_is_dir);
1329         if (rc)
1330                 GOTO(out, rc);
1331
1332         if (!tobj) {
1333                 rc = mdd_may_create(ctxt, tgt_pobj, NULL);
1334                 if (rc)
1335                         GOTO(out, rc);
1336                 if (!mdd_is_parent(ctxt, mdd, tgt_pobj, sobj))
1337                         GOTO(out, rc = -EINVAL);
1338                 GOTO(out, rc);
1339         }
1340
1341         /* source should not be ancestor of target */
1342         if (!mdd_is_parent(ctxt, mdd, tobj, sobj))
1343                 GOTO(out, rc = -EINVAL);
1344
1345         rc = mdd_may_delete(ctxt, tgt_pobj, tobj, src_is_dir);
1346         if (rc)
1347                 GOTO(out, rc);
1348
1349         tgt_is_dir = S_ISDIR(mdd_object_type(tobj));
1350         if (tgt_is_dir && mdd_dir_is_empty(ctxt, tobj))
1351                 GOTO(out, rc = -ENOTEMPTY);
1352 out:
1353         mdd_object_put(ctxt, sobj);
1354         RETURN(rc);
1355 }
1356
1357 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
1358                       struct md_object *tgt_pobj, const struct lu_fid *lf,
1359                       const char *sname, struct md_object *tobj,
1360                       const char *tname, struct md_attr *ma)
1361 {
1362         struct mdd_device *mdd = mdo2mdd(src_pobj);
1363         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1364         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1365         struct mdd_object *mdd_sobj = mdd_object_find(ctxt, mdd, lf);
1366         struct mdd_object *mdd_tobj = NULL;
1367         struct thandle *handle;
1368         int is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
1369         int rc;
1370         ENTRY;
1371
1372         if (tobj)
1373                 mdd_tobj = md2mdd_obj(tobj);
1374
1375         /*XXX: shouldn't this check be done under lock below? */
1376         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
1377                                      mdd_sobj, mdd_tobj);
1378         if (rc)
1379                 RETURN(rc);
1380
1381         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1382         handle = mdd_trans_start(ctxt, mdd);
1383         if (IS_ERR(handle))
1384                 RETURN(PTR_ERR(handle));
1385
1386         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1387         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
1388         if (rc)
1389                 GOTO(cleanup_unlocked, rc);
1390
1391         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
1392         if (rc)
1393                 GOTO(cleanup, rc);
1394
1395         /*if sobj is dir, its parent object nlink should be dec too*/
1396         if (is_dir)
1397                 __mdd_ref_del(ctxt, mdd_spobj, handle);
1398
1399         if (tobj) {
1400                 rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
1401                 if (rc)
1402                         GOTO(cleanup, rc);
1403         }
1404
1405         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, is_dir, handle);
1406         if (rc)
1407                 GOTO(cleanup, rc);
1408
1409         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1410                 mdd_write_lock(ctxt, mdd_tobj);
1411                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1412                 /* remove dot reference */
1413                 if (is_dir)
1414                         __mdd_ref_del(ctxt, mdd_tobj, handle);
1415
1416                 rc = __mdd_finish_unlink(ctxt, mdd_tobj, ma, handle);
1417                 mdd_write_unlock(ctxt, mdd_tobj);
1418         }
1419 cleanup:
1420         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
1421 cleanup_unlocked:
1422         mdd_trans_stop(ctxt, mdd, rc, handle);
1423         RETURN(rc);
1424 }
1425
1426 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
1427                       const char *name, struct lu_fid* fid)
1428 {
1429         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1430         struct dt_object    *dir = mdd_object_child(mdd_obj);
1431         struct dt_rec       *rec    = (struct dt_rec *)fid;
1432         const struct dt_key *key = (const struct dt_key *)name;
1433         int rc;
1434         ENTRY;
1435
1436         if (mdd_is_dead_obj(mdd_obj))
1437                 RETURN(-ESTALE);
1438         mdd_read_lock(ctxt, mdd_obj);
1439         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(ctxt, dir))
1440                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
1441         else
1442                 rc = -ENOTDIR;
1443         mdd_read_unlock(ctxt, mdd_obj);
1444         RETURN(rc);
1445 }
1446
1447 static int __mdd_object_initialize(const struct lu_context *ctxt,
1448                                    const struct lu_fid *pfid,
1449                                    struct mdd_object *child,
1450                                    struct md_attr *ma, struct thandle *handle)
1451 {
1452         int rc;
1453         ENTRY;
1454
1455         /* update attributes for child.
1456          * FIXME:
1457          *  (1) the valid bits should be converted between Lustre and Linux;
1458          *  (2) maybe, the child attributes should be set in OSD when creation.
1459          */
1460
1461         rc = mdd_attr_set_internal(ctxt, child, &ma->ma_attr, handle);
1462         if (rc != 0)
1463                 RETURN(rc);
1464
1465         if (S_ISDIR(ma->ma_attr.la_mode)) {
1466                 /* add . and .. for newly created dir */
1467                 __mdd_ref_add(ctxt, child, handle);
1468                 rc = __mdd_index_insert_only(ctxt, child, mdo2fid(child),
1469                                              dot, handle);
1470                 if (rc == 0) {
1471                         rc = __mdd_index_insert_only(ctxt, child, pfid,
1472                                                      dotdot, handle);
1473                         if (rc != 0) {
1474                                 int rc2;
1475
1476                                 rc2 = __mdd_index_delete(ctxt,
1477                                                          child, dot, handle);
1478                                 if (rc2 != 0)
1479                                         CERROR("Failure to cleanup after dotdot"
1480                                                " creation: %d (%d)\n", rc2, rc);
1481                                 else
1482                                         __mdd_ref_del(ctxt, child, handle);
1483                         }
1484                 }
1485         }
1486         RETURN(rc);
1487 }
1488
1489 static int mdd_create_data(const struct lu_context *ctxt,
1490                            struct md_object *pobj, struct md_object *cobj,
1491                            const struct md_create_spec *spec,
1492                            struct md_attr *ma)
1493 {
1494         struct mdd_device *mdd = mdo2mdd(pobj);
1495         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1496         struct mdd_object *son = md2mdd_obj(cobj);
1497         struct lu_attr    *attr = &ma->ma_attr;
1498         struct lov_mds_md *lmm = NULL;
1499         int                lmm_size = 0;
1500         struct thandle    *handle;
1501         int                rc;
1502         ENTRY;
1503
1504         rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
1505                             attr);
1506         if (rc)
1507                 RETURN(rc);
1508
1509         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
1510         handle = mdd_trans_start(ctxt, mdd);
1511         if (IS_ERR(handle))
1512                 RETURN(PTR_ERR(handle));
1513
1514         /*XXX: setting the lov ea is not locked but setting the attr is locked? */
1515         if (rc == 0) {
1516                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size,
1517                                     handle, 0);
1518                 if (rc == 0)
1519                         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1520         }
1521
1522         mdd_trans_stop(ctxt, mdd, rc, handle);
1523         RETURN(rc);
1524 }
1525
1526 static int mdd_create_sanity_check(const struct lu_context *ctxt,
1527                                    struct mdd_device *mdd,
1528                                    struct md_object *pobj,
1529                                    const char *name, struct md_attr *ma)
1530 {
1531         struct mdd_thread_info *info = mdd_ctx_info(ctxt);
1532         struct lu_attr    *la        = &info->mti_la;
1533         struct lu_fid     *fid       = &info->mti_fid;
1534         struct mdd_object *obj       = md2mdd_obj(pobj);
1535         int rc;
1536
1537         ENTRY;
1538         /* EEXIST check */
1539         if (mdd_is_dead_obj(obj))
1540                 RETURN(-ENOENT);
1541         rc = mdd_lookup(ctxt, pobj, name, fid);
1542         if (rc != -ENOENT)
1543                 RETURN(rc ? : -EEXIST);
1544
1545         /* sgid check */
1546         mdd_read_lock(ctxt, obj);
1547         rc = __mdd_la_get(ctxt, obj, la);
1548         mdd_read_unlock(ctxt, obj);
1549         if (rc != 0)
1550                 RETURN(rc);
1551
1552         if (la->la_mode & S_ISGID) {
1553                 ma->ma_attr.la_gid = la->la_gid;
1554                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1555                         ma->ma_attr.la_mode |= S_ISGID;
1556                         ma->ma_attr.la_valid |= LA_MODE;
1557                 }
1558         }
1559
1560         switch (ma->ma_attr.la_mode & S_IFMT) {
1561         case S_IFREG:
1562         case S_IFDIR:
1563         case S_IFLNK:
1564         case S_IFCHR:
1565         case S_IFBLK:
1566         case S_IFIFO:
1567         case S_IFSOCK:
1568                 rc = 0;
1569                 break;
1570         default:
1571                 rc = -EINVAL;
1572                 break;
1573         }
1574         RETURN(rc);
1575 }
1576
1577 /*
1578  * Create object and insert it into namespace.
1579  */
1580 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
1581                       const char *name, struct md_object *child,
1582                       const struct md_create_spec *spec,
1583                       struct md_attr* ma)
1584 {
1585         struct mdd_device *mdd = mdo2mdd(pobj);
1586         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1587         struct mdd_object *son = md2mdd_obj(child);
1588         struct lu_attr *attr = &ma->ma_attr;
1589         struct lov_mds_md *lmm = NULL;
1590         struct thandle *handle;
1591         int rc, created = 0, inserted = 0, lmm_size = 0;
1592         ENTRY;
1593
1594         /* sanity checks before big job */
1595         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1596         if (rc)
1597                 RETURN(rc);
1598         /* no RPC inside the transaction, so OST objects should be created at
1599          * first */
1600         if (S_ISREG(attr->la_mode)) {
1601                 rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size,
1602                                     spec, attr);
1603                 if (rc)
1604                         RETURN(rc);
1605         }
1606
1607         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1608         handle = mdd_trans_start(ctxt, mdd);
1609         if (IS_ERR(handle))
1610                 RETURN(PTR_ERR(handle));
1611
1612         mdd_write_lock(ctxt, mdd_pobj);
1613
1614         /*
1615          * XXX check that link can be added to the parent in mkdir case.
1616          */
1617
1618         /*
1619          * Two operations have to be performed:
1620          *
1621          *  - allocation of new object (->do_create()), and
1622          *
1623          *  - insertion into parent index (->dio_insert()).
1624          *
1625          * Due to locking, operation order is not important, when both are
1626          * successful, *but* error handling cases are quite different:
1627          *
1628          *  - if insertion is done first, and following object creation fails,
1629          *  insertion has to be rolled back, but this operation might fail
1630          *  also leaving us with dangling index entry.
1631          *
1632          *  - if creation is done first, is has to be undone if insertion
1633          *  fails, leaving us with leaked space, which is neither good, nor
1634          *  fatal.
1635          *
1636          * It seems that creation-first is simplest solution, but it is
1637          * sub-optimal in the frequent
1638          *
1639          *         $ mkdir foo
1640          *         $ mkdir foo
1641          *
1642          * case, because second mkdir is bound to create object, only to
1643          * destroy it immediately.
1644          *
1645          * Note that local file systems do
1646          *
1647          *     0. lookup -> -EEXIST
1648          *
1649          *     1. create
1650          *
1651          *     2. insert
1652          *
1653          * Maybe we should do the same. For now: creation-first.
1654          */
1655
1656         mdd_write_lock(ctxt, son);
1657         rc = __mdd_object_create(ctxt, son, ma, handle);
1658         if (rc) {
1659                 mdd_write_unlock(ctxt, son);
1660                 GOTO(cleanup, rc);
1661         }
1662
1663         created = 1;
1664
1665         rc = __mdd_object_initialize(ctxt, mdo2fid(mdd_pobj),
1666                                      son, ma, handle);
1667         mdd_write_unlock(ctxt, son);
1668         if (rc)
1669                 /*
1670                  * Object has no links, so it will be destroyed when last
1671                  * reference is released. (XXX not now.)
1672                  */
1673                 GOTO(cleanup, rc);
1674
1675         rc = __mdd_index_insert(ctxt, mdd_pobj, mdo2fid(son),
1676                                 name, S_ISDIR(attr->la_mode), handle);
1677
1678         if (rc)
1679                 GOTO(cleanup, rc);
1680
1681         inserted = 1;
1682         rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
1683         if (rc) {
1684                 CERROR("error on stripe info copy %d \n", rc);
1685                 GOTO(cleanup, rc);
1686         }
1687
1688         if (S_ISLNK(attr->la_mode)) {
1689                 struct dt_object *dt = mdd_object_child(son);
1690                 const char *target_name = spec->u.sp_symname;
1691                 int sym_len = strlen(target_name);
1692                 loff_t pos = 0;
1693
1694                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1695                                                 sym_len, &pos, handle);
1696                 if (rc == sym_len)
1697                         rc = 0;
1698                 else
1699                         rc = -EFAULT;
1700         }
1701         /* return attr back */
1702         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1703 cleanup:
1704         if (rc && created) {
1705                 int rc2 = 0;
1706
1707                 if (inserted) {
1708                         rc2 = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1709                         if (rc2)
1710                                 CERROR("error can not cleanup destroy %d\n",
1711                                        rc2);
1712                 }
1713                 if (rc2 == 0)
1714                         __mdd_ref_del(ctxt, son, handle);
1715         }
1716         if (lmm)
1717                 OBD_FREE(lmm, lmm_size);
1718         mdd_write_unlock(ctxt, mdd_pobj);
1719         mdd_trans_stop(ctxt, mdd, rc, handle);
1720         RETURN(rc);
1721 }
1722 /* partial operation */
1723 static int mdd_object_create(const struct lu_context *ctxt,
1724                              struct md_object *obj,
1725                              const struct md_create_spec *spec,
1726                              struct md_attr *ma)
1727 {
1728
1729         struct mdd_device *mdd = mdo2mdd(obj);
1730         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1731         struct thandle *handle;
1732         int rc;
1733         ENTRY;
1734
1735         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1736         handle = mdd_trans_start(ctxt, mdd);
1737         if (IS_ERR(handle))
1738                 RETURN(PTR_ERR(handle));
1739
1740         mdd_write_lock(ctxt, mdd_obj);
1741         rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
1742         if (rc == 0)
1743                 rc = __mdd_object_initialize(ctxt, spec->u.sp_pfid, mdd_obj,
1744                                      ma, handle);
1745         mdd_write_unlock(ctxt, mdd_obj);
1746
1747         if (rc == 0)
1748                 rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
1749
1750         mdd_trans_stop(ctxt, mdd, rc, handle);
1751         RETURN(rc);
1752 }
1753 /* partial operation */
1754 static int mdd_name_insert(const struct lu_context *ctxt,
1755                            struct md_object *pobj, const char *name,
1756                            const struct lu_fid *fid, int isdir)
1757 {
1758         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1759         struct thandle *handle;
1760         int rc;
1761         ENTRY;
1762
1763         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
1764         handle = mdd_trans_start(ctxt, mdo2mdd(pobj));
1765         if (IS_ERR(handle))
1766                 RETURN(PTR_ERR(handle));
1767
1768         mdd_write_lock(ctxt, mdd_obj);
1769         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, isdir, handle);
1770         mdd_write_unlock(ctxt, mdd_obj);
1771
1772         mdd_trans_stop(ctxt, mdo2mdd(pobj), rc, handle);
1773         RETURN(rc);
1774 }
1775
1776 static int mdd_name_remove(const struct lu_context *ctxt,
1777                            struct md_object *pobj,
1778                            const char *name)
1779 {
1780         struct mdd_device *mdd = mdo2mdd(pobj);
1781         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1782         struct thandle *handle;
1783         int rc;
1784         ENTRY;
1785
1786         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
1787         handle = mdd_trans_start(ctxt, mdd);
1788         if (IS_ERR(handle))
1789                 RETURN(PTR_ERR(handle));
1790
1791         mdd_write_lock(ctxt, mdd_obj);
1792
1793         rc = __mdd_index_delete(ctxt, mdd_obj, name, handle);
1794
1795         mdd_write_unlock(ctxt, mdd_obj);
1796
1797         mdd_trans_stop(ctxt, mdd, rc, handle);
1798         RETURN(rc);
1799 }
1800
1801 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
1802                           struct md_object *tobj, const struct lu_fid *lf,
1803                           const char *name)
1804 {
1805         struct mdd_device *mdd = mdo2mdd(pobj);
1806         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1807         struct mdd_object *mdd_tobj = NULL;
1808         struct thandle *handle;
1809         int rc;
1810         ENTRY;
1811
1812         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1813         handle = mdd_trans_start(ctxt, mdd);
1814         if (IS_ERR(handle))
1815                 RETURN(PTR_ERR(handle));
1816
1817         if (tobj)
1818                 mdd_tobj = md2mdd_obj(tobj);
1819
1820         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
1821
1822         /*TODO rename sanity checking*/
1823         if (tobj) {
1824                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
1825                 if (rc)
1826                         GOTO(cleanup, rc);
1827         }
1828
1829         rc = __mdd_index_insert_only(ctxt, mdd_tpobj, lf, name, handle);
1830         if (rc)
1831                 GOTO(cleanup, rc);
1832
1833         if (tobj && lu_object_exists(&tobj->mo_lu))
1834                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1835 cleanup:
1836         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
1837         mdd_trans_stop(ctxt, mdd, rc, handle);
1838         RETURN(rc);
1839 }
1840
1841 static int mdd_root_get(const struct lu_context *ctx,
1842                         struct md_device *m, struct lu_fid *f)
1843 {
1844         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1845
1846         ENTRY;
1847         *f = mdd->mdd_root_fid;
1848         RETURN(0);
1849 }
1850
1851 static int mdd_statfs(const struct lu_context *ctx,
1852                       struct md_device *m, struct kstatfs *sfs)
1853 {
1854         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1855         int rc;
1856
1857         ENTRY;
1858
1859         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
1860
1861         RETURN(rc);
1862 }
1863
1864 static int mdd_get_maxsize(const struct lu_context *ctx,
1865                            struct md_device *m, int *md_size,
1866                            int *cookie_size)
1867 {
1868         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1869         ENTRY;
1870
1871         *md_size =  mdd_lov_mdsize(ctx, mdd);
1872         *cookie_size = mdd_lov_cookiesize(ctx, mdd);
1873
1874         RETURN(0);
1875 }
1876
1877 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
1878                          struct thandle *handle)
1879 {
1880         struct dt_object *next;
1881
1882         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1883         next = mdd_object_child(obj);
1884         next->do_ops->do_ref_add(ctxt, next, handle);
1885 }
1886
1887 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
1888 {
1889         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1890         struct mdd_device *mdd = mdo2mdd(obj);
1891         struct thandle *handle;
1892         ENTRY;
1893
1894         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1895         handle = mdd_trans_start(ctxt, mdd);
1896         if (IS_ERR(handle))
1897                 RETURN(-ENOMEM);
1898
1899         mdd_write_lock(ctxt, mdd_obj);
1900         __mdd_ref_add(ctxt, mdd_obj, handle);
1901         mdd_write_unlock(ctxt, mdd_obj);
1902
1903         mdd_trans_stop(ctxt, mdd, 0, handle);
1904
1905         RETURN(0);
1906 }
1907
1908 static void
1909 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
1910               struct thandle *handle)
1911 {
1912         struct dt_object *next = mdd_object_child(obj);
1913
1914         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1915
1916         next->do_ops->do_ref_del(ctxt, next, handle);
1917 }
1918
1919 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1920                        struct md_attr *ma)
1921 {
1922         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1923         struct mdd_device *mdd = mdo2mdd(obj);
1924         struct thandle *handle;
1925         int isdir;
1926         int rc;
1927         ENTRY;
1928
1929         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1930         handle = mdd_trans_start(ctxt, mdd);
1931         if (IS_ERR(handle))
1932                 RETURN(-ENOMEM);
1933
1934         mdd_write_lock(ctxt, mdd_obj);
1935
1936         isdir = S_ISDIR(lu_object_attr(&obj->mo_lu));
1937         /* rmdir checks */
1938         if (isdir && dt_try_as_dir(ctxt, mdd_object_child(mdd_obj))) {
1939                 rc = mdd_dir_is_empty(ctxt, mdd_obj);
1940                 if (rc != 0)
1941                         GOTO(cleanup, rc);
1942         }
1943
1944         __mdd_ref_del(ctxt, mdd_obj, handle);
1945
1946         if (isdir) {
1947                 /* unlink dot */
1948                 __mdd_ref_del(ctxt, mdd_obj, handle);
1949         }
1950
1951         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle);
1952
1953 cleanup:
1954         mdd_write_unlock(ctxt, mdd_obj);
1955         mdd_trans_stop(ctxt, mdd, rc, handle);
1956         RETURN(rc);
1957 }
1958
1959 /* do NOT or the MAY_*'s, you'll get the weakest */
1960 static int accmode(struct mdd_object *mdd_obj, int flags)
1961 {
1962         int res = 0;
1963
1964 #if 0
1965         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1966          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1967          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1968          * owner can write to a file even if it is marked readonly to hide
1969          * its brokenness. (bug 5781) */
1970         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
1971                 return 0;
1972 #endif
1973         if (flags & FMODE_READ)
1974                 res = MAY_READ;
1975         if (flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
1976                 res |= MAY_WRITE;
1977         if (flags & MDS_FMODE_EXEC)
1978                 res = MAY_EXEC;
1979         return res;
1980 }
1981
1982 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
1983                     int flags)
1984 {
1985         int mode = accmode(md2mdd_obj(obj), flags);
1986
1987         if (mode & MAY_WRITE) {
1988                 if (mdd_is_immutable(md2mdd_obj(obj)))
1989                         RETURN(-EACCES);
1990         }
1991
1992         atomic_inc(&md2mdd_obj(obj)->mod_count);
1993         return 0;
1994 }
1995
1996 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
1997                      struct md_attr *ma)
1998 {
1999         int rc;
2000         struct mdd_object *mdd_obj;
2001         struct thandle *handle = NULL;
2002         ENTRY;
2003
2004         mdd_obj = md2mdd_obj(obj);
2005         mdd_read_lock(ctxt, mdd_obj);
2006         rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
2007         if (rc)
2008                 GOTO(out_locked, rc);
2009
2010         if (atomic_dec_and_test(&mdd_obj->mod_count)) {
2011                 if (ma->ma_attr.la_nlink == 0) {
2012                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
2013                         if (rc)
2014                                 GOTO(out_locked, rc);
2015                         mdd_read_unlock(ctxt, mdd_obj);
2016                         /* let's remove obj from the orphan list */
2017                         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
2018                         handle = mdd_trans_start(ctxt, mdo2mdd(obj));
2019                         if (IS_ERR(handle))
2020                                 GOTO(out, rc = -ENOMEM);
2021                         
2022                         rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
2023
2024                         mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
2025                         GOTO(out, rc);
2026                 }
2027         }
2028 out_locked:
2029         mdd_read_unlock(ctxt, mdd_obj);
2030 out:
2031         RETURN(rc);
2032 }
2033
2034 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
2035                         const struct lu_rdpg *rdpg)
2036 {
2037         struct dt_object *next;
2038         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2039         int rc;
2040
2041         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2042         next = mdd_object_child(mdd_obj);
2043
2044         mdd_read_lock(ctxt, mdd_obj);
2045         if (S_ISDIR(mdd_object_type(mdd_obj)) &&
2046             dt_try_as_dir(ctxt, next))
2047                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
2048         else
2049                 rc = -ENOTDIR;
2050         mdd_read_unlock(ctxt, mdd_obj);
2051         return rc;
2052 }
2053
2054 struct md_device_operations mdd_ops = {
2055         .mdo_root_get       = mdd_root_get,
2056         .mdo_statfs         = mdd_statfs,
2057         .mdo_get_maxsize    = mdd_get_maxsize,
2058 };
2059
2060 static struct md_dir_operations mdd_dir_ops = {
2061         .mdo_lookup        = mdd_lookup,
2062         .mdo_create        = mdd_create,
2063         .mdo_rename        = mdd_rename,
2064         .mdo_link          = mdd_link,
2065         .mdo_unlink        = mdd_unlink,
2066         .mdo_name_insert   = mdd_name_insert,
2067         .mdo_name_remove   = mdd_name_remove,
2068         .mdo_rename_tgt    = mdd_rename_tgt,
2069         .mdo_create_data   = mdd_create_data
2070 };
2071
2072
2073 static struct md_object_operations mdd_obj_ops = {
2074         .moo_attr_get      = mdd_attr_get,
2075         .moo_attr_set      = mdd_attr_set,
2076         .moo_xattr_get     = mdd_xattr_get,
2077         .moo_xattr_set     = mdd_xattr_set,
2078         .moo_xattr_list    = mdd_xattr_list,
2079         .moo_xattr_del     = mdd_xattr_del,
2080         .moo_object_create = mdd_object_create,
2081         .moo_ref_add       = mdd_ref_add,
2082         .moo_ref_del       = mdd_ref_del,
2083         .moo_open          = mdd_open,
2084         .moo_close         = mdd_close,
2085         .moo_readpage      = mdd_readpage,
2086         .moo_readlink      = mdd_readlink
2087 };
2088
2089 static struct obd_ops mdd_obd_device_ops = {
2090         .o_owner = THIS_MODULE
2091 };
2092
2093 static struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
2094                                           struct lu_device_type *t,
2095                                           struct lustre_cfg *lcfg)
2096 {
2097         struct lu_device  *l;
2098         struct mdd_device *m;
2099
2100         OBD_ALLOC_PTR(m);
2101         if (m == NULL) {
2102                 l = ERR_PTR(-ENOMEM);
2103         } else {
2104                 md_device_init(&m->mdd_md_dev, t);
2105                 l = mdd2lu_dev(m);
2106                 l->ld_ops = &mdd_lu_ops;
2107                 m->mdd_md_dev.md_ops = &mdd_ops;
2108         }
2109
2110         return l;
2111 }
2112
2113 static void mdd_device_free(const struct lu_context *ctx,
2114                             struct lu_device *lu)
2115 {
2116         struct mdd_device *m = lu2mdd_dev(lu);
2117
2118         LASSERT(atomic_read(&lu->ld_ref) == 0);
2119         md_device_fini(&m->mdd_md_dev);
2120         OBD_FREE_PTR(m);
2121 }
2122
2123 static int mdd_type_init(struct lu_device_type *t)
2124 {
2125         return lu_context_key_register(&mdd_thread_key);
2126 }
2127
2128 static void mdd_type_fini(struct lu_device_type *t)
2129 {
2130         lu_context_key_degister(&mdd_thread_key);
2131 }
2132
2133 static struct lu_device_type_operations mdd_device_type_ops = {
2134         .ldto_init = mdd_type_init,
2135         .ldto_fini = mdd_type_fini,
2136
2137         .ldto_device_alloc = mdd_device_alloc,
2138         .ldto_device_free  = mdd_device_free,
2139
2140         .ldto_device_init    = mdd_device_init,
2141         .ldto_device_fini    = mdd_device_fini
2142 };
2143
2144 static struct lu_device_type mdd_device_type = {
2145         .ldt_tags     = LU_DEVICE_MD,
2146         .ldt_name     = LUSTRE_MDD0_NAME,
2147         .ldt_ops      = &mdd_device_type_ops,
2148         .ldt_ctx_tags = LCT_MD_THREAD
2149 };
2150
2151 static void *mdd_key_init(const struct lu_context *ctx,
2152                           struct lu_context_key *key)
2153 {
2154         struct mdd_thread_info *info;
2155
2156         OBD_ALLOC_PTR(info);
2157         if (info == NULL)
2158                 info = ERR_PTR(-ENOMEM);
2159         return info;
2160 }
2161
2162 static void mdd_key_fini(const struct lu_context *ctx,
2163                          struct lu_context_key *key, void *data)
2164 {
2165         struct mdd_thread_info *info = data;
2166         OBD_FREE_PTR(info);
2167 }
2168
2169 static struct lu_context_key mdd_thread_key = {
2170         .lct_tags = LCT_MD_THREAD,
2171         .lct_init = mdd_key_init,
2172         .lct_fini = mdd_key_fini
2173 };
2174
2175 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
2176         { 0 }
2177 };
2178
2179 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
2180         { 0 }
2181 };
2182
2183 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
2184
2185 static int __init mdd_mod_init(void)
2186 {
2187         struct lprocfs_static_vars lvars;
2188
2189         lprocfs_init_vars(mdd, &lvars);
2190         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
2191                                    LUSTRE_MDD0_NAME, &mdd_device_type);
2192 }
2193
2194 static void __exit mdd_mod_exit(void)
2195 {
2196         class_unregister_type(LUSTRE_MDD0_NAME);
2197 }
2198
2199 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2200 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD0_NAME")");
2201 MODULE_LICENSE("GPL");
2202
2203 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);