Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_env *env,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_env *env,
51                            struct mdd_device *mdd, int rc,
52                            struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
55                           struct thandle *handle);
56 static void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
57                           struct thandle *handle);
58 static int __mdd_lookup(const struct lu_env *env,
59                         struct md_object *pobj,
60                         const char *name, const struct lu_fid* fid,
61                         int mask, struct md_ucred *uc);
62 static int __mdd_lookup_locked(const struct lu_env *env,
63                                struct md_object *pobj,
64                                const char *name, const struct lu_fid* fid,
65                                int mask, struct md_ucred *uc);
66 static int mdd_exec_permission_lite(const struct lu_env *env,
67                                     struct mdd_object *obj,
68                                     struct md_ucred *uc);
69 static int __mdd_permission_internal(const struct lu_env *env,
70                                      struct mdd_object *obj,
71                                      int mask, int getattr,
72                                      struct md_ucred *uc);
73
74 static struct md_object_operations mdd_obj_ops;
75 static struct md_dir_operations    mdd_dir_ops;
76 static struct lu_object_operations mdd_lu_obj_ops;
77
78 static struct lu_context_key       mdd_thread_key;
79
80 static const char *mdd_root_dir_name = "root";
81 static const char dot[] = ".";
82 static const char dotdot[] = "..";
83
84 enum mdd_txn_op {
85         MDD_TXN_OBJECT_DESTROY_OP,
86         MDD_TXN_OBJECT_CREATE_OP,
87         MDD_TXN_ATTR_SET_OP,
88         MDD_TXN_XATTR_SET_OP,
89         MDD_TXN_INDEX_INSERT_OP,
90         MDD_TXN_INDEX_DELETE_OP,
91         MDD_TXN_LINK_OP,
92         MDD_TXN_UNLINK_OP,
93         MDD_TXN_RENAME_OP,
94         MDD_TXN_CREATE_DATA_OP,
95         MDD_TXN_MKDIR_OP
96 };
97
98 struct mdd_txn_op_descr {
99         enum mdd_txn_op mod_op;
100         unsigned int    mod_credits;
101 };
102
103 /* Calculate the credits of each transaction here */
104 /* Note: we did not count into QUOTA here, If we mount with --data_journal
105  * we may need more*/
106 enum {
107 /* Insert/Delete IAM
108  * EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) + EXT3_SINGLEDATA_TRANS_BLOCKS 8
109  * XXX Note: maybe iam need more,since iam have more level than Ext3 htree
110  */
111         INSERT_IAM_CREDITS  = 16,
112
113 /* Insert/Delete Oi
114  * same as IAM insert/delete 16
115  * */
116         INSERT_OI_CREDITS = 16,
117
118 /* Create a object
119  * Same as create object in Ext3 filesystem, but did not count QUOTA i
120  * EXT3_DATA_TRANS_BLOCKS(12) + INDEX_EXTRA_BLOCKS(8) +
121  * 3(inode bits,groups, GDT)*/
122         CREATE_OBJECT_CREDITS = 23,
123
124 /* XATTR_SET
125  * SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS
126  * XXX Note: in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
127  * also counted in. Do not know why? */
128         XATTR_SET_CREDITS = 12,
129
130  /* A log rec need EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
131  *                        EXT3_SINGLEDATA_TRANS_BLOCKS(8))
132  */
133         LOG_REC_CREDIT = 16
134 };
135
136 /* XXX we should know the ost count to calculate the llog */
137 #define DEFAULT_LSM_COUNT 4  /* FIXME later */
138 enum {
139         MDD_TXN_OBJECT_DESTROY_CREDITS = 20,
140         /* OBJECT CREATE :OI_INSERT + CREATE */
141         MDD_TXN_OBJECT_CREATE_CREDITS  = (INSERT_OI_CREDITS + \
142                                         CREATE_OBJECT_CREDITS),
143         /* ATTR SET: XATTR_SET + ATTR set(3)*/
144         MDD_TXN_ATTR_SET_CREDITS       = (XATTR_SET_CREDITS + 3),
145
146         MDD_TXN_XATTR_SET_CREDITS      = XATTR_SET_CREDITS,
147
148         MDD_TXN_INDEX_INSERT_CREDITS   = INSERT_IAM_CREDITS,
149         MDD_TXN_INDEX_DELETE_CREDITS   = INSERT_IAM_CREDITS,
150         MDD_TXN_LINK_CREDITS           = INSERT_IAM_CREDITS,
151
152 /*
153  * UNLINK CREDITS
154  * IAM_INSERT_CREDITS + UNLINK log
155  * Unlink log = ((EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
156  *                        EXT3_SINGLEDATA_TRANS_BLOCKS(8)) * lsm stripe count
157  * XXX we should know the ost count to calculate the llog
158  */
159         MDD_TXN_UNLINK_CREDITS      = (INSERT_IAM_CREDITS +
160                                        LOG_REC_CREDIT*DEFAULT_LSM_COUNT),
161 /*
162  * RENAME CREDITS
163  * 2 IAM_INSERT + 1 IAM_DELETE + UNLINK LOG
164  */
165         MDD_TXN_RENAME_CREDITS      = (3 * INSERT_IAM_CREDITS + \
166                                        LOG_REC_CREDIT * DEFAULT_LSM_COUNT),
167 /* CREATE_DATA CREDITS
168  * SET_XATTR
169  * */
170         MDD_TXN_CREATE_DATA_CREDITS = XATTR_SET_CREDITS,
171 /* CREATE
172  * IAM_INSERT + OI_INSERT + CREATE_OBJECT_CREDITS
173  * SET_MD CREDITS is already counted in CREATE_OBJECT CREDITS */
174         MDD_TXN_MKDIR_CREDITS       =  (INSERT_IAM_CREDITS + INSERT_OI_CREDITS \
175                                        + CREATE_OBJECT_CREDITS)
176 };
177
178 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
179 static const struct mdd_txn_op_descr opname = { \
180         .mod_op      = opname ## _OP,           \
181         .mod_credits = opname ## _CREDITS,      \
182 }
183
184 /*
185  * number of blocks to reserve for particular operations. Should be function
186  * of ... something. Stub for now.
187  */
188 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY);
189 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE);
190 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET);
191 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET);
192 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT);
193 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE);
194 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK);
195 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK);
196 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME);
197 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA);
198 DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR);
199
200 static void mdd_txn_param_build(const struct lu_env *env,
201                                 const struct mdd_txn_op_descr *opd)
202 {
203         mdd_env_info(env)->mti_param.tp_credits = opd->mod_credits;
204 }
205
206 #define mdd_get_group_info(group_info) do {             \
207         atomic_inc(&(group_info)->usage);               \
208 } while (0)
209
210 #define mdd_put_group_info(group_info) do {             \
211         if (atomic_dec_and_test(&(group_info)->usage))  \
212                 groups_free(group_info);                \
213 } while (0)
214
215 #define MDD_NGROUPS_PER_BLOCK       ((int)(CFS_PAGE_SIZE / sizeof(gid_t)))
216
217 #define MDD_GROUP_AT(gi, i) \
218     ((gi)->blocks[(i) / MDD_NGROUPS_PER_BLOCK][(i) % MDD_NGROUPS_PER_BLOCK])
219
220 /* groups_search() is copied from linux kernel! */
221 /* a simple bsearch */
222 static int mdd_groups_search(struct group_info *group_info, gid_t grp)
223 {
224         int left, right;
225
226         if (!group_info)
227                 return 0;
228
229         left = 0;
230         right = group_info->ngroups;
231         while (left < right) {
232                 int mid = (left + right) / 2;
233                 int cmp = grp - MDD_GROUP_AT(group_info, mid);
234
235                 if (cmp > 0)
236                         left = mid + 1;
237                 else if (cmp < 0)
238                         right = mid;
239                 else
240                         return 1;
241         }
242         return 0;
243 }
244
245 static int mdd_in_group_p(struct md_ucred *uc, gid_t grp)
246 {
247         int rc = 1;
248
249         if (grp != uc->mu_fsgid) {
250                 struct group_info *group_info = NULL;
251
252                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
253                         if ((grp == uc->mu_suppgids[0]) ||
254                             (grp == uc->mu_suppgids[1]))
255                                 return 1;
256
257                 if (uc->mu_ginfo)
258                         group_info = uc->mu_ginfo;
259                 else if (uc->mu_identity)
260                         group_info = uc->mu_identity->mi_ginfo;
261
262                 if (!group_info)
263                         return 0;
264
265                 mdd_get_group_info(group_info);
266                 rc = mdd_groups_search(group_info, grp);
267                 mdd_put_group_info(group_info);
268         }
269         return rc;
270 }
271
272 static inline int mdd_permission_internal(const struct lu_env *env,
273                                           struct mdd_object *obj, int mask,
274                                           struct md_ucred *uc)
275 {
276         return __mdd_permission_internal(env, obj, mask, 1, uc);
277 }
278
279 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
280 {
281         struct mdd_thread_info *info;
282
283         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
284         LASSERT(info != NULL);
285         return info;
286 }
287
288 static struct lu_object *mdd_object_alloc(const struct lu_env *env,
289                                           const struct lu_object_header *hdr,
290                                           struct lu_device *d)
291 {
292         struct mdd_object *mdd_obj;
293
294         OBD_ALLOC_PTR(mdd_obj);
295         if (mdd_obj != NULL) {
296                 struct lu_object *o;
297                 
298                 o = mdd2lu_obj(mdd_obj);
299                 lu_object_init(o, NULL, d);
300                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
301                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
302                 mdd_obj->mod_count = 0;
303                 o->lo_ops = &mdd_lu_obj_ops;
304                 return o;
305         } else {
306                 return NULL;
307         }
308 }
309
310 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
311 {
312         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
313         struct lu_object  *below;
314         struct lu_device  *under;
315         ENTRY;
316
317         under = &d->mdd_child->dd_lu_dev;
318         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
319
320         if (below == NULL)
321                 RETURN(-ENOMEM);
322
323         lu_object_add(o, below);
324         RETURN(0);
325 }
326
327 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj);
328
329 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
330 {
331         if (lu_object_exists(o))
332                 return mdd_get_flags(env, lu2mdd_obj(o));
333         else
334                 return 0;
335 }
336
337 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
338 {
339         struct mdd_object *mdd = lu2mdd_obj(o);
340         
341         lu_object_fini(o);
342         OBD_FREE_PTR(mdd);
343 }
344
345 static int mdd_object_print(const struct lu_env *env, void *cookie,
346                             lu_printer_t p, const struct lu_object *o)
347 {
348         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
349 }
350
351 /* orphan handling is here */
352 static void mdd_object_delete(const struct lu_env *env,
353                                struct lu_object *o)
354 {
355         struct mdd_object *mdd_obj = lu2mdd_obj(o);
356         struct thandle *handle = NULL;
357         ENTRY;
358
359         if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
360                 return;
361
362         if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
363                 mdd_txn_param_build(env, &MDD_TXN_MKDIR);
364                 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
365                 if (IS_ERR(handle))
366                         CERROR("Cannot get thandle\n");
367                 else {
368                         mdd_write_lock(env, mdd_obj);
369                         /* let's remove obj from the orphan list */
370                         __mdd_orphan_del(env, mdd_obj, handle);
371                         mdd_write_unlock(env, mdd_obj);
372                         mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
373                                        0, handle);
374                 }
375         }
376 }
377
378 static struct lu_object_operations mdd_lu_obj_ops = {
379         .loo_object_init    = mdd_object_init,
380         .loo_object_start   = mdd_object_start,
381         .loo_object_free    = mdd_object_free,
382         .loo_object_print   = mdd_object_print,
383         .loo_object_delete  = mdd_object_delete
384 };
385
386 struct mdd_object *mdd_object_find(const struct lu_env *env,
387                                    struct mdd_device *d,
388                                    const struct lu_fid *f)
389 {
390         struct lu_object *o, *lo;
391         struct mdd_object *m;
392         ENTRY;
393
394         o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA);
395         if (IS_ERR(o))
396                 m = (struct mdd_object *)o;
397         else {
398                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
399                 /* remote object can't be located and should be put then */
400                 if (lo == NULL)
401                         lu_object_put(env, o);
402                 m = lu2mdd_obj(lo);
403         }
404         RETURN(m);
405 }
406
407 static inline int mdd_is_immutable(struct mdd_object *obj)
408 {
409         return obj->mod_flags & IMMUTE_OBJ;
410 }
411
412 static inline int mdd_is_append(struct mdd_object *obj)
413 {
414         return obj->mod_flags & APPEND_OBJ;
415 }
416
417 static inline void mdd_set_dead_obj(struct mdd_object *obj)
418 {
419         if (obj)
420                 obj->mod_flags |= DEAD_OBJ;
421 }
422
423 static inline int mdd_is_dead_obj(struct mdd_object *obj)
424 {
425         return obj && obj->mod_flags & DEAD_OBJ;
426 }
427
428 /*Check whether it may create the cobj under the pobj*/
429 static int mdd_may_create(const struct lu_env *env,
430                           struct mdd_object *pobj, struct mdd_object *cobj,
431                           int need_check, struct md_ucred *uc)
432 {
433         int rc = 0;
434         ENTRY;
435
436         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
437                 RETURN(-EEXIST);
438
439         if (mdd_is_dead_obj(pobj))
440                 RETURN(-ENOENT);
441
442         /*check pobj may create or not*/
443         if (need_check)
444                 rc = mdd_permission_internal(env, pobj,
445                                              MAY_WRITE | MAY_EXEC, uc);
446
447         RETURN(rc);
448 }
449
450 static inline int __mdd_la_get(const struct lu_env *env,
451                                struct mdd_object *obj, struct lu_attr *la)
452 {
453         struct dt_object  *next = mdd_object_child(obj);
454         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
455         return next->do_ops->do_attr_get(env, next, la);
456 }
457
458 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
459 {
460         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
461
462         if (flags & LUSTRE_APPEND_FL)
463                 obj->mod_flags |= APPEND_OBJ;
464
465         if (flags & LUSTRE_IMMUTABLE_FL)
466                 obj->mod_flags |= IMMUTE_OBJ;
467 }
468
469 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
470 {
471         struct lu_attr *la = &mdd_env_info(env)->mti_la;
472         int rc;
473
474         ENTRY;
475         mdd_read_lock(env, obj);
476         rc = __mdd_la_get(env, obj, la);
477         mdd_read_unlock(env, obj);
478         if (rc == 0)
479                 mdd_flags_xlate(obj, la->la_flags);
480         RETURN(rc);
481 }
482
483 #define mdd_cap_t(x) (x)
484
485 #define MDD_CAP_TO_MASK(x) (1 << (x))
486
487 #define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
488
489 /* capable() is copied from linux kernel! */
490 static inline int mdd_capable(struct md_ucred *uc, int cap)
491 {
492         if (mdd_cap_raised(uc->mu_cap, cap))
493                 return 1;
494         return 0;
495 }
496
497 /*
498  * It's inline, so penalty for filesystems that don't use sticky bit is
499  * minimal.
500  */
501 static inline int mdd_is_sticky(const struct lu_env *env,
502                                 struct mdd_object *pobj,
503                                 struct mdd_object *cobj,
504                                 struct md_ucred *uc)
505 {
506         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
507         int rc;
508
509         rc = __mdd_la_get(env, cobj, tmp_la);
510         if (rc) {
511                 return rc;
512         } else if (tmp_la->la_uid == uc->mu_fsuid) {
513                 return 0;
514         } else {
515                 rc = __mdd_la_get(env, pobj, tmp_la);
516                 if (rc)
517                         return rc;
518                 else if (!(tmp_la->la_mode & S_ISVTX))
519                         return 0;
520                 else if (tmp_la->la_uid == uc->mu_fsuid)
521                         return 0;
522                 else
523                         return !mdd_capable(uc, CAP_FOWNER);
524         }
525 }
526
527 /*Check whether it may delete the cobj under the pobj*/
528 static int mdd_may_delete(const struct lu_env *env,
529                           struct mdd_object *pobj, struct mdd_object *cobj,
530                           int is_dir, int need_check, struct md_ucred *uc)
531 {
532         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
533         int rc = 0;
534         ENTRY;
535
536         LASSERT(cobj);
537
538         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
539                 RETURN(-ENOENT);
540
541         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
542                 RETURN(-EPERM);
543
544         if (is_dir) {
545                 if (!S_ISDIR(mdd_object_type(cobj)))
546                         RETURN(-ENOTDIR);
547
548                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
549                         RETURN(-EBUSY);
550
551         } else if (S_ISDIR(mdd_object_type(cobj))) {
552                         RETURN(-EISDIR);
553         }
554
555         if (pobj) {
556                 if (mdd_is_dead_obj(pobj))
557                         RETURN(-ENOENT);
558
559                 if (mdd_is_sticky(env, pobj, cobj, uc))
560                         RETURN(-EPERM);
561
562                 if (need_check)
563                         rc = mdd_permission_internal(env, pobj,
564                                                      MAY_WRITE | MAY_EXEC, uc);
565         }
566         RETURN(rc);
567 }
568
569 /* get only inode attributes */
570 static int __mdd_iattr_get(const struct lu_env *env,
571                            struct mdd_object *mdd_obj, struct md_attr *ma)
572 {
573         int rc = 0;
574         ENTRY;
575
576         rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr);
577         if (rc == 0)
578                 ma->ma_valid = MA_INODE;
579         RETURN(rc);
580 }
581
582 /* get lov EA only */
583 static int __mdd_lmm_get(const struct lu_env *env,
584                          struct mdd_object *mdd_obj, struct md_attr *ma)
585 {
586         int rc;
587         ENTRY;
588
589         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
590         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
591                         MDS_LOV_MD_NAME);
592         if (rc > 0) {
593                 ma->ma_valid |= MA_LOV;
594                 rc = 0;
595         }
596         RETURN(rc);
597 }
598
599 /* get lmv EA only*/
600 static int __mdd_lmv_get(const struct lu_env *env,
601                          struct mdd_object *mdd_obj, struct md_attr *ma)
602 {
603         int rc;
604
605         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
606                         MDS_LMV_MD_NAME);
607         if (rc > 0) {
608                 ma->ma_valid |= MA_LMV;
609                 rc = 0;
610         }
611         RETURN(rc);
612 }
613
614 static int mdd_attr_get_internal(const struct lu_env *env,
615                                  struct mdd_object *mdd_obj,
616                                  struct md_attr *ma)
617 {
618         int rc = 0;
619         ENTRY;
620
621         if (ma->ma_need & MA_INODE)
622                 rc = __mdd_iattr_get(env, mdd_obj, ma);
623
624         if (rc == 0 && ma->ma_need & MA_LOV) {
625                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
626                     S_ISDIR(mdd_object_type(mdd_obj)))
627                         rc = __mdd_lmm_get(env, mdd_obj, ma);
628         }
629         if (rc == 0 && ma->ma_need & MA_LMV) {
630                 if (S_ISDIR(mdd_object_type(mdd_obj)))
631                         rc = __mdd_lmv_get(env, mdd_obj, ma);
632         }
633         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
634                         rc, ma->ma_valid);
635         RETURN(rc);
636 }
637
638 static inline int mdd_attr_get_internal_locked(const struct lu_env *env,
639                                                struct mdd_object *mdd_obj,
640                                                struct md_attr *ma)
641 {
642         int rc;
643         mdd_read_lock(env, mdd_obj);
644         rc = mdd_attr_get_internal(env, mdd_obj, ma);
645         mdd_read_unlock(env, mdd_obj);
646         return rc;
647 }
648
649 /*
650  * No permission check is needed.
651  */
652 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
653                         struct md_attr *ma, struct md_ucred *uc)
654 {
655         struct mdd_object *mdd_obj = md2mdd_obj(obj);
656         int                rc;
657
658         ENTRY;
659         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
660         RETURN(rc);
661 }
662
663 /*
664  * No permission check is needed.
665  */
666 static int mdd_xattr_get(const struct lu_env *env,
667                          struct md_object *obj, void *buf, int buf_len,
668                          const char *name, struct md_ucred *uc)
669 {
670         struct mdd_object *mdd_obj = md2mdd_obj(obj);
671         struct dt_object  *next;
672         int rc;
673
674         ENTRY;
675
676         LASSERT(lu_object_exists(&obj->mo_lu));
677
678         next = mdd_object_child(mdd_obj);
679         mdd_read_lock(env, mdd_obj);
680         rc = next->do_ops->do_xattr_get(env, next, buf, buf_len, name);
681         mdd_read_unlock(env, mdd_obj);
682
683         RETURN(rc);
684 }
685
686 /*
687  * Permission check is done when open,
688  * no need check again.
689  */
690 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
691                         void *buf, int buf_len, struct md_ucred *uc)
692 {
693         struct mdd_object *mdd_obj = md2mdd_obj(obj);
694         struct dt_object  *next;
695         loff_t             pos = 0;
696         int                rc;
697         ENTRY;
698
699         LASSERT(lu_object_exists(&obj->mo_lu));
700
701         next = mdd_object_child(mdd_obj);
702         mdd_read_lock(env, mdd_obj);
703         rc = next->do_body_ops->dbo_read(env, next, buf, buf_len, &pos);
704         mdd_read_unlock(env, mdd_obj);
705         RETURN(rc);
706 }
707
708 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
709                           void *buf, int buf_len, struct md_ucred *uc)
710 {
711         struct mdd_object *mdd_obj = md2mdd_obj(obj);
712         struct dt_object  *next;
713         int rc;
714
715         ENTRY;
716
717         LASSERT(lu_object_exists(&obj->mo_lu));
718
719         next = mdd_object_child(mdd_obj);
720         mdd_read_lock(env, mdd_obj);
721         rc = next->do_ops->do_xattr_list(env, next, buf, buf_len);
722         mdd_read_unlock(env, mdd_obj);
723
724         RETURN(rc);
725 }
726
727 static int mdd_txn_start_cb(const struct lu_env *env,
728                             struct txn_param *param, void *cookie)
729 {
730         return 0;
731 }
732
733 static int mdd_txn_stop_cb(const struct lu_env *env,
734                            struct thandle *txn, void *cookie)
735 {
736         struct mdd_device *mdd = cookie;
737         struct obd_device *obd = mdd2obd_dev(mdd);
738
739         LASSERT(obd);
740         return mds_lov_write_objids(obd);
741 }
742
743 static int mdd_txn_commit_cb(const struct lu_env *env,
744                              struct thandle *txn, void *cookie)
745 {
746         return 0;
747 }
748
749 static int mdd_device_init(const struct lu_env *env,
750                            struct lu_device *d, struct lu_device *next)
751 {
752         struct mdd_device *mdd = lu2mdd_dev(d);
753         struct dt_device  *dt;
754         int rc = 0;
755         ENTRY;
756
757         mdd->mdd_child = lu2dt_dev(next);
758
759         dt = mdd->mdd_child;
760         /* prepare transactions callbacks */
761         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
762         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
763         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
764         mdd->mdd_txn_cb.dtc_cookie = mdd;
765
766         RETURN(rc);
767 }
768
769 static struct lu_device *mdd_device_fini(const struct lu_env *env,
770                                          struct lu_device *d)
771 {
772         struct mdd_device *mdd = lu2mdd_dev(d);
773         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
774
775         return next;
776 }
777
778 static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
779 {
780         int rc;
781         struct dt_object *root;
782         ENTRY;
783
784         dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
785         root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
786                              &mdd->mdd_root_fid);
787         if (!IS_ERR(root)) {
788                 LASSERT(root != NULL);
789                 lu_object_put(env, &root->do_lu);
790                 rc = orph_index_init(env, mdd);
791         } else
792                 rc = PTR_ERR(root);
793
794         RETURN(rc);
795 }
796
797 static void mdd_device_shutdown(const struct lu_env *env,
798                                 struct mdd_device *m)
799 {
800         dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
801         if (m->mdd_obd_dev)
802                 mdd_fini_obd(env, m);
803         orph_index_fini(env, m);
804 }
805
806 static int mdd_process_config(const struct lu_env *env,
807                               struct lu_device *d, struct lustre_cfg *cfg)
808 {
809         struct mdd_device *m    = lu2mdd_dev(d);
810         struct dt_device  *dt   = m->mdd_child;
811         struct lu_device  *next = &dt->dd_lu_dev;
812         int rc;
813         ENTRY;
814
815         switch (cfg->lcfg_command) {
816         case LCFG_SETUP:
817                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
818                 if (rc)
819                         GOTO(out, rc);
820                 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
821
822                 rc = mdd_init_obd(env, m, cfg);
823                 if (rc) {
824                         CERROR("lov init error %d \n", rc);
825                         GOTO(out, rc);
826                 }
827                 rc = mdd_mount(env, m);
828                 if (rc)
829                         GOTO(out, rc);
830                 break;
831         case LCFG_CLEANUP:
832                 mdd_device_shutdown(env, m);
833         default:
834                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
835                 break;
836         }
837 out:
838         RETURN(rc);
839 }
840
841 static int mdd_recovery_complete(const struct lu_env *env,
842                                  struct lu_device *d)
843 {
844         struct mdd_device *mdd = lu2mdd_dev(d);
845         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
846         struct obd_device *obd = mdd2obd_dev(mdd);
847         int rc;
848         ENTRY;
849         /* TODO:
850         rc = mdd_lov_set_nextid(env, mdd);
851         if (rc) {
852                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
853                        obd->obd_name, rc);
854                 GOTO(out, rc);
855         }
856         rc = mdd_cleanup_unlink_llog(env, mdd);
857
858         obd_notify(obd->u.mds.mds_osc_obd, NULL,
859                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
860                    OBD_NOTIFY_SYNC, NULL);
861         */
862         LASSERT(mdd);
863         LASSERT(obd);
864
865         obd->obd_recovering = 0;
866         obd->obd_type->typ_dt_ops->o_postrecov(obd);
867         /* TODO: orphans handling */
868         __mdd_orphan_cleanup(env, mdd);
869         rc = next->ld_ops->ldo_recovery_complete(env, next);
870
871         RETURN(rc);
872 }
873
874 struct lu_device_operations mdd_lu_ops = {
875         .ldo_object_alloc      = mdd_object_alloc,
876         .ldo_process_config    = mdd_process_config,
877         .ldo_recovery_complete = mdd_recovery_complete
878 };
879
880 void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
881 {
882         struct dt_object  *next = mdd_object_child(obj);
883
884         next->do_ops->do_write_lock(env, next);
885 }
886
887 void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
888 {
889         struct dt_object  *next = mdd_object_child(obj);
890
891         next->do_ops->do_read_lock(env, next);
892 }
893
894 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
895 {
896         struct dt_object  *next = mdd_object_child(obj);
897
898         next->do_ops->do_write_unlock(env, next);
899 }
900
901 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
902 {
903         struct dt_object  *next = mdd_object_child(obj);
904
905         next->do_ops->do_read_unlock(env, next);
906 }
907
908 static void mdd_lock2(const struct lu_env *env,
909                       struct mdd_object *o0, struct mdd_object *o1)
910 {
911         mdd_write_lock(env, o0);
912         mdd_write_lock(env, o1);
913 }
914
915 static void mdd_unlock2(const struct lu_env *env,
916                         struct mdd_object *o0, struct mdd_object *o1)
917 {
918         mdd_write_unlock(env, o1);
919         mdd_write_unlock(env, o0);
920 }
921
922 static struct thandle* mdd_trans_start(const struct lu_env *env,
923                                        struct mdd_device *mdd)
924 {
925         struct txn_param *p = &mdd_env_info(env)->mti_param;
926
927         return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
928 }
929
930 static void mdd_trans_stop(const struct lu_env *env,
931                            struct mdd_device *mdd, int result,
932                            struct thandle *handle)
933 {
934         handle->th_result = result;
935         mdd_child_ops(mdd)->dt_trans_stop(env, handle);
936 }
937
938 static int __mdd_object_create(const struct lu_env *env,
939                                struct mdd_object *obj, struct md_attr *ma,
940                                struct thandle *handle)
941 {
942         struct dt_object *next;
943         struct lu_attr *attr = &ma->ma_attr;
944         int rc;
945         ENTRY;
946
947         if (!lu_object_exists(mdd2lu_obj(obj))) {
948                 next = mdd_object_child(obj);
949                 rc = next->do_ops->do_create(env, next, attr, handle);
950         } else
951                 rc = -EEXIST;
952
953         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
954
955         RETURN(rc);
956 }
957
958 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
959                           const struct lu_attr *attr, struct thandle *handle)
960 {
961         struct dt_object *next;
962
963         LASSERT(lu_object_exists(mdd2lu_obj(o)));
964         next = mdd_object_child(o);
965         return next->do_ops->do_attr_set(env, next, attr, handle);
966 }
967
968 int mdd_attr_set_internal_locked(const struct lu_env *env,
969                                  struct mdd_object *o,
970                                  const struct lu_attr *attr,
971                                  struct thandle *handle)
972 {
973         int rc;
974         mdd_write_lock(env, o);
975         rc = mdd_attr_set_internal(env, o, attr, handle);
976         mdd_write_unlock(env, o);
977         return rc;
978 }
979
980 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
981                            const void *buf, int buf_len, const char *name,
982                            int fl, struct thandle *handle)
983 {
984         struct dt_object *next;
985         int rc = 0;
986         ENTRY;
987
988         LASSERT(lu_object_exists(mdd2lu_obj(o)));
989         next = mdd_object_child(o);
990         if (buf && buf_len > 0) {
991                 rc = next->do_ops->do_xattr_set(env, next, buf, buf_len, name,
992                                                 0, handle);
993         }else if (buf == NULL && buf_len == 0) {
994                 rc = next->do_ops->do_xattr_del(env, next, name, handle);
995         }
996         RETURN(rc);
997 }
998
999 /* this gives the same functionality as the code between
1000  * sys_chmod and inode_setattr
1001  * chown_common and inode_setattr
1002  * utimes and inode_setattr
1003  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1004  * and port to
1005  */
1006 int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1007                  struct lu_attr *la, struct md_ucred *uc)
1008 {
1009         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1010         time_t            now        = CURRENT_SECONDS;
1011         int               rc;
1012         ENTRY;
1013
1014         if (!la->la_valid)
1015                 RETURN(0);
1016
1017         /* Do not permit change file type */
1018         if (la->la_valid & LA_TYPE)
1019                 RETURN(-EPERM);
1020
1021         /* They should not be processed by setattr */
1022         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1023                 RETURN(-EPERM);
1024
1025         rc = __mdd_la_get(env, obj, tmp_la);
1026         if (rc)
1027                 RETURN(rc);
1028
1029         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
1030
1031                 /*
1032                  * If only change flags of the object, we should
1033                  * let it pass, but also need capability check
1034                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
1035                  * fix it, when implement capable in mds
1036                  */
1037                 if (la->la_valid & ~LA_FLAGS)
1038                         RETURN(-EPERM);
1039
1040                 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
1041                         RETURN(-EPERM);
1042
1043                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1044                     !mdd_capable(uc, CAP_FOWNER))
1045                         RETURN(-EPERM);
1046
1047                 /*
1048                  * According to Ext3 implementation on this, the
1049                  * Ctime will be changed, but not clear why?
1050                  */
1051                 la->la_ctime = now;
1052                 la->la_valid |= LA_CTIME;
1053                 RETURN(0);
1054         }
1055
1056         /* Check for setting the obj time. */
1057         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1058             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1059                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1060                     !mdd_capable(uc, CAP_FOWNER))
1061                         RETURN(-EPERM);
1062         }
1063
1064         /* Make sure a caller can chmod. */
1065         if (la->la_valid & LA_MODE) {
1066                 /*
1067                  * Bypass la_vaild == LA_MODE,
1068                  * this is for changing file with SUID or SGID.
1069                  */
1070                 if ((la->la_valid & ~LA_MODE) &&
1071                     (uc->mu_fsuid != tmp_la->la_uid) &&
1072                     !mdd_capable(uc, CAP_FOWNER))
1073                         RETURN(-EPERM);
1074
1075                 if (la->la_mode == (umode_t) -1)
1076                         la->la_mode = tmp_la->la_mode;
1077                 else
1078                         la->la_mode = (la->la_mode & S_IALLUGO) |
1079                                       (tmp_la->la_mode & ~S_IALLUGO);
1080
1081                 /* Also check the setgid bit! */
1082                 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
1083                                 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
1084                         la->la_mode &= ~S_ISGID;
1085         } else {
1086                la->la_mode = tmp_la->la_mode;
1087         }
1088
1089         /* Make sure a caller can chown. */
1090         if (la->la_valid & LA_UID) {
1091                 if (la->la_uid == (uid_t) -1)
1092                         la->la_uid = tmp_la->la_uid;
1093                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1094                     (la->la_uid != tmp_la->la_uid)) &&
1095                     !mdd_capable(uc, CAP_CHOWN))
1096                         RETURN(-EPERM);
1097
1098                 /*
1099                  * If the user or group of a non-directory has been
1100                  * changed by a non-root user, remove the setuid bit.
1101                  * 19981026 David C Niemi <niemi@tux.org>
1102                  *
1103                  * Changed this to apply to all users, including root,
1104                  * to avoid some races. This is the behavior we had in
1105                  * 2.0. The check for non-root was definitely wrong
1106                  * for 2.2 anyway, as it should have been using
1107                  * CAP_FSETID rather than fsuid -- 19990830 SD.
1108                  */
1109                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1110                     !S_ISDIR(tmp_la->la_mode)) {
1111                         la->la_mode &= ~S_ISUID;
1112                         la->la_valid |= LA_MODE;
1113                 }
1114         }
1115
1116         /* Make sure caller can chgrp. */
1117         if (la->la_valid & LA_GID) {
1118                 if (la->la_gid == (gid_t) -1)
1119                         la->la_gid = tmp_la->la_gid;
1120                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1121                     ((la->la_gid != tmp_la->la_gid) &&
1122                     !mdd_in_group_p(uc, la->la_gid))) &&
1123                     !mdd_capable(uc, CAP_CHOWN))
1124                         RETURN(-EPERM);
1125
1126                 /*
1127                  * Likewise, if the user or group of a non-directory
1128                  * has been changed by a non-root user, remove the
1129                  * setgid bit UNLESS there is no group execute bit
1130                  * (this would be a file marked for mandatory
1131                  * locking).  19981026 David C Niemi <niemi@tux.org>
1132                  *
1133                  * Removed the fsuid check (see the comment above) --
1134                  * 19990830 SD.
1135                  */
1136                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1137                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1138                         la->la_mode &= ~S_ISGID;
1139                         la->la_valid |= LA_MODE;
1140                 }
1141         }
1142
1143         /* For tuncate (or setsize), we should have MAY_WRITE perm */
1144         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1145                 rc = mdd_permission_internal(env, obj, MAY_WRITE, uc);
1146                 if (rc)
1147                         RETURN(rc);
1148
1149                 /*
1150                  * For the "Size-on-MDS" setattr update, merge coming
1151                  * attributes with the set in the inode. BUG 10641
1152                  */
1153                 if ((la->la_valid & LA_ATIME) &&
1154                     (la->la_atime < tmp_la->la_atime))
1155                         la->la_valid &= ~LA_ATIME;
1156
1157                 if ((la->la_valid & LA_CTIME) &&
1158                     (la->la_ctime < tmp_la->la_ctime))
1159                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1160
1161                 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
1162                         la->la_mtime = now;
1163                         la->la_valid |= LA_MTIME;
1164                 }
1165         }
1166
1167         /* For last, ctime must be fixed */
1168         if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
1169                 la->la_ctime = now;
1170                 la->la_valid |= LA_CTIME;
1171         }
1172
1173         RETURN(0);
1174 }
1175
1176 /* set attr and LOV EA at once, return updated attr */
1177 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1178                         const struct md_attr *ma, struct md_ucred *uc)
1179 {
1180         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1181         struct mdd_device *mdd = mdo2mdd(obj);
1182         struct thandle *handle;
1183         struct lov_mds_md *lmm = NULL;
1184         int  rc = 0, lmm_size = 0, max_size = 0;
1185         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1186         ENTRY;
1187
1188         mdd_txn_param_build(env, &MDD_TXN_ATTR_SET);
1189         handle = mdd_trans_start(env, mdd);
1190         if (IS_ERR(handle))
1191                 RETURN(PTR_ERR(handle));
1192         /*TODO: add lock here*/
1193         /* start a log jounal handle if needed */
1194         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1195             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1196                 max_size = mdd_lov_mdsize(env, mdd);
1197                 OBD_ALLOC(lmm, max_size);
1198                 if (lmm == NULL)
1199                         GOTO(cleanup, rc = -ENOMEM);
1200
1201                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1202                                 MDS_LOV_MD_NAME);
1203
1204                 if (rc < 0)
1205                         GOTO(cleanup, rc);
1206         }
1207
1208         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
1209                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1210                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1211
1212         *la_copy = ma->ma_attr;
1213         mdd_write_lock(env, mdd_obj);
1214         rc = mdd_fix_attr(env, mdd_obj, la_copy, uc);
1215         mdd_write_unlock(env, mdd_obj);
1216         if (rc)
1217                 GOTO(cleanup, rc);
1218
1219         if (la_copy->la_valid & LA_FLAGS) {
1220                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1221                                                   handle);
1222                 if (rc == 0)
1223                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1224         } else if (la_copy->la_valid) {            /* setattr */
1225                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1226                                                   handle);
1227                 /* journal chown/chgrp in llog, just like unlink */
1228                 if (rc == 0 && lmm_size){
1229                         /*TODO set_attr llog */
1230                 }
1231         }
1232
1233         if (rc == 0 && ma->ma_valid & MA_LOV) {
1234                 umode_t mode;
1235
1236                 mode = mdd_object_type(mdd_obj);
1237                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1238                         /*TODO check permission*/
1239                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1240                                             ma->ma_lmm_size, handle, 1);
1241                 }
1242
1243         }
1244 cleanup:
1245         mdd_trans_stop(env, mdd, rc, handle);
1246         if (rc == 0 && lmm_size) {
1247                 /*set obd attr, if needed*/
1248                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
1249         }
1250         if (lmm != NULL) {
1251                 OBD_FREE(lmm, max_size);
1252         }
1253
1254         RETURN(rc);
1255 }
1256
1257 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1258                       const void *buf, int buf_len, const char *name, int fl,
1259                       struct thandle *handle)
1260 {
1261         int  rc;
1262         ENTRY;
1263
1264         mdd_write_lock(env, obj);
1265         rc = __mdd_xattr_set(env, obj, buf, buf_len, name, fl, handle);
1266         mdd_write_unlock(env, obj);
1267
1268         RETURN(rc);
1269 }
1270
1271 static int mdd_xattr_sanity_check(const struct lu_env *env,
1272                                   struct mdd_object *obj,
1273                                   struct md_ucred *uc)
1274 {
1275         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1276         int rc;
1277         ENTRY;
1278
1279         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1280                 RETURN(-EPERM);
1281
1282         mdd_read_lock(env, obj);
1283         rc = __mdd_la_get(env, obj, tmp_la);
1284         mdd_read_unlock(env, obj);
1285         if (rc)
1286                 RETURN(rc);
1287
1288         if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
1289                 RETURN(-EPERM);
1290
1291         RETURN(rc);
1292 }
1293
1294 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1295                          const void *buf, int buf_len, const char *name, int fl,
1296                          struct md_ucred *uc)
1297 {
1298         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1299         struct mdd_device *mdd = mdo2mdd(obj);
1300         struct thandle *handle;
1301         int  rc;
1302         ENTRY;
1303
1304         rc = mdd_xattr_sanity_check(env, mdd_obj, uc);
1305         if (rc)
1306                 RETURN(rc);
1307
1308         mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1309         handle = mdd_trans_start(env, mdd);
1310         if (IS_ERR(handle))
1311                 RETURN(PTR_ERR(handle));
1312
1313         rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, buf_len, name,
1314                                fl, handle);
1315 #ifdef HAVE_SPLIT_SUPPORT
1316         if (rc == 0) {
1317                 /* very ugly hack, if setting lmv, it means splitting
1318                  * sucess, we should return -ERESTART to notify the
1319                  * client, so transno for this splitting should be
1320                  * zero according to the replay rules. so return -ERESTART
1321                  * here let mdt trans stop callback know this.
1322                  */
1323                  if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
1324                         rc = -ERESTART;
1325         }
1326 #endif
1327         mdd_trans_stop(env, mdd, rc, handle);
1328
1329         RETURN(rc);
1330 }
1331
1332 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
1333                            struct mdd_object *obj,
1334                            const char *name, struct thandle *handle)
1335 {
1336         struct dt_object *next;
1337
1338         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1339         next = mdd_object_child(obj);
1340         return next->do_ops->do_xattr_del(env, next, name, handle);
1341 }
1342
1343 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1344                   const char *name, struct md_ucred *uc)
1345 {
1346         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1347         struct mdd_device *mdd = mdo2mdd(obj);
1348         struct thandle *handle;
1349         int  rc;
1350         ENTRY;
1351
1352         rc = mdd_xattr_sanity_check(env, mdd_obj, uc);
1353         if (rc)
1354                 RETURN(rc);
1355
1356         mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1357         handle = mdd_trans_start(env, mdd);
1358         if (IS_ERR(handle))
1359                 RETURN(PTR_ERR(handle));
1360
1361         mdd_write_lock(env, mdd_obj);
1362         rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
1363         mdd_write_unlock(env, mdd_obj);
1364
1365         mdd_trans_stop(env, mdd, rc, handle);
1366
1367         RETURN(rc);
1368 }
1369
1370 static int __mdd_index_insert_only(const struct lu_env *env,
1371                                    struct mdd_object *pobj,
1372                                    const struct lu_fid *lf,
1373                                    const char *name, struct thandle *th)
1374 {
1375         int rc;
1376         struct dt_object *next = mdd_object_child(pobj);
1377         ENTRY;
1378
1379         if (dt_try_as_dir(env, next))
1380                 rc = next->do_index_ops->dio_insert(env, next,
1381                                          (struct dt_rec *)lf,
1382                                          (struct dt_key *)name, th);
1383         else
1384                 rc = -ENOTDIR;
1385         RETURN(rc);
1386 }
1387
1388 /* insert new index, add reference if isdir, update times */
1389 static int __mdd_index_insert(const struct lu_env *env,
1390                              struct mdd_object *pobj, const struct lu_fid *lf,
1391                              const char *name, int isdir, struct thandle *th)
1392 {
1393         int rc;
1394         struct dt_object *next = mdd_object_child(pobj);
1395         ENTRY;
1396
1397 #if 0
1398         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
1399 #endif
1400
1401         if (dt_try_as_dir(env, next))
1402                 rc = next->do_index_ops->dio_insert(env, next,
1403                                          (struct dt_rec *)lf,
1404                                          (struct dt_key *)name, th);
1405         else
1406                 rc = -ENOTDIR;
1407
1408         if (rc == 0) {
1409                 if (isdir)
1410                         __mdd_ref_add(env, pobj, th);
1411 #if 0
1412                 la->la_valid = LA_MTIME|LA_CTIME;
1413                 la->la_atime = ma->ma_attr.la_atime;
1414                 la->la_ctime = ma->ma_attr.la_ctime;
1415                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle);
1416 #endif
1417         }
1418         return rc;
1419 }
1420
1421 static int __mdd_index_delete(const struct lu_env *env,
1422                               struct mdd_object *pobj, const char *name,
1423                               struct thandle *handle)
1424 {
1425         int rc;
1426         struct dt_object *next = mdd_object_child(pobj);
1427         ENTRY;
1428
1429         if (dt_try_as_dir(env, next))
1430                 rc = next->do_index_ops->dio_delete(env, next,
1431                                         (struct dt_key *)name, handle);
1432         else
1433                 rc = -ENOTDIR;
1434         RETURN(rc);
1435 }
1436
1437 static int mdd_link_sanity_check(const struct lu_env *env,
1438                                  struct mdd_object *tgt_obj,
1439                                  struct mdd_object *src_obj,
1440                                  struct md_ucred *uc)
1441 {
1442         int rc;
1443         ENTRY;
1444
1445         rc = mdd_may_create(env, tgt_obj, NULL, 1, uc);
1446         if (rc)
1447                 RETURN(rc);
1448
1449         if (S_ISDIR(mdd_object_type(src_obj)))
1450                 RETURN(-EPERM);
1451
1452         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1453                 RETURN(-EPERM);
1454
1455         RETURN(rc);
1456 }
1457
1458 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1459                     struct md_object *src_obj, const char *name,
1460                     struct md_attr *ma, struct md_ucred *uc)
1461 {
1462         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1463         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1464         struct mdd_device *mdd = mdo2mdd(src_obj);
1465         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1466         struct thandle *handle;
1467         int rc;
1468         ENTRY;
1469
1470         mdd_txn_param_build(env, &MDD_TXN_LINK);
1471         handle = mdd_trans_start(env, mdd);
1472         if (IS_ERR(handle))
1473                 RETURN(PTR_ERR(handle));
1474
1475         mdd_lock2(env, mdd_tobj, mdd_sobj);
1476
1477         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj, uc);
1478         if (rc)
1479                 GOTO(out, rc);
1480
1481         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1482                                      name, handle);
1483         if (rc == 0)
1484                 __mdd_ref_add(env, mdd_sobj, handle);
1485
1486         *la_copy = ma->ma_attr;
1487         la_copy->la_valid = LA_CTIME;
1488         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle);
1489         if (rc)
1490                 GOTO(out, rc);
1491
1492         la_copy->la_valid = LA_CTIME | LA_MTIME;
1493         rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
1494
1495 out:
1496         mdd_unlock2(env, mdd_tobj, mdd_sobj);
1497         mdd_trans_stop(env, mdd, rc, handle);
1498         RETURN(rc);
1499 }
1500
1501 /*
1502  * Check that @dir contains no entries except (possibly) dot and dotdot.
1503  *
1504  * Returns:
1505  *
1506  *             0        empty
1507  *    -ENOTEMPTY        not empty
1508  *           -ve        other error
1509  *
1510  */
1511 static int mdd_dir_is_empty(const struct lu_env *env,
1512                             struct mdd_object *dir)
1513 {
1514         struct dt_it     *it;
1515         struct dt_object *obj;
1516         struct dt_it_ops *iops;
1517         int result;
1518
1519         obj = mdd_object_child(dir);
1520         iops = &obj->do_index_ops->dio_it;
1521         it = iops->init(env, obj, 0);
1522         if (it != NULL) {
1523                 result = iops->get(env, it, (const void *)"");
1524                 if (result > 0) {
1525                         int i;
1526                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1527                                 result = iops->next(env, it);
1528                         if (result == 0)
1529                                 result = -ENOTEMPTY;
1530                         else if (result == +1)
1531                                 result = 0;
1532                 } else if (result == 0)
1533                         /*
1534                          * Huh? Index contains no zero key?
1535                          */
1536                         result = -EIO;
1537
1538                 iops->put(env, it);
1539                 iops->fini(env, it);
1540         } else
1541                 result = -ENOMEM;
1542         return result;
1543 }
1544
1545 /* return md_attr back,
1546  * if it is last unlink then return lov ea + llog cookie*/
1547 int __mdd_object_kill(const struct lu_env *env,
1548                       struct mdd_object *obj,
1549                       struct md_attr *ma)
1550 {
1551         int rc = 0;
1552         ENTRY;
1553
1554         mdd_set_dead_obj(obj);
1555         if (S_ISREG(mdd_object_type(obj))) {
1556                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1557                  * Caller must be ready for that. */
1558                 rc = __mdd_lmm_get(env, obj, ma);
1559                 if ((ma->ma_valid & MA_LOV))
1560                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1561                                             obj, ma);
1562         }
1563         RETURN(rc);
1564 }
1565
1566 /* caller should take a lock before calling */
1567 static int __mdd_finish_unlink(const struct lu_env *env,
1568                                struct mdd_object *obj, struct md_attr *ma,
1569                                struct thandle *th)
1570 {
1571         int rc;
1572         ENTRY;
1573
1574         rc = __mdd_iattr_get(env, obj, ma);
1575         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1576                 /* add new orphan and the object
1577                  * will be deleted during the object_put() */
1578                 if (__mdd_orphan_add(env, obj, th) == 0)
1579                         set_bit(LU_OBJECT_ORPHAN,
1580                                 &mdd2lu_obj(obj)->lo_header->loh_flags);
1581
1582                 if (obj->mod_count == 0)
1583                         rc = __mdd_object_kill(env, obj, ma);
1584         }
1585         RETURN(rc);
1586 }
1587
1588 static int mdd_unlink_sanity_check(const struct lu_env *env,
1589                                    struct mdd_object *pobj,
1590                                    struct mdd_object *cobj,
1591                                    struct md_attr *ma,
1592                                    struct md_ucred *uc)
1593 {
1594         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1595         int rc = 0;
1596         ENTRY;
1597
1598         rc = mdd_may_delete(env, pobj, cobj,
1599                             S_ISDIR(ma->ma_attr.la_mode), 1, uc);
1600         if (rc)
1601                 RETURN(rc);
1602
1603         if (S_ISDIR(mdd_object_type(cobj))) {
1604                 if (dt_try_as_dir(env, dt_cobj))
1605                         rc = mdd_dir_is_empty(env, cobj);
1606                 else
1607                         rc = -ENOTDIR;
1608         }
1609
1610         RETURN(rc);
1611 }
1612
1613 static int mdd_unlink(const struct lu_env *env,
1614                       struct md_object *pobj, struct md_object *cobj,
1615                       const char *name, struct md_attr *ma, struct md_ucred *uc)
1616 {
1617         struct mdd_device *mdd = mdo2mdd(pobj);
1618         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1619         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1620         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1621         struct thandle    *handle;
1622         int rc;
1623         ENTRY;
1624
1625         mdd_txn_param_build(env, &MDD_TXN_UNLINK);
1626         handle = mdd_trans_start(env, mdd);
1627         if (IS_ERR(handle))
1628                 RETURN(PTR_ERR(handle));
1629
1630         mdd_lock2(env, mdd_pobj, mdd_cobj);
1631
1632         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma, uc);
1633         if (rc)
1634                 GOTO(cleanup, rc);
1635
1636         rc = __mdd_index_delete(env, mdd_pobj, name, handle);
1637         if (rc)
1638                 GOTO(cleanup, rc);
1639
1640         __mdd_ref_del(env, mdd_cobj, handle);
1641         *la_copy = ma->ma_attr;
1642         if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
1643                 /* unlink dot */
1644                 __mdd_ref_del(env, mdd_cobj, handle);
1645                 /* unlink dotdot */
1646                 __mdd_ref_del(env, mdd_pobj, handle);
1647         } else {
1648                 la_copy->la_valid = LA_CTIME;
1649                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle);
1650                 if (rc)
1651                         GOTO(cleanup, rc);
1652         }
1653
1654         la_copy->la_valid = LA_CTIME | LA_MTIME;
1655         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
1656         if (rc)
1657                 GOTO(cleanup, rc);
1658
1659         rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle);
1660
1661         if (rc == 0)
1662                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
1663                                    strlen("unlinked"), "unlinked", 0,
1664                                    NULL, NULL);
1665
1666 cleanup:
1667         mdd_unlock2(env, mdd_pobj, mdd_cobj);
1668         mdd_trans_stop(env, mdd, rc, handle);
1669         RETURN(rc);
1670 }
1671
1672 /* partial unlink */
1673 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1674                        struct md_attr *ma, struct md_ucred *uc)
1675 {
1676         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1677         struct mdd_device *mdd = mdo2mdd(obj);
1678         struct thandle *handle;
1679         int rc;
1680         ENTRY;
1681
1682         mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
1683         handle = mdd_trans_start(env, mdd);
1684         if (IS_ERR(handle))
1685                 RETURN(-ENOMEM);
1686
1687         mdd_write_lock(env, mdd_obj);
1688
1689         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma, uc);
1690         if (rc)
1691                 GOTO(cleanup, rc);
1692
1693         __mdd_ref_del(env, mdd_obj, handle);
1694
1695         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1696                 /* unlink dot */
1697                 __mdd_ref_del(env, mdd_obj, handle);
1698         }
1699
1700         rc = __mdd_finish_unlink(env, mdd_obj, ma, handle);
1701
1702         EXIT;
1703 cleanup:
1704         mdd_write_unlock(env, mdd_obj);
1705         mdd_trans_stop(env, mdd, rc, handle);
1706         return rc;
1707 }
1708
1709 static int mdd_parent_fid(const struct lu_env *env,
1710                           struct mdd_object *obj,
1711                           struct lu_fid *fid)
1712 {
1713         return __mdd_lookup_locked(env, &obj->mod_obj,
1714                                    dotdot, fid, 0, NULL);
1715 }
1716
1717 /*
1718  * return 1: if lf is the fid of the ancestor of p1;
1719  * return 0: if not;
1720  *
1721  * return -EREMOTE: if remote object is found, in this
1722  * case fid of remote object is saved to @pf;
1723  *
1724  * otherwise: values < 0, errors.
1725  */
1726 static int mdd_is_parent(const struct lu_env *env,
1727                          struct mdd_device *mdd,
1728                          struct mdd_object *p1,
1729                          const struct lu_fid *lf,
1730                          struct lu_fid *pf)
1731 {
1732         struct mdd_object *parent = NULL;
1733         struct lu_fid *pfid;
1734         int rc;
1735         ENTRY;
1736
1737         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1738         pfid = &mdd_env_info(env)->mti_fid;
1739
1740         /* Do not lookup ".." in root, they do not exist there. */
1741         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1742                 RETURN(0);
1743
1744         for(;;) {
1745                 rc = mdd_parent_fid(env, p1, pfid);
1746                 if (rc)
1747                         GOTO(out, rc);
1748                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1749                         GOTO(out, rc = 0);
1750                 if (lu_fid_eq(pfid, lf))
1751                         GOTO(out, rc = 1);
1752                 if (parent)
1753                         mdd_object_put(env, parent);
1754                 parent = mdd_object_find(env, mdd, pfid);
1755
1756                 /* cross-ref parent */
1757                 if (parent == NULL) {
1758                         if (pf != NULL)
1759                                 *pf = *pfid;
1760                         GOTO(out, rc = EREMOTE);
1761                 } else if (IS_ERR(parent))
1762                         GOTO(out, rc = PTR_ERR(parent));
1763                 p1 = parent;
1764         }
1765         EXIT;
1766 out:
1767         if (parent && !IS_ERR(parent))
1768                 mdd_object_put(env, parent);
1769         return rc;
1770 }
1771
1772 static int mdd_rename_lock(const struct lu_env *env,
1773                            struct mdd_device *mdd,
1774                            struct mdd_object *src_pobj,
1775                            struct mdd_object *tgt_pobj)
1776 {
1777         int rc;
1778         ENTRY;
1779
1780         if (src_pobj == tgt_pobj) {
1781                 mdd_write_lock(env, src_pobj);
1782                 RETURN(0);
1783         }
1784
1785         /* compared the parent child relationship of src_p&tgt_p */
1786         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1787                 mdd_lock2(env, src_pobj, tgt_pobj);
1788                 RETURN(0);
1789         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1790                 mdd_lock2(env, tgt_pobj, src_pobj);
1791                 RETURN(0);
1792         }
1793
1794         rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1795         if (rc < 0)
1796                 RETURN(rc);
1797
1798         if (rc == 1) {
1799                 mdd_lock2(env, tgt_pobj, src_pobj);
1800                 RETURN(0);
1801         }
1802
1803         mdd_lock2(env, src_pobj, tgt_pobj);
1804
1805         RETURN(0);
1806 }
1807
1808 static void mdd_rename_unlock(const struct lu_env *env,
1809                               struct mdd_object *src_pobj,
1810                               struct mdd_object *tgt_pobj)
1811 {
1812         mdd_write_unlock(env, src_pobj);
1813         if (src_pobj != tgt_pobj)
1814                 mdd_write_unlock(env, tgt_pobj);
1815 }
1816
1817 static int mdd_rename_sanity_check(const struct lu_env *env,
1818                                    struct mdd_object *src_pobj,
1819                                    struct mdd_object *tgt_pobj,
1820                                    const struct lu_fid *sfid,
1821                                    int src_is_dir,
1822                                    struct mdd_object *sobj,
1823                                    struct mdd_object *tobj,
1824                                    struct md_ucred *uc)
1825 {
1826         struct mdd_device *mdd = mdo2mdd(&src_pobj->mod_obj);
1827         int rc = 0, need_check = 1;
1828         ENTRY;
1829
1830         mdd_read_lock(env, src_pobj);
1831         rc = mdd_may_delete(env, src_pobj, sobj, src_is_dir, need_check, uc);
1832         mdd_read_unlock(env, src_pobj);
1833         if (rc)
1834                 RETURN(rc);
1835
1836         if (src_pobj == tgt_pobj)
1837                 need_check = 0;
1838
1839         if (!tobj) {
1840                 mdd_read_lock(env, tgt_pobj);
1841                 rc = mdd_may_create(env, tgt_pobj, NULL, need_check, uc);
1842                 mdd_read_unlock(env, tgt_pobj);
1843         } else {
1844                 mdd_read_lock(env, tgt_pobj);
1845                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1846                                     need_check, uc);
1847                 mdd_read_unlock(env, tgt_pobj);
1848                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
1849                     mdd_dir_is_empty(env, tobj))
1850                                 RETURN(-ENOTEMPTY);
1851         }
1852
1853         /* source should not be ancestor of target dir */
1854         if (!rc && src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
1855                 RETURN(-EINVAL);
1856
1857         RETURN(rc);
1858 }
1859 /* src object can be remote that is why we use only fid and type of object */
1860 static int mdd_rename(const struct lu_env *env,
1861                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1862                       const struct lu_fid *lf, const char *sname,
1863                       struct md_object *tobj, const char *tname,
1864                       struct md_attr *ma, struct md_ucred *uc)
1865 {
1866         struct mdd_device *mdd = mdo2mdd(src_pobj);
1867         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1868         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1869         struct mdd_object *mdd_sobj = mdd_object_find(env, mdd, lf);
1870         struct mdd_object *mdd_tobj = NULL;
1871         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1872         struct thandle *handle;
1873         int is_dir;
1874         int rc;
1875         ENTRY;
1876
1877         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1878         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1879         if (ma->ma_attr.la_valid & LA_FLAGS &&
1880             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1881                 GOTO(out, rc = -EPERM);
1882
1883         if (tobj)
1884                 mdd_tobj = md2mdd_obj(tobj);
1885
1886         /*XXX: shouldn't this check be done under lock below? */
1887         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1888                                      lf, is_dir, mdd_sobj, mdd_tobj, uc);
1889         if (rc)
1890                 GOTO(out, rc);
1891
1892         mdd_txn_param_build(env, &MDD_TXN_RENAME);
1893         handle = mdd_trans_start(env, mdd);
1894         if (IS_ERR(handle))
1895                 GOTO(out, rc = PTR_ERR(handle));
1896
1897         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1898         rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
1899         if (rc)
1900                 GOTO(cleanup_unlocked, rc);
1901
1902         rc = __mdd_index_delete(env, mdd_spobj, sname, handle);
1903         if (rc)
1904                 GOTO(cleanup, rc);
1905
1906         /*if sobj is dir, its parent object nlink should be dec too*/
1907         if (is_dir)
1908                 __mdd_ref_del(env, mdd_spobj, handle);
1909
1910         rc = __mdd_index_delete(env, mdd_tpobj, tname, handle);
1911         /* tobj can be remote one,
1912          * so we do index_delete unconditionally and -ENOENT is allowed */
1913         if (rc != 0 && rc != -ENOENT)
1914                 GOTO(cleanup, rc);
1915
1916         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle);
1917         if (rc)
1918                 GOTO(cleanup, rc);
1919
1920         *la_copy = ma->ma_attr;
1921         la_copy->la_valid = LA_CTIME;
1922         if (mdd_sobj) {
1923                 /*XXX: how to update ctime for remote sobj? */
1924                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
1925                 if (rc)
1926                         GOTO(cleanup, rc);
1927         }
1928         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1929                 mdd_write_lock(env, mdd_tobj);
1930                 __mdd_ref_del(env, mdd_tobj, handle);
1931                 /* remove dot reference */
1932                 if (is_dir)
1933                         __mdd_ref_del(env, mdd_tobj, handle);
1934
1935                 la_copy->la_valid = LA_CTIME;
1936                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
1937                 if (rc)
1938                         GOTO(cleanup, rc);
1939
1940                 rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle);
1941                 mdd_write_unlock(env, mdd_tobj);
1942                 if (rc)
1943                         GOTO(cleanup, rc);
1944         }
1945
1946         la_copy->la_valid = LA_CTIME | LA_MTIME;
1947         rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle);
1948         if (rc)
1949                 GOTO(cleanup, rc);
1950
1951         if (mdd_spobj != mdd_tpobj) {
1952                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1953                 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle);
1954         }
1955
1956 cleanup:
1957         mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
1958 cleanup_unlocked:
1959         mdd_trans_stop(env, mdd, rc, handle);
1960 out:
1961         if (mdd_sobj)
1962                 mdd_object_put(env, mdd_sobj);
1963         RETURN(rc);
1964 }
1965
1966 static int
1967 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1968              const char *name, const struct lu_fid* fid, int mask,
1969              struct md_ucred *uc)
1970 {
1971         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1972         struct dt_object    *dir = mdd_object_child(mdd_obj);
1973         struct dt_rec       *rec = (struct dt_rec *)fid;
1974         const struct dt_key *key = (const struct dt_key *)name;
1975         int rc;
1976         ENTRY;
1977
1978         if (mdd_is_dead_obj(mdd_obj))
1979                 RETURN(-ESTALE);
1980
1981         if (mask == MAY_EXEC)
1982                 rc = mdd_exec_permission_lite(env, mdd_obj, uc);
1983         else
1984                 rc = mdd_permission_internal(env, mdd_obj, mask, uc);
1985         if (rc)
1986                 RETURN(rc);
1987
1988         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
1989                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key);
1990         else
1991                 rc = -ENOTDIR;
1992
1993         RETURN(rc);
1994 }
1995
1996 static int
1997 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
1998                     const char *name, const struct lu_fid* fid, int mask,
1999                     struct md_ucred *uc)
2000 {
2001         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2002         int rc;
2003
2004         mdd_read_lock(env, mdd_obj);
2005         rc = __mdd_lookup(env, pobj, name, fid, mask, uc);
2006         mdd_read_unlock(env, mdd_obj);
2007
2008        return rc;
2009 }
2010
2011 static int mdd_lookup(const struct lu_env *env,
2012                       struct md_object *pobj, const char *name,
2013                       struct lu_fid* fid, struct md_ucred *uc)
2014 {
2015         int rc;
2016         ENTRY;
2017         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC, uc);
2018         RETURN(rc);
2019 }
2020
2021 /*
2022  * returns 1: if fid is ancestor of @mo;
2023  * returns 0: if fid is not a ancestor of @mo;
2024  *
2025  * returns EREMOTE if remote object is found, fid of remote object is saved to
2026  * @fid;
2027  *
2028  * returns < 0: if error
2029  */
2030 static int mdd_is_subdir(const struct lu_env *env,
2031                          struct md_object *mo, const struct lu_fid *fid,
2032                          struct lu_fid *sfid, struct md_ucred *uc)
2033 {
2034         struct mdd_device *mdd = mdo2mdd(mo);
2035         int rc;
2036         ENTRY;
2037
2038         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
2039                 RETURN(0);
2040
2041         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
2042
2043         RETURN(rc);
2044 }
2045
2046 static int __mdd_object_initialize(const struct lu_env *env,
2047                                    const struct lu_fid *pfid,
2048                                    struct mdd_object *child,
2049                                    struct md_attr *ma, struct thandle *handle)
2050 {
2051         int rc;
2052         ENTRY;
2053
2054         /* update attributes for child.
2055          * FIXME:
2056          *  (1) the valid bits should be converted between Lustre and Linux;
2057          *  (2) maybe, the child attributes should be set in OSD when creation.
2058          */
2059
2060         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle);
2061         if (rc != 0)
2062                 RETURN(rc);
2063
2064         if (S_ISDIR(ma->ma_attr.la_mode)) {
2065                 /* add . and .. for newly created dir */
2066                 __mdd_ref_add(env, child, handle);
2067                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2068                                              dot, handle);
2069                 if (rc == 0) {
2070                         rc = __mdd_index_insert_only(env, child, pfid,
2071                                                      dotdot, handle);
2072                         if (rc != 0) {
2073                                 int rc2;
2074
2075                                 rc2 = __mdd_index_delete(env,
2076                                                          child, dot, handle);
2077                                 if (rc2 != 0)
2078                                         CERROR("Failure to cleanup after dotdot"
2079                                                " creation: %d (%d)\n", rc2, rc);
2080                                 else
2081                                         __mdd_ref_del(env, child, handle);
2082                         }
2083                 }
2084         }
2085         RETURN(rc);
2086 }
2087
2088 /*
2089  * XXX: Need MAY_WRITE to be checked?
2090  */
2091 static int mdd_cd_sanity_check(const struct lu_env *env,
2092                                struct mdd_object *obj, struct md_ucred *uc)
2093 {
2094         int rc = 0;
2095         ENTRY;
2096
2097         /* EEXIST check */
2098         if (!obj || mdd_is_dead_obj(obj))
2099                 RETURN(-ENOENT);
2100
2101 #if 0
2102         mdd_read_lock(env, obj);
2103         rc = mdd_permission_internal(env, obj, MAY_WRITE, uc);
2104         mdd_read_unlock(env, obj);
2105 #endif
2106
2107         RETURN(rc);
2108
2109 }
2110
2111 static int mdd_create_data(const struct lu_env *env,
2112                            struct md_object *pobj, struct md_object *cobj,
2113                            const struct md_create_spec *spec,
2114                            struct md_attr *ma, struct md_ucred *uc)
2115 {
2116         struct mdd_device *mdd = mdo2mdd(cobj);
2117         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
2118         struct mdd_object *son = md2mdd_obj(cobj);
2119         struct lu_attr    *attr = &ma->ma_attr;
2120         struct lov_mds_md *lmm = NULL;
2121         int                lmm_size = 0;
2122         struct thandle    *handle;
2123         int                rc;
2124         ENTRY;
2125
2126         rc = mdd_cd_sanity_check(env, son, uc);
2127         if (rc)
2128                 RETURN(rc);
2129
2130         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
2131                         !(spec->sp_cr_flags & FMODE_WRITE))
2132                 RETURN(0);
2133         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
2134                             attr);
2135         if (rc)
2136                 RETURN(rc);
2137
2138         mdd_txn_param_build(env, &MDD_TXN_CREATE_DATA);
2139         handle = mdd_trans_start(env, mdd);
2140         if (IS_ERR(handle))
2141                 RETURN(rc = PTR_ERR(handle));
2142
2143         /*XXX: setting the lov ea is not locked
2144          * but setting the attr is locked? */
2145
2146         /* replay creates has objects already */
2147         if (spec->u.sp_ea.no_lov_create) {
2148                 CDEBUG(D_INFO, "we already have lov ea\n");
2149                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2150                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2151                                     spec->u.sp_ea.eadatalen, handle, 0);
2152         } else
2153                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2154                                     lmm_size, handle, 0);
2155
2156         if (rc == 0)
2157                rc = mdd_attr_get_internal_locked(env, son, ma);
2158
2159         /* finish mdd_lov_create() stuff */
2160         mdd_lov_create_finish(env, mdd, rc);
2161         mdd_trans_stop(env, mdd, rc, handle);
2162         if (lmm)
2163                 OBD_FREE(lmm, lmm_size);
2164         RETURN(rc);
2165 }
2166
2167 static int mdd_create_sanity_check(const struct lu_env *env,
2168                                    struct md_object *pobj,
2169                                    const char *name, struct md_attr *ma,
2170                                    struct md_ucred *uc)
2171 {
2172         struct mdd_thread_info *info = mdd_env_info(env);
2173         struct lu_attr    *la        = &info->mti_la;
2174         struct lu_fid     *fid       = &info->mti_fid;
2175         struct mdd_object *obj       = md2mdd_obj(pobj);
2176         int rc;
2177
2178         ENTRY;
2179         /* EEXIST check */
2180         if (mdd_is_dead_obj(obj))
2181                 RETURN(-ENOENT);
2182
2183         rc = __mdd_lookup_locked(env, pobj, name, fid,
2184                                  MAY_WRITE | MAY_EXEC, uc);
2185         if (rc != -ENOENT)
2186                 RETURN(rc ? : -EEXIST);
2187
2188         /* sgid check */
2189         mdd_read_lock(env, obj);
2190         rc = __mdd_la_get(env, obj, la);
2191         mdd_read_unlock(env, obj);
2192         if (rc)
2193                 RETURN(rc);
2194
2195         if (la->la_mode & S_ISGID) {
2196                 ma->ma_attr.la_gid = la->la_gid;
2197                 if (S_ISDIR(ma->ma_attr.la_mode)) {
2198                         ma->ma_attr.la_mode |= S_ISGID;
2199                         ma->ma_attr.la_valid |= LA_MODE;
2200                 }
2201         }
2202
2203         switch (ma->ma_attr.la_mode & S_IFMT) {
2204         case S_IFREG:
2205         case S_IFDIR:
2206         case S_IFLNK:
2207         case S_IFCHR:
2208         case S_IFBLK:
2209         case S_IFIFO:
2210         case S_IFSOCK:
2211                 rc = 0;
2212                 break;
2213         default:
2214                 rc = -EINVAL;
2215                 break;
2216         }
2217         RETURN(rc);
2218 }
2219
2220 /*
2221  * Create object and insert it into namespace.
2222  */
2223 static int mdd_create(const struct lu_env *env,
2224                       struct md_object *pobj, const char *name,
2225                       struct md_object *child,
2226                       const struct md_create_spec *spec,
2227                       struct md_attr* ma, struct md_ucred *uc)
2228 {
2229         struct mdd_device *mdd = mdo2mdd(pobj);
2230         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2231         struct mdd_object *son = md2mdd_obj(child);
2232         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2233         struct lu_attr    *attr = &ma->ma_attr;
2234         struct lov_mds_md *lmm = NULL;
2235         struct thandle    *handle;
2236         int rc, created = 0, inserted = 0, lmm_size = 0;
2237         ENTRY;
2238
2239         /* sanity checks before big job */
2240         rc = mdd_create_sanity_check(env, pobj, name, ma, uc);
2241         if (rc)
2242                 RETURN(rc);
2243
2244         /* no RPC inside the transaction, so OST objects should be created at
2245          * first */
2246         if (S_ISREG(attr->la_mode)) {
2247                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2248                                     spec, attr);
2249                 if (rc)
2250                         RETURN(rc);
2251         }
2252
2253         mdd_txn_param_build(env, &MDD_TXN_MKDIR);
2254         handle = mdd_trans_start(env, mdd);
2255         if (IS_ERR(handle))
2256                 RETURN(PTR_ERR(handle));
2257
2258         mdd_write_lock(env, mdd_pobj);
2259
2260         /*
2261          * XXX check that link can be added to the parent in mkdir case.
2262          */
2263
2264         /*
2265          * Two operations have to be performed:
2266          *
2267          *  - allocation of new object (->do_create()), and
2268          *
2269          *  - insertion into parent index (->dio_insert()).
2270          *
2271          * Due to locking, operation order is not important, when both are
2272          * successful, *but* error handling cases are quite different:
2273          *
2274          *  - if insertion is done first, and following object creation fails,
2275          *  insertion has to be rolled back, but this operation might fail
2276          *  also leaving us with dangling index entry.
2277          *
2278          *  - if creation is done first, is has to be undone if insertion
2279          *  fails, leaving us with leaked space, which is neither good, nor
2280          *  fatal.
2281          *
2282          * It seems that creation-first is simplest solution, but it is
2283          * sub-optimal in the frequent
2284          *
2285          *         $ mkdir foo
2286          *         $ mkdir foo
2287          *
2288          * case, because second mkdir is bound to create object, only to
2289          * destroy it immediately.
2290          *
2291          * Note that local file systems do
2292          *
2293          *     0. lookup -> -EEXIST
2294          *
2295          *     1. create
2296          *
2297          *     2. insert
2298          *
2299          * Maybe we should do the same. For now: creation-first.
2300          */
2301
2302         mdd_write_lock(env, son);
2303         rc = __mdd_object_create(env, son, ma, handle);
2304         if (rc) {
2305                 mdd_write_unlock(env, son);
2306                 GOTO(cleanup, rc);
2307         }
2308
2309         created = 1;
2310
2311         rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj),
2312                                      son, ma, handle);
2313         mdd_write_unlock(env, son);
2314         if (rc)
2315                 /*
2316                  * Object has no links, so it will be destroyed when last
2317                  * reference is released. (XXX not now.)
2318                  */
2319                 GOTO(cleanup, rc);
2320
2321         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2322                                 name, S_ISDIR(attr->la_mode), handle);
2323
2324         if (rc)
2325                 GOTO(cleanup, rc);
2326
2327         inserted = 1;
2328         /* replay creates has objects already */
2329         if (spec->u.sp_ea.no_lov_create) {
2330                 CDEBUG(D_INFO, "we already have lov ea\n");
2331                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2332                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2333                                     spec->u.sp_ea.eadatalen, handle, 0);
2334         } else
2335                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2336                                     lmm_size, handle, 0);
2337         if (rc) {
2338                 CERROR("error on stripe info copy %d \n", rc);
2339                 GOTO(cleanup, rc);
2340         }
2341
2342         if (S_ISLNK(attr->la_mode)) {
2343                 struct dt_object *dt = mdd_object_child(son);
2344                 const char *target_name = spec->u.sp_symname;
2345                 int sym_len = strlen(target_name);
2346                 loff_t pos = 0;
2347
2348                 rc = dt->do_body_ops->dbo_write(env, dt, target_name,
2349                                                 sym_len, &pos, handle);
2350                 if (rc == sym_len)
2351                         rc = 0;
2352                 else
2353                         rc = -EFAULT;
2354         }
2355
2356         *la_copy = ma->ma_attr;
2357         la_copy->la_valid = LA_CTIME | LA_MTIME;
2358         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
2359         if (rc)
2360                 GOTO(cleanup, rc);
2361
2362         /* return attr back */
2363         rc = mdd_attr_get_internal_locked(env, son, ma);
2364 cleanup:
2365         if (rc && created) {
2366                 int rc2 = 0;
2367
2368                 if (inserted) {
2369                         rc2 = __mdd_index_delete(env, mdd_pobj, name, handle);
2370                         if (rc2)
2371                                 CERROR("error can not cleanup destroy %d\n",
2372                                        rc2);
2373                 }
2374                 if (rc2 == 0)
2375                         __mdd_ref_del(env, son, handle);
2376         }
2377         /* finish mdd_lov_create() stuff */
2378         mdd_lov_create_finish(env, mdd, rc);
2379         if (lmm)
2380                 OBD_FREE(lmm, lmm_size);
2381         mdd_write_unlock(env, mdd_pobj);
2382         mdd_trans_stop(env, mdd, rc, handle);
2383         RETURN(rc);
2384 }
2385
2386 /* partial operation */
2387 static int mdd_oc_sanity_check(const struct lu_env *env,
2388                                struct mdd_object *obj,
2389                                struct md_attr *ma,
2390                                struct md_ucred *uc)
2391 {
2392         int rc;
2393         ENTRY;
2394
2395         /* EEXIST check */
2396         if (lu_object_exists(&obj->mod_obj.mo_lu))
2397                 RETURN(-EEXIST);
2398
2399         switch (ma->ma_attr.la_mode & S_IFMT) {
2400         case S_IFREG:
2401         case S_IFDIR:
2402         case S_IFLNK:
2403         case S_IFCHR:
2404         case S_IFBLK:
2405         case S_IFIFO:
2406         case S_IFSOCK:
2407                 rc = 0;
2408                 break;
2409         default:
2410                 rc = -EINVAL;
2411                 break;
2412         }
2413         RETURN(rc);
2414 }
2415
2416 static int mdd_object_create(const struct lu_env *env,
2417                              struct md_object *obj,
2418                              const struct md_create_spec *spec,
2419                              struct md_attr *ma,
2420                              struct md_ucred *uc)
2421 {
2422
2423         struct mdd_device *mdd = mdo2mdd(obj);
2424         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2425         struct thandle *handle;
2426         const struct lu_fid *pfid = spec->u.sp_pfid;
2427         int rc;
2428         ENTRY;
2429
2430         rc = mdd_oc_sanity_check(env, mdd_obj, ma, uc);
2431         if (rc)
2432                 RETURN(rc);
2433
2434         mdd_txn_param_build(env, &MDD_TXN_OBJECT_CREATE);
2435         handle = mdd_trans_start(env, mdd);
2436         if (IS_ERR(handle))
2437                 RETURN(PTR_ERR(handle));
2438
2439         mdd_write_lock(env, mdd_obj);
2440         rc = __mdd_object_create(env, mdd_obj, ma, handle);
2441         if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2442                 /* if creating the slave object, set slave EA here */
2443                 rc = __mdd_xattr_set(env, mdd_obj, spec->u.sp_ea.eadata,
2444                                      spec->u.sp_ea.eadatalen, MDS_LMV_MD_NAME,
2445                                      0, handle);
2446                 pfid = spec->u.sp_ea.fid;
2447                 CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
2448                        PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
2449         }
2450
2451         if (rc == 0)
2452                 rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
2453         mdd_write_unlock(env, mdd_obj);
2454
2455         if (rc == 0)
2456                 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
2457
2458         mdd_trans_stop(env, mdd, rc, handle);
2459         RETURN(rc);
2460 }
2461
2462 /*
2463  * Partial operation. Be aware, this is called with write lock taken, so we use
2464  * locksless version of __mdd_lookup() here.
2465  */
2466 static int mdd_ni_sanity_check(const struct lu_env *env,
2467                                struct md_object *pobj,
2468                                const char *name,
2469                                const struct lu_fid *fid,
2470                                struct md_ucred *uc)
2471 {
2472         struct mdd_object *obj       = md2mdd_obj(pobj);
2473         int rc;
2474         ENTRY;
2475
2476         /* EEXIST check */
2477         if (mdd_is_dead_obj(obj))
2478                 RETURN(-ENOENT);
2479
2480         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
2481         if (rc != -ENOENT)
2482                 RETURN(rc ? : -EEXIST);
2483         else
2484                 RETURN(0);
2485 }
2486
2487 static int mdd_name_insert(const struct lu_env *env,
2488                            struct md_object *pobj,
2489                            const char *name, const struct lu_fid *fid,
2490                            int isdir, struct md_ucred *uc)
2491 {
2492         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2493         struct thandle *handle;
2494         int rc;
2495         ENTRY;
2496
2497         mdd_txn_param_build(env, &MDD_TXN_INDEX_INSERT);
2498         handle = mdd_trans_start(env, mdo2mdd(pobj));
2499         if (IS_ERR(handle))
2500                 RETURN(PTR_ERR(handle));
2501
2502         mdd_write_lock(env, mdd_obj);
2503         rc = mdd_ni_sanity_check(env, pobj, name, fid, uc);
2504         if (rc)
2505                 GOTO(out_unlock, rc);
2506
2507         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle);
2508
2509 out_unlock:
2510         mdd_write_unlock(env, mdd_obj);
2511
2512         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
2513         RETURN(rc);
2514 }
2515
2516 /*
2517  * Be aware, this is called with write lock taken, so we use locksless version
2518  * of __mdd_lookup() here.
2519  */
2520 static int mdd_nr_sanity_check(const struct lu_env *env,
2521                                struct md_object *pobj,
2522                                const char *name,
2523                                struct md_ucred *uc)
2524 {
2525         struct mdd_thread_info *info = mdd_env_info(env);
2526         struct lu_fid     *fid       = &info->mti_fid;
2527         struct mdd_object *obj       = md2mdd_obj(pobj);
2528         int rc;
2529         ENTRY;
2530
2531         /* EEXIST check */
2532         if (mdd_is_dead_obj(obj))
2533                 RETURN(-ENOENT);
2534
2535         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
2536         RETURN(rc);
2537 }
2538
2539 static int mdd_name_remove(const struct lu_env *env,
2540                            struct md_object *pobj,
2541                            const char *name,
2542                            struct md_ucred *uc)
2543 {
2544         struct mdd_device *mdd = mdo2mdd(pobj);
2545         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2546         struct thandle *handle;
2547         int rc;
2548         ENTRY;
2549
2550         mdd_txn_param_build(env, &MDD_TXN_INDEX_DELETE);
2551         handle = mdd_trans_start(env, mdd);
2552         if (IS_ERR(handle))
2553                 RETURN(PTR_ERR(handle));
2554
2555         mdd_write_lock(env, mdd_obj);
2556         rc = mdd_nr_sanity_check(env, pobj, name, uc);
2557         if (rc)
2558                 GOTO(out_unlock, rc);
2559
2560         rc = __mdd_index_delete(env, mdd_obj, name, handle);
2561
2562 out_unlock:
2563         mdd_write_unlock(env, mdd_obj);
2564
2565         mdd_trans_stop(env, mdd, rc, handle);
2566         RETURN(rc);
2567 }
2568
2569 static int mdd_rt_sanity_check(const struct lu_env *env,
2570                                struct mdd_object *tgt_pobj,
2571                                struct mdd_object *tobj,
2572                                const struct lu_fid *sfid,
2573                                const char *name, struct md_attr *ma,
2574                                struct md_ucred *uc)
2575 {
2576         struct mdd_device *mdd = mdo2mdd(&tgt_pobj->mod_obj);
2577         int rc, src_is_dir;
2578         ENTRY;
2579
2580         /* EEXIST check */
2581         if (mdd_is_dead_obj(tgt_pobj))
2582                 RETURN(-ENOENT);
2583
2584         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
2585         if (tobj) {
2586                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1, uc);
2587                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
2588                      mdd_dir_is_empty(env, tobj))
2589                                 RETURN(-ENOTEMPTY);
2590         } else {
2591                 rc = mdd_may_create(env, tgt_pobj, NULL, 1, uc);
2592         }
2593
2594         /* source should not be ancestor of target dir */
2595         if (!rc &&& src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
2596                 RETURN(-EINVAL);
2597
2598         RETURN(rc);
2599 }
2600
2601 static int mdd_rename_tgt(const struct lu_env *env,
2602                           struct md_object *pobj, struct md_object *tobj,
2603                           const struct lu_fid *lf, const char *name,
2604                           struct md_attr *ma, struct md_ucred *uc)
2605 {
2606         struct mdd_device *mdd = mdo2mdd(pobj);
2607         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
2608         struct mdd_object *mdd_tobj = NULL;
2609         struct thandle *handle;
2610         int rc;
2611         ENTRY;
2612
2613         mdd_txn_param_build(env, &MDD_TXN_RENAME);
2614         handle = mdd_trans_start(env, mdd);
2615         if (IS_ERR(handle))
2616                 RETURN(PTR_ERR(handle));
2617
2618         if (tobj) {
2619                 mdd_tobj = md2mdd_obj(tobj);
2620                 mdd_lock2(env, mdd_tpobj, mdd_tobj);
2621         } else {
2622                 mdd_write_lock(env, mdd_tpobj);
2623         }
2624
2625         /*TODO rename sanity checking*/
2626         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma, uc);
2627         if (rc)
2628                 GOTO(cleanup, rc);
2629
2630         if (tobj) {
2631                 rc = __mdd_index_delete(env, mdd_tpobj, name, handle);
2632                 if (rc)
2633                         GOTO(cleanup, rc);
2634         }
2635
2636         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle);
2637         if (rc)
2638                 GOTO(cleanup, rc);
2639
2640         if (tobj && lu_object_exists(&tobj->mo_lu))
2641                 __mdd_ref_del(env, mdd_tobj, handle);
2642 cleanup:
2643         if (tobj)
2644                 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
2645         else
2646                 mdd_write_unlock(env, mdd_tpobj);
2647         mdd_trans_stop(env, mdd, rc, handle);
2648         RETURN(rc);
2649 }
2650
2651 /*
2652  * No permission check is needed.
2653  */
2654 static int mdd_root_get(const struct lu_env *env,
2655                         struct md_device *m, struct lu_fid *f,
2656                         struct md_ucred *uc)
2657 {
2658         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2659
2660         ENTRY;
2661         *f = mdd->mdd_root_fid;
2662         RETURN(0);
2663 }
2664
2665 /*
2666  * No permission check is needed.
2667  */
2668 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
2669                       struct kstatfs *sfs, struct md_ucred *uc)
2670 {
2671         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2672         int rc;
2673
2674         ENTRY;
2675
2676         rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
2677
2678         RETURN(rc);
2679 }
2680
2681 /*
2682  * No permission check is needed.
2683  */
2684 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
2685                            int *md_size, int *cookie_size, struct md_ucred *uc)
2686 {
2687         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2688         ENTRY;
2689
2690         *md_size = mdd_lov_mdsize(env, mdd);
2691         *cookie_size = mdd_lov_cookiesize(env, mdd);
2692
2693         RETURN(0);
2694 }
2695
2696 static int mdd_init_capa_keys(struct md_device *m,
2697                               struct lustre_capa_key *keys)
2698 {
2699         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2700         struct mds_obd    *mds = &mdd2obd_dev(mdd)->u.mds;
2701         ENTRY;
2702
2703         mds->mds_capa_keys = keys;
2704         RETURN(0);
2705 }
2706
2707 static int mdd_update_capa_key(const struct lu_env *env,
2708                                struct md_device *m,
2709                                struct lustre_capa_key *key)
2710 {
2711         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2712         struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
2713         int rc;
2714         ENTRY;
2715
2716         rc = obd_set_info_async(lov_exp, strlen(KEY_CAPA_KEY), KEY_CAPA_KEY,
2717                                 sizeof(*key), key, NULL);
2718         RETURN(rc);
2719 }
2720
2721 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
2722                          struct thandle *handle)
2723 {
2724         struct dt_object *next;
2725
2726         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2727         next = mdd_object_child(obj);
2728         next->do_ops->do_ref_add(env, next, handle);
2729 }
2730
2731 /*
2732  * XXX: if permission check is needed here?
2733  */
2734 static int mdd_ref_add(const struct lu_env *env,
2735                        struct md_object *obj, struct md_ucred *uc)
2736 {
2737         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2738         struct mdd_device *mdd = mdo2mdd(obj);
2739         struct thandle *handle;
2740         ENTRY;
2741
2742         mdd_txn_param_build(env, &MDD_TXN_XATTR_SET);
2743         handle = mdd_trans_start(env, mdd);
2744         if (IS_ERR(handle))
2745                 RETURN(-ENOMEM);
2746
2747         mdd_write_lock(env, mdd_obj);
2748         __mdd_ref_add(env, mdd_obj, handle);
2749         mdd_write_unlock(env, mdd_obj);
2750
2751         mdd_trans_stop(env, mdd, 0, handle);
2752
2753         RETURN(0);
2754 }
2755
2756 static void
2757 __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
2758               struct thandle *handle)
2759 {
2760         struct dt_object *next = mdd_object_child(obj);
2761
2762         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2763
2764         next->do_ops->do_ref_del(env, next, handle);
2765 }
2766
2767 /* do NOT or the MAY_*'s, you'll get the weakest */
2768 static int accmode(struct mdd_object *mdd_obj, int flags)
2769 {
2770         int res = 0;
2771
2772 #if 0
2773         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2774          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2775          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2776          * owner can write to a file even if it is marked readonly to hide
2777          * its brokenness. (bug 5781) */
2778         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2779                 return 0;
2780 #endif
2781         if (flags & FMODE_READ)
2782                 res = MAY_READ;
2783         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2784                 res |= MAY_WRITE;
2785         if (flags & MDS_FMODE_EXEC)
2786                 res = MAY_EXEC;
2787         return res;
2788 }
2789
2790 static int mdd_open_sanity_check(const struct lu_env *env,
2791                                  struct mdd_object *obj, int flag,
2792                                  struct md_ucred *uc)
2793 {
2794         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2795         int mode = accmode(obj, flag);
2796         int rc;
2797         ENTRY;
2798
2799         /* EEXIST check */
2800         if (mdd_is_dead_obj(obj))
2801                 RETURN(-ENOENT);
2802
2803         rc = __mdd_la_get(env, obj, tmp_la);
2804         if (rc)
2805                RETURN(rc);
2806
2807         if (S_ISLNK(tmp_la->la_mode))
2808                 RETURN(-ELOOP);
2809
2810         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2811                 RETURN(-EISDIR);
2812
2813         if (!(flag & MDS_OPEN_CREATED)) {
2814                 rc = __mdd_permission_internal(env, obj, mode, 0, uc);
2815                 if (rc)
2816                         RETURN(rc);
2817         }
2818
2819         /*
2820          * FIFO's, sockets and device files are special: they don't
2821          * actually live on the filesystem itself, and as such you
2822          * can write to them even if the filesystem is read-only.
2823          */
2824         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2825             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2826                 flag &= ~O_TRUNC;
2827
2828         /*
2829          * An append-only file must be opened in append mode for writing.
2830          */
2831         if (mdd_is_append(obj)) {
2832                 if ((flag & FMODE_WRITE) && !(flag & O_APPEND))
2833                         RETURN(-EPERM);
2834                 if (flag & O_TRUNC)
2835                         RETURN(-EPERM);
2836         }
2837
2838         /* O_NOATIME can only be set by the owner or superuser */
2839         if (flag & O_NOATIME)
2840                 if (uc->mu_fsuid != tmp_la->la_uid && !mdd_capable(uc, CAP_FOWNER))
2841                         RETURN(-EPERM);
2842
2843         RETURN(0);
2844 }
2845
2846 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2847                     int flags, struct md_ucred *uc)
2848 {
2849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2850         int rc = 0;
2851
2852         mdd_write_lock(env, mdd_obj);
2853
2854         rc = mdd_open_sanity_check(env, mdd_obj, flags, uc);
2855         if (rc == 0)
2856                 mdd_obj->mod_count ++;
2857
2858         mdd_write_unlock(env, mdd_obj);
2859         return rc;
2860 }
2861
2862 /*
2863  * No permission check is needed.
2864  */
2865 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2866                      struct md_attr *ma, struct md_ucred *uc)
2867 {
2868         int rc;
2869         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2870         ENTRY;
2871
2872         mdd_write_lock(env, mdd_obj);
2873         /* release open count */
2874         mdd_obj->mod_count --;
2875
2876         rc = __mdd_iattr_get(env, mdd_obj, ma);
2877         if (rc == 0 && mdd_obj->mod_count == 0) {
2878                 if (ma->ma_attr.la_nlink == 0)
2879                         rc = __mdd_object_kill(env, mdd_obj, ma);
2880         }
2881         mdd_write_unlock(env, mdd_obj);
2882         RETURN(rc);
2883 }
2884
2885 static int mdd_readpage_sanity_check(const struct lu_env *env,
2886                                      struct mdd_object *obj,
2887                                      struct md_ucred *uc)
2888 {
2889         struct dt_object *next = mdd_object_child(obj);
2890         int rc;
2891         ENTRY;
2892
2893         if (S_ISDIR(mdd_object_type(obj)) &&
2894             dt_try_as_dir(env, next))
2895                 rc = mdd_permission_internal(env, obj, MAY_READ, uc);
2896         else
2897                 rc = -ENOTDIR;
2898
2899         RETURN(rc);
2900 }
2901
2902 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2903                         const struct lu_rdpg *rdpg, struct md_ucred *uc)
2904 {
2905         struct dt_object *next;
2906         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2907         int rc;
2908         ENTRY;
2909
2910         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2911         next = mdd_object_child(mdd_obj);
2912
2913         mdd_read_lock(env, mdd_obj);
2914         rc = mdd_readpage_sanity_check(env, mdd_obj, uc);
2915         if (rc)
2916                 GOTO(out_unlock, rc);
2917
2918         rc = next->do_ops->do_readpage(env, next, rdpg);
2919
2920 out_unlock:
2921         mdd_read_unlock(env, mdd_obj);
2922         RETURN(rc);
2923 }
2924
2925 #ifdef CONFIG_FS_POSIX_ACL
2926 #include <linux/posix_acl_xattr.h>
2927 #include <linux/posix_acl.h>
2928
2929 static int mdd_posix_acl_permission(struct md_ucred *uc, struct lu_attr *la,
2930                                     int want, posix_acl_xattr_entry *entry,
2931                                     int count)
2932 {
2933         posix_acl_xattr_entry *pa, *pe, *mask_obj;
2934         int found = 0;
2935         ENTRY;
2936
2937         if (count <= 0)
2938                 RETURN(-EACCES);
2939
2940         pa = &entry[0];
2941         pe = &entry[count - 1];
2942         for (; pa <= pe; pa++) {
2943                 switch(pa->e_tag) {
2944                         case ACL_USER_OBJ:
2945                                 /* (May have been checked already) */
2946                                 if (la->la_uid == uc->mu_fsuid)
2947                                         goto check_perm;
2948                                 break;
2949                         case ACL_USER:
2950                                 if (pa->e_id == uc->mu_fsuid)
2951                                         goto mask;
2952                                 break;
2953                         case ACL_GROUP_OBJ:
2954                                 if (mdd_in_group_p(uc, la->la_gid)) {
2955                                         found = 1;
2956                                         if ((pa->e_perm & want) == want)
2957                                                 goto mask;
2958                                 }
2959                                 break;
2960                         case ACL_GROUP:
2961                                 if (mdd_in_group_p(uc, pa->e_id)) {
2962                                         found = 1;
2963                                         if ((pa->e_perm & want) == want)
2964                                                 goto mask;
2965                                 }
2966                                 break;
2967                         case ACL_MASK:
2968                                 break;
2969                         case ACL_OTHER:
2970                                 if (found)
2971                                         RETURN(-EACCES);
2972                                 else
2973                                         goto check_perm;
2974                         default:
2975                                 RETURN(-EIO);
2976                 }
2977         }
2978         RETURN(-EIO);
2979
2980 mask:
2981         for (mask_obj = pa + 1; mask_obj <= pe; mask_obj++) {
2982                 if (mask_obj->e_tag == ACL_MASK) {
2983                         if ((pa->e_perm & mask_obj->e_perm & want) == want)
2984                                 RETURN(0);
2985
2986                         RETURN(-EACCES);
2987                 }
2988         }
2989
2990 check_perm:
2991         if ((pa->e_perm & want) == want)
2992                 RETURN(0);
2993
2994         RETURN(-EACCES);
2995 }
2996 #endif
2997
2998 static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
2999                          struct lu_attr* la, int mask, struct md_ucred *uc)
3000 {
3001 #ifdef CONFIG_FS_POSIX_ACL
3002         struct dt_object  *next;
3003         void *buf;
3004         int buf_len;
3005         posix_acl_xattr_entry *entry;
3006         int entry_count;
3007         int rc;
3008         ENTRY;
3009
3010         next = mdd_object_child(obj);
3011         buf_len = next->do_ops->do_xattr_get(env, next, NULL, 0, "");
3012         if (buf_len <= 0)
3013                 RETURN(buf_len ? : -EACCES);
3014
3015         OBD_ALLOC(buf, buf_len);
3016         if (buf == NULL)
3017                 RETURN(-ENOMEM);
3018
3019         rc = next->do_ops->do_xattr_get(env, next, buf, buf_len, "");
3020         if (rc <= 0)
3021                 GOTO(out, rc = rc ? : -EACCES);
3022
3023         entry = ((posix_acl_xattr_header *)buf)->a_entries;
3024         entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
3025
3026         rc = mdd_posix_acl_permission(uc, la, mask, entry, entry_count);
3027
3028 out:
3029         OBD_FREE(buf, buf_len);
3030         RETURN(rc);
3031 #else
3032         ENTRY;
3033         RETURN(-EAGAIN);
3034 #endif
3035 }
3036
3037 static int mdd_exec_permission_lite(const struct lu_env *env,
3038                                     struct mdd_object *obj,
3039                                     struct md_ucred *uc)
3040 {
3041         struct lu_attr *la = &mdd_env_info(env)->mti_la;
3042         umode_t mode;
3043         int rc;
3044         ENTRY;
3045
3046         /* These means unnecessary for permission check */
3047         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3048                 RETURN(0);
3049
3050         /* Invalid user credit */
3051         if (uc->mu_valid == UCRED_INVALID)
3052                 RETURN(-EACCES);
3053
3054         rc = __mdd_la_get(env, obj, la);
3055         if (rc)
3056                 RETURN(rc);
3057
3058         mode = la->la_mode;
3059         if (uc->mu_fsuid == la->la_uid)
3060                 mode >>= 6;
3061         else if (mdd_in_group_p(uc, la->la_gid))
3062                 mode >>= 3;
3063
3064         if (mode & MAY_EXEC)
3065                 RETURN(0);
3066
3067         if (((la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) &&
3068             mdd_capable(uc, CAP_DAC_OVERRIDE))
3069                 RETURN(0);
3070
3071         if (S_ISDIR(la->la_mode) && mdd_capable(uc, CAP_DAC_READ_SEARCH))
3072                 RETURN(0);
3073
3074         RETURN(-EACCES);
3075 }
3076
3077 static int __mdd_permission_internal(const struct lu_env *env,
3078                                      struct mdd_object *obj,
3079                                      int mask, int getattr,
3080                                      struct md_ucred *uc)
3081 {
3082         struct lu_attr *la = &mdd_env_info(env)->mti_la;
3083         __u32 mode;
3084         int rc;
3085
3086         ENTRY;
3087
3088         if (mask == 0)
3089                 RETURN(0);
3090
3091         /* These means unnecessary for permission check */
3092         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3093                 RETURN(0);
3094
3095         /* Invalid user credit */
3096         if (uc->mu_valid == UCRED_INVALID)
3097                 RETURN(-EACCES);
3098
3099         /*
3100          * Nobody gets write access to an immutable file.
3101          */
3102         if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
3103                 RETURN(-EACCES);
3104
3105         if (getattr) {
3106                 rc = __mdd_la_get(env, obj, la);
3107                 if (rc)
3108                         RETURN(rc);
3109         }
3110
3111         mode = la->la_mode;
3112         if (uc->mu_fsuid == la->la_uid) {
3113                 mode >>= 6;
3114         } else {
3115                 if (mode & S_IRWXG) {
3116                         if (((mode >> 3) & mask & S_IRWXO) != mask)
3117                                 goto check_groups;
3118
3119                         rc = mdd_check_acl(env, obj, la, mask, uc);
3120                         if (rc == -EACCES)
3121                                 goto check_capabilities;
3122                         else if ((rc != -EAGAIN) && (rc != -EOPNOTSUPP))
3123                                 RETURN(rc);
3124                 }
3125
3126 check_groups:
3127                 if (mdd_in_group_p(uc, la->la_gid))
3128                         mode >>= 3;
3129         }
3130
3131         /*
3132          * If the DACs are ok we don't need any capability check.
3133          */
3134         if (((mode & mask & S_IRWXO) == mask))
3135                 RETURN(0);
3136
3137 check_capabilities:
3138
3139         /*
3140          * Read/write DACs are always overridable.
3141          * Executable DACs are overridable if at least one exec bit is set.
3142          * Dir's DACs are always overridable.
3143          */
3144         if (!(mask & MAY_EXEC) ||
3145             (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
3146                 if (mdd_capable(uc, CAP_DAC_OVERRIDE))
3147                         RETURN(0);
3148
3149         /*
3150          * Searching includes executable on directories, else just read.
3151          */
3152         if ((mask == MAY_READ) ||
3153             (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
3154                 if (mdd_capable(uc, CAP_DAC_READ_SEARCH))
3155                         RETURN(0);
3156
3157         RETURN(-EACCES);
3158 }
3159
3160 static inline int mdd_permission_internal_locked(const struct lu_env *env,
3161                                                  struct mdd_object *obj,
3162                                                  int mask, struct md_ucred *uc)
3163 {
3164         int rc;
3165
3166         mdd_read_lock(env, obj);
3167         rc = mdd_permission_internal(env, obj, mask, uc);
3168         mdd_read_unlock(env, obj);
3169
3170         return rc;
3171 }
3172
3173 static int mdd_permission(const struct lu_env *env, struct md_object *obj,
3174                           int mask, struct md_ucred *uc)
3175 {
3176         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3177         int rc;
3178         ENTRY;
3179
3180         rc = mdd_permission_internal_locked(env, mdd_obj, mask, uc);
3181
3182         RETURN(rc);
3183 }
3184
3185 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
3186                         struct lustre_capa *capa)
3187 {
3188         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3189         struct mdd_device *mdd = mdo2mdd(obj);
3190         struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site;
3191         struct lustre_capa_key *key = &ls->ls_capa_keys[1];
3192         struct obd_capa *ocapa;
3193         int rc;
3194         ENTRY;
3195
3196         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3197
3198         capa->lc_fid = *mdo2fid(mdd_obj);
3199         if (ls->ls_capa_timeout < CAPA_TIMEOUT)
3200                 capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
3201         if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid))
3202                 capa->lc_flags |= CAPA_FL_ROOT;
3203         capa->lc_flags = ls->ls_capa_alg << 24;
3204
3205         /* TODO: get right permission here after remote uid landing */
3206         ocapa = capa_lookup(capa);
3207         if (ocapa) {
3208                 LASSERT(!capa_is_expired(ocapa));
3209                 capa_cpy(capa, ocapa);
3210                 capa_put(ocapa);
3211                 RETURN(0);
3212         }
3213
3214         capa->lc_keyid = key->lk_keyid;
3215         capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout;
3216         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
3217         if (rc)
3218                 RETURN(rc);
3219
3220         capa_add(capa);
3221         RETURN(0);
3222 }
3223
3224 struct md_device_operations mdd_ops = {
3225         .mdo_statfs         = mdd_statfs,
3226         .mdo_root_get       = mdd_root_get,
3227         .mdo_maxsize_get    = mdd_maxsize_get,
3228         .mdo_init_capa_keys = mdd_init_capa_keys,
3229         .mdo_update_capa_key= mdd_update_capa_key,
3230 };
3231
3232 static struct md_dir_operations mdd_dir_ops = {
3233         .mdo_is_subdir     = mdd_is_subdir,
3234         .mdo_lookup        = mdd_lookup,
3235         .mdo_create        = mdd_create,
3236         .mdo_rename        = mdd_rename,
3237         .mdo_link          = mdd_link,
3238         .mdo_unlink        = mdd_unlink,
3239         .mdo_name_insert   = mdd_name_insert,
3240         .mdo_name_remove   = mdd_name_remove,
3241         .mdo_rename_tgt    = mdd_rename_tgt,
3242         .mdo_create_data   = mdd_create_data
3243 };
3244
3245 static struct md_object_operations mdd_obj_ops = {
3246         .moo_permission    = mdd_permission,
3247         .moo_attr_get      = mdd_attr_get,
3248         .moo_attr_set      = mdd_attr_set,
3249         .moo_xattr_get     = mdd_xattr_get,
3250         .moo_xattr_set     = mdd_xattr_set,
3251         .moo_xattr_list    = mdd_xattr_list,
3252         .moo_xattr_del     = mdd_xattr_del,
3253         .moo_object_create = mdd_object_create,
3254         .moo_ref_add       = mdd_ref_add,
3255         .moo_ref_del       = mdd_ref_del,
3256         .moo_open          = mdd_open,
3257         .moo_close         = mdd_close,
3258         .moo_readpage      = mdd_readpage,
3259         .moo_readlink      = mdd_readlink,
3260         .moo_capa_get      = mdd_capa_get
3261 };
3262
3263 static struct obd_ops mdd_obd_device_ops = {
3264         .o_owner = THIS_MODULE
3265 };
3266
3267 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
3268                                           struct lu_device_type *t,
3269                                           struct lustre_cfg *lcfg)
3270 {
3271         struct lu_device  *l;
3272         struct mdd_device *m;
3273
3274         OBD_ALLOC_PTR(m);
3275         if (m == NULL) {
3276                 l = ERR_PTR(-ENOMEM);
3277         } else {
3278                 md_device_init(&m->mdd_md_dev, t);
3279                 l = mdd2lu_dev(m);
3280                 l->ld_ops = &mdd_lu_ops;
3281                 m->mdd_md_dev.md_ops = &mdd_ops;
3282         }
3283
3284         return l;
3285 }
3286
3287 static void mdd_device_free(const struct lu_env *env,
3288                             struct lu_device *lu)
3289 {
3290         struct mdd_device *m = lu2mdd_dev(lu);
3291
3292         LASSERT(atomic_read(&lu->ld_ref) == 0);
3293         md_device_fini(&m->mdd_md_dev);
3294         OBD_FREE_PTR(m);
3295 }
3296
3297 static int mdd_type_init(struct lu_device_type *t)
3298 {
3299         return lu_context_key_register(&mdd_thread_key);
3300 }
3301
3302 static void mdd_type_fini(struct lu_device_type *t)
3303 {
3304         lu_context_key_degister(&mdd_thread_key);
3305 }
3306
3307 static struct lu_device_type_operations mdd_device_type_ops = {
3308         .ldto_init = mdd_type_init,
3309         .ldto_fini = mdd_type_fini,
3310
3311         .ldto_device_alloc = mdd_device_alloc,
3312         .ldto_device_free  = mdd_device_free,
3313
3314         .ldto_device_init    = mdd_device_init,
3315         .ldto_device_fini    = mdd_device_fini
3316 };
3317
3318 static struct lu_device_type mdd_device_type = {
3319         .ldt_tags     = LU_DEVICE_MD,
3320         .ldt_name     = LUSTRE_MDD_NAME,
3321         .ldt_ops      = &mdd_device_type_ops,
3322         .ldt_ctx_tags = LCT_MD_THREAD
3323 };
3324
3325 static void *mdd_key_init(const struct lu_context *ctx,
3326                           struct lu_context_key *key)
3327 {
3328         struct mdd_thread_info *info;
3329
3330         OBD_ALLOC_PTR(info);
3331         if (info == NULL)
3332                 info = ERR_PTR(-ENOMEM);
3333         return info;
3334 }
3335
3336 static void mdd_key_fini(const struct lu_context *ctx,
3337                          struct lu_context_key *key, void *data)
3338 {
3339         struct mdd_thread_info *info = data;
3340         OBD_FREE_PTR(info);
3341 }
3342
3343 static struct lu_context_key mdd_thread_key = {
3344         .lct_tags = LCT_MD_THREAD,
3345         .lct_init = mdd_key_init,
3346         .lct_fini = mdd_key_fini
3347 };
3348
3349 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
3350         { 0 }
3351 };
3352
3353 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
3354         { 0 }
3355 };
3356
3357 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
3358
3359 static int __init mdd_mod_init(void)
3360 {
3361         struct lprocfs_static_vars lvars;
3362         printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
3363         lprocfs_init_vars(mdd, &lvars);
3364         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
3365                                    LUSTRE_MDD_NAME, &mdd_device_type);
3366 }
3367
3368 static void __exit mdd_mod_exit(void)
3369 {
3370         class_unregister_type(LUSTRE_MDD_NAME);
3371 }
3372
3373 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3374 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
3375 MODULE_LICENSE("GPL");
3376
3377 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);