Whamcloud - gitweb
- added comment about access capa fields in mdt_body_unpack().
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_env *env,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_env *env,
51                            struct mdd_device *mdd, int rc,
52                            struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
55                           struct thandle *handle);
56 static void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
57                           struct thandle *handle);
58 static int __mdd_lookup(const struct lu_env *env,
59                         struct md_object *pobj,
60                         const char *name, const struct lu_fid* fid,
61                         int mask);
62 static int __mdd_lookup_locked(const struct lu_env *env,
63                                struct md_object *pobj,
64                                const char *name, const struct lu_fid* fid,
65                                int mask);
66 static int mdd_exec_permission_lite(const struct lu_env *env,
67                                     struct mdd_object *obj);
68 static int __mdd_permission_internal(const struct lu_env *env,
69                                      struct mdd_object *obj,
70                                      int mask, int getattr);
71
72 static struct md_object_operations mdd_obj_ops;
73 static struct md_dir_operations    mdd_dir_ops;
74 static struct lu_object_operations mdd_lu_obj_ops;
75
76 static struct lu_context_key       mdd_thread_key;
77
78 static const char *mdd_root_dir_name = "root";
79 static const char dot[] = ".";
80 static const char dotdot[] = "..";
81
82 enum mdd_txn_op {
83         MDD_TXN_OBJECT_DESTROY_OP,
84         MDD_TXN_OBJECT_CREATE_OP,
85         MDD_TXN_ATTR_SET_OP,
86         MDD_TXN_XATTR_SET_OP,
87         MDD_TXN_INDEX_INSERT_OP,
88         MDD_TXN_INDEX_DELETE_OP,
89         MDD_TXN_LINK_OP,
90         MDD_TXN_UNLINK_OP,
91         MDD_TXN_RENAME_OP,
92         MDD_TXN_RENAME_TGT_OP,
93         MDD_TXN_CREATE_DATA_OP,
94         MDD_TXN_MKDIR_OP
95 };
96
97 struct mdd_txn_op_descr {
98         enum mdd_txn_op mod_op;
99         unsigned int    mod_credits;
100 };
101
102 enum {
103         MDD_TXN_OBJECT_DESTROY_CREDITS = 0,
104         MDD_TXN_OBJECT_CREATE_CREDITS = 0,
105         MDD_TXN_ATTR_SET_CREDITS = 0,
106         MDD_TXN_XATTR_SET_CREDITS = 0,
107         MDD_TXN_INDEX_INSERT_CREDITS = 0,
108         MDD_TXN_INDEX_DELETE_CREDITS = 0,
109         MDD_TXN_LINK_CREDITS = 0,
110         MDD_TXN_UNLINK_CREDITS = 0,
111         MDD_TXN_RENAME_CREDITS = 0,
112         MDD_TXN_RENAME_TGT_CREDITS = 0,
113         MDD_TXN_CREATE_DATA_CREDITS = 0,
114         MDD_TXN_MKDIR_CREDITS = 0
115 };
116 #define DEFINE_MDD_TXN_OP_ARRAY(opname, base)       \
117 [opname ## _OP - base ## _OP]= { \
118         .mod_op      = opname ## _OP,           \
119         .mod_credits = opname ## _CREDITS,      \
120 }
121
122 /*
123  * number of blocks to reserve for particular operations. Should be function
124  * of ... something. Stub for now.
125  */
126
127 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
128         DEFINE_MDD_TXN_OP_ARRAY(opname, MDD_TXN_OBJECT_DESTROY)
129
130 static struct mdd_txn_op_descr mdd_txn_descrs[] = {
131         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY),
132         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE),
133         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET),
134         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET),
135         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT),
136         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE),
137         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK),
138         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK),
139         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME),
140         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME_TGT),
141         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA),
142         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR)
143 };
144 struct rw_semaphore    mdd_txn_sem;
145
146 static void mdd_txn_param_build(const struct lu_env *env, int op)
147 {
148         int num_entries, i;
149         /* init credits for each ops */
150         num_entries = sizeof (mdd_txn_descrs) / sizeof(struct mdd_txn_op_descr);
151
152         LASSERT(num_entries > 0);
153
154         down_read(&mdd_txn_sem);
155         for (i =0; i < num_entries; i++) {
156                 if (mdd_txn_descrs[i].mod_op == op) {
157                         LASSERT(mdd_txn_descrs[i].mod_credits > 0);
158                         mdd_env_info(env)->mti_param.tp_credits = 
159                                         mdd_txn_descrs[i].mod_credits;
160                         up_read(&mdd_txn_sem);
161                         return;
162                 }
163         }
164         up_read(&mdd_txn_sem);
165         CERROR("Wrong operation %d \n", op);
166         LBUG();
167 }
168
169 static int mdd_credit_get(const struct lu_env *env, struct mdd_device *mdd,
170                           int op)
171 {
172         int credits;
173         credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child,
174                                                     op);
175         LASSERT(credits > 0);
176         return credits;
177 }
178
179 /* FIXME: we should calculate it by lsm count, 
180  * not ost count */
181 int mdd_init_txn_credits(const struct lu_env *env, struct mdd_device *mdd)
182 {
183         struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
184         int ost_count = mds->mds_lov_desc.ld_tgt_count;
185         int iam_credits, xattr_credits, log_credits, create_credits;
186         int num_entries, i, attr_credits;
187
188         /* init credits for each ops */
189         num_entries = sizeof (mdd_txn_descrs) / sizeof(struct mdd_txn_op_descr);
190         LASSERT(num_entries > 0);
191
192         /* init the basic credits from osd layer */
193         iam_credits = mdd_credit_get(env, mdd, INSERT_IAM);
194         log_credits = mdd_credit_get(env, mdd, LOG_REC);
195         attr_credits = mdd_credit_get(env, mdd, ATTR_SET);
196         xattr_credits = mdd_credit_get(env, mdd, XATTR_SET);
197         create_credits = mdd_credit_get(env, mdd, CREATE_OBJECT);
198         /* calculate the mdd credits */
199         down_write(&mdd_txn_sem);
200         for (i =0; i < num_entries; i++) {
201                 int opcode = mdd_txn_descrs[i].mod_op;
202                 switch(opcode) {
203                         case MDD_TXN_OBJECT_DESTROY_OP:
204                                 mdd_txn_descrs[i].mod_credits = 20;
205                                 break;
206                         case MDD_TXN_OBJECT_CREATE_OP:
207                                 /* OI_INSERT + CREATE OBJECT */
208                                 mdd_txn_descrs[i].mod_credits = 
209                                     iam_credits + create_credits; 
210                                 break;
211                         case MDD_TXN_ATTR_SET_OP:
212                                 /* ATTR set + XATTR(lsm, lmv) set */
213                                 mdd_txn_descrs[i].mod_credits = 
214                                     attr_credits + xattr_credits;
215                                 break;
216                         case MDD_TXN_XATTR_SET_OP:
217                                 mdd_txn_descrs[i].mod_credits = xattr_credits;
218                                 break;
219                         case MDD_TXN_INDEX_INSERT_OP:
220                                 mdd_txn_descrs[i].mod_credits = iam_credits;
221                                 break;
222                         case MDD_TXN_INDEX_DELETE_OP:
223                                 mdd_txn_descrs[i].mod_credits = iam_credits;
224                                 break;
225                         case MDD_TXN_LINK_OP:
226                                 mdd_txn_descrs[i].mod_credits = iam_credits;
227                                 break;
228                         case MDD_TXN_UNLINK_OP:
229                                 /* delete IAM + Unlink log */
230                                 mdd_txn_descrs[i].mod_credits = 
231                                         iam_credits + log_credits * ost_count;
232                                 break;
233                         case MDD_TXN_RENAME_OP:
234                                 /* 2 delete IAM + 1 insert + Unlink log */
235                                 mdd_txn_descrs[i].mod_credits = 
236                                     3 * iam_credits + log_credits * ost_count;
237                                 break;
238                         case MDD_TXN_RENAME_TGT_OP:
239                                 /* iam insert + iam delete */
240                                 mdd_txn_descrs[i].mod_credits = 2 * iam_credits;
241                                 break;
242                         case MDD_TXN_CREATE_DATA_OP:
243                                 /* same as set xattr(lsm) */
244                                 mdd_txn_descrs[i].mod_credits = xattr_credits;
245                                 break;
246                         case MDD_TXN_MKDIR_OP:
247                                 /* IAM_INSERT + OI_INSERT + CREATE_OBJECT_CREDITS
248                                  * SET_MD CREDITS is already counted in 
249                                  * CREATE_OBJECT CREDITS 
250                                  */
251                                 mdd_txn_descrs[i].mod_credits = 
252                                         2 * iam_credits + create_credits;
253                                 break;
254                         default:
255                                 CERROR("invalid op %d init its credit\n", opcode);
256                                 up_write(&mdd_txn_sem);
257                                 LBUG();
258                 }
259         }
260         up_write(&mdd_txn_sem);
261         RETURN(0);        
262 }
263
264 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
265 {
266         struct lu_buf *buf;
267
268         buf = &mdd_env_info(env)->mti_buf;
269         buf->lb_buf = area;
270         buf->lb_len = len;
271         return buf;
272 }
273
274 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
275                                        const void *area, ssize_t len)
276 {
277         struct lu_buf *buf;
278
279         buf = &mdd_env_info(env)->mti_buf;
280         buf->lb_buf = (void *)area;
281         buf->lb_len = len;
282         return buf;
283 }
284
285 #define mdd_get_group_info(group_info) do {             \
286         atomic_inc(&(group_info)->usage);               \
287 } while (0)
288
289 #define mdd_put_group_info(group_info) do {             \
290         if (atomic_dec_and_test(&(group_info)->usage))  \
291                 groups_free(group_info);                \
292 } while (0)
293
294 #define MDD_NGROUPS_PER_BLOCK       ((int)(CFS_PAGE_SIZE / sizeof(gid_t)))
295
296 #define MDD_GROUP_AT(gi, i) \
297     ((gi)->blocks[(i) / MDD_NGROUPS_PER_BLOCK][(i) % MDD_NGROUPS_PER_BLOCK])
298
299 /* groups_search() is copied from linux kernel! */
300 /* a simple bsearch */
301 static int mdd_groups_search(struct group_info *group_info, gid_t grp)
302 {
303         int left, right;
304
305         if (!group_info)
306                 return 0;
307
308         left = 0;
309         right = group_info->ngroups;
310         while (left < right) {
311                 int mid = (left + right) / 2;
312                 int cmp = grp - MDD_GROUP_AT(group_info, mid);
313
314                 if (cmp > 0)
315                         left = mid + 1;
316                 else if (cmp < 0)
317                         right = mid;
318                 else
319                         return 1;
320         }
321         return 0;
322 }
323
324 static int mdd_in_group_p(struct md_ucred *uc, gid_t grp)
325 {
326         int rc = 1;
327
328         if (grp != uc->mu_fsgid) {
329                 struct group_info *group_info = NULL;
330
331                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
332                         if ((grp == uc->mu_suppgids[0]) ||
333                             (grp == uc->mu_suppgids[1]))
334                                 return 1;
335
336                 if (uc->mu_ginfo)
337                         group_info = uc->mu_ginfo;
338                 else if (uc->mu_identity)
339                         group_info = uc->mu_identity->mi_ginfo;
340
341                 if (!group_info)
342                         return 0;
343
344                 mdd_get_group_info(group_info);
345                 rc = mdd_groups_search(group_info, grp);
346                 mdd_put_group_info(group_info);
347         }
348         return rc;
349 }
350
351 static inline int mdd_permission_internal(const struct lu_env *env,
352                                           struct mdd_object *obj, int mask)
353 {
354         return __mdd_permission_internal(env, obj, mask, 1);
355 }
356
357 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
358 {
359         struct mdd_thread_info *info;
360
361         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
362         LASSERT(info != NULL);
363         return info;
364 }
365
366 static struct lu_object *mdd_object_alloc(const struct lu_env *env,
367                                           const struct lu_object_header *hdr,
368                                           struct lu_device *d)
369 {
370         struct mdd_object *mdd_obj;
371
372         OBD_ALLOC_PTR(mdd_obj);
373         if (mdd_obj != NULL) {
374                 struct lu_object *o;
375                 
376                 o = mdd2lu_obj(mdd_obj);
377                 lu_object_init(o, NULL, d);
378                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
379                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
380                 mdd_obj->mod_count = 0;
381                 o->lo_ops = &mdd_lu_obj_ops;
382                 return o;
383         } else {
384                 return NULL;
385         }
386 }
387
388 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
389 {
390         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
391         struct lu_object  *below;
392         struct lu_device  *under;
393         ENTRY;
394
395         under = &d->mdd_child->dd_lu_dev;
396         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
397
398         if (below == NULL)
399                 RETURN(-ENOMEM);
400
401         lu_object_add(o, below);
402         RETURN(0);
403 }
404
405 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj);
406
407 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
408 {
409         if (lu_object_exists(o))
410                 return mdd_get_flags(env, lu2mdd_obj(o));
411         else
412                 return 0;
413 }
414
415 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
416 {
417         struct mdd_object *mdd = lu2mdd_obj(o);
418         
419         lu_object_fini(o);
420         OBD_FREE_PTR(mdd);
421 }
422
423 static int mdd_object_print(const struct lu_env *env, void *cookie,
424                             lu_printer_t p, const struct lu_object *o)
425 {
426         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
427 }
428
429 /* orphan handling is here */
430 static void mdd_object_delete(const struct lu_env *env,
431                                struct lu_object *o)
432 {
433         struct mdd_object *mdd_obj = lu2mdd_obj(o);
434         struct thandle *handle = NULL;
435         ENTRY;
436
437         if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
438                 return;
439
440         if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
441                 mdd_txn_param_build(env, MDD_TXN_MKDIR_OP);
442                 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
443                 if (IS_ERR(handle))
444                         CERROR("Cannot get thandle\n");
445                 else {
446                         mdd_write_lock(env, mdd_obj);
447                         /* let's remove obj from the orphan list */
448                         __mdd_orphan_del(env, mdd_obj, handle);
449                         mdd_write_unlock(env, mdd_obj);
450                         mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
451                                        0, handle);
452                 }
453         }
454 }
455
456 static struct lu_object_operations mdd_lu_obj_ops = {
457         .loo_object_init    = mdd_object_init,
458         .loo_object_start   = mdd_object_start,
459         .loo_object_free    = mdd_object_free,
460         .loo_object_print   = mdd_object_print,
461         .loo_object_delete  = mdd_object_delete
462 };
463
464 struct mdd_object *mdd_object_find(const struct lu_env *env,
465                                    struct mdd_device *d,
466                                    const struct lu_fid *f)
467 {
468         struct lu_object *o, *lo;
469         struct mdd_object *m;
470         ENTRY;
471
472         o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA);
473         if (IS_ERR(o))
474                 m = (struct mdd_object *)o;
475         else {
476                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
477                 /* remote object can't be located and should be put then */
478                 if (lo == NULL)
479                         lu_object_put(env, o);
480                 m = lu2mdd_obj(lo);
481         }
482         RETURN(m);
483 }
484
485 static inline int mdd_is_immutable(struct mdd_object *obj)
486 {
487         return obj->mod_flags & IMMUTE_OBJ;
488 }
489
490 static inline int mdd_is_append(struct mdd_object *obj)
491 {
492         return obj->mod_flags & APPEND_OBJ;
493 }
494
495 static inline void mdd_set_dead_obj(struct mdd_object *obj)
496 {
497         if (obj)
498                 obj->mod_flags |= DEAD_OBJ;
499 }
500
501 static inline int mdd_is_dead_obj(struct mdd_object *obj)
502 {
503         return obj && obj->mod_flags & DEAD_OBJ;
504 }
505
506 /*Check whether it may create the cobj under the pobj*/
507 static int mdd_may_create(const struct lu_env *env,
508                           struct mdd_object *pobj, struct mdd_object *cobj,
509                           int need_check)
510 {
511         int rc = 0;
512         ENTRY;
513
514         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
515                 RETURN(-EEXIST);
516
517         if (mdd_is_dead_obj(pobj))
518                 RETURN(-ENOENT);
519
520         /*check pobj may create or not*/
521         if (need_check)
522                 rc = mdd_permission_internal(env, pobj,
523                                              MAY_WRITE | MAY_EXEC);
524
525         RETURN(rc);
526 }
527
528 static inline int __mdd_la_get(const struct lu_env *env,
529                                struct mdd_object *obj, struct lu_attr *la)
530 {
531         struct dt_object  *next = mdd_object_child(obj);
532         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
533         return next->do_ops->do_attr_get(env, next, la);
534 }
535
536 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
537 {
538         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
539
540         if (flags & LUSTRE_APPEND_FL)
541                 obj->mod_flags |= APPEND_OBJ;
542
543         if (flags & LUSTRE_IMMUTABLE_FL)
544                 obj->mod_flags |= IMMUTE_OBJ;
545 }
546
547 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
548 {
549         struct lu_attr *la = &mdd_env_info(env)->mti_la;
550         int rc;
551
552         ENTRY;
553         mdd_read_lock(env, obj);
554         rc = __mdd_la_get(env, obj, la);
555         mdd_read_unlock(env, obj);
556         if (rc == 0)
557                 mdd_flags_xlate(obj, la->la_flags);
558         RETURN(rc);
559 }
560
561 #define mdd_cap_t(x) (x)
562
563 #define MDD_CAP_TO_MASK(x) (1 << (x))
564
565 #define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
566
567 /* capable() is copied from linux kernel! */
568 static inline int mdd_capable(struct md_ucred *uc, int cap)
569 {
570         if (mdd_cap_raised(uc->mu_cap, cap))
571                 return 1;
572         return 0;
573 }
574
575 /*
576  * It's inline, so penalty for filesystems that don't use sticky bit is
577  * minimal.
578  */
579 static inline int mdd_is_sticky(const struct lu_env *env,
580                                 struct mdd_object *pobj,
581                                 struct mdd_object *cobj)
582 {
583         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
584         struct md_ucred *uc = md_ucred(env);
585         int rc;
586
587         rc = __mdd_la_get(env, cobj, tmp_la);
588         if (rc) {
589                 return rc;
590         } else if (tmp_la->la_uid == uc->mu_fsuid) {
591                 return 0;
592         } else {
593                 rc = __mdd_la_get(env, pobj, tmp_la);
594                 if (rc)
595                         return rc;
596                 else if (!(tmp_la->la_mode & S_ISVTX))
597                         return 0;
598                 else if (tmp_la->la_uid == uc->mu_fsuid)
599                         return 0;
600                 else
601                         return !mdd_capable(uc, CAP_FOWNER);
602         }
603 }
604
605 /* Check whether it may delete the cobj under the pobj. */
606 static int mdd_may_delete(const struct lu_env *env,
607                           struct mdd_object *pobj,
608                           struct mdd_object *cobj,
609                           int is_dir, int need_check)
610 {
611         struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
612         int rc = 0;
613         ENTRY;
614
615         LASSERT(cobj);
616
617         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
618                 RETURN(-ENOENT);
619
620         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
621                 RETURN(-EPERM);
622
623         if (is_dir) {
624                 if (!S_ISDIR(mdd_object_type(cobj)))
625                         RETURN(-ENOTDIR);
626
627                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
628                         RETURN(-EBUSY);
629
630         } else if (S_ISDIR(mdd_object_type(cobj))) {
631                         RETURN(-EISDIR);
632         }
633
634         if (pobj) {
635                 if (mdd_is_dead_obj(pobj))
636                         RETURN(-ENOENT);
637
638                 if (mdd_is_sticky(env, pobj, cobj))
639                         RETURN(-EPERM);
640
641                 if (need_check)
642                         rc = mdd_permission_internal(env, pobj,
643                                                      MAY_WRITE | MAY_EXEC);
644         }
645         RETURN(rc);
646 }
647
648 /* get only inode attributes */
649 static int __mdd_iattr_get(const struct lu_env *env,
650                            struct mdd_object *mdd_obj, struct md_attr *ma)
651 {
652         int rc = 0;
653         ENTRY;
654
655         rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr);
656         if (rc == 0)
657                 ma->ma_valid = MA_INODE;
658         RETURN(rc);
659 }
660
661 /* get lov EA only */
662 static int __mdd_lmm_get(const struct lu_env *env,
663                          struct mdd_object *mdd_obj, struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
669         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
670                         MDS_LOV_MD_NAME);
671         if (rc > 0) {
672                 ma->ma_valid |= MA_LOV;
673                 rc = 0;
674         }
675         RETURN(rc);
676 }
677
678 /* get lmv EA only*/
679 static int __mdd_lmv_get(const struct lu_env *env,
680                          struct mdd_object *mdd_obj, struct md_attr *ma)
681 {
682         int rc;
683
684         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
685                         MDS_LMV_MD_NAME);
686         if (rc > 0) {
687                 ma->ma_valid |= MA_LMV;
688                 rc = 0;
689         }
690         RETURN(rc);
691 }
692
693 static int mdd_attr_get_internal(const struct lu_env *env,
694                                  struct mdd_object *mdd_obj,
695                                  struct md_attr *ma)
696 {
697         int rc = 0;
698         ENTRY;
699
700         if (ma->ma_need & MA_INODE)
701                 rc = __mdd_iattr_get(env, mdd_obj, ma);
702
703         if (rc == 0 && ma->ma_need & MA_LOV) {
704                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
705                     S_ISDIR(mdd_object_type(mdd_obj)))
706                         rc = __mdd_lmm_get(env, mdd_obj, ma);
707         }
708         if (rc == 0 && ma->ma_need & MA_LMV) {
709                 if (S_ISDIR(mdd_object_type(mdd_obj)))
710                         rc = __mdd_lmv_get(env, mdd_obj, ma);
711         }
712         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
713                         rc, ma->ma_valid);
714         RETURN(rc);
715 }
716
717 static inline int mdd_attr_get_internal_locked(const struct lu_env *env,
718                                                struct mdd_object *mdd_obj,
719                                                struct md_attr *ma)
720 {
721         int rc;
722         mdd_read_lock(env, mdd_obj);
723         rc = mdd_attr_get_internal(env, mdd_obj, ma);
724         mdd_read_unlock(env, mdd_obj);
725         return rc;
726 }
727
728 /*
729  * No permission check is needed.
730  */
731 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
732                         struct md_attr *ma)
733 {
734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
735         int                rc;
736
737         ENTRY;
738         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
739         RETURN(rc);
740 }
741
742 /*
743  * No permission check is needed.
744  */
745 static int mdd_xattr_get(const struct lu_env *env,
746                          struct md_object *obj, struct lu_buf *buf,
747                          const char *name)
748 {
749         struct mdd_object *mdd_obj = md2mdd_obj(obj);
750         struct dt_object  *next;
751         int rc;
752
753         ENTRY;
754
755         LASSERT(lu_object_exists(&obj->mo_lu));
756
757         next = mdd_object_child(mdd_obj);
758         mdd_read_lock(env, mdd_obj);
759         rc = next->do_ops->do_xattr_get(env, next, buf, name);
760         mdd_read_unlock(env, mdd_obj);
761
762         RETURN(rc);
763 }
764
765 /*
766  * Permission check is done when open,
767  * no need check again.
768  */
769 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
770                         struct lu_buf *buf)
771 {
772         struct mdd_object *mdd_obj = md2mdd_obj(obj);
773         struct dt_object  *next;
774         loff_t             pos = 0;
775         int                rc;
776         ENTRY;
777
778         LASSERT(lu_object_exists(&obj->mo_lu));
779
780         next = mdd_object_child(mdd_obj);
781         mdd_read_lock(env, mdd_obj);
782         rc = next->do_body_ops->dbo_read(env, next, buf, &pos);
783         mdd_read_unlock(env, mdd_obj);
784         RETURN(rc);
785 }
786
787 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
788                           struct lu_buf *buf)
789 {
790         struct mdd_object *mdd_obj = md2mdd_obj(obj);
791         struct dt_object  *next;
792         int rc;
793
794         ENTRY;
795
796         LASSERT(lu_object_exists(&obj->mo_lu));
797
798         next = mdd_object_child(mdd_obj);
799         mdd_read_lock(env, mdd_obj);
800         rc = next->do_ops->do_xattr_list(env, next, buf);
801         mdd_read_unlock(env, mdd_obj);
802
803         RETURN(rc);
804 }
805
806 static int mdd_txn_start_cb(const struct lu_env *env,
807                             struct txn_param *param, void *cookie)
808 {
809         return 0;
810 }
811
812 static int mdd_txn_stop_cb(const struct lu_env *env,
813                            struct thandle *txn, void *cookie)
814 {
815         struct mdd_device *mdd = cookie;
816         struct obd_device *obd = mdd2obd_dev(mdd);
817
818         LASSERT(obd);
819         return mds_lov_write_objids(obd);
820 }
821
822 static int mdd_txn_commit_cb(const struct lu_env *env,
823                              struct thandle *txn, void *cookie)
824 {
825         return 0;
826 }
827
828 static int mdd_device_init(const struct lu_env *env,
829                            struct lu_device *d, struct lu_device *next)
830 {
831         struct mdd_device *mdd = lu2mdd_dev(d);
832         struct dt_device  *dt;
833         int rc = 0;
834         ENTRY;
835
836         mdd->mdd_child = lu2dt_dev(next);
837
838         dt = mdd->mdd_child;
839         /* prepare transactions callbacks */
840         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
841         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
842         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
843         mdd->mdd_txn_cb.dtc_cookie = mdd;
844
845         /* init txn credits */
846         init_rwsem(&mdd_txn_sem);
847         RETURN(rc);
848 }
849
850 static struct lu_device *mdd_device_fini(const struct lu_env *env,
851                                          struct lu_device *d)
852 {
853         struct mdd_device *mdd = lu2mdd_dev(d);
854         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
855
856         return next;
857 }
858
859 static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
860 {
861         int rc;
862         struct dt_object *root;
863         ENTRY;
864
865         dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
866         root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
867                              &mdd->mdd_root_fid);
868         if (!IS_ERR(root)) {
869                 LASSERT(root != NULL);
870                 lu_object_put(env, &root->do_lu);
871                 rc = orph_index_init(env, mdd);
872         } else
873                 rc = PTR_ERR(root);
874
875         RETURN(rc);
876 }
877
878 static void mdd_device_shutdown(const struct lu_env *env,
879                                 struct mdd_device *m)
880 {
881         dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
882         if (m->mdd_obd_dev)
883                 mdd_fini_obd(env, m);
884         orph_index_fini(env, m);
885 }
886
887 static int mdd_process_config(const struct lu_env *env,
888                               struct lu_device *d, struct lustre_cfg *cfg)
889 {
890         struct mdd_device *m    = lu2mdd_dev(d);
891         struct dt_device  *dt   = m->mdd_child;
892         struct lu_device  *next = &dt->dd_lu_dev;
893         int rc;
894         ENTRY;
895
896         switch (cfg->lcfg_command) {
897         case LCFG_SETUP:
898                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
899                 if (rc)
900                         GOTO(out, rc);
901                 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
902
903                 rc = mdd_init_obd(env, m, cfg);
904                 if (rc) {
905                         CERROR("lov init error %d \n", rc);
906                         GOTO(out, rc);
907                 }
908                 rc = mdd_mount(env, m);
909                 if (rc)
910                         GOTO(out, rc);
911                 break;
912         case LCFG_CLEANUP:
913                 mdd_device_shutdown(env, m);
914         default:
915                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
916                 break;
917         }
918 out:
919         RETURN(rc);
920 }
921
922 static int mdd_recovery_complete(const struct lu_env *env,
923                                  struct lu_device *d)
924 {
925         struct mdd_device *mdd = lu2mdd_dev(d);
926         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
927         struct obd_device *obd = mdd2obd_dev(mdd);
928         int rc;
929         ENTRY;
930         /* TODO:
931         rc = mdd_lov_set_nextid(env, mdd);
932         if (rc) {
933                 CERROR("%s: mdd_lov_set_nextid failed %d\n",
934                        obd->obd_name, rc);
935                 GOTO(out, rc);
936         }
937         rc = mdd_cleanup_unlink_llog(env, mdd);
938
939         obd_notify(obd->u.mds.mds_osc_obd, NULL,
940                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
941                    OBD_NOTIFY_SYNC, NULL);
942         */
943         LASSERT(mdd);
944         LASSERT(obd);
945
946         obd->obd_recovering = 0;
947         obd->obd_type->typ_dt_ops->o_postrecov(obd);
948         /* TODO: orphans handling */
949         __mdd_orphan_cleanup(env, mdd);
950         rc = next->ld_ops->ldo_recovery_complete(env, next);
951
952         RETURN(rc);
953 }
954
955 struct lu_device_operations mdd_lu_ops = {
956         .ldo_object_alloc      = mdd_object_alloc,
957         .ldo_process_config    = mdd_process_config,
958         .ldo_recovery_complete = mdd_recovery_complete
959 };
960
961 void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
962 {
963         struct dt_object  *next = mdd_object_child(obj);
964
965         next->do_ops->do_write_lock(env, next);
966 }
967
968 void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
969 {
970         struct dt_object  *next = mdd_object_child(obj);
971
972         next->do_ops->do_read_lock(env, next);
973 }
974
975 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
976 {
977         struct dt_object  *next = mdd_object_child(obj);
978
979         next->do_ops->do_write_unlock(env, next);
980 }
981
982 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
983 {
984         struct dt_object  *next = mdd_object_child(obj);
985
986         next->do_ops->do_read_unlock(env, next);
987 }
988
989 static void mdd_lock2(const struct lu_env *env,
990                       struct mdd_object *o0, struct mdd_object *o1)
991 {
992         mdd_write_lock(env, o0);
993         mdd_write_lock(env, o1);
994 }
995
996 static void mdd_unlock2(const struct lu_env *env,
997                         struct mdd_object *o0, struct mdd_object *o1)
998 {
999         mdd_write_unlock(env, o1);
1000         mdd_write_unlock(env, o0);
1001 }
1002
1003 static struct thandle* mdd_trans_start(const struct lu_env *env,
1004                                        struct mdd_device *mdd)
1005 {
1006         struct txn_param *p = &mdd_env_info(env)->mti_param;
1007
1008         return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
1009 }
1010
1011 static void mdd_trans_stop(const struct lu_env *env,
1012                            struct mdd_device *mdd, int result,
1013                            struct thandle *handle)
1014 {
1015         handle->th_result = result;
1016         mdd_child_ops(mdd)->dt_trans_stop(env, handle);
1017 }
1018
1019 static int __mdd_object_create(const struct lu_env *env,
1020                                struct mdd_object *obj, struct md_attr *ma,
1021                                struct thandle *handle)
1022 {
1023         struct dt_object *next;
1024         struct lu_attr *attr = &ma->ma_attr;
1025         int rc;
1026         ENTRY;
1027
1028         if (!lu_object_exists(mdd2lu_obj(obj))) {
1029                 next = mdd_object_child(obj);
1030                 rc = next->do_ops->do_create(env, next, attr, handle);
1031         } else
1032                 rc = -EEXIST;
1033
1034         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
1035
1036         RETURN(rc);
1037 }
1038
1039 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
1040                           const struct lu_attr *attr, struct thandle *handle)
1041 {
1042         struct dt_object *next;
1043
1044         LASSERT(lu_object_exists(mdd2lu_obj(o)));
1045         next = mdd_object_child(o);
1046         return next->do_ops->do_attr_set(env, next, attr, handle);
1047 }
1048
1049 int mdd_attr_set_internal_locked(const struct lu_env *env,
1050                                  struct mdd_object *o,
1051                                  const struct lu_attr *attr,
1052                                  struct thandle *handle)
1053 {
1054         int rc;
1055         mdd_write_lock(env, o);
1056         rc = mdd_attr_set_internal(env, o, attr, handle);
1057         mdd_write_unlock(env, o);
1058         return rc;
1059 }
1060
1061 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
1062                            const struct lu_buf *buf, const char *name,
1063                            int fl, struct thandle *handle)
1064 {
1065         struct dt_object *next;
1066         int rc = 0;
1067         ENTRY;
1068
1069         LASSERT(lu_object_exists(mdd2lu_obj(o)));
1070         next = mdd_object_child(o);
1071         if (buf->lb_buf && buf->lb_len > 0) {
1072                 rc = next->do_ops->do_xattr_set(env, next, buf, name,
1073                                                 0, handle);
1074         } else if (buf->lb_buf == NULL && buf->lb_len == 0) {
1075                 rc = next->do_ops->do_xattr_del(env, next, name, handle);
1076         }
1077         RETURN(rc);
1078 }
1079
1080 /* this gives the same functionality as the code between
1081  * sys_chmod and inode_setattr
1082  * chown_common and inode_setattr
1083  * utimes and inode_setattr
1084  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1085  * and port to
1086  */
1087 int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1088                  struct lu_attr *la)
1089 {
1090         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1091         struct md_ucred  *uc         = md_ucred(env);
1092         time_t            now        = CURRENT_SECONDS;
1093         int               rc;
1094         ENTRY;
1095
1096         if (!la->la_valid)
1097                 RETURN(0);
1098
1099         /* Do not permit change file type */
1100         if (la->la_valid & LA_TYPE)
1101                 RETURN(-EPERM);
1102
1103         /* They should not be processed by setattr */
1104         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1105                 RETURN(-EPERM);
1106
1107         rc = __mdd_la_get(env, obj, tmp_la);
1108         if (rc)
1109                 RETURN(rc);
1110
1111         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
1112
1113                 /*
1114                  * If only change flags of the object, we should
1115                  * let it pass, but also need capability check
1116                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
1117                  * fix it, when implement capable in mds
1118                  */
1119                 if (la->la_valid & ~LA_FLAGS)
1120                         RETURN(-EPERM);
1121
1122                 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
1123                         RETURN(-EPERM);
1124
1125                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1126                     !mdd_capable(uc, CAP_FOWNER))
1127                         RETURN(-EPERM);
1128
1129                 /*
1130                  * According to Ext3 implementation on this, the
1131                  * Ctime will be changed, but not clear why?
1132                  */
1133                 la->la_ctime = now;
1134                 la->la_valid |= LA_CTIME;
1135                 RETURN(0);
1136         }
1137
1138         /* Check for setting the obj time. */
1139         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1140             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1141                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1142                     !mdd_capable(uc, CAP_FOWNER))
1143                         RETURN(-EPERM);
1144         }
1145
1146         /* Make sure a caller can chmod. */
1147         if (la->la_valid & LA_MODE) {
1148                 /*
1149                  * Bypass la_vaild == LA_MODE,
1150                  * this is for changing file with SUID or SGID.
1151                  */
1152                 if ((la->la_valid & ~LA_MODE) &&
1153                     (uc->mu_fsuid != tmp_la->la_uid) &&
1154                     !mdd_capable(uc, CAP_FOWNER))
1155                         RETURN(-EPERM);
1156
1157                 if (la->la_mode == (umode_t) -1)
1158                         la->la_mode = tmp_la->la_mode;
1159                 else
1160                         la->la_mode = (la->la_mode & S_IALLUGO) |
1161                                       (tmp_la->la_mode & ~S_IALLUGO);
1162
1163                 /* Also check the setgid bit! */
1164                 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
1165                                 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
1166                         la->la_mode &= ~S_ISGID;
1167         } else {
1168                la->la_mode = tmp_la->la_mode;
1169         }
1170
1171         /* Make sure a caller can chown. */
1172         if (la->la_valid & LA_UID) {
1173                 if (la->la_uid == (uid_t) -1)
1174                         la->la_uid = tmp_la->la_uid;
1175                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1176                     (la->la_uid != tmp_la->la_uid)) &&
1177                     !mdd_capable(uc, CAP_CHOWN))
1178                         RETURN(-EPERM);
1179
1180                 /*
1181                  * If the user or group of a non-directory has been
1182                  * changed by a non-root user, remove the setuid bit.
1183                  * 19981026 David C Niemi <niemi@tux.org>
1184                  *
1185                  * Changed this to apply to all users, including root,
1186                  * to avoid some races. This is the behavior we had in
1187                  * 2.0. The check for non-root was definitely wrong
1188                  * for 2.2 anyway, as it should have been using
1189                  * CAP_FSETID rather than fsuid -- 19990830 SD.
1190                  */
1191                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192                     !S_ISDIR(tmp_la->la_mode)) {
1193                         la->la_mode &= ~S_ISUID;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196         }
1197
1198         /* Make sure caller can chgrp. */
1199         if (la->la_valid & LA_GID) {
1200                 if (la->la_gid == (gid_t) -1)
1201                         la->la_gid = tmp_la->la_gid;
1202                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203                     ((la->la_gid != tmp_la->la_gid) &&
1204                     !mdd_in_group_p(uc, la->la_gid))) &&
1205                     !mdd_capable(uc, CAP_CHOWN))
1206                         RETURN(-EPERM);
1207
1208                 /*
1209                  * Likewise, if the user or group of a non-directory
1210                  * has been changed by a non-root user, remove the
1211                  * setgid bit UNLESS there is no group execute bit
1212                  * (this would be a file marked for mandatory
1213                  * locking).  19981026 David C Niemi <niemi@tux.org>
1214                  *
1215                  * Removed the fsuid check (see the comment above) --
1216                  * 19990830 SD.
1217                  */
1218                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1219                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1220                         la->la_mode &= ~S_ISGID;
1221                         la->la_valid |= LA_MODE;
1222                 }
1223         }
1224
1225         /* For tuncate (or setsize), we should have MAY_WRITE perm */
1226         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1227                 rc = mdd_permission_internal(env, obj, MAY_WRITE);
1228                 if (rc)
1229                         RETURN(rc);
1230
1231                 /*
1232                  * For the "Size-on-MDS" setattr update, merge coming
1233                  * attributes with the set in the inode. BUG 10641
1234                  */
1235                 if ((la->la_valid & LA_ATIME) &&
1236                     (la->la_atime < tmp_la->la_atime))
1237                         la->la_valid &= ~LA_ATIME;
1238
1239                 if ((la->la_valid & LA_CTIME) &&
1240                     (la->la_ctime < tmp_la->la_ctime))
1241                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1242
1243                 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
1244                         la->la_mtime = now;
1245                         la->la_valid |= LA_MTIME;
1246                 }
1247         }
1248
1249         /* For last, ctime must be fixed */
1250         if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
1251                 la->la_ctime = now;
1252                 la->la_valid |= LA_CTIME;
1253         }
1254
1255         RETURN(0);
1256 }
1257
1258 /* set attr and LOV EA at once, return updated attr */
1259 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1260                         const struct md_attr *ma)
1261 {
1262         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1263         struct mdd_device *mdd = mdo2mdd(obj);
1264         struct thandle *handle;
1265         struct lov_mds_md *lmm = NULL;
1266         int  rc = 0, lmm_size = 0, max_size = 0;
1267         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1268         ENTRY;
1269
1270         mdd_txn_param_build(env, MDD_TXN_ATTR_SET_OP);
1271         handle = mdd_trans_start(env, mdd);
1272         if (IS_ERR(handle))
1273                 RETURN(PTR_ERR(handle));
1274         /*TODO: add lock here*/
1275         /* start a log jounal handle if needed */
1276         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1277             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1278                 max_size = mdd_lov_mdsize(env, mdd);
1279                 OBD_ALLOC(lmm, max_size);
1280                 if (lmm == NULL)
1281                         GOTO(cleanup, rc = -ENOMEM);
1282
1283                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1284                                 MDS_LOV_MD_NAME);
1285
1286                 if (rc < 0)
1287                         GOTO(cleanup, rc);
1288         }
1289
1290         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
1291                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1292                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1293
1294         *la_copy = ma->ma_attr;
1295         mdd_write_lock(env, mdd_obj);
1296         rc = mdd_fix_attr(env, mdd_obj, la_copy);
1297         mdd_write_unlock(env, mdd_obj);
1298         if (rc)
1299                 GOTO(cleanup, rc);
1300
1301         if (la_copy->la_valid & LA_FLAGS) {
1302                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1303                                                   handle);
1304                 if (rc == 0)
1305                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1306         } else if (la_copy->la_valid) {            /* setattr */
1307                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1308                                                   handle);
1309                 /* journal chown/chgrp in llog, just like unlink */
1310                 if (rc == 0 && lmm_size){
1311                         /*TODO set_attr llog */
1312                 }
1313         }
1314
1315         if (rc == 0 && ma->ma_valid & MA_LOV) {
1316                 umode_t mode;
1317
1318                 mode = mdd_object_type(mdd_obj);
1319                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1320                         /*TODO check permission*/
1321                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1322                                             ma->ma_lmm_size, handle, 1);
1323                 }
1324
1325         }
1326 cleanup:
1327         mdd_trans_stop(env, mdd, rc, handle);
1328         if (rc == 0 && lmm_size) {
1329                 /*set obd attr, if needed*/
1330                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
1331         }
1332         if (lmm != NULL) {
1333                 OBD_FREE(lmm, max_size);
1334         }
1335
1336         RETURN(rc);
1337 }
1338
1339 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1340                       const struct lu_buf *buf, const char *name, int fl,
1341                       struct thandle *handle)
1342 {
1343         int  rc;
1344         ENTRY;
1345
1346         mdd_write_lock(env, obj);
1347         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1348         mdd_write_unlock(env, obj);
1349
1350         RETURN(rc);
1351 }
1352
1353 static int mdd_xattr_sanity_check(const struct lu_env *env,
1354                                   struct mdd_object *obj)
1355 {
1356         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1357         struct md_ucred *uc     = md_ucred(env);
1358         int rc;
1359         ENTRY;
1360
1361         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1362                 RETURN(-EPERM);
1363
1364         mdd_read_lock(env, obj);
1365         rc = __mdd_la_get(env, obj, tmp_la);
1366         mdd_read_unlock(env, obj);
1367         if (rc)
1368                 RETURN(rc);
1369
1370         if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
1371                 RETURN(-EPERM);
1372
1373         RETURN(rc);
1374 }
1375
1376 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1377                          const struct lu_buf *buf, const char *name, int fl)
1378 {
1379         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1380         struct mdd_device *mdd = mdo2mdd(obj);
1381         struct thandle *handle;
1382         int  rc;
1383         ENTRY;
1384
1385         rc = mdd_xattr_sanity_check(env, mdd_obj);
1386         if (rc)
1387                 RETURN(rc);
1388
1389         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1390         handle = mdd_trans_start(env, mdd);
1391         if (IS_ERR(handle))
1392                 RETURN(PTR_ERR(handle));
1393
1394         rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, name,
1395                                fl, handle);
1396 #ifdef HAVE_SPLIT_SUPPORT
1397         if (rc == 0) {
1398                 /* very ugly hack, if setting lmv, it means splitting
1399                  * sucess, we should return -ERESTART to notify the
1400                  * client, so transno for this splitting should be
1401                  * zero according to the replay rules. so return -ERESTART
1402                  * here let mdt trans stop callback know this.
1403                  */
1404                  if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
1405                         rc = -ERESTART;
1406         }
1407 #endif
1408         mdd_trans_stop(env, mdd, rc, handle);
1409
1410         RETURN(rc);
1411 }
1412
1413 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
1414                            struct mdd_object *obj,
1415                            const char *name, struct thandle *handle)
1416 {
1417         struct dt_object *next;
1418
1419         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1420         next = mdd_object_child(obj);
1421         return next->do_ops->do_xattr_del(env, next, name, handle);
1422 }
1423
1424 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1425                   const char *name)
1426 {
1427         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1428         struct mdd_device *mdd = mdo2mdd(obj);
1429         struct thandle *handle;
1430         int  rc;
1431         ENTRY;
1432
1433         rc = mdd_xattr_sanity_check(env, mdd_obj);
1434         if (rc)
1435                 RETURN(rc);
1436
1437         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1438         handle = mdd_trans_start(env, mdd);
1439         if (IS_ERR(handle))
1440                 RETURN(PTR_ERR(handle));
1441
1442         mdd_write_lock(env, mdd_obj);
1443         rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
1444         mdd_write_unlock(env, mdd_obj);
1445
1446         mdd_trans_stop(env, mdd, rc, handle);
1447
1448         RETURN(rc);
1449 }
1450
1451 static int __mdd_index_insert_only(const struct lu_env *env,
1452                                    struct mdd_object *pobj,
1453                                    const struct lu_fid *lf,
1454                                    const char *name, struct thandle *th)
1455 {
1456         int rc;
1457         struct dt_object *next = mdd_object_child(pobj);
1458         ENTRY;
1459
1460         if (dt_try_as_dir(env, next))
1461                 rc = next->do_index_ops->dio_insert(env, next,
1462                                          (struct dt_rec *)lf,
1463                                          (struct dt_key *)name, th);
1464         else
1465                 rc = -ENOTDIR;
1466         RETURN(rc);
1467 }
1468
1469 /* insert new index, add reference if isdir, update times */
1470 static int __mdd_index_insert(const struct lu_env *env,
1471                              struct mdd_object *pobj, const struct lu_fid *lf,
1472                              const char *name, int isdir, struct thandle *th)
1473 {
1474         int rc;
1475         struct dt_object *next = mdd_object_child(pobj);
1476         ENTRY;
1477
1478 #if 0
1479         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
1480 #endif
1481
1482         if (dt_try_as_dir(env, next))
1483                 rc = next->do_index_ops->dio_insert(env, next,
1484                                                     (struct dt_rec *)lf,
1485                                                     (struct dt_key *)name,
1486                                                     th);
1487         else
1488                 rc = -ENOTDIR;
1489
1490         if (rc == 0) {
1491                 if (isdir)
1492                         __mdd_ref_add(env, pobj, th);
1493 #if 0
1494                 la->la_valid = LA_MTIME|LA_CTIME;
1495                 la->la_atime = ma->ma_attr.la_atime;
1496                 la->la_ctime = ma->ma_attr.la_ctime;
1497                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle);
1498 #endif
1499         }
1500         return rc;
1501 }
1502
1503 static int __mdd_index_delete(const struct lu_env *env,
1504                               struct mdd_object *pobj, const char *name,
1505                               int is_dir, struct thandle *handle)
1506 {
1507         int rc;
1508         struct dt_object *next = mdd_object_child(pobj);
1509         ENTRY;
1510
1511         if (dt_try_as_dir(env, next)) {
1512                 rc = next->do_index_ops->dio_delete(env, next,
1513                                                     (struct dt_key *)name,
1514                                                     handle);
1515                 if (rc == 0 && is_dir)
1516                         __mdd_ref_del(env, pobj, handle);
1517         } else
1518                 rc = -ENOTDIR;
1519         RETURN(rc);
1520 }
1521
1522 static int mdd_link_sanity_check(const struct lu_env *env,
1523                                  struct mdd_object *tgt_obj,
1524                                  struct mdd_object *src_obj)
1525 {
1526         int rc;
1527         ENTRY;
1528
1529         rc = mdd_may_create(env, tgt_obj, NULL, 1);
1530         if (rc)
1531                 RETURN(rc);
1532
1533         if (S_ISDIR(mdd_object_type(src_obj)))
1534                 RETURN(-EPERM);
1535
1536         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1537                 RETURN(-EPERM);
1538
1539         RETURN(rc);
1540 }
1541
1542 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1543                     struct md_object *src_obj, const char *name,
1544                     struct md_attr *ma)
1545 {
1546         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1547         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1548         struct mdd_device *mdd = mdo2mdd(src_obj);
1549         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1550         struct thandle *handle;
1551         int rc;
1552         ENTRY;
1553
1554         mdd_txn_param_build(env, MDD_TXN_LINK_OP);
1555         handle = mdd_trans_start(env, mdd);
1556         if (IS_ERR(handle))
1557                 RETURN(PTR_ERR(handle));
1558
1559         mdd_lock2(env, mdd_tobj, mdd_sobj);
1560
1561         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
1562         if (rc)
1563                 GOTO(out, rc);
1564
1565         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1566                                      name, handle);
1567         if (rc == 0)
1568                 __mdd_ref_add(env, mdd_sobj, handle);
1569
1570         *la_copy = ma->ma_attr;
1571         la_copy->la_valid = LA_CTIME;
1572         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle);
1573         if (rc)
1574                 GOTO(out, rc);
1575
1576         la_copy->la_valid = LA_CTIME | LA_MTIME;
1577         rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
1578
1579 out:
1580         mdd_unlock2(env, mdd_tobj, mdd_sobj);
1581         mdd_trans_stop(env, mdd, rc, handle);
1582         RETURN(rc);
1583 }
1584
1585 /*
1586  * Check that @dir contains no entries except (possibly) dot and dotdot.
1587  *
1588  * Returns:
1589  *
1590  *             0        empty
1591  *    -ENOTEMPTY        not empty
1592  *           -ve        other error
1593  *
1594  */
1595 static int mdd_dir_is_empty(const struct lu_env *env,
1596                             struct mdd_object *dir)
1597 {
1598         struct dt_it     *it;
1599         struct dt_object *obj;
1600         struct dt_it_ops *iops;
1601         int result;
1602
1603         obj = mdd_object_child(dir);
1604         iops = &obj->do_index_ops->dio_it;
1605         it = iops->init(env, obj, 0);
1606         if (it != NULL) {
1607                 result = iops->get(env, it, (const void *)"");
1608                 if (result > 0) {
1609                         int i;
1610                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1611                                 result = iops->next(env, it);
1612                         if (result == 0)
1613                                 result = -ENOTEMPTY;
1614                         else if (result == +1)
1615                                 result = 0;
1616                 } else if (result == 0)
1617                         /*
1618                          * Huh? Index contains no zero key?
1619                          */
1620                         result = -EIO;
1621
1622                 iops->put(env, it);
1623                 iops->fini(env, it);
1624         } else
1625                 result = -ENOMEM;
1626         return result;
1627 }
1628
1629 /* return md_attr back,
1630  * if it is last unlink then return lov ea + llog cookie*/
1631 int __mdd_object_kill(const struct lu_env *env,
1632                       struct mdd_object *obj,
1633                       struct md_attr *ma)
1634 {
1635         int rc = 0;
1636         ENTRY;
1637
1638         mdd_set_dead_obj(obj);
1639         if (S_ISREG(mdd_object_type(obj))) {
1640                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1641                  * Caller must be ready for that. */
1642                 rc = __mdd_lmm_get(env, obj, ma);
1643                 if ((ma->ma_valid & MA_LOV))
1644                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1645                                             obj, ma);
1646         }
1647         RETURN(rc);
1648 }
1649
1650 /* caller should take a lock before calling */
1651 static int __mdd_finish_unlink(const struct lu_env *env,
1652                                struct mdd_object *obj, struct md_attr *ma,
1653                                struct thandle *th)
1654 {
1655         int rc;
1656         ENTRY;
1657
1658         rc = __mdd_iattr_get(env, obj, ma);
1659         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1660                 /* add new orphan and the object
1661                  * will be deleted during the object_put() */
1662                 if (__mdd_orphan_add(env, obj, th) == 0)
1663                         set_bit(LU_OBJECT_ORPHAN,
1664                                 &mdd2lu_obj(obj)->lo_header->loh_flags);
1665
1666                 if (obj->mod_count == 0)
1667                         rc = __mdd_object_kill(env, obj, ma);
1668         }
1669         RETURN(rc);
1670 }
1671
1672 static int mdd_unlink_sanity_check(const struct lu_env *env,
1673                                    struct mdd_object *pobj,
1674                                    struct mdd_object *cobj,
1675                                    struct md_attr *ma)
1676 {
1677         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1678         int rc = 0;
1679         ENTRY;
1680
1681         rc = mdd_may_delete(env, pobj, cobj,
1682                             S_ISDIR(ma->ma_attr.la_mode), 1);
1683         if (rc)
1684                 RETURN(rc);
1685
1686         if (S_ISDIR(mdd_object_type(cobj))) {
1687                 if (dt_try_as_dir(env, dt_cobj))
1688                         rc = mdd_dir_is_empty(env, cobj);
1689                 else
1690                         rc = -ENOTDIR;
1691         }
1692
1693         RETURN(rc);
1694 }
1695
1696 static int mdd_unlink(const struct lu_env *env,
1697                       struct md_object *pobj, struct md_object *cobj,
1698                       const char *name, struct md_attr *ma)
1699 {
1700         struct mdd_device *mdd = mdo2mdd(pobj);
1701         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1702         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1703         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1704         struct thandle    *handle;
1705         int rc, is_dir;
1706         ENTRY;
1707
1708         mdd_txn_param_build(env, MDD_TXN_UNLINK_OP);
1709         handle = mdd_trans_start(env, mdd);
1710         if (IS_ERR(handle))
1711                 RETURN(PTR_ERR(handle));
1712
1713         mdd_lock2(env, mdd_pobj, mdd_cobj);
1714
1715         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
1716         if (rc)
1717                 GOTO(cleanup, rc);
1718
1719         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
1720         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
1721         if (rc)
1722                 GOTO(cleanup, rc);
1723
1724         __mdd_ref_del(env, mdd_cobj, handle);
1725         *la_copy = ma->ma_attr;
1726         if (is_dir) {
1727                 /* unlink dot */
1728                 __mdd_ref_del(env, mdd_cobj, handle);
1729         } else {
1730                 la_copy->la_valid = LA_CTIME;
1731                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle);
1732                 if (rc)
1733                         GOTO(cleanup, rc);
1734         }
1735
1736         la_copy->la_valid = LA_CTIME | LA_MTIME;
1737         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
1738         if (rc)
1739                 GOTO(cleanup, rc);
1740
1741         rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle);
1742
1743         if (rc == 0)
1744                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
1745                                    strlen("unlinked"), "unlinked", 0,
1746                                    NULL, NULL);
1747
1748 cleanup:
1749         mdd_unlock2(env, mdd_pobj, mdd_cobj);
1750         mdd_trans_stop(env, mdd, rc, handle);
1751         RETURN(rc);
1752 }
1753
1754 /* partial unlink */
1755 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1756                        struct md_attr *ma)
1757 {
1758         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1759         struct mdd_device *mdd = mdo2mdd(obj);
1760         struct thandle *handle;
1761         int rc;
1762         ENTRY;
1763
1764         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1765         handle = mdd_trans_start(env, mdd);
1766         if (IS_ERR(handle))
1767                 RETURN(-ENOMEM);
1768
1769         mdd_write_lock(env, mdd_obj);
1770
1771         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1772         if (rc)
1773                 GOTO(cleanup, rc);
1774
1775         __mdd_ref_del(env, mdd_obj, handle);
1776
1777         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1778                 /* unlink dot */
1779                 __mdd_ref_del(env, mdd_obj, handle);
1780         }
1781
1782         rc = __mdd_finish_unlink(env, mdd_obj, ma, handle);
1783
1784         EXIT;
1785 cleanup:
1786         mdd_write_unlock(env, mdd_obj);
1787         mdd_trans_stop(env, mdd, rc, handle);
1788         return rc;
1789 }
1790
1791 static int mdd_parent_fid(const struct lu_env *env,
1792                           struct mdd_object *obj,
1793                           struct lu_fid *fid)
1794 {
1795         return __mdd_lookup_locked(env, &obj->mod_obj,
1796                                    dotdot, fid, 0);
1797 }
1798
1799 /*
1800  * return 1: if lf is the fid of the ancestor of p1;
1801  * return 0: if not;
1802  *
1803  * return -EREMOTE: if remote object is found, in this
1804  * case fid of remote object is saved to @pf;
1805  *
1806  * otherwise: values < 0, errors.
1807  */
1808 static int mdd_is_parent(const struct lu_env *env,
1809                          struct mdd_device *mdd,
1810                          struct mdd_object *p1,
1811                          const struct lu_fid *lf,
1812                          struct lu_fid *pf)
1813 {
1814         struct mdd_object *parent = NULL;
1815         struct lu_fid *pfid;
1816         int rc;
1817         ENTRY;
1818
1819         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1820         pfid = &mdd_env_info(env)->mti_fid;
1821
1822         /* Do not lookup ".." in root, they do not exist there. */
1823         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1824                 RETURN(0);
1825
1826         for(;;) {
1827                 rc = mdd_parent_fid(env, p1, pfid);
1828                 if (rc)
1829                         GOTO(out, rc);
1830                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
1831                         GOTO(out, rc = 0);
1832                 if (lu_fid_eq(pfid, lf))
1833                         GOTO(out, rc = 1);
1834                 if (parent)
1835                         mdd_object_put(env, parent);
1836                 parent = mdd_object_find(env, mdd, pfid);
1837
1838                 /* cross-ref parent */
1839                 if (parent == NULL) {
1840                         if (pf != NULL)
1841                                 *pf = *pfid;
1842                         GOTO(out, rc = EREMOTE);
1843                 } else if (IS_ERR(parent))
1844                         GOTO(out, rc = PTR_ERR(parent));
1845                 p1 = parent;
1846         }
1847         EXIT;
1848 out:
1849         if (parent && !IS_ERR(parent))
1850                 mdd_object_put(env, parent);
1851         return rc;
1852 }
1853
1854 static int mdd_rename_lock(const struct lu_env *env,
1855                            struct mdd_device *mdd,
1856                            struct mdd_object *src_pobj,
1857                            struct mdd_object *tgt_pobj)
1858 {
1859         int rc;
1860         ENTRY;
1861
1862         if (src_pobj == tgt_pobj) {
1863                 mdd_write_lock(env, src_pobj);
1864                 RETURN(0);
1865         }
1866
1867         /* compared the parent child relationship of src_p&tgt_p */
1868         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1869                 mdd_lock2(env, src_pobj, tgt_pobj);
1870                 RETURN(0);
1871         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1872                 mdd_lock2(env, tgt_pobj, src_pobj);
1873                 RETURN(0);
1874         }
1875
1876         rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1877         if (rc < 0)
1878                 RETURN(rc);
1879
1880         if (rc == 1) {
1881                 mdd_lock2(env, tgt_pobj, src_pobj);
1882                 RETURN(0);
1883         }
1884
1885         mdd_lock2(env, src_pobj, tgt_pobj);
1886
1887         RETURN(0);
1888 }
1889
1890 static void mdd_rename_unlock(const struct lu_env *env,
1891                               struct mdd_object *src_pobj,
1892                               struct mdd_object *tgt_pobj)
1893 {
1894         mdd_write_unlock(env, src_pobj);
1895         if (src_pobj != tgt_pobj)
1896                 mdd_write_unlock(env, tgt_pobj);
1897 }
1898
1899 static int mdd_rename_sanity_check(const struct lu_env *env,
1900                                    struct mdd_object *src_pobj,
1901                                    struct mdd_object *tgt_pobj,
1902                                    const struct lu_fid *sfid,
1903                                    int src_is_dir,
1904                                    struct mdd_object *sobj,
1905                                    struct mdd_object *tobj)
1906 {
1907         struct mdd_device *mdd = mdo2mdd(&src_pobj->mod_obj);
1908         int rc = 0, need_check = 1;
1909         ENTRY;
1910
1911         mdd_read_lock(env, src_pobj);
1912         rc = mdd_may_delete(env, src_pobj, sobj, src_is_dir, need_check);
1913         mdd_read_unlock(env, src_pobj);
1914         if (rc)
1915                 RETURN(rc);
1916
1917         if (src_pobj == tgt_pobj)
1918                 need_check = 0;
1919
1920         if (!tobj) {
1921                 mdd_read_lock(env, tgt_pobj);
1922                 rc = mdd_may_create(env, tgt_pobj, NULL, need_check);
1923                 mdd_read_unlock(env, tgt_pobj);
1924         } else {
1925                 mdd_read_lock(env, tgt_pobj);
1926                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1927                                     need_check);
1928                 mdd_read_unlock(env, tgt_pobj);
1929                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
1930                     mdd_dir_is_empty(env, tobj))
1931                                 RETURN(-ENOTEMPTY);
1932         }
1933
1934         /* source should not be ancestor of target dir */
1935         if (!rc && src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
1936                 RETURN(-EINVAL);
1937
1938         RETURN(rc);
1939 }
1940 /* src object can be remote that is why we use only fid and type of object */
1941 static int mdd_rename(const struct lu_env *env,
1942                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1943                       const struct lu_fid *lf, const char *sname,
1944                       struct md_object *tobj, const char *tname,
1945                       struct md_attr *ma)
1946 {
1947         struct mdd_device *mdd = mdo2mdd(src_pobj);
1948         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1949         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1950         struct mdd_object *mdd_sobj = mdd_object_find(env, mdd, lf);
1951         struct mdd_object *mdd_tobj = NULL;
1952         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1953         struct thandle *handle;
1954         int is_dir;
1955         int rc;
1956         ENTRY;
1957
1958         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1959         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1960         if (ma->ma_attr.la_valid & LA_FLAGS &&
1961             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1962                 GOTO(out, rc = -EPERM);
1963
1964         if (tobj)
1965                 mdd_tobj = md2mdd_obj(tobj);
1966
1967         /*XXX: shouldn't this check be done under lock below? */
1968         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1969                                      lf, is_dir, mdd_sobj, mdd_tobj);
1970         if (rc)
1971                 GOTO(out, rc);
1972
1973         mdd_txn_param_build(env, MDD_TXN_RENAME_OP);
1974         handle = mdd_trans_start(env, mdd);
1975         if (IS_ERR(handle))
1976                 GOTO(out, rc = PTR_ERR(handle));
1977
1978         /*FIXME: Should consider tobj and sobj too in rename_lock*/
1979         rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
1980         if (rc)
1981                 GOTO(cleanup_unlocked, rc);
1982
1983         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
1984         if (rc)
1985                 GOTO(cleanup, rc);
1986
1987         /* tobj can be remote one,
1988          * so we do index_delete unconditionally and -ENOENT is allowed */
1989         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
1990         if (rc != 0 && rc != -ENOENT)
1991                 GOTO(cleanup, rc);
1992
1993         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle);
1994         if (rc)
1995                 GOTO(cleanup, rc);
1996
1997         *la_copy = ma->ma_attr;
1998         la_copy->la_valid = LA_CTIME;
1999         if (mdd_sobj) {
2000                 /*XXX: how to update ctime for remote sobj? */
2001                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
2002                 if (rc)
2003                         GOTO(cleanup, rc);
2004         }
2005         if (tobj && lu_object_exists(&tobj->mo_lu)) {
2006                 mdd_write_lock(env, mdd_tobj);
2007                 __mdd_ref_del(env, mdd_tobj, handle);
2008                 /* remove dot reference */
2009                 if (is_dir)
2010                         __mdd_ref_del(env, mdd_tobj, handle);
2011
2012                 la_copy->la_valid = LA_CTIME;
2013                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle);
2014                 if (rc)
2015                         GOTO(cleanup, rc);
2016
2017                 rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle);
2018                 mdd_write_unlock(env, mdd_tobj);
2019                 if (rc)
2020                         GOTO(cleanup, rc);
2021         }
2022
2023         la_copy->la_valid = LA_CTIME | LA_MTIME;
2024         rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle);
2025         if (rc)
2026                 GOTO(cleanup, rc);
2027
2028         if (mdd_spobj != mdd_tpobj) {
2029                 la_copy->la_valid = LA_CTIME | LA_MTIME;
2030                 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle);
2031         }
2032
2033 cleanup:
2034         mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
2035 cleanup_unlocked:
2036         mdd_trans_stop(env, mdd, rc, handle);
2037 out:
2038         if (mdd_sobj)
2039                 mdd_object_put(env, mdd_sobj);
2040         RETURN(rc);
2041 }
2042
2043 static int
2044 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
2045              const char *name, const struct lu_fid* fid, int mask)
2046 {
2047         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
2048         struct dt_object    *dir = mdd_object_child(mdd_obj);
2049         struct dt_rec       *rec = (struct dt_rec *)fid;
2050         const struct dt_key *key = (const struct dt_key *)name;
2051         int rc;
2052         ENTRY;
2053
2054         if (mdd_is_dead_obj(mdd_obj))
2055                 RETURN(-ESTALE);
2056
2057         if (mask == MAY_EXEC)
2058                 rc = mdd_exec_permission_lite(env, mdd_obj);
2059         else
2060                 rc = mdd_permission_internal(env, mdd_obj, mask);
2061         if (rc)
2062                 RETURN(rc);
2063
2064         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
2065                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key);
2066         else
2067                 rc = -ENOTDIR;
2068
2069         RETURN(rc);
2070 }
2071
2072 static int
2073 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
2074                     const char *name, const struct lu_fid* fid, int mask)
2075 {
2076         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2077         int rc;
2078
2079         mdd_read_lock(env, mdd_obj);
2080         rc = __mdd_lookup(env, pobj, name, fid, mask);
2081         mdd_read_unlock(env, mdd_obj);
2082
2083        return rc;
2084 }
2085
2086 static int mdd_lookup(const struct lu_env *env,
2087                       struct md_object *pobj, const char *name,
2088                       struct lu_fid* fid)
2089 {
2090         int rc;
2091         ENTRY;
2092         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
2093         RETURN(rc);
2094 }
2095
2096 /*
2097  * returns 1: if fid is ancestor of @mo;
2098  * returns 0: if fid is not a ancestor of @mo;
2099  *
2100  * returns EREMOTE if remote object is found, fid of remote object is saved to
2101  * @fid;
2102  *
2103  * returns < 0: if error
2104  */
2105 static int mdd_is_subdir(const struct lu_env *env,
2106                          struct md_object *mo, const struct lu_fid *fid,
2107                          struct lu_fid *sfid)
2108 {
2109         struct mdd_device *mdd = mdo2mdd(mo);
2110         int rc;
2111         ENTRY;
2112
2113         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
2114                 RETURN(0);
2115
2116         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
2117
2118         RETURN(rc);
2119 }
2120
2121 static int __mdd_object_initialize(const struct lu_env *env,
2122                                    const struct lu_fid *pfid,
2123                                    struct mdd_object *child,
2124                                    struct md_attr *ma, struct thandle *handle)
2125 {
2126         int rc;
2127         ENTRY;
2128
2129         /* update attributes for child.
2130          * FIXME:
2131          *  (1) the valid bits should be converted between Lustre and Linux;
2132          *  (2) maybe, the child attributes should be set in OSD when creation.
2133          */
2134
2135         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle);
2136         if (rc != 0)
2137                 RETURN(rc);
2138
2139         if (S_ISDIR(ma->ma_attr.la_mode)) {
2140                 /* add . and .. for newly created dir */
2141                 __mdd_ref_add(env, child, handle);
2142                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2143                                              dot, handle);
2144                 if (rc == 0) {
2145                         rc = __mdd_index_insert_only(env, child, pfid,
2146                                                      dotdot, handle);
2147                         if (rc != 0) {
2148                                 int rc2;
2149
2150                                 rc2 = __mdd_index_delete(env,
2151                                                          child, dot, 0, handle);
2152                                 if (rc2 != 0)
2153                                         CERROR("Failure to cleanup after dotdot"
2154                                                " creation: %d (%d)\n", rc2, rc);
2155                                 else
2156                                         __mdd_ref_del(env, child, handle);
2157                         }
2158                 }
2159         }
2160         RETURN(rc);
2161 }
2162
2163 /*
2164  * XXX: Need MAY_WRITE to be checked?
2165  */
2166 static int mdd_cd_sanity_check(const struct lu_env *env,
2167                                struct mdd_object *obj)
2168 {
2169         int rc = 0;
2170         ENTRY;
2171
2172         /* EEXIST check */
2173         if (!obj || mdd_is_dead_obj(obj))
2174                 RETURN(-ENOENT);
2175
2176 #if 0
2177         mdd_read_lock(env, obj);
2178         rc = mdd_permission_internal(env, obj, MAY_WRITE);
2179         mdd_read_unlock(env, obj);
2180 #endif
2181
2182         RETURN(rc);
2183
2184 }
2185
2186 static int mdd_create_data(const struct lu_env *env,
2187                            struct md_object *pobj, struct md_object *cobj,
2188                            const struct md_create_spec *spec,
2189                            struct md_attr *ma)
2190 {
2191         struct mdd_device *mdd = mdo2mdd(cobj);
2192         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
2193         struct mdd_object *son = md2mdd_obj(cobj);
2194         struct lu_attr    *attr = &ma->ma_attr;
2195         struct lov_mds_md *lmm = NULL;
2196         int                lmm_size = 0;
2197         struct thandle    *handle;
2198         int                rc;
2199         ENTRY;
2200
2201         rc = mdd_cd_sanity_check(env, son);
2202         if (rc)
2203                 RETURN(rc);
2204
2205         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
2206                         !(spec->sp_cr_flags & FMODE_WRITE))
2207                 RETURN(0);
2208         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
2209                             attr);
2210         if (rc)
2211                 RETURN(rc);
2212
2213         mdd_txn_param_build(env, MDD_TXN_CREATE_DATA_OP);
2214         handle = mdd_trans_start(env, mdd);
2215         if (IS_ERR(handle))
2216                 RETURN(rc = PTR_ERR(handle));
2217
2218         /*XXX: setting the lov ea is not locked
2219          * but setting the attr is locked? */
2220
2221         /* replay creates has objects already */
2222         if (spec->u.sp_ea.no_lov_create) {
2223                 CDEBUG(D_INFO, "we already have lov ea\n");
2224                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2225                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2226                                     spec->u.sp_ea.eadatalen, handle, 0);
2227         } else
2228                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2229                                     lmm_size, handle, 0);
2230
2231         if (rc == 0)
2232                rc = mdd_attr_get_internal_locked(env, son, ma);
2233
2234         /* finish mdd_lov_create() stuff */
2235         mdd_lov_create_finish(env, mdd, rc);
2236         mdd_trans_stop(env, mdd, rc, handle);
2237         if (lmm)
2238                 OBD_FREE(lmm, lmm_size);
2239         RETURN(rc);
2240 }
2241
2242 static int mdd_create_sanity_check(const struct lu_env *env,
2243                                    struct md_object *pobj,
2244                                    const char *name, struct md_attr *ma)
2245 {
2246         struct mdd_thread_info *info = mdd_env_info(env);
2247         struct lu_attr    *la        = &info->mti_la;
2248         struct lu_fid     *fid       = &info->mti_fid;
2249         struct mdd_object *obj       = md2mdd_obj(pobj);
2250         int rc;
2251
2252         ENTRY;
2253         /* EEXIST check */
2254         if (mdd_is_dead_obj(obj))
2255                 RETURN(-ENOENT);
2256
2257         rc = __mdd_lookup_locked(env, pobj, name, fid,
2258                                  MAY_WRITE | MAY_EXEC);
2259         if (rc != -ENOENT)
2260                 RETURN(rc ? : -EEXIST);
2261
2262         /* sgid check */
2263         mdd_read_lock(env, obj);
2264         rc = __mdd_la_get(env, obj, la);
2265         mdd_read_unlock(env, obj);
2266         if (rc)
2267                 RETURN(rc);
2268
2269         if (la->la_mode & S_ISGID) {
2270                 ma->ma_attr.la_gid = la->la_gid;
2271                 if (S_ISDIR(ma->ma_attr.la_mode)) {
2272                         ma->ma_attr.la_mode |= S_ISGID;
2273                         ma->ma_attr.la_valid |= LA_MODE;
2274                 }
2275         }
2276
2277         switch (ma->ma_attr.la_mode & S_IFMT) {
2278         case S_IFREG:
2279         case S_IFDIR:
2280         case S_IFLNK:
2281         case S_IFCHR:
2282         case S_IFBLK:
2283         case S_IFIFO:
2284         case S_IFSOCK:
2285                 rc = 0;
2286                 break;
2287         default:
2288                 rc = -EINVAL;
2289                 break;
2290         }
2291         RETURN(rc);
2292 }
2293
2294 /*
2295  * Create object and insert it into namespace.
2296  */
2297 static int mdd_create(const struct lu_env *env,
2298                       struct md_object *pobj, const char *name,
2299                       struct md_object *child,
2300                       const struct md_create_spec *spec,
2301                       struct md_attr* ma)
2302 {
2303         struct mdd_device *mdd = mdo2mdd(pobj);
2304         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2305         struct mdd_object *son = md2mdd_obj(child);
2306         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2307         struct lu_attr    *attr = &ma->ma_attr;
2308         struct lov_mds_md *lmm = NULL;
2309         struct thandle    *handle;
2310         int rc, created = 0, inserted = 0, lmm_size = 0;
2311         ENTRY;
2312
2313         /* sanity checks before big job */
2314         rc = mdd_create_sanity_check(env, pobj, name, ma);
2315         if (rc)
2316                 RETURN(rc);
2317
2318         /* no RPC inside the transaction, so OST objects should be created at
2319          * first */
2320         if (S_ISREG(attr->la_mode)) {
2321                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2322                                     spec, attr);
2323                 if (rc)
2324                         RETURN(rc);
2325         }
2326
2327         mdd_txn_param_build(env, MDD_TXN_MKDIR_OP);
2328         handle = mdd_trans_start(env, mdd);
2329         if (IS_ERR(handle))
2330                 RETURN(PTR_ERR(handle));
2331
2332         mdd_write_lock(env, mdd_pobj);
2333
2334         /*
2335          * XXX check that link can be added to the parent in mkdir case.
2336          */
2337
2338         /*
2339          * Two operations have to be performed:
2340          *
2341          *  - allocation of new object (->do_create()), and
2342          *
2343          *  - insertion into parent index (->dio_insert()).
2344          *
2345          * Due to locking, operation order is not important, when both are
2346          * successful, *but* error handling cases are quite different:
2347          *
2348          *  - if insertion is done first, and following object creation fails,
2349          *  insertion has to be rolled back, but this operation might fail
2350          *  also leaving us with dangling index entry.
2351          *
2352          *  - if creation is done first, is has to be undone if insertion
2353          *  fails, leaving us with leaked space, which is neither good, nor
2354          *  fatal.
2355          *
2356          * It seems that creation-first is simplest solution, but it is
2357          * sub-optimal in the frequent
2358          *
2359          *         $ mkdir foo
2360          *         $ mkdir foo
2361          *
2362          * case, because second mkdir is bound to create object, only to
2363          * destroy it immediately.
2364          *
2365          * Note that local file systems do
2366          *
2367          *     0. lookup -> -EEXIST
2368          *
2369          *     1. create
2370          *
2371          *     2. insert
2372          *
2373          * Maybe we should do the same. For now: creation-first.
2374          */
2375
2376         mdd_write_lock(env, son);
2377         rc = __mdd_object_create(env, son, ma, handle);
2378         if (rc) {
2379                 mdd_write_unlock(env, son);
2380                 GOTO(cleanup, rc);
2381         }
2382
2383         created = 1;
2384
2385         rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj),
2386                                      son, ma, handle);
2387         mdd_write_unlock(env, son);
2388         if (rc)
2389                 /*
2390                  * Object has no links, so it will be destroyed when last
2391                  * reference is released. (XXX not now.)
2392                  */
2393                 GOTO(cleanup, rc);
2394
2395         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2396                                 name, S_ISDIR(attr->la_mode), handle);
2397
2398         if (rc)
2399                 GOTO(cleanup, rc);
2400
2401         inserted = 1;
2402         /* replay creates has objects already */
2403         if (spec->u.sp_ea.no_lov_create) {
2404                 CDEBUG(D_INFO, "we already have lov ea\n");
2405                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2406                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2407                                     spec->u.sp_ea.eadatalen, handle, 0);
2408         } else
2409                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2410                                     lmm_size, handle, 0);
2411         if (rc) {
2412                 CERROR("error on stripe info copy %d \n", rc);
2413                 GOTO(cleanup, rc);
2414         }
2415
2416         if (S_ISLNK(attr->la_mode)) {
2417                 struct dt_object *dt = mdd_object_child(son);
2418                 const char *target_name = spec->u.sp_symname;
2419                 int sym_len = strlen(target_name);
2420                 loff_t pos = 0;
2421
2422                 rc = dt->do_body_ops->dbo_write(env, dt,
2423                                                 mdd_buf_get_const(env,
2424                                                                   target_name,
2425                                                                   sym_len),
2426                                                 &pos, handle);
2427                 if (rc == sym_len)
2428                         rc = 0;
2429                 else
2430                         rc = -EFAULT;
2431         }
2432
2433         *la_copy = ma->ma_attr;
2434         la_copy->la_valid = LA_CTIME | LA_MTIME;
2435         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle);
2436         if (rc)
2437                 GOTO(cleanup, rc);
2438
2439         /* return attr back */
2440         rc = mdd_attr_get_internal_locked(env, son, ma);
2441 cleanup:
2442         if (rc && created) {
2443                 int rc2 = 0;
2444
2445                 if (inserted) {
2446                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2447                                                  S_ISDIR(attr->la_mode),
2448                                                  handle);
2449                         if (rc2)
2450                                 CERROR("error can not cleanup destroy %d\n",
2451                                        rc2);
2452                 }
2453                 if (rc2 == 0)
2454                         __mdd_ref_del(env, son, handle);
2455         }
2456         /* finish mdd_lov_create() stuff */
2457         mdd_lov_create_finish(env, mdd, rc);
2458         if (lmm)
2459                 OBD_FREE(lmm, lmm_size);
2460         mdd_write_unlock(env, mdd_pobj);
2461         mdd_trans_stop(env, mdd, rc, handle);
2462         RETURN(rc);
2463 }
2464
2465 /* partial operation */
2466 static int mdd_oc_sanity_check(const struct lu_env *env,
2467                                struct mdd_object *obj,
2468                                struct md_attr *ma)
2469 {
2470         int rc;
2471         ENTRY;
2472
2473         /* EEXIST check */
2474         if (lu_object_exists(&obj->mod_obj.mo_lu))
2475                 RETURN(-EEXIST);
2476
2477         switch (ma->ma_attr.la_mode & S_IFMT) {
2478         case S_IFREG:
2479         case S_IFDIR:
2480         case S_IFLNK:
2481         case S_IFCHR:
2482         case S_IFBLK:
2483         case S_IFIFO:
2484         case S_IFSOCK:
2485                 rc = 0;
2486                 break;
2487         default:
2488                 rc = -EINVAL;
2489                 break;
2490         }
2491         RETURN(rc);
2492 }
2493
2494 static int mdd_object_create(const struct lu_env *env,
2495                              struct md_object *obj,
2496                              const struct md_create_spec *spec,
2497                              struct md_attr *ma)
2498 {
2499
2500         struct mdd_device *mdd = mdo2mdd(obj);
2501         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2502         struct thandle *handle;
2503         const struct lu_fid *pfid = spec->u.sp_pfid;
2504         int rc;
2505         ENTRY;
2506
2507         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2508         if (rc)
2509                 RETURN(rc);
2510
2511         mdd_txn_param_build(env, MDD_TXN_OBJECT_CREATE_OP);
2512         handle = mdd_trans_start(env, mdd);
2513         if (IS_ERR(handle))
2514                 RETURN(PTR_ERR(handle));
2515
2516         mdd_write_lock(env, mdd_obj);
2517         rc = __mdd_object_create(env, mdd_obj, ma, handle);
2518         if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2519                 /* if creating the slave object, set slave EA here */
2520                 rc = __mdd_xattr_set(env, mdd_obj,
2521                                      mdd_buf_get_const(env,
2522                                                        spec->u.sp_ea.eadata,
2523                                                        spec->u.sp_ea.eadatalen),
2524                                      MDS_LMV_MD_NAME, 0, handle);
2525                 pfid = spec->u.sp_ea.fid;
2526                 CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
2527                        PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
2528         }
2529
2530         if (rc == 0)
2531                 rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
2532         mdd_write_unlock(env, mdd_obj);
2533
2534         if (rc == 0)
2535                 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
2536
2537         mdd_trans_stop(env, mdd, rc, handle);
2538         RETURN(rc);
2539 }
2540
2541 /*
2542  * Partial operation. Be aware, this is called with write lock taken, so we use
2543  * locksless version of __mdd_lookup() here.
2544  */
2545 static int mdd_ni_sanity_check(const struct lu_env *env,
2546                                struct md_object *pobj,
2547                                const char *name,
2548                                const struct lu_fid *fid)
2549 {
2550         struct mdd_object *obj       = md2mdd_obj(pobj);
2551         int rc;
2552         ENTRY;
2553
2554         /* EEXIST check */
2555         if (mdd_is_dead_obj(obj))
2556                 RETURN(-ENOENT);
2557
2558         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2559         if (rc != -ENOENT)
2560                 RETURN(rc ? : -EEXIST);
2561         else
2562                 RETURN(0);
2563 }
2564
2565 static int mdd_name_insert(const struct lu_env *env,
2566                            struct md_object *pobj,
2567                            const char *name, const struct lu_fid *fid,
2568                            int isdir)
2569 {
2570         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2571         struct thandle *handle;
2572         int rc;
2573         ENTRY;
2574
2575         mdd_txn_param_build(env, MDD_TXN_INDEX_INSERT_OP);
2576         handle = mdd_trans_start(env, mdo2mdd(pobj));
2577         if (IS_ERR(handle))
2578                 RETURN(PTR_ERR(handle));
2579
2580         mdd_write_lock(env, mdd_obj);
2581         rc = mdd_ni_sanity_check(env, pobj, name, fid);
2582         if (rc)
2583                 GOTO(out_unlock, rc);
2584
2585         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle);
2586
2587 out_unlock:
2588         mdd_write_unlock(env, mdd_obj);
2589
2590         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
2591         RETURN(rc);
2592 }
2593
2594 /*
2595  * Be aware, this is called with write lock taken, so we use locksless version
2596  * of __mdd_lookup() here.
2597  */
2598 static int mdd_nr_sanity_check(const struct lu_env *env,
2599                                struct md_object *pobj,
2600                                const char *name)
2601 {
2602         struct mdd_thread_info *info = mdd_env_info(env);
2603         struct lu_fid     *fid       = &info->mti_fid;
2604         struct mdd_object *obj       = md2mdd_obj(pobj);
2605         int rc;
2606         ENTRY;
2607
2608         /* EEXIST check */
2609         if (mdd_is_dead_obj(obj))
2610                 RETURN(-ENOENT);
2611
2612         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2613         RETURN(rc);
2614 }
2615
2616 static int mdd_name_remove(const struct lu_env *env,
2617                            struct md_object *pobj,
2618                            const char *name, int is_dir)
2619 {
2620         struct mdd_device *mdd = mdo2mdd(pobj);
2621         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2622         struct thandle *handle;
2623         int rc;
2624         ENTRY;
2625
2626         mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP);
2627         handle = mdd_trans_start(env, mdd);
2628         if (IS_ERR(handle))
2629                 RETURN(PTR_ERR(handle));
2630
2631         mdd_write_lock(env, mdd_obj);
2632         rc = mdd_nr_sanity_check(env, pobj, name);
2633         if (rc)
2634                 GOTO(out_unlock, rc);
2635
2636         rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle);
2637
2638 out_unlock:
2639         mdd_write_unlock(env, mdd_obj);
2640
2641         mdd_trans_stop(env, mdd, rc, handle);
2642         RETURN(rc);
2643 }
2644
2645 static int mdd_rt_sanity_check(const struct lu_env *env,
2646                                struct mdd_object *tgt_pobj,
2647                                struct mdd_object *tobj,
2648                                const struct lu_fid *sfid,
2649                                const char *name, struct md_attr *ma)
2650 {
2651         struct mdd_device *mdd = mdo2mdd(&tgt_pobj->mod_obj);
2652         int rc, src_is_dir;
2653         ENTRY;
2654
2655         /* EEXIST check */
2656         if (mdd_is_dead_obj(tgt_pobj))
2657                 RETURN(-ENOENT);
2658
2659         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
2660         if (tobj) {
2661                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
2662                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
2663                      mdd_dir_is_empty(env, tobj))
2664                                 RETURN(-ENOTEMPTY);
2665         } else {
2666                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
2667         }
2668
2669         /* source should not be ancestor of target dir */
2670         if (!rc &&& src_is_dir && mdd_is_parent(env, mdd, tgt_pobj, sfid, NULL))
2671                 RETURN(-EINVAL);
2672
2673         RETURN(rc);
2674 }
2675
2676 static int mdd_rename_tgt(const struct lu_env *env,
2677                           struct md_object *pobj, struct md_object *tobj,
2678                           const struct lu_fid *lf, const char *name,
2679                           struct md_attr *ma)
2680 {
2681         struct mdd_device *mdd = mdo2mdd(pobj);
2682         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
2683         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
2684         struct thandle *handle;
2685         int rc;
2686         ENTRY;
2687
2688         mdd_txn_param_build(env, MDD_TXN_RENAME_TGT_OP);
2689         handle = mdd_trans_start(env, mdd);
2690         if (IS_ERR(handle))
2691                 RETURN(PTR_ERR(handle));
2692
2693         if (mdd_tobj)
2694                 mdd_lock2(env, mdd_tpobj, mdd_tobj);
2695         else
2696                 mdd_write_lock(env, mdd_tpobj);
2697
2698         /*TODO rename sanity checking*/
2699         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
2700         if (rc)
2701                 GOTO(cleanup, rc);
2702
2703         /* if rename_tgt is called then we should just re-insert name with
2704          * correct fid, no need to dec/inc parent nlink if obj is dir */
2705         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle);
2706         if (rc)
2707                 GOTO(cleanup, rc);
2708
2709         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle);
2710         if (rc)
2711                 GOTO(cleanup, rc);
2712
2713         if (tobj && lu_object_exists(&tobj->mo_lu))
2714                 __mdd_ref_del(env, mdd_tobj, handle);
2715 cleanup:
2716         if (tobj)
2717                 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
2718         else
2719                 mdd_write_unlock(env, mdd_tpobj);
2720         mdd_trans_stop(env, mdd, rc, handle);
2721         RETURN(rc);
2722 }
2723
2724 /*
2725  * No permission check is needed.
2726  */
2727 static int mdd_root_get(const struct lu_env *env,
2728                         struct md_device *m, struct lu_fid *f)
2729 {
2730         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2731
2732         ENTRY;
2733         *f = mdd->mdd_root_fid;
2734         RETURN(0);
2735 }
2736
2737 /*
2738  * No permission check is needed.
2739  */
2740 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
2741                       struct kstatfs *sfs)
2742 {
2743         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2744         int rc;
2745
2746         ENTRY;
2747
2748         rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
2749
2750         RETURN(rc);
2751 }
2752
2753 /*
2754  * No permission check is needed.
2755  */
2756 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
2757                            int *md_size, int *cookie_size)
2758 {
2759         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2760         ENTRY;
2761
2762         *md_size = mdd_lov_mdsize(env, mdd);
2763         *cookie_size = mdd_lov_cookiesize(env, mdd);
2764
2765         RETURN(0);
2766 }
2767
2768 static int mdd_init_capa_keys(struct md_device *m,
2769                               struct lustre_capa_key *keys)
2770 {
2771         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2772         struct mds_obd    *mds = &mdd2obd_dev(mdd)->u.mds;
2773         ENTRY;
2774
2775         mds->mds_capa_keys = keys;
2776         RETURN(0);
2777 }
2778
2779 static int mdd_update_capa_key(const struct lu_env *env,
2780                                struct md_device *m,
2781                                struct lustre_capa_key *key)
2782 {
2783         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
2784         struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
2785         int rc;
2786         ENTRY;
2787
2788         rc = obd_set_info_async(lov_exp, strlen(KEY_CAPA_KEY), KEY_CAPA_KEY,
2789                                 sizeof(*key), key, NULL);
2790         RETURN(rc);
2791 }
2792
2793 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
2794                          struct thandle *handle)
2795 {
2796         struct dt_object *next;
2797
2798         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2799         next = mdd_object_child(obj);
2800         next->do_ops->do_ref_add(env, next, handle);
2801 }
2802
2803 /*
2804  * XXX: if permission check is needed here?
2805  */
2806 static int mdd_ref_add(const struct lu_env *env,
2807                        struct md_object *obj)
2808 {
2809         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2810         struct mdd_device *mdd = mdo2mdd(obj);
2811         struct thandle *handle;
2812         ENTRY;
2813
2814         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
2815         handle = mdd_trans_start(env, mdd);
2816         if (IS_ERR(handle))
2817                 RETURN(-ENOMEM);
2818
2819         mdd_write_lock(env, mdd_obj);
2820         __mdd_ref_add(env, mdd_obj, handle);
2821         mdd_write_unlock(env, mdd_obj);
2822
2823         mdd_trans_stop(env, mdd, 0, handle);
2824
2825         RETURN(0);
2826 }
2827
2828 static void
2829 __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
2830               struct thandle *handle)
2831 {
2832         struct dt_object *next = mdd_object_child(obj);
2833
2834         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
2835
2836         next->do_ops->do_ref_del(env, next, handle);
2837 }
2838
2839 /* do NOT or the MAY_*'s, you'll get the weakest */
2840 static int accmode(struct mdd_object *mdd_obj, int flags)
2841 {
2842         int res = 0;
2843
2844 #if 0
2845         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2846          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2847          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2848          * owner can write to a file even if it is marked readonly to hide
2849          * its brokenness. (bug 5781) */
2850         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
2851                 return 0;
2852 #endif
2853         if (flags & FMODE_READ)
2854                 res = MAY_READ;
2855         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2856                 res |= MAY_WRITE;
2857         if (flags & MDS_FMODE_EXEC)
2858                 res = MAY_EXEC;
2859         return res;
2860 }
2861
2862 static int mdd_open_sanity_check(const struct lu_env *env,
2863                                  struct mdd_object *obj, int flag)
2864 {
2865         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2866         int mode = accmode(obj, flag);
2867         int rc;
2868         ENTRY;
2869
2870         /* EEXIST check */
2871         if (mdd_is_dead_obj(obj))
2872                 RETURN(-ENOENT);
2873
2874         rc = __mdd_la_get(env, obj, tmp_la);
2875         if (rc)
2876                RETURN(rc);
2877
2878         if (S_ISLNK(tmp_la->la_mode))
2879                 RETURN(-ELOOP);
2880
2881         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2882                 RETURN(-EISDIR);
2883
2884         if (!(flag & MDS_OPEN_CREATED)) {
2885                 rc = __mdd_permission_internal(env, obj, mode, 0);
2886                 if (rc)
2887                         RETURN(rc);
2888         }
2889
2890         /*
2891          * FIFO's, sockets and device files are special: they don't
2892          * actually live on the filesystem itself, and as such you
2893          * can write to them even if the filesystem is read-only.
2894          */
2895         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2896             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2897                 flag &= ~O_TRUNC;
2898
2899         /*
2900          * An append-only file must be opened in append mode for writing.
2901          */
2902         if (mdd_is_append(obj)) {
2903                 if ((flag & FMODE_WRITE) && !(flag & O_APPEND))
2904                         RETURN(-EPERM);
2905                 if (flag & O_TRUNC)
2906                         RETURN(-EPERM);
2907         }
2908
2909         /* O_NOATIME can only be set by the owner or superuser */
2910         if (flag & O_NOATIME) {
2911                 struct md_ucred *uc = md_ucred(env);
2912
2913                 if (uc->mu_fsuid != tmp_la->la_uid &&
2914                     !mdd_capable(uc, CAP_FOWNER))
2915                         RETURN(-EPERM);
2916         }
2917
2918         RETURN(0);
2919 }
2920
2921 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2922                     int flags)
2923 {
2924         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2925         int rc = 0;
2926
2927         mdd_write_lock(env, mdd_obj);
2928
2929         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2930         if (rc == 0)
2931                 mdd_obj->mod_count ++;
2932
2933         mdd_write_unlock(env, mdd_obj);
2934         return rc;
2935 }
2936
2937 /*
2938  * No permission check is needed.
2939  */
2940 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2941                      struct md_attr *ma)
2942 {
2943         int rc;
2944         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2945         ENTRY;
2946
2947         mdd_write_lock(env, mdd_obj);
2948         /* release open count */
2949         mdd_obj->mod_count --;
2950
2951         rc = __mdd_iattr_get(env, mdd_obj, ma);
2952         if (rc == 0 && mdd_obj->mod_count == 0) {
2953                 if (ma->ma_attr.la_nlink == 0)
2954                         rc = __mdd_object_kill(env, mdd_obj, ma);
2955         }
2956         mdd_write_unlock(env, mdd_obj);
2957         RETURN(rc);
2958 }
2959
2960 static int mdd_readpage_sanity_check(const struct lu_env *env,
2961                                      struct mdd_object *obj)
2962 {
2963         struct dt_object *next = mdd_object_child(obj);
2964         int rc;
2965         ENTRY;
2966
2967         if (S_ISDIR(mdd_object_type(obj)) &&
2968             dt_try_as_dir(env, next))
2969                 rc = mdd_permission_internal(env, obj, MAY_READ);
2970         else
2971                 rc = -ENOTDIR;
2972
2973         RETURN(rc);
2974 }
2975
2976 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2977                         const struct lu_rdpg *rdpg)
2978 {
2979         struct dt_object *next;
2980         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2981         int rc;
2982         ENTRY;
2983
2984         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
2985         next = mdd_object_child(mdd_obj);
2986
2987         mdd_read_lock(env, mdd_obj);
2988         rc = mdd_readpage_sanity_check(env, mdd_obj);
2989         if (rc)
2990                 GOTO(out_unlock, rc);
2991
2992         rc = next->do_ops->do_readpage(env, next, rdpg);
2993
2994 out_unlock:
2995         mdd_read_unlock(env, mdd_obj);
2996         RETURN(rc);
2997 }
2998
2999 #ifdef CONFIG_FS_POSIX_ACL
3000 #include <linux/posix_acl_xattr.h>
3001 #include <linux/posix_acl.h>
3002
3003 static int mdd_posix_acl_permission(struct md_ucred *uc, struct lu_attr *la,
3004                                     int want, posix_acl_xattr_entry *entry,
3005                                     int count)
3006 {
3007         posix_acl_xattr_entry *pa, *pe, *mask_obj;
3008         int found = 0;
3009         ENTRY;
3010
3011         if (count <= 0)
3012                 RETURN(-EACCES);
3013
3014         pa = &entry[0];
3015         pe = &entry[count - 1];
3016         for (; pa <= pe; pa++) {
3017                 switch(pa->e_tag) {
3018                         case ACL_USER_OBJ:
3019                                 /* (May have been checked already) */
3020                                 if (la->la_uid == uc->mu_fsuid)
3021                                         goto check_perm;
3022                                 break;
3023                         case ACL_USER:
3024                                 if (pa->e_id == uc->mu_fsuid)
3025                                         goto mask;
3026                                 break;
3027                         case ACL_GROUP_OBJ:
3028                                 if (mdd_in_group_p(uc, la->la_gid)) {
3029                                         found = 1;
3030                                         if ((pa->e_perm & want) == want)
3031                                                 goto mask;
3032                                 }
3033                                 break;
3034                         case ACL_GROUP:
3035                                 if (mdd_in_group_p(uc, pa->e_id)) {
3036                                         found = 1;
3037                                         if ((pa->e_perm & want) == want)
3038                                                 goto mask;
3039                                 }
3040                                 break;
3041                         case ACL_MASK:
3042                                 break;
3043                         case ACL_OTHER:
3044                                 if (found)
3045                                         RETURN(-EACCES);
3046                                 else
3047                                         goto check_perm;
3048                         default:
3049                                 RETURN(-EIO);
3050                 }
3051         }
3052         RETURN(-EIO);
3053
3054 mask:
3055         for (mask_obj = pa + 1; mask_obj <= pe; mask_obj++) {
3056                 if (mask_obj->e_tag == ACL_MASK) {
3057                         if ((pa->e_perm & mask_obj->e_perm & want) == want)
3058                                 RETURN(0);
3059
3060                         RETURN(-EACCES);
3061                 }
3062         }
3063
3064 check_perm:
3065         if ((pa->e_perm & want) == want)
3066                 RETURN(0);
3067
3068         RETURN(-EACCES);
3069 }
3070 #endif
3071
3072 static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
3073                          struct lu_attr* la, int mask)
3074 {
3075 #ifdef CONFIG_FS_POSIX_ACL
3076         struct dt_object *next;
3077         struct lu_buf    *buf = &mdd_env_info(env)->mti_buf;
3078         struct md_ucred  *uc  = md_ucred(env);
3079         posix_acl_xattr_entry *entry;
3080         int entry_count;
3081         int rc;
3082         ENTRY;
3083
3084         next = mdd_object_child(obj);
3085
3086         buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
3087         buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
3088         rc = next->do_ops->do_xattr_get(env, next, buf,
3089                                         XATTR_NAME_ACL_ACCESS);
3090         if (rc <= 0)
3091                 RETURN(rc ? : -EACCES);
3092
3093         entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
3094         entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
3095
3096         rc = mdd_posix_acl_permission(uc, la, mask, entry, entry_count);
3097         RETURN(rc);
3098 #else
3099         ENTRY;
3100         RETURN(-EAGAIN);
3101 #endif
3102 }
3103
3104 static int mdd_exec_permission_lite(const struct lu_env *env,
3105                                     struct mdd_object *obj)
3106 {
3107         struct lu_attr  *la = &mdd_env_info(env)->mti_la;
3108         struct md_ucred *uc = md_ucred(env);
3109         umode_t mode;
3110         int rc;
3111         ENTRY;
3112
3113         /* These means unnecessary for permission check */
3114         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3115                 RETURN(0);
3116
3117         /* Invalid user credit */
3118         if (uc->mu_valid == UCRED_INVALID)
3119                 RETURN(-EACCES);
3120
3121         rc = __mdd_la_get(env, obj, la);
3122         if (rc)
3123                 RETURN(rc);
3124
3125         mode = la->la_mode;
3126         if (uc->mu_fsuid == la->la_uid)
3127                 mode >>= 6;
3128         else if (mdd_in_group_p(uc, la->la_gid))
3129                 mode >>= 3;
3130
3131         if (mode & MAY_EXEC)
3132                 RETURN(0);
3133
3134         if (((la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) &&
3135             mdd_capable(uc, CAP_DAC_OVERRIDE))
3136                 RETURN(0);
3137
3138         if (S_ISDIR(la->la_mode) && mdd_capable(uc, CAP_DAC_READ_SEARCH))
3139                 RETURN(0);
3140
3141         RETURN(-EACCES);
3142 }
3143
3144 static int __mdd_permission_internal(const struct lu_env *env,
3145                                      struct mdd_object *obj,
3146                                      int mask, int getattr)
3147 {
3148         struct lu_attr  *la = &mdd_env_info(env)->mti_la;
3149         struct md_ucred *uc = md_ucred(env);
3150         __u32 mode;
3151         int rc;
3152
3153         ENTRY;
3154
3155         if (mask == 0)
3156                 RETURN(0);
3157
3158         /* These means unnecessary for permission check */
3159         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3160                 RETURN(0);
3161
3162         /* Invalid user credit */
3163         if (uc->mu_valid == UCRED_INVALID)
3164                 RETURN(-EACCES);
3165
3166         /*
3167          * Nobody gets write access to an immutable file.
3168          */
3169         if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
3170                 RETURN(-EACCES);
3171
3172         if (getattr) {
3173                 rc = __mdd_la_get(env, obj, la);
3174                 if (rc)
3175                         RETURN(rc);
3176         }
3177
3178         mode = la->la_mode;
3179         if (uc->mu_fsuid == la->la_uid) {
3180                 mode >>= 6;
3181         } else {
3182                 if (mode & S_IRWXG) {
3183                         if (((mode >> 3) & mask & S_IRWXO) != mask)
3184                                 goto check_groups;
3185
3186                         rc = mdd_check_acl(env, obj, la, mask);
3187                         if (rc == -EACCES)
3188                                 goto check_capabilities;
3189                         else if ((rc != -EAGAIN) && (rc != -EOPNOTSUPP))
3190                                 RETURN(rc);
3191                 }
3192
3193 check_groups:
3194                 if (mdd_in_group_p(uc, la->la_gid))
3195                         mode >>= 3;
3196         }
3197
3198         /*
3199          * If the DACs are ok we don't need any capability check.
3200          */
3201         if (((mode & mask & S_IRWXO) == mask))
3202                 RETURN(0);
3203
3204 check_capabilities:
3205
3206         /*
3207          * Read/write DACs are always overridable.
3208          * Executable DACs are overridable if at least one exec bit is set.
3209          * Dir's DACs are always overridable.
3210          */
3211         if (!(mask & MAY_EXEC) ||
3212             (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
3213                 if (mdd_capable(uc, CAP_DAC_OVERRIDE))
3214                         RETURN(0);
3215
3216         /*
3217          * Searching includes executable on directories, else just read.
3218          */
3219         if ((mask == MAY_READ) ||
3220             (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
3221                 if (mdd_capable(uc, CAP_DAC_READ_SEARCH))
3222                         RETURN(0);
3223
3224         RETURN(-EACCES);
3225 }
3226
3227 static inline int mdd_permission_internal_locked(const struct lu_env *env,
3228                                                  struct mdd_object *obj,
3229                                                  int mask)
3230 {
3231         int rc;
3232
3233         mdd_read_lock(env, obj);
3234         rc = mdd_permission_internal(env, obj, mask);
3235         mdd_read_unlock(env, obj);
3236
3237         return rc;
3238 }
3239
3240 static int mdd_permission(const struct lu_env *env, struct md_object *obj,
3241                           int mask)
3242 {
3243         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3244         int rc;
3245         ENTRY;
3246
3247         rc = mdd_permission_internal_locked(env, mdd_obj, mask);
3248
3249         RETURN(rc);
3250 }
3251
3252 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
3253                         struct lustre_capa *capa)
3254 {
3255         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3256         struct mdd_device *mdd = mdo2mdd(obj);
3257         struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site;
3258         struct lustre_capa_key *key = &ls->ls_capa_keys[1];
3259         struct obd_capa *ocapa;
3260         int rc;
3261         ENTRY;
3262
3263         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3264
3265         capa->lc_fid = *mdo2fid(mdd_obj);
3266         if (ls->ls_capa_timeout < CAPA_TIMEOUT)
3267                 capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
3268         if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid))
3269                 capa->lc_flags |= CAPA_FL_ROOT;
3270         capa->lc_flags = ls->ls_capa_alg << 24;
3271
3272         /* TODO: get right permission here after remote uid landing */
3273         ocapa = capa_lookup(capa);
3274         if (ocapa) {
3275                 LASSERT(!capa_is_expired(ocapa));
3276                 capa_cpy(capa, ocapa);
3277                 capa_put(ocapa);
3278                 RETURN(0);
3279         }
3280
3281         capa->lc_keyid = key->lk_keyid;
3282         capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout;
3283         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
3284         if (rc)
3285                 RETURN(rc);
3286
3287         capa_add(capa);
3288         RETURN(0);
3289 }
3290
3291 struct md_device_operations mdd_ops = {
3292         .mdo_statfs         = mdd_statfs,
3293         .mdo_root_get       = mdd_root_get,
3294         .mdo_maxsize_get    = mdd_maxsize_get,
3295         .mdo_init_capa_keys = mdd_init_capa_keys,
3296         .mdo_update_capa_key= mdd_update_capa_key,
3297 };
3298
3299 static struct md_dir_operations mdd_dir_ops = {
3300         .mdo_is_subdir     = mdd_is_subdir,
3301         .mdo_lookup        = mdd_lookup,
3302         .mdo_create        = mdd_create,
3303         .mdo_rename        = mdd_rename,
3304         .mdo_link          = mdd_link,
3305         .mdo_unlink        = mdd_unlink,
3306         .mdo_name_insert   = mdd_name_insert,
3307         .mdo_name_remove   = mdd_name_remove,
3308         .mdo_rename_tgt    = mdd_rename_tgt,
3309         .mdo_create_data   = mdd_create_data
3310 };
3311
3312 static struct md_object_operations mdd_obj_ops = {
3313         .moo_permission    = mdd_permission,
3314         .moo_attr_get      = mdd_attr_get,
3315         .moo_attr_set      = mdd_attr_set,
3316         .moo_xattr_get     = mdd_xattr_get,
3317         .moo_xattr_set     = mdd_xattr_set,
3318         .moo_xattr_list    = mdd_xattr_list,
3319         .moo_xattr_del     = mdd_xattr_del,
3320         .moo_object_create = mdd_object_create,
3321         .moo_ref_add       = mdd_ref_add,
3322         .moo_ref_del       = mdd_ref_del,
3323         .moo_open          = mdd_open,
3324         .moo_close         = mdd_close,
3325         .moo_readpage      = mdd_readpage,
3326         .moo_readlink      = mdd_readlink,
3327         .moo_capa_get      = mdd_capa_get
3328 };
3329
3330 static struct obd_ops mdd_obd_device_ops = {
3331         .o_owner = THIS_MODULE
3332 };
3333
3334 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
3335                                           struct lu_device_type *t,
3336                                           struct lustre_cfg *lcfg)
3337 {
3338         struct lu_device  *l;
3339         struct mdd_device *m;
3340
3341         OBD_ALLOC_PTR(m);
3342         if (m == NULL) {
3343                 l = ERR_PTR(-ENOMEM);
3344         } else {
3345                 md_device_init(&m->mdd_md_dev, t);
3346                 l = mdd2lu_dev(m);
3347                 l->ld_ops = &mdd_lu_ops;
3348                 m->mdd_md_dev.md_ops = &mdd_ops;
3349         }
3350
3351         return l;
3352 }
3353
3354 static void mdd_device_free(const struct lu_env *env,
3355                             struct lu_device *lu)
3356 {
3357         struct mdd_device *m = lu2mdd_dev(lu);
3358
3359         LASSERT(atomic_read(&lu->ld_ref) == 0);
3360         md_device_fini(&m->mdd_md_dev);
3361         OBD_FREE_PTR(m);
3362 }
3363
3364 static void *mdd_ucred_key_init(const struct lu_context *ctx,
3365                                 struct lu_context_key *key)
3366 {
3367         struct md_ucred *uc;
3368
3369         OBD_ALLOC_PTR(uc);
3370         if (uc == NULL)
3371                 uc = ERR_PTR(-ENOMEM);
3372         return uc;
3373 }
3374
3375 static void mdd_ucred_key_fini(const struct lu_context *ctx,
3376                              struct lu_context_key *key, void *data)
3377 {
3378         struct md_ucred *uc = data;
3379         OBD_FREE_PTR(uc);
3380 }
3381
3382 static struct lu_context_key mdd_ucred_key = {
3383         .lct_tags = LCT_SESSION,
3384         .lct_init = mdd_ucred_key_init,
3385         .lct_fini = mdd_ucred_key_fini
3386 };
3387
3388 struct md_ucred *md_ucred(const struct lu_env *env)
3389 {
3390         LASSERT(env->le_ses != NULL);
3391         return lu_context_key_get(env->le_ses, &mdd_ucred_key);
3392 }
3393 EXPORT_SYMBOL(md_ucred);
3394
3395 static int mdd_type_init(struct lu_device_type *t)
3396 {
3397         int result;
3398
3399         result = lu_context_key_register(&mdd_thread_key);
3400         if (result == 0)
3401                 result = lu_context_key_register(&mdd_ucred_key);
3402         return result;
3403 }
3404
3405 static void mdd_type_fini(struct lu_device_type *t)
3406 {
3407         lu_context_key_degister(&mdd_ucred_key);
3408         lu_context_key_degister(&mdd_thread_key);
3409 }
3410
3411 static struct lu_device_type_operations mdd_device_type_ops = {
3412         .ldto_init = mdd_type_init,
3413         .ldto_fini = mdd_type_fini,
3414
3415         .ldto_device_alloc = mdd_device_alloc,
3416         .ldto_device_free  = mdd_device_free,
3417
3418         .ldto_device_init    = mdd_device_init,
3419         .ldto_device_fini    = mdd_device_fini
3420 };
3421
3422 static struct lu_device_type mdd_device_type = {
3423         .ldt_tags     = LU_DEVICE_MD,
3424         .ldt_name     = LUSTRE_MDD_NAME,
3425         .ldt_ops      = &mdd_device_type_ops,
3426         .ldt_ctx_tags = LCT_MD_THREAD
3427 };
3428
3429 static void *mdd_key_init(const struct lu_context *ctx,
3430                           struct lu_context_key *key)
3431 {
3432         struct mdd_thread_info *info;
3433
3434         OBD_ALLOC_PTR(info);
3435         if (info == NULL)
3436                 info = ERR_PTR(-ENOMEM);
3437         return info;
3438 }
3439
3440 static void mdd_key_fini(const struct lu_context *ctx,
3441                          struct lu_context_key *key, void *data)
3442 {
3443         struct mdd_thread_info *info = data;
3444         OBD_FREE_PTR(info);
3445 }
3446
3447 static struct lu_context_key mdd_thread_key = {
3448         .lct_tags = LCT_MD_THREAD,
3449         .lct_init = mdd_key_init,
3450         .lct_fini = mdd_key_fini
3451 };
3452
3453 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
3454         { 0 }
3455 };
3456
3457 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
3458         { 0 }
3459 };
3460
3461 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
3462
3463 static int __init mdd_mod_init(void)
3464 {
3465         struct lprocfs_static_vars lvars;
3466         printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
3467         lprocfs_init_vars(mdd, &lvars);
3468         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
3469                                    LUSTRE_MDD_NAME, &mdd_device_type);
3470 }
3471
3472 static void __exit mdd_mod_exit(void)
3473 {
3474         class_unregister_type(LUSTRE_MDD_NAME);
3475 }
3476
3477 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3478 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
3479 MODULE_LICENSE("GPL");
3480
3481 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);