Whamcloud - gitweb
- fix bug with getting object in mdd. If it is not located then it should be put
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43
44 #include "mdd_internal.h"
45
46
47 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
48                                        struct mdd_device *);
49 static void mdd_trans_stop(const struct lu_context *ctxt,
50                            struct mdd_device *mdd, int rc,
51                            struct thandle *handle);
52 static struct dt_object* mdd_object_child(struct mdd_object *o);
53 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
54                           struct thandle *handle);
55 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
56                           struct thandle *handle);
57 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
58                       const char *name, struct lu_fid* fid);
59
60 static struct md_object_operations mdd_obj_ops;
61 static struct md_dir_operations    mdd_dir_ops;
62 static struct lu_object_operations mdd_lu_obj_ops;
63
64 static struct lu_context_key       mdd_thread_key;
65
66 static const char *mdd_root_dir_name = "root";
67 static const char dot[] = ".";
68 static const char dotdot[] = "..";
69
70
71 struct mdd_thread_info *mdd_ctx_info(const struct lu_context *ctx)
72 {
73         struct mdd_thread_info *info;
74
75         info = lu_context_key_get(ctx, &mdd_thread_key);
76         LASSERT(info != NULL);
77         return info;
78 }
79
80 static struct lu_object *mdd_object_alloc(const struct lu_context *ctxt,
81                                           const struct lu_object_header *hdr,
82                                           struct lu_device *d)
83 {
84         struct mdd_object *mdd_obj;
85
86         OBD_ALLOC_PTR(mdd_obj);
87         if (mdd_obj != NULL) {
88                 struct lu_object *o;
89                 
90                 o = mdd2lu_obj(mdd_obj);
91                 lu_object_init(o, NULL, d);
92                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
93                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
94                 mdd_obj->mod_count = 0;
95                 o->lo_ops = &mdd_lu_obj_ops;
96                 return o;
97         } else {
98                 return NULL;
99         }
100 }
101
102 static int mdd_object_init(const struct lu_context *ctxt, struct lu_object *o)
103 {
104         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
105         struct lu_object  *below;
106         struct lu_device  *under;
107         ENTRY;
108
109         under = &d->mdd_child->dd_lu_dev;
110         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
111
112         if (below == NULL)
113                 RETURN(-ENOMEM);
114
115         lu_object_add(o, below);
116         RETURN(0);
117 }
118
119 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj);
120
121 static int mdd_object_start(const struct lu_context *ctxt, struct lu_object *o)
122 {
123         if (lu_object_exists(o))
124                 return mdd_get_flags(ctxt, lu2mdd_obj(o));
125         else
126                 return 0;
127 }
128
129 static void mdd_object_free(const struct lu_context *ctxt, struct lu_object *o)
130 {
131         struct mdd_object *mdd = lu2mdd_obj(o);
132         
133         lu_object_fini(o);
134         OBD_FREE_PTR(mdd);
135 }
136
137 struct mdd_object *mdd_object_find(const struct lu_context *ctxt,
138                                    struct mdd_device *d,
139                                    const struct lu_fid *f)
140 {
141         struct lu_object *o, *lo;
142         struct mdd_object *m;
143         ENTRY;
144
145         o = lu_object_find(ctxt, mdd2lu_dev(d)->ld_site, f);
146         if (IS_ERR(o))
147                 m = (struct mdd_object *)o;
148         else {
149                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
150                 /* remote object can't be located and should be put then */
151                 if (lo == NULL)
152                         lu_object_put(ctxt, o);
153                 m = lu2mdd_obj(lo);
154         }
155         RETURN(m);
156 }
157
158 static inline int mdd_is_immutable(struct mdd_object *obj)
159 {
160         return obj->mod_flags & IMMUTE_OBJ;
161 }
162
163 static inline int mdd_is_append(struct mdd_object *obj)
164 {
165         return obj->mod_flags & APPEND_OBJ;
166 }
167
168 static inline void mdd_set_dead_obj(struct mdd_object *obj)
169 {
170         if (obj)
171                 obj->mod_flags |= DEAD_OBJ;
172 }
173
174 static inline int mdd_is_dead_obj(struct mdd_object *obj)
175 {
176         return obj && obj->mod_flags & DEAD_OBJ;
177 }
178
179 /*Check whether it may create the cobj under the pobj*/
180 static int mdd_may_create(const struct lu_context *ctxt,
181                           struct mdd_object *pobj, struct mdd_object *cobj)
182 {
183         ENTRY;
184         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
185                 RETURN(-EEXIST);
186
187         if (mdd_is_dead_obj(pobj))
188                 RETURN(-ENOENT);
189
190         /*check pobj may create or not*/
191         RETURN(0);
192 }
193
194 static inline int __mdd_la_get(const struct lu_context *ctxt,
195                                struct mdd_object *obj, struct lu_attr *la)
196 {
197         struct dt_object  *next = mdd_object_child(obj);
198         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
199         return next->do_ops->do_attr_get(ctxt, next, la);
200 }
201
202 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
203 {
204         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
205
206         if (flags & LUSTRE_APPEND_FL)
207                 obj->mod_flags |= APPEND_OBJ;
208
209         if (flags & LUSTRE_IMMUTABLE_FL)
210                 obj->mod_flags |= IMMUTE_OBJ;
211 }
212
213 static int mdd_get_flags(const struct lu_context *ctxt, struct mdd_object *obj)
214 {
215         struct lu_attr *la = &mdd_ctx_info(ctxt)->mti_la;
216         int rc;
217
218         ENTRY;
219         mdd_read_lock(ctxt, obj);
220         rc = __mdd_la_get(ctxt, obj, la);
221         mdd_read_unlock(ctxt, obj);
222         if (rc == 0)
223                 mdd_flags_xlate(obj, la->la_flags);
224         RETURN(rc);
225 }
226
227 /*Check whether it may delete the cobj under the pobj*/
228 static int mdd_may_delete(const struct lu_context *ctxt,
229                           struct mdd_object *pobj, struct mdd_object *cobj,
230                           int is_dir)
231 {
232         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
233         int rc = 0;
234         ENTRY;
235
236         LASSERT(cobj);
237
238         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
239                 RETURN(-ENOENT);
240
241         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
242                 RETURN(-EPERM);
243
244         if (is_dir) {
245                 if (!S_ISDIR(mdd_object_type(cobj)))
246                         RETURN(-ENOTDIR);
247
248                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
249                         RETURN(-EBUSY);
250
251         } else if (S_ISDIR(mdd_object_type(cobj)))
252                         RETURN(-EISDIR);
253
254         if (pobj && mdd_is_dead_obj(pobj))
255                 RETURN(-ENOENT);
256
257         RETURN(rc);
258 }
259 /* get only inode attributes */
260 static int __mdd_iattr_get(const struct lu_context *ctxt,
261                            struct mdd_object *mdd_obj, struct md_attr *ma)
262 {
263         int rc = 0;
264         ENTRY;
265
266         rc = __mdd_la_get(ctxt, mdd_obj, &ma->ma_attr);
267         if (rc == 0)
268                 ma->ma_valid = MA_INODE;
269         RETURN(rc);
270 }
271 /* get lov EA only */
272 static int __mdd_lmm_get(const struct lu_context *ctxt,
273                          struct mdd_object *mdd_obj, struct md_attr *ma)
274 {
275         int rc;
276         ENTRY;
277
278         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
279         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
280                         MDS_LOV_MD_NAME);
281         if (rc > 0) {
282                 ma->ma_valid |= MA_LOV;
283                 rc = 0;
284         }
285         RETURN(rc);
286 }
287
288 /* get lmv EA only*/
289 static int __mdd_lmv_get(const struct lu_context *ctxt,
290                          struct mdd_object *mdd_obj, struct md_attr *ma)
291 {
292         int rc;
293
294         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
295                         MDS_LMV_MD_NAME);
296         if (rc > 0) {
297                 ma->ma_valid |= MA_LMV;
298                 rc = 0;
299         }
300         RETURN(rc);
301 }
302
303 static int mdd_attr_get_internal(const struct lu_context *ctxt,
304                                  struct mdd_object *mdd_obj,
305                                  struct md_attr *ma)
306 {
307         int rc = 0;
308         ENTRY;
309
310         if (ma->ma_need & MA_INODE)
311                 rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
312
313         if (rc == 0 && ma->ma_need & MA_LOV) {
314                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
315                     S_ISDIR(mdd_object_type(mdd_obj)))
316                         rc = __mdd_lmm_get(ctxt, mdd_obj, ma);
317         }
318         if (rc == 0 && ma->ma_need & MA_LMV) {
319                 if (S_ISDIR(mdd_object_type(mdd_obj)))
320                         rc = __mdd_lmv_get(ctxt, mdd_obj, ma);
321         }
322         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
323                         rc, ma->ma_valid);
324         RETURN(rc);
325 }
326
327 static inline int mdd_attr_get_internal_locked(const struct lu_context *ctxt,
328                                                struct mdd_object *mdd_obj,
329                                                struct md_attr *ma)
330 {
331         int rc;
332         mdd_read_lock(ctxt, mdd_obj);
333         rc = mdd_attr_get_internal(ctxt, mdd_obj, ma);
334         mdd_read_unlock(ctxt, mdd_obj);
335         return rc;
336 }
337
338 static int mdd_attr_get(const struct lu_context *ctxt,
339                         struct md_object *obj, struct md_attr *ma)
340 {
341         struct mdd_object *mdd_obj = md2mdd_obj(obj);
342         int                rc;
343
344         ENTRY;
345         rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
346         RETURN(rc);
347 }
348
349 static int mdd_xattr_get(const struct lu_context *ctxt, struct md_object *obj,
350                          void *buf, int buf_len, const char *name)
351 {
352         struct mdd_object *mdd_obj = md2mdd_obj(obj);
353         struct dt_object  *next;
354         int rc;
355
356         ENTRY;
357
358         LASSERT(lu_object_exists(&obj->mo_lu));
359
360         next = mdd_object_child(mdd_obj);
361         mdd_read_lock(ctxt, mdd_obj);
362         rc = next->do_ops->do_xattr_get(ctxt, next, buf, buf_len, name);
363         mdd_read_unlock(ctxt, mdd_obj);
364
365         RETURN(rc);
366 }
367
368 static int mdd_readlink(const struct lu_context *ctxt, struct md_object *obj,
369                         void *buf, int buf_len)
370 {
371         struct mdd_object *mdd_obj = md2mdd_obj(obj);
372         struct dt_object  *next;
373         loff_t             pos = 0;
374         int                rc;
375         ENTRY;
376
377         LASSERT(lu_object_exists(&obj->mo_lu));
378
379         next = mdd_object_child(mdd_obj);
380         rc = next->do_body_ops->dbo_read(ctxt, next, buf, buf_len, &pos);
381         RETURN(rc);
382 }
383 static int mdd_xattr_list(const struct lu_context *ctxt, struct md_object *obj,
384                           void *buf, int buf_len)
385 {
386         struct mdd_object *mdd_obj = md2mdd_obj(obj);
387         struct dt_object  *next;
388         int rc;
389
390         ENTRY;
391
392         LASSERT(lu_object_exists(&obj->mo_lu));
393
394         next = mdd_object_child(mdd_obj);
395         rc = next->do_ops->do_xattr_list(ctxt, next, buf, buf_len);
396
397         RETURN(rc);
398 }
399
400 enum mdd_txn_op {
401         MDD_TXN_OBJECT_DESTROY_OP,
402         MDD_TXN_OBJECT_CREATE_OP,
403         MDD_TXN_ATTR_SET_OP,
404         MDD_TXN_XATTR_SET_OP,
405         MDD_TXN_INDEX_INSERT_OP,
406         MDD_TXN_INDEX_DELETE_OP,
407         MDD_TXN_LINK_OP,
408         MDD_TXN_UNLINK_OP,
409         MDD_TXN_RENAME_OP,
410         MDD_TXN_CREATE_DATA_OP,
411         MDD_TXN_MKDIR_OP
412 };
413
414 struct mdd_txn_op_descr {
415         enum mdd_txn_op mod_op;
416         unsigned int    mod_credits;
417 };
418
419 enum {
420         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
421         MDD_TXN_OBJECT_CREATE_CREDITS  = 20,
422         MDD_TXN_ATTR_SET_CREDITS       = 20,
423         MDD_TXN_XATTR_SET_CREDITS      = 20,
424         MDD_TXN_INDEX_INSERT_CREDITS   = 20,
425         MDD_TXN_INDEX_DELETE_CREDITS   = 20,
426         MDD_TXN_LINK_CREDITS           = 20,
427         MDD_TXN_UNLINK_CREDITS         = 20,
428         MDD_TXN_RENAME_CREDITS         = 20,
429         MDD_TXN_CREATE_DATA_CREDITS    = 20,
430         MDD_TXN_MKDIR_CREDITS          = 20
431 };
432
433 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
434 static const struct mdd_txn_op_descr opname = { \
435         .mod_op      = opname ## _OP,           \
436         .mod_credits = opname ## _CREDITS,      \
437 }
438
439 /*
440  * number of blocks to reserve for particular operations. Should be function
441  * of ... something. Stub for now.
442  */
443 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
444 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
445 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
446 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
447 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
448 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
449 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
450 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
451 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
452 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
453 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
454
455 static void mdd_txn_param_build(const struct lu_context *ctx,
456                                 const struct mdd_txn_op_descr *opd)
457 {
458         mdd_ctx_info(ctx)->mti_param.tp_credits = opd->mod_credits;
459 }
460
461 static int mdd_object_print(const struct lu_context *ctxt, void *cookie,
462                             lu_printer_t p, const struct lu_object *o)
463 {
464         return (*p)(ctxt, cookie, LUSTRE_MDD_NAME"-object@%p", o);
465 }
466
467 static int mdd_mount(const struct lu_context *ctx, struct mdd_device *mdd)
468 {
469         int rc;
470         struct dt_object *root;
471         ENTRY;
472
473         root = dt_store_open(ctx, mdd->mdd_child, mdd_root_dir_name,
474                              &mdd->mdd_root_fid);
475         if (!IS_ERR(root)) {
476                 LASSERT(root != NULL);
477                 lu_object_put(ctx, &root->do_lu);
478                 rc = orph_index_init(ctx, mdd);
479         } else
480                 rc = PTR_ERR(root);
481
482         RETURN(rc);
483 }
484
485 static int mdd_txn_start_cb(const struct lu_context *ctx,
486                             struct txn_param *param, void *cookie)
487 {
488         return 0;
489 }
490
491 static int mdd_txn_stop_cb(const struct lu_context *ctx,
492                            struct thandle *txn, void *cookie)
493 {
494         struct mdd_device *mdd = cookie;
495         struct obd_device *obd = mdd2obd_dev(mdd);
496
497         return mds_lov_write_objids(obd);
498 }
499
500 static int mdd_txn_commit_cb(const struct lu_context *ctx,
501                              struct thandle *txn, void *cookie)
502 {
503         return 0;
504 }
505
506 static int mdd_device_init(const struct lu_context *ctx,
507                            struct lu_device *d, struct lu_device *next)
508 {
509         struct mdd_device *mdd = lu2mdd_dev(d);
510         struct dt_device  *dt;
511         int rc = 0;
512         ENTRY;
513
514         mdd->mdd_child = lu2dt_dev(next);
515
516         dt = mdd->mdd_child;
517         /* prepare transactions callbacks */
518         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
519         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
520         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
521         mdd->mdd_txn_cb.dtc_cookie = mdd;
522
523         dt_txn_callback_add(dt, &mdd->mdd_txn_cb);
524
525         RETURN(rc);
526 }
527
528 static struct lu_device *mdd_device_fini(const struct lu_context *ctx,
529                                          struct lu_device *d)
530 {
531         struct mdd_device *mdd = lu2mdd_dev(d);
532         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
533
534         dt_txn_callback_del(mdd->mdd_child, &mdd->mdd_txn_cb);
535
536         return next;
537 }
538
539 static void mdd_device_shutdown(const struct lu_context *ctxt,
540                                 struct mdd_device *m)
541 {
542         if (m->mdd_obd_dev)
543                 mdd_fini_obd(ctxt, m);
544         orph_index_fini(ctxt, m);
545 }
546
547 static int mdd_process_config(const struct lu_context *ctxt,
548                               struct lu_device *d, struct lustre_cfg *cfg)
549 {
550         struct mdd_device *m    = lu2mdd_dev(d);
551         struct dt_device  *dt   = m->mdd_child;
552         struct lu_device  *next = &dt->dd_lu_dev;
553         int rc;
554         ENTRY;
555
556         switch (cfg->lcfg_command) {
557         case LCFG_SETUP:
558                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
559                 if (rc)
560                         GOTO(out, rc);
561                 dt->dd_ops->dt_conf_get(ctxt, dt, &m->mdd_dt_conf);
562
563                 rc = mdd_mount(ctxt, m);
564                 if (rc)
565                         GOTO(out, rc);
566                 rc = mdd_init_obd(ctxt, m, cfg);
567                 if (rc) {
568                         CERROR("lov init error %d \n", rc);
569                         GOTO(out, rc);
570                 }
571                 break;
572         case LCFG_CLEANUP:
573                 mdd_device_shutdown(ctxt, m);
574         default:
575                 rc = next->ld_ops->ldo_process_config(ctxt, next, cfg);
576                 break;
577         }
578 out:
579         RETURN(rc);
580 }
581
582 static int mdd_recovery_complete(const struct lu_context *ctxt,
583                                  struct lu_device *d)
584 {
585         struct mdd_device *mdd = lu2mdd_dev(d);
586         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
587         struct obd_device *obd = mdd2obd_dev(mdd);
588         int rc;
589         ENTRY;
590         /* TODO:
591         rc = mdd_lov_set_nextid(ctx, mdd);
592         if (rc) {
593                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
594                        obd->obd_name, rc);
595                 GOTO(out, rc);
596         }
597         rc = mdd_cleanup_unlink_llog(ctx, mdd);
598
599         obd_notify(obd->u.mds.mds_osc_obd, NULL,
600                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
601                    OBD_NOTIFY_SYNC, NULL);
602 */
603         LASSERT(mdd);
604         LASSERT(obd);
605
606         obd->obd_recovering = 0;
607         obd->obd_type->typ_dt_ops->o_postrecov(obd);
608         /* TODO: orphans handling */
609         rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
610
611         RETURN(rc);
612 }
613
614 struct lu_device_operations mdd_lu_ops = {
615         .ldo_object_alloc      = mdd_object_alloc,
616         .ldo_process_config    = mdd_process_config,
617         .ldo_recovery_complete = mdd_recovery_complete
618 };
619
620 static struct lu_object_operations mdd_lu_obj_ops = {
621         .loo_object_init    = mdd_object_init,
622         .loo_object_start   = mdd_object_start,
623         .loo_object_free    = mdd_object_free,
624         .loo_object_print   = mdd_object_print
625 };
626
627 void mdd_write_lock(const struct lu_context *ctxt, struct mdd_object *obj)
628 {
629         struct dt_object  *next = mdd_object_child(obj);
630
631         next->do_ops->do_write_lock(ctxt, next);
632 }
633
634 void mdd_read_lock(const struct lu_context *ctxt, struct mdd_object *obj)
635 {
636         struct dt_object  *next = mdd_object_child(obj);
637
638         next->do_ops->do_read_lock(ctxt, next);
639 }
640
641 void mdd_write_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
642 {
643         struct dt_object  *next = mdd_object_child(obj);
644
645         next->do_ops->do_write_unlock(ctxt, next);
646 }
647
648 void mdd_read_unlock(const struct lu_context *ctxt, struct mdd_object *obj)
649 {
650         struct dt_object  *next = mdd_object_child(obj);
651
652         next->do_ops->do_read_unlock(ctxt, next);
653 }
654
655 static void mdd_lock2(const struct lu_context *ctxt,
656                       struct mdd_object *o0, struct mdd_object *o1)
657 {
658         mdd_write_lock(ctxt, o0);
659         mdd_write_lock(ctxt, o1);
660 }
661
662 static void mdd_unlock2(const struct lu_context *ctxt,
663                         struct mdd_object *o0, struct mdd_object *o1)
664 {
665         mdd_write_unlock(ctxt, o1);
666         mdd_write_unlock(ctxt, o0);
667 }
668
669 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
670                                        struct mdd_device *mdd)
671 {
672         struct txn_param *p = &mdd_ctx_info(ctxt)->mti_param;
673
674         return mdd_child_ops(mdd)->dt_trans_start(ctxt, mdd->mdd_child, p);
675 }
676
677 static void mdd_trans_stop(const struct lu_context *ctxt,
678                            struct mdd_device *mdd, int result,
679                            struct thandle *handle)
680 {
681         handle->th_result = result;
682         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
683 }
684
685 static int __mdd_object_create(const struct lu_context *ctxt,
686                                struct mdd_object *obj, struct md_attr *ma,
687                                struct thandle *handle)
688 {
689         struct dt_object *next;
690         struct lu_attr *attr = &ma->ma_attr;
691         int rc;
692         ENTRY;
693
694         if (!lu_object_exists(mdd2lu_obj(obj))) {
695                 next = mdd_object_child(obj);
696                 rc = next->do_ops->do_create(ctxt, next, attr, handle);
697         } else
698                 rc = -EEXIST;
699
700         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
701
702         RETURN(rc);
703 }
704
705 int mdd_attr_set_internal(const struct lu_context *ctxt, struct mdd_object *o,
706                           const struct lu_attr *attr, struct thandle *handle)
707 {
708         struct dt_object *next;
709
710         LASSERT(lu_object_exists(mdd2lu_obj(o)));
711         next = mdd_object_child(o);
712         return next->do_ops->do_attr_set(ctxt, next, attr, handle);
713 }
714
715 int mdd_attr_set_internal_locked(const struct lu_context *ctxt,
716                                  struct mdd_object *o,
717                                  const struct lu_attr *attr,
718                                  struct thandle *handle)
719 {
720         int rc;
721         mdd_write_lock(ctxt, o);
722         rc = mdd_attr_set_internal(ctxt, o, attr, handle);
723         mdd_write_unlock(ctxt, o);
724         return rc;
725 }
726
727 static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
728                            const void *buf, int buf_len, const char *name,
729                            int fl, struct thandle *handle)
730 {
731         struct dt_object *next;
732         int rc = 0;
733         ENTRY;
734
735         LASSERT(lu_object_exists(mdd2lu_obj(o)));
736         next = mdd_object_child(o);
737         if (buf && buf_len > 0) {
738                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
739                                                 0, handle);
740         }else if (buf == NULL && buf_len == 0) {
741                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
742         }
743         RETURN(rc);
744 }
745 /* this gives the same functionality as the code between
746  * sys_chmod and inode_setattr
747  * chown_common and inode_setattr
748  * utimes and inode_setattr
749  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
750  * and port to
751  */
752 int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj,
753                  const struct md_attr *ma, struct lu_attr *la)
754 {
755         struct lu_attr   *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
756         time_t            now = CURRENT_SECONDS;
757         int               rc;
758         ENTRY;
759
760         rc = __mdd_la_get(ctxt, obj, tmp_la);
761         if (rc)
762                 RETURN(rc);
763         /*XXX Check permission */
764         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
765
766                 /*If only change flags of the object, we should
767                  * let it pass, but also need capability check
768                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
769                  * fix it, when implement capable in mds*/
770                 if (la->la_valid & ~LA_FLAGS)
771                         RETURN(-EPERM);
772
773                 /*According to Ext3 implementation on this, the
774                  *Ctime will be changed, but not clear why?*/
775                 la->la_ctime = now;
776                 la->la_valid |= LA_CTIME;
777                 RETURN(0);
778         }
779         if (!(la->la_valid & LA_CTIME)) {
780                 la->la_ctime = now;
781                 la->la_valid |= LA_CTIME;
782         }
783
784 #if 0
785         /* times */
786         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
787                 if (current->fsuid != inode->i_uid &&
788                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
789                         RETURN(error);
790         }
791         if (ia_valid & ATTR_SIZE &&
792             /* NFSD hack for open(O_CREAT|O_TRUNC)=mknod+truncate (bug 5781) */
793             !(rec->ur_uc.luc_fsuid == inode->i_uid &&
794               ia_valid & MDS_OPEN_OWNEROVERRIDE)) {
795                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
796                         RETURN(error);
797         }
798 #endif
799
800         if (la->la_valid & (LA_UID | LA_GID)) {
801                 /* chown */
802
803                 if (mdd_is_immutable(obj) || mdd_is_append(obj))
804                         RETURN(-EPERM);
805                 if (la->la_uid == (uid_t) -1)
806                         la->la_uid = tmp_la->la_uid;
807                 if (la->la_gid == (gid_t) -1)
808                         la->la_gid = tmp_la->la_gid;
809                 if (!(la->la_valid & LA_MODE))
810                         la->la_mode = tmp_la->la_mode;
811                 /*
812                  * If the user or group of a non-directory has been
813                  * changed by a non-root user, remove the setuid bit.
814                  * 19981026 David C Niemi <niemi@tux.org>
815                  *
816                  * Changed this to apply to all users, including root,
817                  * to avoid some races. This is the behavior we had in
818                  * 2.0. The check for non-root was definitely wrong
819                  * for 2.2 anyway, as it should have been using
820                  * CAP_FSETID rather than fsuid -- 19990830 SD.
821                  */
822                 if ((tmp_la->la_mode & S_ISUID) == S_ISUID &&
823                     !S_ISDIR(tmp_la->la_mode)) {
824                         la->la_mode &= ~S_ISUID;
825                         la->la_valid |= LA_MODE;
826                 }
827                 /*
828                  * Likewise, if the user or group of a non-directory
829                  * has been changed by a non-root user, remove the
830                  * setgid bit UNLESS there is no group execute bit
831                  * (this would be a file marked for mandatory
832                  * locking).  19981026 David C Niemi <niemi@tux.org>
833                  *
834                  * Removed the fsuid check (see the comment above) --
835                  * 19990830 SD.
836                  */
837                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
838                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
839                         la->la_mode &= ~S_ISGID;
840                         la->la_valid |= LA_MODE;
841                 }
842         } else if (la->la_valid & LA_MODE) {
843                 int mode = la->la_mode;
844                 /* chmod */
845                 if (la->la_mode == (umode_t)-1)
846                         mode = tmp_la->la_mode;
847                 la->la_mode =
848                         (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO);
849         }
850
851         /* For the "Size-on-MDS" setattr update, merge coming attributes with
852          * the set in the inode. */
853         if (la->la_valid & LA_SIZE) {
854                 if ((la->la_valid & LA_ATIME) &&
855                     (la->la_atime < tmp_la->la_atime))
856                         la->la_valid &= ~LA_ATIME;
857
858                 if ((la->la_valid & LA_CTIME) &&
859                     (la->la_ctime < tmp_la->la_ctime))
860                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
861         }
862
863         RETURN(rc);
864 }
865
866
867 /* set attr and LOV EA at once, return updated attr */
868 static int mdd_attr_set(const struct lu_context *ctxt,
869                         struct md_object *obj,
870                         const struct md_attr *ma)
871 {
872         struct mdd_object *mdd_obj = md2mdd_obj(obj);
873         struct mdd_device *mdd = mdo2mdd(obj);
874         struct thandle *handle;
875         struct lov_mds_md *lmm = NULL;
876         int  rc = 0, lmm_size = 0, max_size = 0;
877         struct lu_attr *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
878         ENTRY;
879
880         mdd_txn_param_build(ctxt, &MDD_TXN_ATTR_SET);
881         handle = mdd_trans_start(ctxt, mdd);
882         if (IS_ERR(handle))
883                 RETURN(PTR_ERR(handle));
884         /*TODO: add lock here*/
885         /* start a log jounal handle if needed */
886         if (S_ISREG(mdd_object_type(mdd_obj)) &&
887             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
888                 max_size = mdd_lov_mdsize(ctxt, mdd);
889                 OBD_ALLOC(lmm, max_size);
890                 if (lmm == NULL)
891                         GOTO(cleanup, rc = -ENOMEM);
892
893                 rc = mdd_get_md_locked(ctxt, mdd_obj, lmm, &lmm_size,
894                                 MDS_LOV_MD_NAME);
895
896                 if (rc < 0)
897                         GOTO(cleanup, rc);
898         }
899
900         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
901                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
902                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
903
904         *la_copy = ma->ma_attr;
905         mdd_write_lock(ctxt, mdd_obj);
906         rc = mdd_fix_attr(ctxt, mdd_obj, ma, la_copy);
907         mdd_write_unlock(ctxt, mdd_obj);
908         if (rc)
909                 GOTO(cleanup, rc);
910
911         if (la_copy->la_valid & LA_FLAGS) {
912                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
913                                                   handle);
914                 if (rc == 0)
915                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
916         } else if (la_copy->la_valid) {            /* setattr */
917                 rc = mdd_attr_set_internal_locked(ctxt, mdd_obj, la_copy,
918                                                   handle);
919                 /* journal chown/chgrp in llog, just like unlink */
920                 if (rc == 0 && lmm_size){
921                         /*TODO set_attr llog */
922                 }
923         }
924
925         if (rc == 0 && ma->ma_valid & MA_LOV) {
926                 umode_t mode;
927
928                 mode = mdd_object_type(mdd_obj);
929                 if (S_ISREG(mode) || S_ISDIR(mode)) {
930                         /*TODO check permission*/
931                         rc = mdd_lov_set_md(ctxt, NULL, mdd_obj, ma->ma_lmm,
932                                             ma->ma_lmm_size, handle, 1);
933                 }
934
935         }
936 cleanup:
937         mdd_trans_stop(ctxt, mdd, rc, handle);
938         if (rc == 0 && lmm_size) {
939                 /*set obd attr, if needed*/
940                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
941         }
942         if (lmm != NULL) {
943                 OBD_FREE(lmm, max_size);
944         }
945
946         RETURN(rc);
947 }
948
949 int mdd_xattr_set_txn(const struct lu_context *ctxt, struct mdd_object *obj,
950                       const void *buf, int buf_len, const char *name, int fl,
951                       struct thandle *handle)
952 {
953         int  rc;
954         ENTRY;
955
956         mdd_write_lock(ctxt, obj);
957         rc = __mdd_xattr_set(ctxt, obj, buf, buf_len, name, fl, handle);
958         mdd_write_unlock(ctxt, obj);
959
960         RETURN(rc);
961 }
962
963 static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
964                          const void *buf, int buf_len, const char *name,
965                          int fl)
966 {
967         struct mdd_device *mdd = mdo2mdd(obj);
968         struct thandle *handle;
969         int  rc;
970         ENTRY;
971
972         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
973         handle = mdd_trans_start(ctxt, mdd);
974         if (IS_ERR(handle))
975                 RETURN(PTR_ERR(handle));
976
977         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
978                                fl, handle);
979 #ifdef HAVE_SPLIT_SUPPORT
980         if (rc == 0) {
981                 /* very ugly hack, if setting lmv, it means splitting
982                  * sucess, we should return -ERESTART to notify the
983                  * client, so transno for this splitting should be
984                  * zero according to the replay rules. so return -ERESTART
985                  * here let mdt trans stop callback know this.
986                  */
987                  if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
988                         rc = -ERESTART;
989         }
990 #endif
991         mdd_trans_stop(ctxt, mdd, rc, handle);
992
993         RETURN(rc);
994 }
995
996 static int __mdd_xattr_del(const struct lu_context *ctxt,struct mdd_device *mdd,
997                            struct mdd_object *obj,
998                            const char *name, struct thandle *handle)
999 {
1000         struct dt_object *next;
1001
1002         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1003         next = mdd_object_child(obj);
1004         return next->do_ops->do_xattr_del(ctxt, next, name, handle);
1005 }
1006
1007 int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
1008                   const char *name)
1009 {
1010         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1011         struct mdd_device *mdd = mdo2mdd(obj);
1012         struct thandle *handle;
1013         int  rc;
1014         ENTRY;
1015
1016         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1017         handle = mdd_trans_start(ctxt, mdd);
1018         if (IS_ERR(handle))
1019                 RETURN(PTR_ERR(handle));
1020
1021         mdd_write_lock(ctxt, mdd_obj);
1022         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
1023         mdd_write_unlock(ctxt, mdd_obj);
1024
1025         mdd_trans_stop(ctxt, mdd, rc, handle);
1026
1027         RETURN(rc);
1028 }
1029
1030 static int __mdd_index_insert_only(const struct lu_context *ctxt,
1031                                    struct mdd_object *pobj,
1032                                    const struct lu_fid *lf,
1033                                    const char *name, struct thandle *th)
1034 {
1035         int rc;
1036         struct dt_object *next = mdd_object_child(pobj);
1037         ENTRY;
1038
1039         if (dt_try_as_dir(ctxt, next))
1040                 rc = next->do_index_ops->dio_insert(ctxt, next,
1041                                          (struct dt_rec *)lf,
1042                                          (struct dt_key *)name, th);
1043         else
1044                 rc = -ENOTDIR;
1045         RETURN(rc);
1046 }
1047
1048 /* insert new index, add reference if isdir, update times */
1049 static int __mdd_index_insert(const struct lu_context *ctxt,
1050                              struct mdd_object *pobj, const struct lu_fid *lf,
1051                              const char *name, int isdir, struct thandle *th)
1052 {
1053         int rc;
1054         struct dt_object *next = mdd_object_child(pobj);
1055         ENTRY;
1056
1057 #if 0
1058         struct lu_attr   *la = &mdd_ctx_info(ctxt)->mti_la;
1059 #endif
1060
1061         if (dt_try_as_dir(ctxt, next))
1062                 rc = next->do_index_ops->dio_insert(ctxt, next,
1063                                          (struct dt_rec *)lf,
1064                                          (struct dt_key *)name, th);
1065         else
1066                 rc = -ENOTDIR;
1067
1068         if (rc == 0) {
1069                 if (isdir)
1070                         __mdd_ref_add(ctxt, pobj, th);
1071 #if 0
1072                 la->la_valid = LA_MTIME|LA_CTIME;
1073                 la->la_atime = ma->ma_attr.la_atime;
1074                 la->la_ctime = ma->ma_attr.la_ctime;
1075                 rc = mdd_attr_set_internal(ctxt, mdd_obj, la, handle);
1076 #endif
1077         }
1078         return rc;
1079 }
1080
1081 static int __mdd_index_delete(const struct lu_context *ctxt,
1082                               struct mdd_object *pobj, const char *name,
1083                               struct thandle *handle)
1084 {
1085         int rc;
1086         struct dt_object *next = mdd_object_child(pobj);
1087         ENTRY;
1088
1089         if (dt_try_as_dir(ctxt, next))
1090                 rc = next->do_index_ops->dio_delete(ctxt, next,
1091                                         (struct dt_key *)name, handle);
1092         else
1093                 rc = -ENOTDIR;
1094         RETURN(rc);
1095 }
1096
1097 static int mdd_link_sanity_check(const struct lu_context *ctxt,
1098                                  struct mdd_object *tgt_obj,
1099                                  struct mdd_object *src_obj)
1100 {
1101         int rc;
1102
1103         rc = mdd_may_create(ctxt, tgt_obj, NULL);
1104         if (rc)
1105                 RETURN(rc);
1106         if (S_ISDIR(mdd_object_type(src_obj)))
1107                 RETURN(-EPERM);
1108
1109         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1110                 RETURN(-EPERM);
1111
1112         RETURN(rc);
1113 }
1114
1115 static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
1116                     struct md_object *src_obj, const char *name,
1117                     struct md_attr *ma)
1118 {
1119         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1120         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1121         struct mdd_device *mdd = mdo2mdd(src_obj);
1122         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1123         struct thandle *handle;
1124         int rc;
1125         ENTRY;
1126
1127         mdd_txn_param_build(ctxt, &MDD_TXN_LINK);
1128         handle = mdd_trans_start(ctxt, mdd);
1129         if (IS_ERR(handle))
1130                 RETURN(PTR_ERR(handle));
1131
1132         mdd_lock2(ctxt, mdd_tobj, mdd_sobj);
1133
1134         rc = mdd_link_sanity_check(ctxt, mdd_tobj, mdd_sobj);
1135         if (rc)
1136                 GOTO(out, rc);
1137
1138         rc = __mdd_index_insert_only(ctxt, mdd_tobj, mdo2fid(mdd_sobj),
1139                                      name, handle);
1140         if (rc == 0)
1141                 __mdd_ref_add(ctxt, mdd_sobj, handle);
1142
1143         *la_copy = ma->ma_attr;
1144         la_copy->la_valid = LA_CTIME;
1145         rc = mdd_attr_set_internal(ctxt, mdd_sobj, la_copy, handle);
1146         if (rc)
1147                 GOTO(out, rc);
1148
1149         la_copy->la_valid = LA_CTIME | LA_MTIME;
1150         rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1151
1152 out:
1153         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
1154         mdd_trans_stop(ctxt, mdd, rc, handle);
1155         RETURN(rc);
1156 }
1157
1158 /*
1159  * Check that @dir contains no entries except (possibly) dot and dotdot.
1160  *
1161  * Returns:
1162  *
1163  *             0        empty
1164  *    -ENOTEMPTY        not empty
1165  *           -ve        other error
1166  *
1167  */
1168 static int mdd_dir_is_empty(const struct lu_context *ctx,
1169                             struct mdd_object *dir)
1170 {
1171         struct dt_it     *it;
1172         struct dt_object *obj;
1173         struct dt_it_ops *iops;
1174         int result;
1175
1176         obj = mdd_object_child(dir);
1177         iops = &obj->do_index_ops->dio_it;
1178         it = iops->init(ctx, obj, 0);
1179         if (it != NULL) {
1180                 result = iops->get(ctx, it, (const void *)"");
1181                 if (result > 0) {
1182                         int i;
1183                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1184                                 result = iops->next(ctx, it);
1185                         if (result == 0)
1186                                 result = -ENOTEMPTY;
1187                         else if (result == +1)
1188                                 result = 0;
1189                 } else if (result == 0)
1190                         /*
1191                          * Huh? Index contains no zero key?
1192                          */
1193                         result = -EIO;
1194                 
1195                 iops->put(ctx, it);
1196                 iops->fini(ctx, it);
1197         } else
1198                 result = -ENOMEM;
1199         return result;
1200 }
1201
1202 /* return md_attr back,
1203  * if it is last unlink then return lov ea + llog cookie*/
1204 int __mdd_object_kill(const struct lu_context *ctxt,
1205                       struct mdd_object *obj,
1206                       struct md_attr *ma)
1207 {
1208         int rc = 0;
1209         ENTRY;
1210
1211         mdd_set_dead_obj(obj);
1212         if (S_ISREG(mdd_object_type(obj)) && ma->ma_need&MA_LOV) {
1213                 rc = __mdd_lmm_get(ctxt, obj, ma);
1214                 if (ma->ma_valid&MA_LOV && ma->ma_need&MA_COOKIE)
1215                         rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
1216                                             obj, ma);
1217         }
1218         RETURN(rc);
1219 }
1220 /* caller should take a lock before calling */
1221 static int __mdd_finish_unlink(const struct lu_context *ctxt,
1222                                struct mdd_object *obj, struct md_attr *ma,
1223                                struct thandle *th)
1224 {
1225         int rc;
1226         ENTRY;
1227
1228         rc = __mdd_iattr_get(ctxt, obj, ma);
1229         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1230                 if (obj->mod_count == 0) {
1231                         rc = __mdd_object_kill(ctxt, obj, ma);
1232                 } else {
1233                         /* add new orphan */
1234                         rc = __mdd_orphan_add(ctxt, obj, th);
1235                 }
1236         }
1237         RETURN(rc);
1238 }
1239
1240 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
1241                                    struct mdd_object *pobj,
1242                                    struct mdd_object *cobj,
1243                                    struct md_attr *ma)
1244 {
1245         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1246         int rc = 0;
1247         ENTRY;
1248
1249         rc = mdd_may_delete(ctxt, pobj, cobj, S_ISDIR(ma->ma_attr.la_mode));
1250         if (rc)
1251                 RETURN(rc);
1252
1253         if (S_ISDIR(mdd_object_type(cobj)) &&
1254             dt_try_as_dir(ctxt, dt_cobj)) {
1255                 rc = mdd_dir_is_empty(ctxt, cobj);
1256                 if (rc != 0)
1257                         RETURN(rc);
1258         }
1259
1260         RETURN(rc);
1261 }
1262
1263 static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
1264                       struct md_object *cobj, const char *name,
1265                       struct md_attr *ma)
1266 {
1267         struct mdd_device *mdd = mdo2mdd(pobj);
1268         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1269         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1270         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1271         struct thandle    *handle;
1272         int rc;
1273         ENTRY;
1274
1275         mdd_txn_param_build(ctxt, &MDD_TXN_UNLINK);
1276         handle = mdd_trans_start(ctxt, mdd);
1277         if (IS_ERR(handle))
1278                 RETURN(PTR_ERR(handle));
1279
1280         mdd_lock2(ctxt, mdd_pobj, mdd_cobj);
1281
1282         rc = mdd_unlink_sanity_check(ctxt, mdd_pobj, mdd_cobj, ma);
1283         if (rc)
1284                 GOTO(cleanup, rc);
1285
1286
1287         rc = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1288         if (rc)
1289                 GOTO(cleanup, rc);
1290
1291         __mdd_ref_del(ctxt, mdd_cobj, handle);
1292         *la_copy = ma->ma_attr;
1293         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1294                 /* unlink dot */
1295                 __mdd_ref_del(ctxt, mdd_cobj, handle);
1296                 /* unlink dotdot */
1297                 __mdd_ref_del(ctxt, mdd_pobj, handle);
1298         } else {
1299                 la_copy->la_valid = LA_CTIME;
1300                 rc = mdd_attr_set_internal(ctxt, mdd_cobj, la_copy, handle);
1301                 if (rc)
1302                         GOTO(cleanup, rc);
1303         }
1304
1305         la_copy->la_valid = LA_CTIME | LA_MTIME;
1306         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1307         if (rc)
1308                 GOTO(cleanup, rc);
1309
1310         rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma, handle);
1311
1312 cleanup:
1313         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
1314         mdd_trans_stop(ctxt, mdd, rc, handle);
1315         RETURN(rc);
1316 }
1317 /* partial unlink */
1318 static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
1319                        struct md_attr *ma)
1320 {
1321         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1322         struct mdd_device *mdd = mdo2mdd(obj);
1323         struct thandle *handle;
1324         int rc;
1325         ENTRY;
1326
1327         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
1328         handle = mdd_trans_start(ctxt, mdd);
1329         if (IS_ERR(handle))
1330                 RETURN(-ENOMEM);
1331
1332         mdd_write_lock(ctxt, mdd_obj);
1333
1334         rc = mdd_unlink_sanity_check(ctxt, NULL, mdd_obj, ma);
1335         if (rc)
1336                 GOTO(cleanup, rc);
1337
1338         __mdd_ref_del(ctxt, mdd_obj, handle);
1339
1340         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1341                 /* unlink dot */
1342                 __mdd_ref_del(ctxt, mdd_obj, handle);
1343         }
1344
1345         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle);
1346
1347         EXIT;
1348 cleanup:
1349         mdd_write_unlock(ctxt, mdd_obj);
1350         mdd_trans_stop(ctxt, mdd, rc, handle);
1351         return rc;
1352 }
1353
1354 static int mdd_parent_fid(const struct lu_context *ctxt,
1355                           struct mdd_object *obj,
1356                           struct lu_fid *fid)
1357 {
1358         return mdd_lookup(ctxt, &obj->mod_obj, dotdot, fid);
1359 }
1360
1361 /*
1362  * return 1: if lf is the fid of the ancestor of p1;
1363  * return 0: if not;
1364  *
1365  * return -EREMOTE: if remote object is found, in this
1366  * case fid of remote object is saved to @pf;
1367  *
1368  * otherwise: values < 0, errors.
1369  */
1370 static int mdd_is_parent(const struct lu_context *ctxt,
1371                          struct mdd_device *mdd,
1372                          struct mdd_object *p1,
1373                          const struct lu_fid *lf,
1374                          struct lu_fid *pf)
1375 {
1376         struct mdd_object *parent = NULL;
1377         struct lu_fid *pfid;
1378         int rc;
1379         ENTRY;
1380
1381         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1382         pfid = &mdd_ctx_info(ctxt)->mti_fid;
1383
1384         /* Do not lookup ".." in root, they do not exist there. */
1385         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1386                 RETURN(0);
1387
1388         for(;;) {
1389                 rc = mdd_parent_fid(ctxt, p1, pfid);
1390                 if (rc)
1391                         GOTO(out, rc);
1392                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1393                         GOTO(out, rc = 0);
1394                 if (lu_fid_eq(pfid, lf))
1395                         GOTO(out, rc = 1);
1396                 if (parent)
1397                         mdd_object_put(ctxt, parent);
1398                 parent = mdd_object_find(ctxt, mdd, pfid);
1399
1400                 /* cross-ref parent */
1401                 if (parent == NULL) {
1402                         if (pf != NULL)
1403                                 *pf = *pfid;
1404                         GOTO(out, rc = -EREMOTE);
1405                 } else if (IS_ERR(parent))
1406                         GOTO(out, rc = PTR_ERR(parent));
1407                 p1 = parent;
1408         }
1409         EXIT;
1410 out:
1411         if (parent && !IS_ERR(parent))
1412                 mdd_object_put(ctxt, parent);
1413         return rc;
1414 }
1415
1416 static int mdd_rename_lock(const struct lu_context *ctxt,
1417                            struct mdd_device *mdd,
1418                            struct mdd_object *src_pobj,
1419                            struct mdd_object *tgt_pobj)
1420 {
1421         int rc;
1422         ENTRY;
1423
1424         if (src_pobj == tgt_pobj) {
1425                 mdd_write_lock(ctxt, src_pobj);
1426                 RETURN(0);
1427         }
1428
1429         /* compared the parent child relationship of src_p&tgt_p */
1430         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1431                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
1432                 RETURN(0);
1433         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1434                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1435                 RETURN(0);
1436         }
1437
1438         rc = mdd_is_parent(ctxt, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1439         if (rc < 0)
1440                 RETURN(rc);
1441
1442         if (rc == 1) {
1443                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
1444                 RETURN(0);
1445         }
1446
1447         mdd_lock2(ctxt, src_pobj, tgt_pobj);
1448
1449         RETURN(0);
1450 }
1451
1452 static void mdd_rename_unlock(const struct lu_context *ctxt,
1453                               struct mdd_object *src_pobj,
1454                               struct mdd_object *tgt_pobj)
1455 {
1456         mdd_write_unlock(ctxt, src_pobj);
1457         if (src_pobj != tgt_pobj)
1458                 mdd_write_unlock(ctxt, tgt_pobj);
1459 }
1460
1461 static int mdd_rename_sanity_check(const struct lu_context *ctxt,
1462                                    struct mdd_object *src_pobj,
1463                                    struct mdd_object *tgt_pobj,
1464                                    const struct lu_fid *sfid,
1465                                    int src_is_dir,
1466                                    struct mdd_object *tobj)
1467 {
1468         int rc = 0, tgt_is_dir;
1469         ENTRY;
1470
1471         if (mdd_is_dead_obj(src_pobj))
1472                 RETURN(-ENOENT);
1473
1474         if (!tobj) {
1475                 rc = mdd_may_create(ctxt, tgt_pobj, NULL);
1476         } else {
1477                 rc = mdd_may_delete(ctxt, tgt_pobj, tobj, src_is_dir);
1478                 if (rc == 0) {
1479                         tgt_is_dir = S_ISDIR(mdd_object_type(tobj));
1480                         if (tgt_is_dir && mdd_dir_is_empty(ctxt, tobj))
1481                                 rc = -ENOTEMPTY;
1482                 }
1483         }
1484         if (rc)
1485                 RETURN(rc);
1486
1487         RETURN(rc);
1488 }
1489 /* src object can be remote that is why we use only fid and type of object */
1490 static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
1491                       struct md_object *tgt_pobj, const struct lu_fid *lf,
1492                       const char *sname, struct md_object *tobj,
1493                       const char *tname, struct md_attr *ma)
1494 {
1495         struct mdd_device *mdd = mdo2mdd(src_pobj);
1496         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1497         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1498         struct mdd_object *mdd_sobj = NULL;
1499         struct mdd_object *mdd_tobj = NULL;
1500         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1501         struct thandle *handle;
1502         int is_dir;
1503         int rc;
1504         ENTRY;
1505
1506         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1507         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1508         if (ma->ma_attr.la_valid & LA_FLAGS &&
1509             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1510                 RETURN(-EPERM);
1511
1512         if (tobj)
1513                 mdd_tobj = md2mdd_obj(tobj);
1514
1515         /*XXX: shouldn't this check be done under lock below? */
1516         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
1517                                      lf, is_dir, mdd_tobj);
1518         if (rc)
1519                 RETURN(rc);
1520
1521         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
1522         handle = mdd_trans_start(ctxt, mdd);
1523         if (IS_ERR(handle))
1524                 RETURN(PTR_ERR(handle));
1525
1526         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1527         rc = mdd_rename_lock(ctxt, mdd, mdd_spobj, mdd_tpobj);
1528         if (rc)
1529                 GOTO(cleanup_unlocked, rc);
1530
1531         rc = __mdd_index_delete(ctxt, mdd_spobj, sname, handle);
1532         if (rc)
1533                 GOTO(cleanup, rc);
1534
1535         /*if sobj is dir, its parent object nlink should be dec too*/
1536         if (is_dir)
1537                 __mdd_ref_del(ctxt, mdd_spobj, handle);
1538
1539         rc = __mdd_index_delete(ctxt, mdd_tpobj, tname, handle);
1540         /* tobj can be remote one,
1541          * so we do index_delete unconditionally and -ENOENT is allowed */
1542         if (rc != 0 && rc != -ENOENT)
1543                 GOTO(cleanup, rc);
1544
1545         rc = __mdd_index_insert(ctxt, mdd_tpobj, lf, tname, is_dir, handle);
1546         if (rc)
1547                 GOTO(cleanup, rc);
1548
1549         mdd_sobj = mdd_object_find(ctxt, mdd, lf);
1550         *la_copy = ma->ma_attr;
1551         la_copy->la_valid = LA_CTIME;
1552         if (mdd_sobj) {
1553                 /*XXX: how to update ctime for remote sobj? */
1554                 rc = mdd_attr_set_internal_locked(ctxt, mdd_sobj, la_copy, handle);
1555                 if (rc)
1556                         GOTO(cleanup, rc);
1557         }
1558         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1559                 mdd_write_lock(ctxt, mdd_tobj);
1560                 __mdd_ref_del(ctxt, mdd_tobj, handle);
1561                 /* remove dot reference */
1562                 if (is_dir)
1563                         __mdd_ref_del(ctxt, mdd_tobj, handle);
1564
1565                 la_copy->la_valid = LA_CTIME;
1566                 rc = mdd_attr_set_internal(ctxt, mdd_tobj, la_copy, handle);
1567                 if (rc)
1568                         GOTO(cleanup, rc);
1569
1570                 rc = __mdd_finish_unlink(ctxt, mdd_tobj, ma, handle);
1571                 mdd_write_unlock(ctxt, mdd_tobj);
1572                 if (rc)
1573                         GOTO(cleanup, rc);
1574         }
1575
1576         la_copy->la_valid = LA_CTIME | LA_MTIME;
1577         rc = mdd_attr_set_internal(ctxt, mdd_spobj, la_copy, handle);
1578         if (rc)
1579                 GOTO(cleanup, rc);
1580
1581         if (mdd_spobj != mdd_tpobj) {
1582                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1583                 rc = mdd_attr_set_internal(ctxt, mdd_tpobj, la_copy, handle);
1584         }
1585
1586 cleanup:
1587         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
1588 cleanup_unlocked:
1589         mdd_trans_stop(ctxt, mdd, rc, handle);
1590         if (mdd_sobj)
1591                 mdd_object_put(ctxt, mdd_sobj);
1592         RETURN(rc);
1593 }
1594
1595 static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
1596                       const char *name, struct lu_fid* fid)
1597 {
1598         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1599         struct dt_object    *dir = mdd_object_child(mdd_obj);
1600         struct dt_rec       *rec    = (struct dt_rec *)fid;
1601         const struct dt_key *key = (const struct dt_key *)name;
1602         int rc;
1603         ENTRY;
1604
1605         if (mdd_is_dead_obj(mdd_obj))
1606                 RETURN(-ESTALE);
1607         mdd_read_lock(ctxt, mdd_obj);
1608         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(ctxt, dir))
1609                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
1610         else
1611                 rc = -ENOTDIR;
1612         mdd_read_unlock(ctxt, mdd_obj);
1613         RETURN(rc);
1614 }
1615
1616 /*
1617  * returns 1: if fid is ancestor of @mo;
1618  * returns 0: if fid is not a ancestor of @mo;
1619  *
1620  * returns EREMOTE if remote object is found, fid of remote object is saved to
1621  * @fid;
1622  *
1623  * returns < 0: if error
1624  */
1625 static int mdd_is_subdir(const struct lu_context *ctx, struct md_object *mo,
1626                          const struct lu_fid *fid, struct lu_fid *sfid)
1627 {
1628         struct mdd_device *mdd = mdo2mdd(mo);
1629         int rc;
1630         ENTRY;
1631
1632         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
1633                 RETURN(0);
1634
1635         rc = mdd_is_parent(ctx, mdd, md2mdd_obj(mo), fid, sfid);
1636         if (rc == -EREMOTE)
1637                 rc = EREMOTE;
1638
1639         RETURN(rc);
1640 }
1641
1642 static int __mdd_object_initialize(const struct lu_context *ctxt,
1643                                    const struct lu_fid *pfid,
1644                                    struct mdd_object *child,
1645                                    struct md_attr *ma, struct thandle *handle)
1646 {
1647         int rc;
1648         ENTRY;
1649
1650         /* update attributes for child.
1651          * FIXME:
1652          *  (1) the valid bits should be converted between Lustre and Linux;
1653          *  (2) maybe, the child attributes should be set in OSD when creation.
1654          */
1655
1656         rc = mdd_attr_set_internal(ctxt, child, &ma->ma_attr, handle);
1657         if (rc != 0)
1658                 RETURN(rc);
1659
1660         if (S_ISDIR(ma->ma_attr.la_mode)) {
1661                 /* add . and .. for newly created dir */
1662                 __mdd_ref_add(ctxt, child, handle);
1663                 rc = __mdd_index_insert_only(ctxt, child, mdo2fid(child),
1664                                              dot, handle);
1665                 if (rc == 0) {
1666                         rc = __mdd_index_insert_only(ctxt, child, pfid,
1667                                                      dotdot, handle);
1668                         if (rc != 0) {
1669                                 int rc2;
1670
1671                                 rc2 = __mdd_index_delete(ctxt,
1672                                                          child, dot, handle);
1673                                 if (rc2 != 0)
1674                                         CERROR("Failure to cleanup after dotdot"
1675                                                " creation: %d (%d)\n", rc2, rc);
1676                                 else
1677                                         __mdd_ref_del(ctxt, child, handle);
1678                         }
1679                 }
1680         }
1681         RETURN(rc);
1682 }
1683
1684 static int mdd_create_data(const struct lu_context *ctxt,
1685                            struct md_object *pobj, struct md_object *cobj,
1686                            const struct md_create_spec *spec,
1687                            struct md_attr *ma)
1688 {
1689         struct mdd_device *mdd = mdo2mdd(cobj);
1690         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
1691         struct mdd_object *son = md2mdd_obj(cobj);
1692         struct lu_attr    *attr = &ma->ma_attr;
1693         struct lov_mds_md *lmm = NULL;
1694         int                lmm_size = 0;
1695         struct thandle    *handle;
1696         int                rc;
1697         ENTRY;
1698
1699         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
1700                         !(spec->sp_cr_flags & FMODE_WRITE))
1701                 RETURN(0);
1702         rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
1703                             attr);
1704         if (rc)
1705                 RETURN(rc);
1706
1707         mdd_txn_param_build(ctxt, &MDD_TXN_CREATE_DATA);
1708         handle = mdd_trans_start(ctxt, mdd);
1709         if (IS_ERR(handle))
1710                 RETURN(rc = PTR_ERR(handle));
1711
1712         /*XXX: setting the lov ea is not locked
1713          * but setting the attr is locked? */
1714
1715         /* replay creates has objects already */
1716         if (spec->u.sp_ea.no_lov_create) {
1717                 CDEBUG(D_INFO, "we already have lov ea\n");
1718                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son,
1719                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1720                                     spec->u.sp_ea.eadatalen, handle, 0);
1721         } else
1722                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm,
1723                                     lmm_size, handle, 0);
1724
1725         if (rc == 0)
1726                rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1727
1728         /* finish mdd_lov_create() stuff */
1729         mdd_lov_create_finish(ctxt, mdd, rc);
1730         mdd_trans_stop(ctxt, mdd, rc, handle);
1731         if (lmm)
1732                 OBD_FREE(lmm, lmm_size);
1733         RETURN(rc);
1734 }
1735
1736 static int mdd_create_sanity_check(const struct lu_context *ctxt,
1737                                    struct mdd_device *mdd,
1738                                    struct md_object *pobj,
1739                                    const char *name, struct md_attr *ma)
1740 {
1741         struct mdd_thread_info *info = mdd_ctx_info(ctxt);
1742         struct lu_attr    *la        = &info->mti_la;
1743         struct lu_fid     *fid       = &info->mti_fid;
1744         struct mdd_object *obj       = md2mdd_obj(pobj);
1745         int rc;
1746
1747         ENTRY;
1748         /* EEXIST check */
1749         if (mdd_is_dead_obj(obj))
1750                 RETURN(-ENOENT);
1751         rc = mdd_lookup(ctxt, pobj, name, fid);
1752         if (rc != -ENOENT)
1753                 RETURN(rc ? : -EEXIST);
1754
1755         /* sgid check */
1756         mdd_read_lock(ctxt, obj);
1757         rc = __mdd_la_get(ctxt, obj, la);
1758         mdd_read_unlock(ctxt, obj);
1759         if (rc != 0)
1760                 RETURN(rc);
1761
1762         if (la->la_mode & S_ISGID) {
1763                 ma->ma_attr.la_gid = la->la_gid;
1764                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1765                         ma->ma_attr.la_mode |= S_ISGID;
1766                         ma->ma_attr.la_valid |= LA_MODE;
1767                 }
1768         }
1769
1770         switch (ma->ma_attr.la_mode & S_IFMT) {
1771         case S_IFREG:
1772         case S_IFDIR:
1773         case S_IFLNK:
1774         case S_IFCHR:
1775         case S_IFBLK:
1776         case S_IFIFO:
1777         case S_IFSOCK:
1778                 rc = 0;
1779                 break;
1780         default:
1781                 rc = -EINVAL;
1782                 break;
1783         }
1784         RETURN(rc);
1785 }
1786
1787 /*
1788  * Create object and insert it into namespace.
1789  */
1790 static int mdd_create(const struct lu_context *ctxt, struct md_object *pobj,
1791                       const char *name, struct md_object *child,
1792                       const struct md_create_spec *spec,
1793                       struct md_attr* ma)
1794 {
1795         struct mdd_device *mdd = mdo2mdd(pobj);
1796         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1797         struct mdd_object *son = md2mdd_obj(child);
1798         struct lu_attr    *la_copy = &mdd_ctx_info(ctxt)->mti_la_for_fix;
1799         struct lu_attr    *attr = &ma->ma_attr;
1800         struct lov_mds_md *lmm = NULL;
1801         struct thandle    *handle;
1802         int rc, created = 0, inserted = 0, lmm_size = 0;
1803         ENTRY;
1804
1805         /* sanity checks before big job */
1806         rc = mdd_create_sanity_check(ctxt, mdd, pobj, name, ma);
1807         if (rc)
1808                 RETURN(rc);
1809         /* no RPC inside the transaction, so OST objects should be created at
1810          * first */
1811         if (S_ISREG(attr->la_mode)) {
1812                 rc = mdd_lov_create(ctxt, mdd, mdd_pobj, son, &lmm, &lmm_size,
1813                                     spec, attr);
1814                 if (rc)
1815                         RETURN(rc);
1816         }
1817
1818         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
1819         handle = mdd_trans_start(ctxt, mdd);
1820         if (IS_ERR(handle))
1821                 RETURN(PTR_ERR(handle));
1822
1823         mdd_write_lock(ctxt, mdd_pobj);
1824
1825         /*
1826          * XXX check that link can be added to the parent in mkdir case.
1827          */
1828
1829         /*
1830          * Two operations have to be performed:
1831          *
1832          *  - allocation of new object (->do_create()), and
1833          *
1834          *  - insertion into parent index (->dio_insert()).
1835          *
1836          * Due to locking, operation order is not important, when both are
1837          * successful, *but* error handling cases are quite different:
1838          *
1839          *  - if insertion is done first, and following object creation fails,
1840          *  insertion has to be rolled back, but this operation might fail
1841          *  also leaving us with dangling index entry.
1842          *
1843          *  - if creation is done first, is has to be undone if insertion
1844          *  fails, leaving us with leaked space, which is neither good, nor
1845          *  fatal.
1846          *
1847          * It seems that creation-first is simplest solution, but it is
1848          * sub-optimal in the frequent
1849          *
1850          *         $ mkdir foo
1851          *         $ mkdir foo
1852          *
1853          * case, because second mkdir is bound to create object, only to
1854          * destroy it immediately.
1855          *
1856          * Note that local file systems do
1857          *
1858          *     0. lookup -> -EEXIST
1859          *
1860          *     1. create
1861          *
1862          *     2. insert
1863          *
1864          * Maybe we should do the same. For now: creation-first.
1865          */
1866
1867         mdd_write_lock(ctxt, son);
1868         rc = __mdd_object_create(ctxt, son, ma, handle);
1869         if (rc) {
1870                 mdd_write_unlock(ctxt, son);
1871                 GOTO(cleanup, rc);
1872         }
1873
1874         created = 1;
1875
1876         rc = __mdd_object_initialize(ctxt, mdo2fid(mdd_pobj),
1877                                      son, ma, handle);
1878         mdd_write_unlock(ctxt, son);
1879         if (rc)
1880                 /*
1881                  * Object has no links, so it will be destroyed when last
1882                  * reference is released. (XXX not now.)
1883                  */
1884                 GOTO(cleanup, rc);
1885
1886         rc = __mdd_index_insert(ctxt, mdd_pobj, mdo2fid(son),
1887                                 name, S_ISDIR(attr->la_mode), handle);
1888
1889         if (rc)
1890                 GOTO(cleanup, rc);
1891
1892         inserted = 1;
1893         /* replay creates has objects already */
1894         if (spec->u.sp_ea.no_lov_create) {
1895                 CDEBUG(D_INFO, "we already have lov ea\n");
1896                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son,
1897                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1898                                     spec->u.sp_ea.eadatalen, handle, 0);
1899         } else
1900                 rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm,
1901                                     lmm_size, handle, 0);
1902         if (rc) {
1903                 CERROR("error on stripe info copy %d \n", rc);
1904                 GOTO(cleanup, rc);
1905         }
1906
1907         if (S_ISLNK(attr->la_mode)) {
1908                 struct dt_object *dt = mdd_object_child(son);
1909                 const char *target_name = spec->u.sp_symname;
1910                 int sym_len = strlen(target_name);
1911                 loff_t pos = 0;
1912
1913                 rc = dt->do_body_ops->dbo_write(ctxt, dt, target_name,
1914                                                 sym_len, &pos, handle);
1915                 if (rc == sym_len)
1916                         rc = 0;
1917                 else
1918                         rc = -EFAULT;
1919         }
1920
1921         *la_copy = ma->ma_attr;
1922         la_copy->la_valid = LA_CTIME | LA_MTIME;
1923         rc = mdd_attr_set_internal(ctxt, mdd_pobj, la_copy, handle);
1924         if (rc)
1925                 GOTO(cleanup, rc);
1926
1927         /* return attr back */
1928         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
1929 cleanup:
1930         if (rc && created) {
1931                 int rc2 = 0;
1932
1933                 if (inserted) {
1934                         rc2 = __mdd_index_delete(ctxt, mdd_pobj, name, handle);
1935                         if (rc2)
1936                                 CERROR("error can not cleanup destroy %d\n",
1937                                        rc2);
1938                 }
1939                 if (rc2 == 0)
1940                         __mdd_ref_del(ctxt, son, handle);
1941         }
1942         /* finish mdd_lov_create() stuff */
1943         mdd_lov_create_finish(ctxt, mdd, rc);
1944         if (lmm)
1945                 OBD_FREE(lmm, lmm_size);
1946         mdd_write_unlock(ctxt, mdd_pobj);
1947         mdd_trans_stop(ctxt, mdd, rc, handle);
1948         RETURN(rc);
1949 }
1950 /* partial operation */
1951 static int mdd_object_create(const struct lu_context *ctxt,
1952                              struct md_object *obj,
1953                              const struct md_create_spec *spec,
1954                              struct md_attr *ma)
1955 {
1956
1957         struct mdd_device *mdd = mdo2mdd(obj);
1958         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1959         struct thandle *handle;
1960         const struct lu_fid *pfid = spec->u.sp_pfid;
1961         int rc;
1962         ENTRY;
1963
1964         mdd_txn_param_build(ctxt, &MDD_TXN_OBJECT_CREATE);
1965         handle = mdd_trans_start(ctxt, mdd);
1966         if (IS_ERR(handle))
1967                 RETURN(PTR_ERR(handle));
1968
1969         mdd_write_lock(ctxt, mdd_obj);
1970         rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
1971         if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1972                 /* if creating the slave object, set slave EA here */
1973                 rc = __mdd_xattr_set(ctxt, mdd_obj, spec->u.sp_ea.eadata,
1974                                      spec->u.sp_ea.eadatalen, MDS_LMV_MD_NAME,
1975                                      0, handle);
1976                 pfid = spec->u.sp_ea.fid;
1977                 CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
1978                        PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
1979         }
1980
1981         if (rc == 0)
1982                 rc = __mdd_object_initialize(ctxt, pfid, mdd_obj, ma, handle);
1983         mdd_write_unlock(ctxt, mdd_obj);
1984
1985         if (rc == 0)
1986                 rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
1987         mdd_trans_stop(ctxt, mdd, rc, handle);
1988         RETURN(rc);
1989 }
1990 /* partial operation */
1991 static int mdd_name_insert(const struct lu_context *ctxt,
1992                            struct md_object *pobj, const char *name,
1993                            const struct lu_fid *fid, int isdir)
1994 {
1995         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1996         struct thandle *handle;
1997         int rc;
1998         ENTRY;
1999
2000         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_INSERT);
2001         handle = mdd_trans_start(ctxt, mdo2mdd(pobj));
2002         if (IS_ERR(handle))
2003                 RETURN(PTR_ERR(handle));
2004
2005         mdd_write_lock(ctxt, mdd_obj);
2006         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, isdir, handle);
2007         mdd_write_unlock(ctxt, mdd_obj);
2008
2009         mdd_trans_stop(ctxt, mdo2mdd(pobj), rc, handle);
2010         RETURN(rc);
2011 }
2012
2013 static int mdd_name_remove(const struct lu_context *ctxt,
2014                            struct md_object *pobj,
2015                            const char *name)
2016 {
2017         struct mdd_device *mdd = mdo2mdd(pobj);
2018         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2019         struct thandle *handle;
2020         int rc;
2021         ENTRY;
2022
2023         mdd_txn_param_build(ctxt, &MDD_TXN_INDEX_DELETE);
2024         handle = mdd_trans_start(ctxt, mdd);
2025         if (IS_ERR(handle))
2026                 RETURN(PTR_ERR(handle));
2027
2028         mdd_write_lock(ctxt, mdd_obj);
2029
2030         rc = __mdd_index_delete(ctxt, mdd_obj, name, handle);
2031
2032         mdd_write_unlock(ctxt, mdd_obj);
2033
2034         mdd_trans_stop(ctxt, mdd, rc, handle);
2035         RETURN(rc);
2036 }
2037
2038 static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
2039                           struct md_object *tobj, const struct lu_fid *lf,
2040                           const char *name, struct md_attr *ma)
2041 {
2042         struct mdd_device *mdd = mdo2mdd(pobj);
2043         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
2044         struct mdd_object *mdd_tobj = NULL;
2045         struct thandle *handle;
2046         int rc;
2047         ENTRY;
2048
2049         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
2050         handle = mdd_trans_start(ctxt, mdd);
2051         if (IS_ERR(handle))
2052                 RETURN(PTR_ERR(handle));
2053
2054         if (tobj)
2055                 mdd_tobj = md2mdd_obj(tobj);
2056
2057         mdd_lock2(ctxt, mdd_tpobj, mdd_tobj);
2058
2059         /*TODO rename sanity checking*/
2060         if (tobj) {
2061                 rc = __mdd_index_delete(ctxt, mdd_tpobj, name, handle);
2062                 if (rc)
2063                         GOTO(cleanup, rc);
2064         }
2065
2066         rc = __mdd_index_insert_only(ctxt, mdd_tpobj, lf, name, handle);
2067         if (rc)
2068                 GOTO(cleanup, rc);
2069
2070         if (tobj && lu_object_exists(&tobj->mo_lu))
2071                 __mdd_ref_del(ctxt, mdd_tobj, handle);
2072 cleanup:
2073         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
2074         mdd_trans_stop(ctxt, mdd, rc, handle);
2075         RETURN(rc);
2076 }
2077
2078 static int mdd_root_get(const struct lu_context *ctx,
2079                         struct md_device *m, struct lu_fid *f)
2080 {
2081         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2082
2083         ENTRY;
2084         *f = mdd->mdd_root_fid;
2085         RETURN(0);
2086 }
2087
2088 static int mdd_statfs(const struct lu_context *ctx,
2089                       struct md_device *m, struct kstatfs *sfs)
2090 {
2091         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2092         int rc;
2093
2094         ENTRY;
2095
2096         rc = mdd_child_ops(mdd)->dt_statfs(ctx, mdd->mdd_child, sfs);
2097
2098         RETURN(rc);
2099 }
2100
2101 static int mdd_maxsize_get(const struct lu_context *ctx,
2102                            struct md_device *m, int *md_size,
2103                            int *cookie_size)
2104 {
2105         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2106         ENTRY;
2107
2108         *md_size =  mdd_lov_mdsize(ctx, mdd);
2109         *cookie_size = mdd_lov_cookiesize(ctx, mdd);
2110
2111         RETURN(0);
2112 }
2113
2114 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
2115                          struct thandle *handle)
2116 {
2117         struct dt_object *next;
2118
2119         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2120         next = mdd_object_child(obj);
2121         next->do_ops->do_ref_add(ctxt, next, handle);
2122 }
2123
2124 static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
2125 {
2126         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2127         struct mdd_device *mdd = mdo2mdd(obj);
2128         struct thandle *handle;
2129         ENTRY;
2130
2131         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
2132         handle = mdd_trans_start(ctxt, mdd);
2133         if (IS_ERR(handle))
2134                 RETURN(-ENOMEM);
2135
2136         mdd_write_lock(ctxt, mdd_obj);
2137         __mdd_ref_add(ctxt, mdd_obj, handle);
2138         mdd_write_unlock(ctxt, mdd_obj);
2139
2140         mdd_trans_stop(ctxt, mdd, 0, handle);
2141
2142         RETURN(0);
2143 }
2144
2145 static void
2146 __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
2147               struct thandle *handle)
2148 {
2149         struct dt_object *next = mdd_object_child(obj);
2150
2151         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2152
2153         next->do_ops->do_ref_del(ctxt, next, handle);
2154 }
2155
2156 /* do NOT or the MAY_*'s, you'll get the weakest */
2157 static int accmode(struct mdd_object *mdd_obj, int flags)
2158 {
2159         int res = 0;
2160
2161 #if 0
2162         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2163          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2164          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2165          * owner can write to a file even if it is marked readonly to hide
2166          * its brokenness. (bug 5781) */
2167         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2168                 return 0;
2169 #endif
2170         if (flags & FMODE_READ)
2171                 res = MAY_READ;
2172         if (flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
2173                 res |= MAY_WRITE;
2174         if (flags & MDS_FMODE_EXEC)
2175                 res = MAY_EXEC;
2176         return res;
2177 }
2178
2179 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj,
2180                     int flags)
2181 {
2182         int mode = accmode(md2mdd_obj(obj), flags);
2183         int rc = 0;
2184
2185         mdd_write_lock(ctxt, md2mdd_obj(obj));
2186
2187         if (mode & MAY_WRITE) {
2188                 if (mdd_is_immutable(md2mdd_obj(obj)))
2189                         rc = -EACCES;
2190         }
2191
2192         if (rc == 0)
2193                 md2mdd_obj(obj)->mod_count ++;
2194
2195         mdd_write_unlock(ctxt, md2mdd_obj(obj));
2196         return rc;
2197 }
2198
2199 static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
2200                      struct md_attr *ma)
2201 {
2202         int rc;
2203         struct mdd_object *mdd_obj;
2204         struct thandle *handle = NULL;
2205         ENTRY;
2206
2207         mdd_obj = md2mdd_obj(obj);
2208         mdd_txn_param_build(ctxt, &MDD_TXN_MKDIR);
2209         handle = mdd_trans_start(ctxt, mdo2mdd(obj));
2210         if (IS_ERR(handle))
2211                 RETURN(-ENOMEM);
2212
2213         mdd_write_lock(ctxt, mdd_obj);
2214         rc = __mdd_iattr_get(ctxt, mdd_obj, ma);
2215         if (rc == 0 && (-- mdd_obj->mod_count) == 0) {
2216                 if (ma->ma_attr.la_nlink == 0) {
2217                         rc = __mdd_object_kill(ctxt, mdd_obj, ma);
2218                         if (rc == 0)
2219                                 /* let's remove obj from the orphan list */
2220                                 rc = __mdd_orphan_del(ctxt, mdd_obj, handle);
2221                 }
2222         }
2223         mdd_write_unlock(ctxt, mdd_obj);
2224         mdd_trans_stop(ctxt, mdo2mdd(obj), rc, handle);
2225         RETURN(rc);
2226 }
2227
2228 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
2229                         const struct lu_rdpg *rdpg)
2230 {
2231         struct dt_object *next;
2232         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2233         int rc;
2234
2235         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2236         next = mdd_object_child(mdd_obj);
2237
2238         mdd_read_lock(ctxt, mdd_obj);
2239         if (S_ISDIR(mdd_object_type(mdd_obj)) &&
2240             dt_try_as_dir(ctxt, next))
2241                 rc = next->do_ops->do_readpage(ctxt, next, rdpg);
2242         else
2243                 rc = -ENOTDIR;
2244         mdd_read_unlock(ctxt, mdd_obj);
2245         return rc;
2246 }
2247
2248 struct md_device_operations mdd_ops = {
2249         .mdo_statfs         = mdd_statfs,
2250         .mdo_root_get       = mdd_root_get,
2251         .mdo_maxsize_get    = mdd_maxsize_get,
2252 };
2253
2254 static struct md_dir_operations mdd_dir_ops = {
2255         .mdo_is_subdir     = mdd_is_subdir,
2256         .mdo_lookup        = mdd_lookup,
2257         .mdo_create        = mdd_create,
2258         .mdo_rename        = mdd_rename,
2259         .mdo_link          = mdd_link,
2260         .mdo_unlink        = mdd_unlink,
2261         .mdo_name_insert   = mdd_name_insert,
2262         .mdo_name_remove   = mdd_name_remove,
2263         .mdo_rename_tgt    = mdd_rename_tgt,
2264         .mdo_create_data   = mdd_create_data
2265 };
2266
2267
2268 static struct md_object_operations mdd_obj_ops = {
2269         .moo_attr_get      = mdd_attr_get,
2270         .moo_attr_set      = mdd_attr_set,
2271         .moo_xattr_get     = mdd_xattr_get,
2272         .moo_xattr_set     = mdd_xattr_set,
2273         .moo_xattr_list    = mdd_xattr_list,
2274         .moo_xattr_del     = mdd_xattr_del,
2275         .moo_object_create = mdd_object_create,
2276         .moo_ref_add       = mdd_ref_add,
2277         .moo_ref_del       = mdd_ref_del,
2278         .moo_open          = mdd_open,
2279         .moo_close         = mdd_close,
2280         .moo_readpage      = mdd_readpage,
2281         .moo_readlink      = mdd_readlink
2282 };
2283
2284 static struct obd_ops mdd_obd_device_ops = {
2285         .o_owner = THIS_MODULE
2286 };
2287
2288 static struct lu_device *mdd_device_alloc(const struct lu_context *ctx,
2289                                           struct lu_device_type *t,
2290                                           struct lustre_cfg *lcfg)
2291 {
2292         struct lu_device  *l;
2293         struct mdd_device *m;
2294
2295         OBD_ALLOC_PTR(m);
2296         if (m == NULL) {
2297                 l = ERR_PTR(-ENOMEM);
2298         } else {
2299                 md_device_init(&m->mdd_md_dev, t);
2300                 l = mdd2lu_dev(m);
2301                 l->ld_ops = &mdd_lu_ops;
2302                 m->mdd_md_dev.md_ops = &mdd_ops;
2303         }
2304
2305         return l;
2306 }
2307
2308 static void mdd_device_free(const struct lu_context *ctx,
2309                             struct lu_device *lu)
2310 {
2311         struct mdd_device *m = lu2mdd_dev(lu);
2312
2313         LASSERT(atomic_read(&lu->ld_ref) == 0);
2314         md_device_fini(&m->mdd_md_dev);
2315         OBD_FREE_PTR(m);
2316 }
2317
2318 static int mdd_type_init(struct lu_device_type *t)
2319 {
2320         return lu_context_key_register(&mdd_thread_key);
2321 }
2322
2323 static void mdd_type_fini(struct lu_device_type *t)
2324 {
2325         lu_context_key_degister(&mdd_thread_key);
2326 }
2327
2328 static struct lu_device_type_operations mdd_device_type_ops = {
2329         .ldto_init = mdd_type_init,
2330         .ldto_fini = mdd_type_fini,
2331
2332         .ldto_device_alloc = mdd_device_alloc,
2333         .ldto_device_free  = mdd_device_free,
2334
2335         .ldto_device_init    = mdd_device_init,
2336         .ldto_device_fini    = mdd_device_fini
2337 };
2338
2339 static struct lu_device_type mdd_device_type = {
2340         .ldt_tags     = LU_DEVICE_MD,
2341         .ldt_name     = LUSTRE_MDD_NAME,
2342         .ldt_ops      = &mdd_device_type_ops,
2343         .ldt_ctx_tags = LCT_MD_THREAD
2344 };
2345
2346 static void *mdd_key_init(const struct lu_context *ctx,
2347                           struct lu_context_key *key)
2348 {
2349         struct mdd_thread_info *info;
2350
2351         OBD_ALLOC_PTR(info);
2352         if (info == NULL)
2353                 info = ERR_PTR(-ENOMEM);
2354         return info;
2355 }
2356
2357 static void mdd_key_fini(const struct lu_context *ctx,
2358                          struct lu_context_key *key, void *data)
2359 {
2360         struct mdd_thread_info *info = data;
2361         OBD_FREE_PTR(info);
2362 }
2363
2364 static struct lu_context_key mdd_thread_key = {
2365         .lct_tags = LCT_MD_THREAD,
2366         .lct_init = mdd_key_init,
2367         .lct_fini = mdd_key_fini
2368 };
2369
2370 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
2371         { 0 }
2372 };
2373
2374 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
2375         { 0 }
2376 };
2377
2378 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
2379
2380 static int __init mdd_mod_init(void)
2381 {
2382         struct lprocfs_static_vars lvars;
2383         printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
2384         lprocfs_init_vars(mdd, &lvars);
2385         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
2386                                    LUSTRE_MDD_NAME, &mdd_device_type);
2387 }
2388
2389 static void __exit mdd_mod_exit(void)
2390 {
2391         class_unregister_type(LUSTRE_MDD_NAME);
2392 }
2393
2394 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2395 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
2396 MODULE_LICENSE("GPL");
2397
2398 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);