Whamcloud - gitweb
Add acl initialization for cross-ref create.
[fs/lustre-release.git] / lustre / mdd / mdd_handler.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "mdd_internal.h"
46
47
48 static struct thandle* mdd_trans_start(const struct lu_env *env,
49                                        struct mdd_device *);
50 static void mdd_trans_stop(const struct lu_env *env,
51                            struct mdd_device *mdd, int rc,
52                            struct thandle *handle);
53 static struct dt_object* mdd_object_child(struct mdd_object *o);
54 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
55                           struct thandle *handle);
56 static void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
57                           struct thandle *handle);
58 static int __mdd_lookup(const struct lu_env *env,
59                         struct md_object *pobj,
60                         const char *name, const struct lu_fid* fid,
61                         int mask);
62 static int __mdd_lookup_locked(const struct lu_env *env,
63                                struct md_object *pobj,
64                                const char *name, const struct lu_fid* fid,
65                                int mask);
66 #if 0
67 static int mdd_exec_permission_lite(const struct lu_env *env,
68                                     struct mdd_object *obj);
69 #endif
70 static int __mdd_permission_internal(const struct lu_env *env,
71                                      struct mdd_object *obj,
72                                      int mask, int getattr);
73
74 static struct md_object_operations mdd_obj_ops;
75 static struct md_dir_operations    mdd_dir_ops;
76 static struct lu_object_operations mdd_lu_obj_ops;
77
78 static struct lu_context_key       mdd_thread_key;
79
80 static const char *mdd_root_dir_name = "root";
81 static const char dot[] = ".";
82 static const char dotdot[] = "..";
83
84 enum mdd_txn_op {
85         MDD_TXN_OBJECT_DESTROY_OP,
86         MDD_TXN_OBJECT_CREATE_OP,
87         MDD_TXN_ATTR_SET_OP,
88         MDD_TXN_XATTR_SET_OP,
89         MDD_TXN_INDEX_INSERT_OP,
90         MDD_TXN_INDEX_DELETE_OP,
91         MDD_TXN_LINK_OP,
92         MDD_TXN_UNLINK_OP,
93         MDD_TXN_RENAME_OP,
94         MDD_TXN_RENAME_TGT_OP,
95         MDD_TXN_CREATE_DATA_OP,
96         MDD_TXN_MKDIR_OP
97 };
98
99 struct mdd_txn_op_descr {
100         enum mdd_txn_op mod_op;
101         unsigned int    mod_credits;
102 };
103
104 enum {
105         MDD_TXN_OBJECT_DESTROY_CREDITS = 0,
106         MDD_TXN_OBJECT_CREATE_CREDITS = 0,
107         MDD_TXN_ATTR_SET_CREDITS = 0,
108         MDD_TXN_XATTR_SET_CREDITS = 0,
109         MDD_TXN_INDEX_INSERT_CREDITS = 0,
110         MDD_TXN_INDEX_DELETE_CREDITS = 0,
111         MDD_TXN_LINK_CREDITS = 0,
112         MDD_TXN_UNLINK_CREDITS = 0,
113         MDD_TXN_RENAME_CREDITS = 0,
114         MDD_TXN_RENAME_TGT_CREDITS = 0,
115         MDD_TXN_CREATE_DATA_CREDITS = 0,
116         MDD_TXN_MKDIR_CREDITS = 0
117 };
118
119 #define DEFINE_MDD_TXN_OP_ARRAY(opname, base)   \
120 [opname ## _OP - base ## _OP]= {                \
121         .mod_op      = opname ## _OP,           \
122         .mod_credits = opname ## _CREDITS,      \
123 }
124
125 /*
126  * number of blocks to reserve for particular operations. Should be function
127  * of ... something. Stub for now.
128  */
129
130 #define DEFINE_MDD_TXN_OP_DESC(opname)          \
131         DEFINE_MDD_TXN_OP_ARRAY(opname, MDD_TXN_OBJECT_DESTROY)
132
133 static struct mdd_txn_op_descr mdd_txn_descrs[] = {
134         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_DESTROY),
135         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_OBJECT_CREATE),
136         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_ATTR_SET),
137         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_XATTR_SET),
138         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_INSERT),
139         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_INDEX_DELETE),
140         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_LINK),
141         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_UNLINK),
142         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME),
143         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_RENAME_TGT),
144         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_CREATE_DATA),
145         DEFINE_MDD_TXN_OP_DESC(MDD_TXN_MKDIR)
146 };
147
148 static void mdd_txn_param_build(const struct lu_env *env, enum mdd_txn_op op)
149 {
150         LASSERT(0 <= op && op < ARRAY_SIZE(mdd_txn_descrs));
151
152         mdd_env_info(env)->mti_param.tp_credits =
153                 mdd_txn_descrs[op].mod_credits;
154 }
155
156 static int mdd_credit_get(const struct lu_env *env, struct mdd_device *mdd,
157                           int op)
158 {
159         int credits;
160         credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child,
161                                                     op);
162         LASSERT(credits > 0);
163         return credits;
164 }
165
166 /* XXX: we should calculate it by lsm count, not ost count. */
167 int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd)
168 {
169         struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
170         int ost_count = mds->mds_lov_desc.ld_tgt_count;
171
172         int index_create_credits;
173         int index_delete_credits;
174
175         int xattr_credits;
176         int log_credits;
177         int create_credits;
178         int destroy_credits;
179         int attr_credits;
180         int num_entries;
181         int i;
182
183         /* Init credits for each ops. */
184         num_entries = ARRAY_SIZE(mdd_txn_descrs);
185         LASSERT(num_entries > 0);
186
187         /* Init the basic credits from osd layer. */
188         index_create_credits = mdd_credit_get(env, mdd, DTO_INDEX_INSERT);
189         index_delete_credits = mdd_credit_get(env, mdd, DTO_INDEX_DELETE);
190         log_credits = mdd_credit_get(env, mdd, DTO_LOG_REC);
191         attr_credits = mdd_credit_get(env, mdd, DTO_ATTR_SET);
192         xattr_credits = mdd_credit_get(env, mdd, DTO_XATTR_SET);
193         create_credits = mdd_credit_get(env, mdd, DTO_OBJECT_CREATE);
194         destroy_credits = mdd_credit_get(env, mdd, DTO_OBJECT_DELETE);
195
196         /* Calculate the mdd credits. */
197         for (i = 0; i < num_entries; i++) {
198                 int opcode = mdd_txn_descrs[i].mod_op;
199                 int *c = &mdd_txn_descrs[i].mod_credits;
200                 switch(opcode) {
201                         case MDD_TXN_OBJECT_DESTROY_OP:
202                                 *c = destroy_credits;
203                                 break;
204                         case MDD_TXN_OBJECT_CREATE_OP:
205                                 /* OI_INSERT + CREATE OBJECT */
206                                 *c = index_create_credits + create_credits;
207                                 break;
208                         case MDD_TXN_ATTR_SET_OP:
209                                 /* ATTR set + XATTR(lsm, lmv) set */
210                                 *c = attr_credits + xattr_credits;
211                                 break;
212                         case MDD_TXN_XATTR_SET_OP:
213                                 *c = xattr_credits;
214                                 break;
215                         case MDD_TXN_INDEX_INSERT_OP:
216                                 *c = index_create_credits;
217                                 break;
218                         case MDD_TXN_INDEX_DELETE_OP:
219                                 *c = index_delete_credits;
220                                 break;
221                         case MDD_TXN_LINK_OP:
222                                 *c = index_create_credits;
223                                 break;
224                         case MDD_TXN_UNLINK_OP:
225                                 /* delete index + Unlink log */
226                                 *c = index_delete_credits +
227                                         log_credits * ost_count;
228                                 break;
229                         case MDD_TXN_RENAME_OP:
230                                 /* 2 delete index + 1 insert + Unlink log */
231                                 *c = 2 * index_delete_credits +
232                                         index_create_credits +
233                                         log_credits * ost_count;
234                                 break;
235                         case MDD_TXN_RENAME_TGT_OP:
236                                 /* index insert + index delete */
237                                 *c = index_delete_credits +
238                                         index_create_credits;
239                                 break;
240                         case MDD_TXN_CREATE_DATA_OP:
241                                 /* same as set xattr(lsm) */
242                                 *c = xattr_credits;
243                                 break;
244                         case MDD_TXN_MKDIR_OP:
245                                 /* INDEX INSERT + OI INSERT + CREATE_OBJECT_CREDITS
246                                  * SET_MD CREDITS is already counted in
247                                  * CREATE_OBJECT CREDITS
248                                  */
249                                  *c = 2 * index_create_credits + create_credits;
250                                 break;
251                         default:
252                                 CERROR("Invalid op %d init its credit\n",
253                                        opcode);
254                                 LBUG();
255                 }
256         }
257         RETURN(0);
258 }
259
260 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
261 {
262         struct lu_buf *buf;
263
264         buf = &mdd_env_info(env)->mti_buf;
265         buf->lb_buf = area;
266         buf->lb_len = len;
267         return buf;
268 }
269
270 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
271                                        const void *area, ssize_t len)
272 {
273         struct lu_buf *buf;
274
275         buf = &mdd_env_info(env)->mti_buf;
276         buf->lb_buf = (void *)area;
277         buf->lb_len = len;
278         return buf;
279 }
280
281 #define mdd_get_group_info(group_info) do {             \
282         atomic_inc(&(group_info)->usage);               \
283 } while (0)
284
285 #define mdd_put_group_info(group_info) do {             \
286         if (atomic_dec_and_test(&(group_info)->usage))  \
287                 groups_free(group_info);                \
288 } while (0)
289
290 #define MDD_NGROUPS_PER_BLOCK       ((int)(CFS_PAGE_SIZE / sizeof(gid_t)))
291
292 #define MDD_GROUP_AT(gi, i) \
293     ((gi)->blocks[(i) / MDD_NGROUPS_PER_BLOCK][(i) % MDD_NGROUPS_PER_BLOCK])
294
295 /* groups_search() is copied from linux kernel! */
296 /* a simple bsearch */
297 static int mdd_groups_search(struct group_info *group_info, gid_t grp)
298 {
299         int left, right;
300
301         if (!group_info)
302                 return 0;
303
304         left = 0;
305         right = group_info->ngroups;
306         while (left < right) {
307                 int mid = (left + right) / 2;
308                 int cmp = grp - MDD_GROUP_AT(group_info, mid);
309
310                 if (cmp > 0)
311                         left = mid + 1;
312                 else if (cmp < 0)
313                         right = mid;
314                 else
315                         return 1;
316         }
317         return 0;
318 }
319
320 static int mdd_in_group_p(struct md_ucred *uc, gid_t grp)
321 {
322         int rc = 1;
323
324         if (grp != uc->mu_fsgid) {
325                 struct group_info *group_info = NULL;
326
327                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD) ||
328                     (!uc->mu_ginfo && !uc->mu_identity))
329                         if ((grp == uc->mu_suppgids[0]) ||
330                             (grp == uc->mu_suppgids[1]))
331                                 return 1;
332
333                 if (uc->mu_ginfo)
334                         group_info = uc->mu_ginfo;
335                 else if (uc->mu_identity)
336                         group_info = uc->mu_identity->mi_ginfo;
337
338                 if (!group_info)
339                         return 0;
340
341                 mdd_get_group_info(group_info);
342                 rc = mdd_groups_search(group_info, grp);
343                 mdd_put_group_info(group_info);
344         }
345         return rc;
346 }
347
348 static inline int mdd_permission_internal(const struct lu_env *env,
349                                           struct mdd_object *obj, int mask)
350 {
351         return __mdd_permission_internal(env, obj, mask, 1);
352 }
353
354 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
355 {
356         struct mdd_thread_info *info;
357
358         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
359         LASSERT(info != NULL);
360         return info;
361 }
362
363 static struct lu_object *mdd_object_alloc(const struct lu_env *env,
364                                           const struct lu_object_header *hdr,
365                                           struct lu_device *d)
366 {
367         struct mdd_object *mdd_obj;
368
369         OBD_ALLOC_PTR(mdd_obj);
370         if (mdd_obj != NULL) {
371                 struct lu_object *o;
372
373                 o = mdd2lu_obj(mdd_obj);
374                 lu_object_init(o, NULL, d);
375                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
376                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
377                 mdd_obj->mod_count = 0;
378                 o->lo_ops = &mdd_lu_obj_ops;
379                 return o;
380         } else {
381                 return NULL;
382         }
383 }
384
385 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
386 {
387         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
388         struct lu_object  *below;
389         struct lu_device  *under;
390         ENTRY;
391
392         under = &d->mdd_child->dd_lu_dev;
393         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
394
395         if (below == NULL)
396                 RETURN(-ENOMEM);
397
398         lu_object_add(o, below);
399         RETURN(0);
400 }
401
402 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj);
403
404 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
405 {
406         if (lu_object_exists(o))
407                 return mdd_get_flags(env, lu2mdd_obj(o));
408         else
409                 return 0;
410 }
411
412 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
413 {
414         struct mdd_object *mdd = lu2mdd_obj(o);
415         
416         lu_object_fini(o);
417         OBD_FREE_PTR(mdd);
418 }
419
420 static int mdd_object_print(const struct lu_env *env, void *cookie,
421                             lu_printer_t p, const struct lu_object *o)
422 {
423         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
424 }
425
426 /* orphan handling is here */
427 static void mdd_object_delete(const struct lu_env *env,
428                                struct lu_object *o)
429 {
430         struct mdd_object *mdd_obj = lu2mdd_obj(o);
431         struct thandle *handle = NULL;
432         ENTRY;
433
434         if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
435                 return;
436
437         if (test_bit(LU_OBJECT_ORPHAN, &o->lo_header->loh_flags)) {
438                 mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP);
439                 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
440                 if (IS_ERR(handle))
441                         CERROR("Cannot get thandle\n");
442                 else {
443                         mdd_write_lock(env, mdd_obj);
444                         /* let's remove obj from the orphan list */
445                         __mdd_orphan_del(env, mdd_obj, handle);
446                         mdd_write_unlock(env, mdd_obj);
447                         mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
448                                        0, handle);
449                 }
450         }
451 }
452
453 static struct lu_object_operations mdd_lu_obj_ops = {
454         .loo_object_init    = mdd_object_init,
455         .loo_object_start   = mdd_object_start,
456         .loo_object_free    = mdd_object_free,
457         .loo_object_print   = mdd_object_print,
458         .loo_object_delete  = mdd_object_delete
459 };
460
461 struct mdd_object *mdd_object_find(const struct lu_env *env,
462                                    struct mdd_device *d,
463                                    const struct lu_fid *f)
464 {
465         struct lu_object *o, *lo;
466         struct mdd_object *m;
467         ENTRY;
468
469         o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
470         if (IS_ERR(o))
471                 m = (struct mdd_object *)o;
472         else {
473                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
474                 /* remote object can't be located and should be put then */
475                 if (lo == NULL)
476                         lu_object_put(env, o);
477                 m = lu2mdd_obj(lo);
478         }
479         RETURN(m);
480 }
481
482 static inline int mdd_is_immutable(struct mdd_object *obj)
483 {
484         return obj->mod_flags & IMMUTE_OBJ;
485 }
486
487 static inline int mdd_is_append(struct mdd_object *obj)
488 {
489         return obj->mod_flags & APPEND_OBJ;
490 }
491
492 static inline void mdd_set_dead_obj(struct mdd_object *obj)
493 {
494         if (obj)
495                 obj->mod_flags |= DEAD_OBJ;
496 }
497
498 static inline int mdd_is_dead_obj(struct mdd_object *obj)
499 {
500         return obj && obj->mod_flags & DEAD_OBJ;
501 }
502
503 /*Check whether it may create the cobj under the pobj*/
504 static int mdd_may_create(const struct lu_env *env,
505                           struct mdd_object *pobj, struct mdd_object *cobj,
506                           int need_check)
507 {
508         int rc = 0;
509         ENTRY;
510
511         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
512                 RETURN(-EEXIST);
513
514         if (mdd_is_dead_obj(pobj))
515                 RETURN(-ENOENT);
516
517         /*check pobj may create or not*/
518         if (need_check)
519                 rc = mdd_permission_internal(env, pobj,
520                                              MAY_WRITE | MAY_EXEC);
521
522         RETURN(rc);
523 }
524
525 static inline int __mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
526                                struct lu_attr *la, struct lustre_capa *capa)
527 {
528         struct dt_object *next = mdd_object_child(obj);
529         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
530         return next->do_ops->do_attr_get(env, next, la, capa);
531 }
532
533 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
534 {
535         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
536
537         if (flags & LUSTRE_APPEND_FL)
538                 obj->mod_flags |= APPEND_OBJ;
539
540         if (flags & LUSTRE_IMMUTABLE_FL)
541                 obj->mod_flags |= IMMUTE_OBJ;
542 }
543
544 static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
545 {
546         struct lu_attr *la = &mdd_env_info(env)->mti_la;
547         int rc;
548
549         ENTRY;
550         mdd_read_lock(env, obj);
551         rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
552         mdd_read_unlock(env, obj);
553         if (rc == 0)
554                 mdd_flags_xlate(obj, la->la_flags);
555         RETURN(rc);
556 }
557
558 #define mdd_cap_t(x) (x)
559
560 #define MDD_CAP_TO_MASK(x) (1 << (x))
561
562 #define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
563
564 /* capable() is copied from linux kernel! */
565 static inline int mdd_capable(struct md_ucred *uc, int cap)
566 {
567         if (mdd_cap_raised(uc->mu_cap, cap))
568                 return 1;
569         return 0;
570 }
571
572 /*
573  * It's inline, so penalty for filesystems that don't use sticky bit is
574  * minimal.
575  */
576 static inline int mdd_is_sticky(const struct lu_env *env,
577                                 struct mdd_object *pobj,
578                                 struct mdd_object *cobj)
579 {
580         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
581         struct md_ucred *uc = md_ucred(env);
582         int rc;
583
584         rc = __mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
585         if (rc) {
586                 return rc;
587         } else if (tmp_la->la_uid == uc->mu_fsuid) {
588                 return 0;
589         } else {
590                 rc = __mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
591                 if (rc)
592                         return rc;
593                 else if (!(tmp_la->la_mode & S_ISVTX))
594                         return 0;
595                 else if (tmp_la->la_uid == uc->mu_fsuid)
596                         return 0;
597                 else
598                         return !mdd_capable(uc, CAP_FOWNER);
599         }
600 }
601
602 /* Check whether it may delete the cobj under the pobj. */
603 static int mdd_may_delete(const struct lu_env *env,
604                           struct mdd_object *pobj,
605                           struct mdd_object *cobj,
606                           int is_dir, int need_check)
607 {
608         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
609         int rc = 0;
610         ENTRY;
611
612         LASSERT(cobj);
613
614         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
615                 RETURN(-ENOENT);
616
617         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
618                 RETURN(-EPERM);
619
620         if (is_dir) {
621                 if (!S_ISDIR(mdd_object_type(cobj)))
622                         RETURN(-ENOTDIR);
623
624                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
625                         RETURN(-EBUSY);
626
627         } else if (S_ISDIR(mdd_object_type(cobj))) {
628                         RETURN(-EISDIR);
629         }
630
631         if (pobj) {
632                 if (mdd_is_dead_obj(pobj))
633                         RETURN(-ENOENT);
634
635                 if (mdd_is_sticky(env, pobj, cobj))
636                         RETURN(-EPERM);
637
638                 if (need_check)
639                         rc = mdd_permission_internal(env, pobj,
640                                                      MAY_WRITE | MAY_EXEC);
641         }
642         RETURN(rc);
643 }
644
645 /* get only inode attributes */
646 static int __mdd_iattr_get(const struct lu_env *env,
647                            struct mdd_object *mdd_obj, struct md_attr *ma)
648 {
649         int rc = 0;
650         ENTRY;
651
652         rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr,
653                           mdd_object_capa(env, mdd_obj));
654         if (rc == 0)
655                 ma->ma_valid = MA_INODE;
656         RETURN(rc);
657 }
658
659 /* get lov EA only */
660 static int __mdd_lmm_get(const struct lu_env *env,
661                          struct mdd_object *mdd_obj, struct md_attr *ma)
662 {
663         int rc;
664         ENTRY;
665
666         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
667         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
668                         MDS_LOV_MD_NAME);
669         if (rc > 0) {
670                 ma->ma_valid |= MA_LOV;
671                 rc = 0;
672         }
673         RETURN(rc);
674 }
675
676 /* get lmv EA only*/
677 static int __mdd_lmv_get(const struct lu_env *env,
678                          struct mdd_object *mdd_obj, struct md_attr *ma)
679 {
680         int rc;
681
682         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
683                         MDS_LMV_MD_NAME);
684         if (rc > 0) {
685                 ma->ma_valid |= MA_LMV;
686                 rc = 0;
687         }
688         RETURN(rc);
689 }
690
691 #ifdef CONFIG_FS_POSIX_ACL
692 /* get default acl EA only */
693 static int __mdd_acl_def_get(const struct lu_env *env,
694                              struct mdd_object *mdd_obj, struct md_attr *ma)
695 {
696         struct dt_object *next = mdd_object_child(mdd_obj);
697         int rc;
698
699         rc = next->do_ops->do_xattr_get(env, next,
700                                         mdd_buf_get(env, ma->ma_lmv,
701                                                     ma->ma_lmv_size),
702                                         XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
703         if (rc > 0) {
704                 ma->ma_lmv_size = rc;
705                 ma->ma_valid |= MA_ACL_DEF;
706                 rc = 0;
707         } else if ((rc == -EOPNOTSUPP) || (rc == -ENODATA)) {
708                 rc = 0;
709         }
710         RETURN(rc);
711 }
712 #endif
713
714 static int mdd_attr_get_internal(const struct lu_env *env,
715                                  struct mdd_object *mdd_obj,
716                                  struct md_attr *ma)
717 {
718         int rc = 0;
719         ENTRY;
720
721         if (ma->ma_need & MA_INODE)
722                 rc = __mdd_iattr_get(env, mdd_obj, ma);
723
724         if (rc == 0 && ma->ma_need & MA_LOV) {
725                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726                     S_ISDIR(mdd_object_type(mdd_obj)))
727                         rc = __mdd_lmm_get(env, mdd_obj, ma);
728         }
729         if (rc == 0 && ma->ma_need & MA_LMV) {
730                 if (S_ISDIR(mdd_object_type(mdd_obj)))
731                         rc = __mdd_lmv_get(env, mdd_obj, ma);
732         }
733 #ifdef CONFIG_FS_POSIX_ACL
734         else if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
735                 if (S_ISDIR(mdd_object_type(mdd_obj)))
736                         rc = __mdd_acl_def_get(env, mdd_obj, ma);
737         }
738 #endif
739         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
740                         rc, ma->ma_valid);
741         RETURN(rc);
742 }
743
744 static inline int mdd_attr_get_internal_locked(const struct lu_env *env,
745                                                struct mdd_object *mdd_obj,
746                                                struct md_attr *ma)
747 {
748         int rc;
749         mdd_read_lock(env, mdd_obj);
750         rc = mdd_attr_get_internal(env, mdd_obj, ma);
751         mdd_read_unlock(env, mdd_obj);
752         return rc;
753 }
754
755 /*
756  * No permission check is needed.
757  */
758 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
759                         struct md_attr *ma)
760 {
761         struct mdd_object *mdd_obj = md2mdd_obj(obj);
762         int                rc;
763
764         ENTRY;
765         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
766         RETURN(rc);
767 }
768
769 /*
770  * No permission check is needed.
771  */
772 static int mdd_xattr_get(const struct lu_env *env,
773                          struct md_object *obj, struct lu_buf *buf,
774                          const char *name)
775 {
776         struct mdd_object *mdd_obj = md2mdd_obj(obj);
777         struct dt_object  *next;
778         int rc;
779
780         ENTRY;
781
782         LASSERT(lu_object_exists(&obj->mo_lu));
783
784         next = mdd_object_child(mdd_obj);
785         mdd_read_lock(env, mdd_obj);
786         rc = next->do_ops->do_xattr_get(env, next, buf, name,
787                                         mdd_object_capa(env, mdd_obj));
788         mdd_read_unlock(env, mdd_obj);
789
790         RETURN(rc);
791 }
792
793 /*
794  * Permission check is done when open,
795  * no need check again.
796  */
797 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
798                         struct lu_buf *buf)
799 {
800         struct mdd_object *mdd_obj = md2mdd_obj(obj);
801         struct dt_object  *next;
802         loff_t             pos = 0;
803         int                rc;
804         ENTRY;
805
806         LASSERT(lu_object_exists(&obj->mo_lu));
807
808         next = mdd_object_child(mdd_obj);
809         mdd_read_lock(env, mdd_obj);
810         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
811                                          mdd_object_capa(env, mdd_obj));
812         mdd_read_unlock(env, mdd_obj);
813         RETURN(rc);
814 }
815
816 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
817                           struct lu_buf *buf)
818 {
819         struct mdd_object *mdd_obj = md2mdd_obj(obj);
820         struct dt_object  *next;
821         int rc;
822
823         ENTRY;
824
825         LASSERT(lu_object_exists(&obj->mo_lu));
826
827         next = mdd_object_child(mdd_obj);
828         mdd_read_lock(env, mdd_obj);
829         rc = next->do_ops->do_xattr_list(env, next, buf,
830                                          mdd_object_capa(env, mdd_obj));
831         mdd_read_unlock(env, mdd_obj);
832
833         RETURN(rc);
834 }
835
836 static int mdd_txn_start_cb(const struct lu_env *env,
837                             struct txn_param *param, void *cookie)
838 {
839         return 0;
840 }
841
842 static int mdd_txn_stop_cb(const struct lu_env *env,
843                            struct thandle *txn, void *cookie)
844 {
845         struct mdd_device *mdd = cookie;
846         struct obd_device *obd = mdd2obd_dev(mdd);
847
848         LASSERT(obd);
849         return mds_lov_write_objids(obd);
850 }
851
852 static int mdd_txn_commit_cb(const struct lu_env *env,
853                              struct thandle *txn, void *cookie)
854 {
855         return 0;
856 }
857
858 static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
859                            const char *name, struct lu_device *next)
860 {
861         struct mdd_device *mdd = lu2mdd_dev(d);
862         struct dt_device  *dt;
863         int rc = 0;
864         ENTRY;
865
866         mdd->mdd_child = lu2dt_dev(next);
867
868         dt = mdd->mdd_child;
869         /* prepare transactions callbacks */
870         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
871         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
872         mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
873         mdd->mdd_txn_cb.dtc_cookie = mdd;
874         RETURN(rc);
875 }
876
877 static struct lu_device *mdd_device_fini(const struct lu_env *env,
878                                          struct lu_device *d)
879 {
880         struct mdd_device *mdd = lu2mdd_dev(d);
881         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
882
883         return next;
884 }
885
886 static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
887 {
888         int rc;
889         struct dt_object *root;
890         ENTRY;
891
892         dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
893         root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
894                              &mdd->mdd_root_fid);
895         if (!IS_ERR(root)) {
896                 LASSERT(root != NULL);
897                 lu_object_put(env, &root->do_lu);
898                 rc = orph_index_init(env, mdd);
899         } else
900                 rc = PTR_ERR(root);
901
902         RETURN(rc);
903 }
904
905 static void mdd_device_shutdown(const struct lu_env *env,
906                                 struct mdd_device *m)
907 {
908         dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
909         if (m->mdd_obd_dev)
910                 mdd_fini_obd(env, m);
911         orph_index_fini(env, m);
912 }
913
914 static int mdd_process_config(const struct lu_env *env,
915                               struct lu_device *d, struct lustre_cfg *cfg)
916 {
917         struct mdd_device *m    = lu2mdd_dev(d);
918         struct dt_device  *dt   = m->mdd_child;
919         struct lu_device  *next = &dt->dd_lu_dev;
920         int rc;
921         ENTRY;
922
923         switch (cfg->lcfg_command) {
924         case LCFG_SETUP:
925                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
926                 if (rc)
927                         GOTO(out, rc);
928                 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
929
930                 rc = mdd_init_obd(env, m, cfg);
931                 if (rc) {
932                         CERROR("lov init error %d \n", rc);
933                         GOTO(out, rc);
934                 }
935                 rc = mdd_mount(env, m);
936                 if (rc)
937                         GOTO(out, rc);
938                 rc = mdd_txn_init_credits(env, m);
939                 break;
940         case LCFG_CLEANUP:
941                 mdd_device_shutdown(env, m);
942         default:
943                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
944                 break;
945         }
946 out:
947         RETURN(rc);
948 }
949 #if 0
950 static int mdd_lov_set_nextid(const struct lu_env *env,
951                               struct mdd_device *mdd)
952 {
953         struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
954         int rc;
955         ENTRY;
956
957         LASSERT(mds->mds_lov_objids != NULL);
958         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
959                                 KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
960                                 mds->mds_lov_objids, NULL);
961
962         RETURN(rc);
963 }
964
965 static int mdd_cleanup_unlink_llog(const struct lu_env *env,
966                                    struct mdd_device *mdd)
967 {
968         /* XXX: to be implemented! */
969         return 0;
970 }
971 #endif
972
973 static int mdd_recovery_complete(const struct lu_env *env,
974                                  struct lu_device *d)
975 {
976         struct mdd_device *mdd = lu2mdd_dev(d);
977         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
978         struct obd_device *obd = mdd2obd_dev(mdd);
979         int rc;
980         ENTRY;
981
982         LASSERT(mdd != NULL);
983         LASSERT(obd != NULL);
984 #if 0
985         /* XXX: Do we need this in new stack? */
986         rc = mdd_lov_set_nextid(env, mdd);
987         if (rc) {
988                 CERROR("mdd_lov_set_nextid() failed %d\n",
989                        rc);
990                 RETURN(rc);
991         }
992
993         /* XXX: cleanup unlink. */
994         rc = mdd_cleanup_unlink_llog(env, mdd);
995         if (rc) {
996                 CERROR("mdd_cleanup_unlink_llog() failed %d\n",
997                        rc);
998                 RETURN(rc);
999         }
1000 #endif
1001         obd_notify(obd->u.mds.mds_osc_obd, NULL,
1002                    (obd->obd_async_recov ?
1003                     OBD_NOTIFY_SYNC_NONBLOCK :
1004                     OBD_NOTIFY_SYNC), NULL);
1005
1006         obd->obd_recovering = 0;
1007         obd->obd_type->typ_dt_ops->o_postrecov(obd);
1008
1009         /* XXX: orphans handling. */
1010         __mdd_orphan_cleanup(env, mdd);
1011         rc = next->ld_ops->ldo_recovery_complete(env, next);
1012
1013         RETURN(rc);
1014 }
1015
1016 struct lu_device_operations mdd_lu_ops = {
1017         .ldo_object_alloc      = mdd_object_alloc,
1018         .ldo_process_config    = mdd_process_config,
1019         .ldo_recovery_complete = mdd_recovery_complete
1020 };
1021
1022 void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
1023 {
1024         struct dt_object  *next = mdd_object_child(obj);
1025
1026         next->do_ops->do_write_lock(env, next);
1027 }
1028
1029 void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
1030 {
1031         struct dt_object  *next = mdd_object_child(obj);
1032
1033         next->do_ops->do_read_lock(env, next);
1034 }
1035
1036 void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
1037 {
1038         struct dt_object  *next = mdd_object_child(obj);
1039
1040         next->do_ops->do_write_unlock(env, next);
1041 }
1042
1043 void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
1044 {
1045         struct dt_object  *next = mdd_object_child(obj);
1046
1047         next->do_ops->do_read_unlock(env, next);
1048 }
1049
1050 static void mdd_lock2(const struct lu_env *env,
1051                       struct mdd_object *o0, struct mdd_object *o1)
1052 {
1053         mdd_write_lock(env, o0);
1054         mdd_write_lock(env, o1);
1055 }
1056
1057 static void mdd_unlock2(const struct lu_env *env,
1058                         struct mdd_object *o0, struct mdd_object *o1)
1059 {
1060         mdd_write_unlock(env, o1);
1061         mdd_write_unlock(env, o0);
1062 }
1063
1064 static struct thandle* mdd_trans_start(const struct lu_env *env,
1065                                        struct mdd_device *mdd)
1066 {
1067         struct txn_param *p = &mdd_env_info(env)->mti_param;
1068
1069         return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
1070 }
1071
1072 static void mdd_trans_stop(const struct lu_env *env,
1073                            struct mdd_device *mdd, int result,
1074                            struct thandle *handle)
1075 {
1076         handle->th_result = result;
1077         mdd_child_ops(mdd)->dt_trans_stop(env, handle);
1078 }
1079
1080 static int __mdd_object_create(const struct lu_env *env,
1081                                struct mdd_object *obj, struct md_attr *ma,
1082                                struct thandle *handle)
1083 {
1084         struct dt_object *next;
1085         struct lu_attr *attr = &ma->ma_attr;
1086         int rc;
1087         ENTRY;
1088
1089         if (!lu_object_exists(mdd2lu_obj(obj))) {
1090                 next = mdd_object_child(obj);
1091                 rc = next->do_ops->do_create(env, next, attr, handle);
1092         } else
1093                 rc = -EEXIST;
1094
1095         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
1096
1097         RETURN(rc);
1098 }
1099
1100 #ifdef CONFIG_FS_POSIX_ACL
1101 #include <linux/posix_acl_xattr.h>
1102 #include <linux/posix_acl.h>
1103
1104 /*
1105  * Modify the ACL for the chmod.
1106  */
1107 static int mdd_posix_acl_chmod_masq(posix_acl_xattr_entry *entry,
1108                                     __u32 mode, int count)
1109 {
1110         posix_acl_xattr_entry *group_obj = NULL, *mask_obj = NULL, *pa, *pe;
1111
1112         pa = &entry[0];
1113         pe = &entry[count - 1];
1114         for (; pa <= pe; pa++) {
1115                 switch(pa->e_tag) {
1116                         case ACL_USER_OBJ:
1117                                 pa->e_perm = (mode & S_IRWXU) >> 6;
1118                                 break;
1119
1120                         case ACL_USER:
1121                         case ACL_GROUP:
1122                                 break;
1123
1124                         case ACL_GROUP_OBJ:
1125                                 group_obj = pa;
1126                                 break;
1127
1128                         case ACL_MASK:
1129                                 mask_obj = pa;
1130                                 break;
1131
1132                         case ACL_OTHER:
1133                                 pa->e_perm = (mode & S_IRWXO);
1134                                 break;
1135
1136                         default:
1137                                 return -EIO;
1138                 }
1139         }
1140
1141         if (mask_obj) {
1142                 mask_obj->e_perm = (mode & S_IRWXG) >> 3;
1143         } else {
1144                 if (!group_obj)
1145                         return -EIO;
1146                 group_obj->e_perm = (mode & S_IRWXG) >> 3;
1147         }
1148
1149         return 0;
1150 }
1151
1152 static int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o,
1153                          __u32 mode, struct thandle *handle)
1154 {
1155         struct dt_object        *next;
1156         struct lu_buf           *buf;
1157         posix_acl_xattr_entry   *entry;
1158         int                      entry_count;
1159         int                      rc;
1160
1161         ENTRY;
1162
1163         next = mdd_object_child(o);
1164         buf = &mdd_env_info(env)->mti_buf;
1165         buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
1166         buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
1167         rc = next->do_ops->do_xattr_get(env, next, buf,
1168                                         XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1169         if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
1170                 RETURN(0);
1171         else if (rc <= 0)
1172                 RETURN(rc);
1173
1174         buf->lb_len = rc;
1175         entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
1176         entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
1177         if (entry_count <= 0)
1178                 RETURN(0);
1179        
1180         rc = mdd_posix_acl_chmod_masq(entry, mode, entry_count);
1181         if (rc)
1182                 RETURN(rc);
1183
1184         rc = next->do_ops->do_xattr_set(env, next, buf, XATTR_NAME_ACL_ACCESS,
1185                                         0, handle, BYPASS_CAPA);
1186         RETURN(rc);
1187 }
1188 #endif
1189
1190 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
1191                           const struct lu_attr *attr, struct thandle *handle,
1192                           const int needacl)
1193 {
1194         struct dt_object *next;
1195         int rc;
1196
1197         LASSERT(lu_object_exists(mdd2lu_obj(o)));
1198         next = mdd_object_child(o);
1199         rc = next->do_ops->do_attr_set(env, next, attr, handle,
1200                                        mdd_object_capa(env, o));
1201 #ifdef CONFIG_FS_POSIX_ACL
1202         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1203                 rc = mdd_acl_chmod(env, o, attr->la_mode, handle);
1204 #endif
1205         return rc;
1206 }
1207
1208 int mdd_attr_set_internal_locked(const struct lu_env *env,
1209                                  struct mdd_object *o,
1210                                  const struct lu_attr *attr,
1211                                  struct thandle *handle)
1212 {
1213         int rc;
1214         mdd_write_lock(env, o);
1215         rc = mdd_attr_set_internal(env, o, attr, handle, 1);
1216         mdd_write_unlock(env, o);
1217         return rc;
1218 }
1219
1220 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
1221                            const struct lu_buf *buf, const char *name,
1222                            int fl, struct thandle *handle)
1223 {
1224         struct dt_object *next;
1225         struct lustre_capa *capa = mdd_object_capa(env, o);
1226         int rc = 0;
1227         ENTRY;
1228
1229         LASSERT(lu_object_exists(mdd2lu_obj(o)));
1230         next = mdd_object_child(o);
1231         if (buf->lb_buf && buf->lb_len > 0) {
1232                 rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle,
1233                                                 capa);
1234         } else if (buf->lb_buf == NULL && buf->lb_len == 0) {
1235                 rc = next->do_ops->do_xattr_del(env, next, name, handle, capa);
1236         }
1237         RETURN(rc);
1238 }
1239
1240 /* this gives the same functionality as the code between
1241  * sys_chmod and inode_setattr
1242  * chown_common and inode_setattr
1243  * utimes and inode_setattr
1244  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1245  * and port to
1246  */
1247 int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1248                  struct lu_attr *la)
1249 {
1250         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1251         struct md_ucred  *uc         = md_ucred(env);
1252         time_t            now        = CURRENT_SECONDS;
1253         int               rc;
1254         ENTRY;
1255
1256         if (!la->la_valid)
1257                 RETURN(0);
1258
1259         /* Do not permit change file type */
1260         if (la->la_valid & LA_TYPE)
1261                 RETURN(-EPERM);
1262
1263         /* They should not be processed by setattr */
1264         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1265                 RETURN(-EPERM);
1266
1267         rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1268         if (rc)
1269                 RETURN(rc);
1270
1271         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
1272
1273                 /*
1274                  * If only change flags of the object, we should
1275                  * let it pass, but also need capability check
1276                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
1277                  * fix it, when implement capable in mds
1278                  */
1279                 if (la->la_valid & ~LA_FLAGS)
1280                         RETURN(-EPERM);
1281
1282                 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
1283                         RETURN(-EPERM);
1284
1285                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1286                     !mdd_capable(uc, CAP_FOWNER))
1287                         RETURN(-EPERM);
1288
1289                 /*
1290                  * According to Ext3 implementation on this, the
1291                  * Ctime will be changed, but not clear why?
1292                  */
1293                 la->la_ctime = now;
1294                 la->la_valid |= LA_CTIME;
1295                 RETURN(0);
1296         }
1297
1298         /* Check for setting the obj time. */
1299         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1300             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1301                 rc = __mdd_permission_internal(env, obj, MAY_WRITE, 0);
1302                 if (rc)
1303                         RETURN(rc);
1304         }
1305
1306         /* Make sure a caller can chmod. */
1307         if (la->la_valid & LA_MODE) {
1308                 /*
1309                  * Bypass la_vaild == LA_MODE,
1310                  * this is for changing file with SUID or SGID.
1311                  */
1312                 if ((la->la_valid & ~LA_MODE) &&
1313                     (uc->mu_fsuid != tmp_la->la_uid) &&
1314                     !mdd_capable(uc, CAP_FOWNER))
1315                         RETURN(-EPERM);
1316
1317                 if (la->la_mode == (umode_t) -1)
1318                         la->la_mode = tmp_la->la_mode;
1319                 else
1320                         la->la_mode = (la->la_mode & S_IALLUGO) |
1321                                       (tmp_la->la_mode & ~S_IALLUGO);
1322
1323                 /* Also check the setgid bit! */
1324                 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
1325                                 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
1326                         la->la_mode &= ~S_ISGID;
1327         } else {
1328                la->la_mode = tmp_la->la_mode;
1329         }
1330
1331         /* Make sure a caller can chown. */
1332         if (la->la_valid & LA_UID) {
1333                 if (la->la_uid == (uid_t) -1)
1334                         la->la_uid = tmp_la->la_uid;
1335                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1336                     (la->la_uid != tmp_la->la_uid)) &&
1337                     !mdd_capable(uc, CAP_CHOWN))
1338                         RETURN(-EPERM);
1339
1340                 /*
1341                  * If the user or group of a non-directory has been
1342                  * changed by a non-root user, remove the setuid bit.
1343                  * 19981026 David C Niemi <niemi@tux.org>
1344                  *
1345                  * Changed this to apply to all users, including root,
1346                  * to avoid some races. This is the behavior we had in
1347                  * 2.0. The check for non-root was definitely wrong
1348                  * for 2.2 anyway, as it should have been using
1349                  * CAP_FSETID rather than fsuid -- 19990830 SD.
1350                  */
1351                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1352                     !S_ISDIR(tmp_la->la_mode)) {
1353                         la->la_mode &= ~S_ISUID;
1354                         la->la_valid |= LA_MODE;
1355                 }
1356         }
1357
1358         /* Make sure caller can chgrp. */
1359         if (la->la_valid & LA_GID) {
1360                 if (la->la_gid == (gid_t) -1)
1361                         la->la_gid = tmp_la->la_gid;
1362                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1363                     ((la->la_gid != tmp_la->la_gid) &&
1364                     !mdd_in_group_p(uc, la->la_gid))) &&
1365                     !mdd_capable(uc, CAP_CHOWN))
1366                         RETURN(-EPERM);
1367
1368                 /*
1369                  * Likewise, if the user or group of a non-directory
1370                  * has been changed by a non-root user, remove the
1371                  * setgid bit UNLESS there is no group execute bit
1372                  * (this would be a file marked for mandatory
1373                  * locking).  19981026 David C Niemi <niemi@tux.org>
1374                  *
1375                  * Removed the fsuid check (see the comment above) --
1376                  * 19990830 SD.
1377                  */
1378                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1379                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1380                         la->la_mode &= ~S_ISGID;
1381                         la->la_valid |= LA_MODE;
1382                 }
1383         }
1384
1385         /* For tuncate (or setsize), we should have MAY_WRITE perm */
1386         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1387                 rc = mdd_permission_internal(env, obj, MAY_WRITE);
1388                 if (rc)
1389                         RETURN(rc);
1390
1391                 /*
1392                  * For the "Size-on-MDS" setattr update, merge coming
1393                  * attributes with the set in the inode. BUG 10641
1394                  */
1395                 if ((la->la_valid & LA_ATIME) &&
1396                     (la->la_atime < tmp_la->la_atime))
1397                         la->la_valid &= ~LA_ATIME;
1398
1399                 if ((la->la_valid & LA_CTIME) &&
1400                     (la->la_ctime < tmp_la->la_ctime))
1401                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1402
1403                 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
1404                         la->la_mtime = now;
1405                         la->la_valid |= LA_MTIME;
1406                 }
1407         }
1408
1409         /* For last, ctime must be fixed */
1410         if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
1411                 la->la_ctime = now;
1412                 la->la_valid |= LA_CTIME;
1413         }
1414
1415         RETURN(0);
1416 }
1417
1418 /* set attr and LOV EA at once, return updated attr */
1419 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1420                         const struct md_attr *ma)
1421 {
1422         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1423         struct mdd_device *mdd = mdo2mdd(obj);
1424         struct thandle *handle;
1425         struct lov_mds_md *lmm = NULL;
1426         int  rc = 0, lmm_size = 0, max_size = 0;
1427         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1428         ENTRY;
1429
1430         mdd_txn_param_build(env, MDD_TXN_ATTR_SET_OP);
1431         handle = mdd_trans_start(env, mdd);
1432         if (IS_ERR(handle))
1433                 RETURN(PTR_ERR(handle));
1434         /*TODO: add lock here*/
1435         /* start a log jounal handle if needed */
1436         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1437             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1438                 max_size = mdd_lov_mdsize(env, mdd);
1439                 OBD_ALLOC(lmm, max_size);
1440                 if (lmm == NULL)
1441                         GOTO(cleanup, rc = -ENOMEM);
1442
1443                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1444                                 MDS_LOV_MD_NAME);
1445
1446                 if (rc < 0)
1447                         GOTO(cleanup, rc);
1448         }
1449
1450         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
1451                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1452                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1453
1454         *la_copy = ma->ma_attr;
1455         mdd_write_lock(env, mdd_obj);
1456         rc = mdd_fix_attr(env, mdd_obj, la_copy);
1457         mdd_write_unlock(env, mdd_obj);
1458         if (rc)
1459                 GOTO(cleanup, rc);
1460
1461         if (la_copy->la_valid & LA_FLAGS) {
1462                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1463                                                   handle);
1464                 if (rc == 0)
1465                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1466         } else if (la_copy->la_valid) {            /* setattr */
1467                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1468                                                   handle);
1469                 /* journal chown/chgrp in llog, just like unlink */
1470                 if (rc == 0 && lmm_size){
1471                         /*TODO set_attr llog */
1472                 }
1473         }
1474
1475         if (rc == 0 && ma->ma_valid & MA_LOV) {
1476                 umode_t mode;
1477
1478                 mode = mdd_object_type(mdd_obj);
1479                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1480                         /*TODO check permission*/
1481                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1482                                             ma->ma_lmm_size, handle, 1);
1483                 }
1484
1485         }
1486 cleanup:
1487         mdd_trans_stop(env, mdd, rc, handle);
1488         if (rc == 0 && lmm_size) {
1489                 /*set obd attr, if needed*/
1490                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
1491         }
1492         if (lmm != NULL) {
1493                 OBD_FREE(lmm, max_size);
1494         }
1495
1496         RETURN(rc);
1497 }
1498
1499 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1500                       const struct lu_buf *buf, const char *name, int fl,
1501                       struct thandle *handle)
1502 {
1503         int  rc;
1504         ENTRY;
1505
1506         mdd_write_lock(env, obj);
1507         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1508         mdd_write_unlock(env, obj);
1509
1510         RETURN(rc);
1511 }
1512
1513 static int mdd_xattr_sanity_check(const struct lu_env *env,
1514                                   struct mdd_object *obj)
1515 {
1516         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1517         struct md_ucred *uc     = md_ucred(env);
1518         int rc;
1519         ENTRY;
1520
1521         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1522                 RETURN(-EPERM);
1523
1524         mdd_read_lock(env, obj);
1525         rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1526         mdd_read_unlock(env, obj);
1527         if (rc)
1528                 RETURN(rc);
1529
1530         if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
1531                 RETURN(-EPERM);
1532
1533         RETURN(rc);
1534 }
1535
1536 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1537                          const struct lu_buf *buf, const char *name, int fl)
1538 {
1539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1540         struct mdd_device *mdd = mdo2mdd(obj);
1541         struct thandle *handle;
1542         int  rc;
1543         ENTRY;
1544
1545         rc = mdd_xattr_sanity_check(env, mdd_obj);
1546         if (rc)
1547                 RETURN(rc);
1548
1549         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1550         handle = mdd_trans_start(env, mdd);
1551         if (IS_ERR(handle))
1552                 RETURN(PTR_ERR(handle));
1553
1554         rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, name,
1555                                fl, handle);
1556 #ifdef HAVE_SPLIT_SUPPORT
1557         if (rc == 0) {
1558                 /*
1559                  * XXX: Very ugly hack, if setting lmv, it means splitting
1560                  * sucess, we should return -ERESTART to notify the client, so
1561                  * transno for this splitting should be zero according to the
1562                  * replay rules. so return -ERESTART here let mdt trans stop
1563                  * callback know this.
1564                  */
1565                  if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
1566                          rc = -ERESTART;
1567         }
1568 #endif
1569         mdd_trans_stop(env, mdd, rc, handle);
1570
1571         RETURN(rc);
1572 }
1573
1574 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
1575                            struct mdd_object *obj,
1576                            const char *name, struct thandle *handle)
1577 {
1578         struct dt_object *next;
1579
1580         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
1581         next = mdd_object_child(obj);
1582         return next->do_ops->do_xattr_del(env, next, name, handle,
1583                                           mdd_object_capa(env, obj));
1584 }
1585
1586 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1587                   const char *name)
1588 {
1589         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590         struct mdd_device *mdd = mdo2mdd(obj);
1591         struct thandle *handle;
1592         int  rc;
1593         ENTRY;
1594
1595         rc = mdd_xattr_sanity_check(env, mdd_obj);
1596         if (rc)
1597                 RETURN(rc);
1598
1599         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
1600         handle = mdd_trans_start(env, mdd);
1601         if (IS_ERR(handle))
1602                 RETURN(PTR_ERR(handle));
1603
1604         mdd_write_lock(env, mdd_obj);
1605         rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
1606         mdd_write_unlock(env, mdd_obj);
1607
1608         mdd_trans_stop(env, mdd, rc, handle);
1609
1610         RETURN(rc);
1611 }
1612
1613 static int __mdd_index_insert_only(const struct lu_env *env,
1614                                    struct mdd_object *pobj,
1615                                    const struct lu_fid *lf,
1616                                    const char *name, struct thandle *th,
1617                                    struct lustre_capa *capa)
1618 {
1619         int rc;
1620         struct dt_object *next = mdd_object_child(pobj);
1621         ENTRY;
1622
1623         if (dt_try_as_dir(env, next))
1624                 rc = next->do_index_ops->dio_insert(env, next,
1625                                          (struct dt_rec *)lf,
1626                                          (struct dt_key *)name, th, capa);
1627         else
1628                 rc = -ENOTDIR;
1629         RETURN(rc);
1630 }
1631
1632 /* insert new index, add reference if isdir, update times */
1633 static int __mdd_index_insert(const struct lu_env *env,
1634                              struct mdd_object *pobj, const struct lu_fid *lf,
1635                              const char *name, int isdir, struct thandle *th,
1636                              struct lustre_capa *capa)
1637 {
1638         int rc;
1639         struct dt_object *next = mdd_object_child(pobj);
1640         ENTRY;
1641
1642 #if 0
1643         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
1644 #endif
1645
1646         if (dt_try_as_dir(env, next))
1647                 rc = next->do_index_ops->dio_insert(env, next,
1648                                                     (struct dt_rec *)lf,
1649                                                     (struct dt_key *)name,
1650                                                     th, capa);
1651         else
1652                 rc = -ENOTDIR;
1653
1654         if (rc == 0) {
1655                 if (isdir)
1656                         __mdd_ref_add(env, pobj, th);
1657 #if 0
1658                 la->la_valid = LA_MTIME|LA_CTIME;
1659                 la->la_atime = ma->ma_attr.la_atime;
1660                 la->la_ctime = ma->ma_attr.la_ctime;
1661                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
1662 #endif
1663         }
1664         return rc;
1665 }
1666
1667 static int __mdd_index_delete(const struct lu_env *env,
1668                               struct mdd_object *pobj, const char *name,
1669                               int is_dir, struct thandle *handle,
1670                               struct lustre_capa *capa)
1671 {
1672         int rc;
1673         struct dt_object *next = mdd_object_child(pobj);
1674         ENTRY;
1675
1676         if (dt_try_as_dir(env, next)) {
1677                 rc = next->do_index_ops->dio_delete(env, next,
1678                                                     (struct dt_key *)name,
1679                                                     handle, capa);
1680                 if (rc == 0 && is_dir)
1681                         __mdd_ref_del(env, pobj, handle);
1682         } else
1683                 rc = -ENOTDIR;
1684         RETURN(rc);
1685 }
1686
1687 static int mdd_link_sanity_check(const struct lu_env *env,
1688                                  struct mdd_object *tgt_obj,
1689                                  struct mdd_object *src_obj)
1690 {
1691         int rc = 0;
1692         ENTRY;
1693
1694         if (tgt_obj) {
1695                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
1696                 if (rc)
1697                         RETURN(rc);
1698         }
1699
1700         if (S_ISDIR(mdd_object_type(src_obj)))
1701                 RETURN(-EPERM);
1702
1703         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
1704                 RETURN(-EPERM);
1705
1706         RETURN(rc);
1707 }
1708
1709 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1710                     struct md_object *src_obj, const char *name,
1711                     struct md_attr *ma)
1712 {
1713         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1714         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1715         struct mdd_device *mdd = mdo2mdd(src_obj);
1716         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1717         struct thandle *handle;
1718         int rc;
1719         ENTRY;
1720
1721         mdd_txn_param_build(env, MDD_TXN_LINK_OP);
1722         handle = mdd_trans_start(env, mdd);
1723         if (IS_ERR(handle))
1724                 RETURN(PTR_ERR(handle));
1725
1726         mdd_lock2(env, mdd_tobj, mdd_sobj);
1727
1728         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
1729         if (rc)
1730                 GOTO(out, rc);
1731
1732         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1733                                      name, handle,
1734                                      mdd_object_capa(env, mdd_tobj));
1735         if (rc == 0)
1736                 __mdd_ref_add(env, mdd_sobj, handle);
1737
1738         *la_copy = ma->ma_attr;
1739         la_copy->la_valid = LA_CTIME;
1740         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
1741         if (rc)
1742                 GOTO(out, rc);
1743
1744         la_copy->la_valid = LA_CTIME | LA_MTIME;
1745         rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1746
1747 out:
1748         mdd_unlock2(env, mdd_tobj, mdd_sobj);
1749         mdd_trans_stop(env, mdd, rc, handle);
1750         RETURN(rc);
1751 }
1752
1753 /*
1754  * Check that @dir contains no entries except (possibly) dot and dotdot.
1755  *
1756  * Returns:
1757  *
1758  *             0        empty
1759  *    -ENOTEMPTY        not empty
1760  *           -ve        other error
1761  *
1762  */
1763 static int mdd_dir_is_empty(const struct lu_env *env,
1764                             struct mdd_object *dir)
1765 {
1766         struct dt_it     *it;
1767         struct dt_object *obj;
1768         struct dt_it_ops *iops;
1769         int result;
1770         ENTRY;
1771
1772         obj = mdd_object_child(dir);
1773         iops = &obj->do_index_ops->dio_it;
1774         it = iops->init(env, obj, 0);
1775         if (it != NULL) {
1776                 result = iops->get(env, it, (const void *)"");
1777                 if (result > 0) {
1778                         int i;
1779                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
1780                                 result = iops->next(env, it);
1781                         if (result == 0)
1782                                 result = -ENOTEMPTY;
1783                         else if (result == +1)
1784                                 result = 0;
1785                 } else if (result == 0)
1786                         /*
1787                          * Huh? Index contains no zero key?
1788                          */
1789                         result = -EIO;
1790
1791                 iops->put(env, it);
1792                 iops->fini(env, it);
1793         } else
1794                 result = -ENOMEM;
1795         RETURN(result);
1796 }
1797
1798 /* return md_attr back,
1799  * if it is last unlink then return lov ea + llog cookie*/
1800 int __mdd_object_kill(const struct lu_env *env,
1801                       struct mdd_object *obj,
1802                       struct md_attr *ma)
1803 {
1804         int rc = 0;
1805         ENTRY;
1806
1807         mdd_set_dead_obj(obj);
1808         if (S_ISREG(mdd_object_type(obj))) {
1809                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1810                  * Caller must be ready for that. */
1811                 rc = __mdd_lmm_get(env, obj, ma);
1812                 if ((ma->ma_valid & MA_LOV))
1813                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1814                                             obj, ma);
1815         }
1816         RETURN(rc);
1817 }
1818
1819 /* caller should take a lock before calling */
1820 static int __mdd_finish_unlink(const struct lu_env *env,
1821                                struct mdd_object *obj, struct md_attr *ma,
1822                                struct thandle *th)
1823 {
1824         int rc;
1825         ENTRY;
1826
1827         rc = __mdd_iattr_get(env, obj, ma);
1828         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1829                 /* add new orphan and the object
1830                  * will be deleted during the object_put() */
1831                 if (__mdd_orphan_add(env, obj, th) == 0)
1832                         set_bit(LU_OBJECT_ORPHAN,
1833                                 &mdd2lu_obj(obj)->lo_header->loh_flags);
1834
1835                 if (obj->mod_count == 0)
1836                         rc = __mdd_object_kill(env, obj, ma);
1837         }
1838         RETURN(rc);
1839 }
1840
1841 static int mdd_unlink_sanity_check(const struct lu_env *env,
1842                                    struct mdd_object *pobj,
1843                                    struct mdd_object *cobj,
1844                                    struct md_attr *ma)
1845 {
1846         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
1847         int rc = 0;
1848         ENTRY;
1849
1850         rc = mdd_may_delete(env, pobj, cobj,
1851                             S_ISDIR(ma->ma_attr.la_mode), 1);
1852         if (rc)
1853                 RETURN(rc);
1854
1855         if (S_ISDIR(mdd_object_type(cobj))) {
1856                 if (dt_try_as_dir(env, dt_cobj))
1857                         rc = mdd_dir_is_empty(env, cobj);
1858                 else
1859                         rc = -ENOTDIR;
1860         }
1861
1862         RETURN(rc);
1863 }
1864
1865 static int mdd_unlink(const struct lu_env *env,
1866                       struct md_object *pobj, struct md_object *cobj,
1867                       const char *name, struct md_attr *ma)
1868 {
1869         struct mdd_device *mdd = mdo2mdd(pobj);
1870         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1871         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
1872         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1873         struct thandle    *handle;
1874         int rc, is_dir;
1875         ENTRY;
1876
1877         mdd_txn_param_build(env, MDD_TXN_UNLINK_OP);
1878         handle = mdd_trans_start(env, mdd);
1879         if (IS_ERR(handle))
1880                 RETURN(PTR_ERR(handle));
1881
1882         mdd_lock2(env, mdd_pobj, mdd_cobj);
1883
1884         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
1885         if (rc)
1886                 GOTO(cleanup, rc);
1887
1888         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
1889         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1890                                 mdd_object_capa(env, mdd_pobj));
1891         if (rc)
1892                 GOTO(cleanup, rc);
1893
1894         __mdd_ref_del(env, mdd_cobj, handle);
1895         *la_copy = ma->ma_attr;
1896         if (is_dir) {
1897                 /* unlink dot */
1898                 __mdd_ref_del(env, mdd_cobj, handle);
1899         } else {
1900                 la_copy->la_valid = LA_CTIME;
1901                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
1902                 if (rc)
1903                         GOTO(cleanup, rc);
1904         }
1905
1906         la_copy->la_valid = LA_CTIME | LA_MTIME;
1907         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
1908         if (rc)
1909                 GOTO(cleanup, rc);
1910
1911         rc = __mdd_finish_unlink(env, mdd_cobj, ma, handle);
1912
1913         if (rc == 0)
1914                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
1915                                    strlen("unlinked"), "unlinked", 0,
1916                                    NULL, NULL);
1917
1918 cleanup:
1919         mdd_unlock2(env, mdd_pobj, mdd_cobj);
1920         mdd_trans_stop(env, mdd, rc, handle);
1921         RETURN(rc);
1922 }
1923
1924 /* partial unlink */
1925 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1926                        struct md_attr *ma)
1927 {
1928         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1929         struct mdd_device *mdd = mdo2mdd(obj);
1930         struct thandle *handle;
1931         int rc;
1932         ENTRY;
1933
1934         mdd_txn_param_build(env, MDD_TXN_UNLINK_OP);
1935         handle = mdd_trans_start(env, mdd);
1936         if (IS_ERR(handle))
1937                 RETURN(-ENOMEM);
1938
1939         mdd_write_lock(env, mdd_obj);
1940
1941         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1942         if (rc)
1943                 GOTO(cleanup, rc);
1944
1945         __mdd_ref_del(env, mdd_obj, handle);
1946
1947         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1948                 /* unlink dot */
1949                 __mdd_ref_del(env, mdd_obj, handle);
1950         }
1951
1952         rc = __mdd_finish_unlink(env, mdd_obj, ma, handle);
1953
1954         EXIT;
1955 cleanup:
1956         mdd_write_unlock(env, mdd_obj);
1957         mdd_trans_stop(env, mdd, rc, handle);
1958         return rc;
1959 }
1960
1961 static int mdd_parent_fid(const struct lu_env *env,
1962                           struct mdd_object *obj,
1963                           struct lu_fid *fid)
1964 {
1965         return __mdd_lookup_locked(env, &obj->mod_obj,
1966                                    dotdot, fid, 0);
1967 }
1968
1969 /*
1970  * return 1: if lf is the fid of the ancestor of p1;
1971  * return 0: if not;
1972  *
1973  * return -EREMOTE: if remote object is found, in this
1974  * case fid of remote object is saved to @pf;
1975  *
1976  * otherwise: values < 0, errors.
1977  */
1978 static int mdd_is_parent(const struct lu_env *env,
1979                          struct mdd_device *mdd,
1980                          struct mdd_object *p1,
1981                          const struct lu_fid *lf,
1982                          struct lu_fid *pf)
1983 {
1984         struct mdd_object *parent = NULL;
1985         struct lu_fid *pfid;
1986         int rc;
1987         ENTRY;
1988
1989         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
1990         pfid = &mdd_env_info(env)->mti_fid;
1991
1992         /* Do not lookup ".." in root, they do not exist there. */
1993         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
1994                 RETURN(0);
1995
1996         for(;;) {
1997                 rc = mdd_parent_fid(env, p1, pfid);
1998                 if (rc)
1999                         GOTO(out, rc);
2000                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
2001                         GOTO(out, rc = 0);
2002                 if (lu_fid_eq(pfid, lf))
2003                         GOTO(out, rc = 1);
2004                 if (parent)
2005                         mdd_object_put(env, parent);
2006                 parent = mdd_object_find(env, mdd, pfid);
2007
2008                 /* cross-ref parent */
2009                 if (parent == NULL) {
2010                         if (pf != NULL)
2011                                 *pf = *pfid;
2012                         GOTO(out, rc = EREMOTE);
2013                 } else if (IS_ERR(parent))
2014                         GOTO(out, rc = PTR_ERR(parent));
2015                 p1 = parent;
2016         }
2017         EXIT;
2018 out:
2019         if (parent && !IS_ERR(parent))
2020                 mdd_object_put(env, parent);
2021         return rc;
2022 }
2023
2024 static int mdd_rename_lock(const struct lu_env *env,
2025                            struct mdd_device *mdd,
2026                            struct mdd_object *src_pobj,
2027                            struct mdd_object *tgt_pobj)
2028 {
2029         int rc;
2030         ENTRY;
2031
2032         if (src_pobj == tgt_pobj) {
2033                 mdd_write_lock(env, src_pobj);
2034                 RETURN(0);
2035         }
2036
2037         /* compared the parent child relationship of src_p&tgt_p */
2038         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2039                 mdd_lock2(env, src_pobj, tgt_pobj);
2040                 RETURN(0);
2041         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2042                 mdd_lock2(env, tgt_pobj, src_pobj);
2043                 RETURN(0);
2044         }
2045
2046         rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2047         if (rc < 0)
2048                 RETURN(rc);
2049
2050         if (rc == 1) {
2051                 mdd_lock2(env, tgt_pobj, src_pobj);
2052                 RETURN(0);
2053         }
2054
2055         mdd_lock2(env, src_pobj, tgt_pobj);
2056         RETURN(0);
2057 }
2058
2059 static void mdd_rename_unlock(const struct lu_env *env,
2060                               struct mdd_object *src_pobj,
2061                               struct mdd_object *tgt_pobj)
2062 {
2063         mdd_write_unlock(env, src_pobj);
2064         if (src_pobj != tgt_pobj)
2065                 mdd_write_unlock(env, tgt_pobj);
2066 }
2067
2068 static int mdd_rename_sanity_check(const struct lu_env *env,
2069                                    struct mdd_object *src_pobj,
2070                                    struct mdd_object *tgt_pobj,
2071                                    const struct lu_fid *sfid,
2072                                    int src_is_dir,
2073                                    struct mdd_object *tobj)
2074 {
2075         int rc;
2076         ENTRY;
2077
2078         if (mdd_is_dead_obj(src_pobj))
2079                 RETURN(-ENOENT);
2080
2081         /* The sobj maybe on the remote, check parent permission only here */
2082         rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
2083         if (rc)
2084                 RETURN(rc);
2085
2086         if (!tobj) {
2087                 rc = mdd_may_create(env, tgt_pobj, NULL,
2088                                     (src_pobj != tgt_pobj));
2089         } else {
2090                 mdd_read_lock(env, tobj);
2091                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
2092                                     (src_pobj != tgt_pobj));
2093                 if (rc == 0)
2094                         if (S_ISDIR(mdd_object_type(tobj))
2095                             && mdd_dir_is_empty(env, tobj))
2096                                 rc = -ENOTEMPTY;
2097                 mdd_read_unlock(env, tobj);
2098         }
2099
2100         RETURN(rc);
2101 }
2102 /* src object can be remote that is why we use only fid and type of object */
2103 static int mdd_rename(const struct lu_env *env,
2104                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2105                       const struct lu_fid *lf, const char *sname,
2106                       struct md_object *tobj, const char *tname,
2107                       struct md_attr *ma)
2108 {
2109         struct mdd_device *mdd = mdo2mdd(src_pobj);
2110         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
2111         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2112         struct mdd_object *mdd_sobj = NULL;
2113         struct mdd_object *mdd_tobj = NULL;
2114         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2115         struct thandle *handle;
2116         int is_dir;
2117         int rc;
2118         ENTRY;
2119
2120         LASSERT(ma->ma_attr.la_mode & S_IFMT);
2121         is_dir = S_ISDIR(ma->ma_attr.la_mode);
2122         if (ma->ma_attr.la_valid & LA_FLAGS &&
2123             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
2124                 RETURN(-EPERM);
2125
2126         if (tobj)
2127                 mdd_tobj = md2mdd_obj(tobj);
2128
2129         mdd_txn_param_build(env, MDD_TXN_RENAME_OP);
2130         handle = mdd_trans_start(env, mdd);
2131         if (IS_ERR(handle))
2132                 RETURN(PTR_ERR(handle));
2133
2134         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2135         rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
2136         if (rc)
2137                 GOTO(cleanup_unlocked, rc);
2138
2139         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2140                                      lf, is_dir, mdd_tobj);
2141         if (rc)
2142                 GOTO(cleanup, rc);
2143
2144         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2145                                 mdd_object_capa(env, mdd_spobj));
2146         if (rc)
2147                 GOTO(cleanup, rc);
2148
2149         /*
2150          * Here tobj can be remote one, so we do index_delete unconditionally
2151          * and -ENOENT is allowed.
2152          */
2153         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2154                                 mdd_object_capa(env, mdd_tpobj));
2155         if (rc != 0 && rc != -ENOENT)
2156                 GOTO(cleanup, rc);
2157
2158         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2159                                 mdd_object_capa(env, mdd_tpobj));
2160         if (rc)
2161                 GOTO(cleanup, rc);
2162
2163         mdd_sobj = mdd_object_find(env, mdd, lf);
2164         *la_copy = ma->ma_attr;
2165         la_copy->la_valid = LA_CTIME;
2166         if (mdd_sobj) {
2167                 /*XXX: how to update ctime for remote sobj? */
2168                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
2169                 if (rc)
2170                         GOTO(cleanup, rc);
2171         }
2172         if (tobj && lu_object_exists(&tobj->mo_lu)) {
2173                 mdd_write_lock(env, mdd_tobj);
2174                 __mdd_ref_del(env, mdd_tobj, handle);
2175                 /* remove dot reference */
2176                 if (is_dir)
2177                         __mdd_ref_del(env, mdd_tobj, handle);
2178
2179                 la_copy->la_valid = LA_CTIME;
2180                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
2181                 if (rc)
2182                         GOTO(cleanup, rc);
2183
2184                 rc = __mdd_finish_unlink(env, mdd_tobj, ma, handle);
2185                 mdd_write_unlock(env, mdd_tobj);
2186                 if (rc)
2187                         GOTO(cleanup, rc);
2188         }
2189
2190         la_copy->la_valid = LA_CTIME | LA_MTIME;
2191         rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
2192         if (rc)
2193                 GOTO(cleanup, rc);
2194
2195         if (mdd_spobj != mdd_tpobj) {
2196                 la_copy->la_valid = LA_CTIME | LA_MTIME;
2197                 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
2198         }
2199
2200 cleanup:
2201         mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
2202 cleanup_unlocked:
2203         mdd_trans_stop(env, mdd, rc, handle);
2204         if (mdd_sobj)
2205                 mdd_object_put(env, mdd_sobj);
2206         RETURN(rc);
2207 }
2208
2209 static int
2210 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
2211              const char *name, const struct lu_fid* fid, int mask)
2212 {
2213         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
2214         struct dt_object    *dir = mdd_object_child(mdd_obj);
2215         struct dt_rec       *rec = (struct dt_rec *)fid;
2216         const struct dt_key *key = (const struct dt_key *)name;
2217         int rc;
2218         ENTRY;
2219
2220         if (mdd_is_dead_obj(mdd_obj))
2221                 RETURN(-ESTALE);
2222
2223         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
2224         if (rc == 0)
2225                 RETURN(-ESTALE);
2226         else if (rc < 0) {
2227                 CERROR("Object "DFID" locates on remote server\n",
2228                         PFID(mdo2fid(mdd_obj)));
2229                 LBUG();
2230         }
2231
2232 #if 0
2233         if (mask == MAY_EXEC)
2234                 rc = mdd_exec_permission_lite(env, mdd_obj);
2235         else
2236 #endif
2237         rc = mdd_permission_internal(env, mdd_obj, mask);
2238         if (rc)
2239                 RETURN(rc);
2240
2241         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
2242                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
2243                                                    mdd_object_capa(env, mdd_obj));
2244         else
2245                 rc = -ENOTDIR;
2246
2247         RETURN(rc);
2248 }
2249
2250 static int
2251 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
2252                     const char *name, const struct lu_fid* fid, int mask)
2253 {
2254         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2255         int rc;
2256
2257         mdd_read_lock(env, mdd_obj);
2258         rc = __mdd_lookup(env, pobj, name, fid, mask);
2259         mdd_read_unlock(env, mdd_obj);
2260
2261        return rc;
2262 }
2263
2264 static int mdd_lookup(const struct lu_env *env,
2265                       struct md_object *pobj, const char *name,
2266                       struct lu_fid* fid)
2267 {
2268         int rc;
2269         ENTRY;
2270         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
2271         RETURN(rc);
2272 }
2273
2274 /*
2275  * No permission check is needed.
2276  *
2277  * returns 1: if fid is ancestor of @mo;
2278  * returns 0: if fid is not a ancestor of @mo;
2279  *
2280  * returns EREMOTE if remote object is found, fid of remote object is saved to
2281  * @fid;
2282  *
2283  * returns < 0: if error
2284  */
2285 static int mdd_is_subdir(const struct lu_env *env,
2286                          struct md_object *mo, const struct lu_fid *fid,
2287                          struct lu_fid *sfid)
2288 {
2289         struct mdd_device *mdd = mdo2mdd(mo);
2290         int rc;
2291         ENTRY;
2292
2293         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
2294                 RETURN(0);
2295
2296         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
2297
2298         RETURN(rc);
2299 }
2300
2301 static int __mdd_object_initialize(const struct lu_env *env,
2302                                    const struct lu_fid *pfid,
2303                                    struct mdd_object *child,
2304                                    struct md_attr *ma, struct thandle *handle)
2305 {
2306         int rc;
2307         ENTRY;
2308
2309         /* update attributes for child.
2310          * FIXME:
2311          *  (1) the valid bits should be converted between Lustre and Linux;
2312          *  (2) maybe, the child attributes should be set in OSD when creation.
2313          */
2314
2315         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
2316         if (rc != 0)
2317                 RETURN(rc);
2318
2319         if (S_ISDIR(ma->ma_attr.la_mode)) {
2320                 /* add . and .. for newly created dir */
2321                 __mdd_ref_add(env, child, handle);
2322                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
2323                                              dot, handle, BYPASS_CAPA);
2324                 if (rc == 0) {
2325                         rc = __mdd_index_insert_only(env, child, pfid,
2326                                                      dotdot, handle,
2327                                                      BYPASS_CAPA);
2328                         if (rc != 0) {
2329                                 int rc2;
2330
2331                                 rc2 = __mdd_index_delete(env, child, dot, 0,
2332                                                          handle, BYPASS_CAPA);
2333                                 if (rc2 != 0)
2334                                         CERROR("Failure to cleanup after dotdot"
2335                                                " creation: %d (%d)\n", rc2, rc);
2336                                 else
2337                                         __mdd_ref_del(env, child, handle);
2338                         }
2339                 }
2340         }
2341         RETURN(rc);
2342 }
2343
2344 /*
2345  * The permission has been checked when obj created,
2346  * no need check again.
2347  */
2348 static int mdd_cd_sanity_check(const struct lu_env *env,
2349                                struct mdd_object *obj)
2350 {
2351         int rc = 0;
2352         ENTRY;
2353
2354         /* EEXIST check */
2355         if (!obj || mdd_is_dead_obj(obj))
2356                 RETURN(-ENOENT);
2357
2358 #if 0
2359         mdd_read_lock(env, obj);
2360         rc = mdd_permission_internal(env, obj, MAY_WRITE);
2361         mdd_read_unlock(env, obj);
2362 #endif
2363
2364         RETURN(rc);
2365
2366 }
2367
2368 static int mdd_create_data(const struct lu_env *env,
2369                            struct md_object *pobj, struct md_object *cobj,
2370                            const struct md_create_spec *spec,
2371                            struct md_attr *ma)
2372 {
2373         struct mdd_device *mdd = mdo2mdd(cobj);
2374         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
2375         struct mdd_object *son = md2mdd_obj(cobj);
2376         struct lu_attr    *attr = &ma->ma_attr;
2377         struct lov_mds_md *lmm = NULL;
2378         int                lmm_size = 0;
2379         struct thandle    *handle;
2380         int                rc;
2381         ENTRY;
2382
2383         rc = mdd_cd_sanity_check(env, son);
2384         if (rc)
2385                 RETURN(rc);
2386
2387         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
2388                         !(spec->sp_cr_flags & FMODE_WRITE))
2389                 RETURN(0);
2390         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
2391                             attr);
2392         if (rc)
2393                 RETURN(rc);
2394
2395         mdd_txn_param_build(env, MDD_TXN_CREATE_DATA_OP);
2396         handle = mdd_trans_start(env, mdd);
2397         if (IS_ERR(handle))
2398                 RETURN(rc = PTR_ERR(handle));
2399
2400         /*
2401          * XXX: Setting the lov ea is not locked but setting the attr is locked?
2402          */
2403
2404         /* Replay creates has objects already */
2405         if (spec->u.sp_ea.no_lov_create) {
2406                 CDEBUG(D_INFO, "we already have lov ea\n");
2407                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2408                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2409                                     spec->u.sp_ea.eadatalen, handle, 0);
2410         } else
2411                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2412                                     lmm_size, handle, 0);
2413
2414         if (rc == 0)
2415                rc = mdd_attr_get_internal_locked(env, son, ma);
2416
2417         /* Finish mdd_lov_create() stuff. */
2418         mdd_lov_create_finish(env, mdd, rc);
2419         mdd_trans_stop(env, mdd, rc, handle);
2420         if (lmm)
2421                 OBD_FREE(lmm, lmm_size);
2422         RETURN(rc);
2423 }
2424
2425 #ifdef CONFIG_FS_POSIX_ACL
2426 /*
2427  * Modify acl when creating a new obj.
2428  *
2429  * mode_p initially must contain the mode parameter to the open() / creat()
2430  * system calls. All permissions that are not granted by the acl are removed.
2431  * The permissions in the acl are changed to reflect the mode_p parameter.
2432  */
2433 static int mdd_posix_acl_create_masq(posix_acl_xattr_entry *entry,
2434                                      __u32 *mode_p, int count)
2435 {
2436         posix_acl_xattr_entry *group_obj = NULL, *mask_obj = NULL, *pa, *pe;
2437         __u32 mode = *mode_p;
2438         int not_equiv = 0;
2439
2440         pa = &entry[0];
2441         pe = &entry[count - 1];
2442         for (; pa <= pe; pa++) {
2443                 switch(pa->e_tag) {
2444                         case ACL_USER_OBJ:
2445                                 pa->e_perm &= (mode >> 6) | ~S_IRWXO;
2446                                 mode &= (pa->e_perm << 6) | ~S_IRWXU;
2447                                 break;
2448
2449                         case ACL_USER:
2450                         case ACL_GROUP:
2451                                 not_equiv = 1;
2452                                 break;
2453
2454                         case ACL_GROUP_OBJ:
2455                                 group_obj = pa;
2456                                 break;
2457
2458                         case ACL_OTHER:
2459                                 pa->e_perm &= mode | ~S_IRWXO;
2460                                 mode &= pa->e_perm | ~S_IRWXO;
2461                                 break;
2462
2463                         case ACL_MASK:
2464                                 mask_obj = pa;
2465                                 not_equiv = 1;
2466                                 break;
2467
2468                         default:
2469                                 return -EIO;
2470                 }
2471         }
2472
2473         if (mask_obj) {
2474                 mask_obj->e_perm &= (mode >> 3) | ~S_IRWXO;
2475                 mode &= (mask_obj->e_perm << 3) | ~S_IRWXG;
2476         } else {
2477                 if (!group_obj)
2478                         return -EIO;
2479                 group_obj->e_perm &= (mode >> 3) | ~S_IRWXO;
2480                 mode &= (group_obj->e_perm << 3) | ~S_IRWXG;
2481         }
2482
2483         *mode_p = (*mode_p & ~S_IRWXUGO) | mode;
2484         return not_equiv;
2485 }
2486
2487 static int __mdd_acl_init(const struct lu_env *env, struct mdd_object *obj,
2488                           struct lu_buf *buf, __u32 *mode,
2489                           struct thandle *handle)
2490 {
2491         struct dt_object        *next;
2492         posix_acl_xattr_entry   *entry;
2493         int                      entry_count;
2494         int                      rc;
2495
2496         ENTRY;
2497
2498         entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
2499         entry_count = (buf->lb_len - 4) / sizeof(posix_acl_xattr_entry);
2500         if (entry_count <= 0)
2501                 RETURN(0);
2502        
2503         next = mdd_object_child(obj);
2504         if (S_ISDIR(*mode)) {
2505                 rc = next->do_ops->do_xattr_set(env, next, buf,
2506                                                 XATTR_NAME_ACL_DEFAULT,
2507                                                 0, handle, BYPASS_CAPA);
2508                 if (rc)
2509                         RETURN(rc);
2510         }
2511
2512         rc = mdd_posix_acl_create_masq(entry, mode, entry_count);
2513         if (rc < 0)
2514                 RETURN(rc);
2515         else if (rc > 0)
2516                 rc = next->do_ops->do_xattr_set(env, next, buf,
2517                                                 XATTR_NAME_ACL_ACCESS,
2518                                                 0, handle, BYPASS_CAPA);
2519         RETURN(rc);
2520 }
2521
2522 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
2523                         struct mdd_object *cobj, __u32 *mode,
2524                         struct thandle *handle)
2525 {
2526         struct dt_object        *next = mdd_object_child(pobj);
2527         struct lu_buf           *buf = &mdd_env_info(env)->mti_buf;
2528         int                      rc;
2529
2530         ENTRY;
2531
2532         if (S_ISLNK(*mode))
2533                 RETURN(0);
2534
2535         buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
2536         buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
2537         rc = next->do_ops->do_xattr_get(env, next, buf,
2538                                         XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
2539         if ((rc == -EOPNOTSUPP) || (rc == -ENODATA))
2540                 RETURN(0);
2541         else if (rc <= 0)
2542                 RETURN(rc);
2543
2544         buf->lb_len = rc;
2545         rc = __mdd_acl_init(env, cobj, buf, mode, handle);
2546         RETURN(rc);
2547 }
2548 #endif
2549
2550 static int mdd_create_sanity_check(const struct lu_env *env,
2551                                    struct md_object *pobj,
2552                                    const char *name, struct md_attr *ma)
2553 {
2554         struct mdd_thread_info *info = mdd_env_info(env);
2555         struct lu_attr    *la        = &info->mti_la;
2556         struct lu_fid     *fid       = &info->mti_fid;
2557         struct mdd_object *obj       = md2mdd_obj(pobj);
2558         int rc;
2559         ENTRY;
2560
2561         /* EEXIST check */
2562         if (mdd_is_dead_obj(obj))
2563                 RETURN(-ENOENT);
2564
2565         /*
2566          * Check if the name already exist, though it will be checked
2567          * in _index_insert also, for avoiding rolling back if exists
2568          * _index_insert.
2569          */
2570         rc = __mdd_lookup_locked(env, pobj, name, fid,
2571                                  MAY_WRITE | MAY_EXEC);
2572         if (rc != -ENOENT)
2573                 RETURN(rc ? : -EEXIST);
2574
2575         /* sgid check */
2576         mdd_read_lock(env, obj);
2577         rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
2578         mdd_read_unlock(env, obj);
2579         if (rc != 0)
2580                 RETURN(rc);
2581
2582         if (la->la_mode & S_ISGID) {
2583                 ma->ma_attr.la_gid = la->la_gid;
2584                 if (S_ISDIR(ma->ma_attr.la_mode)) {
2585                         ma->ma_attr.la_mode |= S_ISGID;
2586                         ma->ma_attr.la_valid |= LA_MODE;
2587                 }
2588         }
2589
2590         switch (ma->ma_attr.la_mode & S_IFMT) {
2591         case S_IFREG:
2592         case S_IFDIR:
2593         case S_IFLNK:
2594         case S_IFCHR:
2595         case S_IFBLK:
2596         case S_IFIFO:
2597         case S_IFSOCK:
2598                 rc = 0;
2599                 break;
2600         default:
2601                 rc = -EINVAL;
2602                 break;
2603         }
2604         RETURN(rc);
2605 }
2606
2607 /*
2608  * Create object and insert it into namespace.
2609  */
2610 static int mdd_create(const struct lu_env *env,
2611                       struct md_object *pobj, const char *name,
2612                       struct md_object *child,
2613                       struct md_create_spec *spec,
2614                       struct md_attr* ma)
2615 {
2616         struct mdd_device *mdd = mdo2mdd(pobj);
2617         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
2618         struct mdd_object *son = md2mdd_obj(child);
2619         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2620         struct lu_attr    *attr = &ma->ma_attr;
2621         struct lov_mds_md *lmm = NULL;
2622         struct thandle    *handle;
2623         int rc, created = 0, inserted = 0, lmm_size = 0;
2624         ENTRY;
2625
2626         /*
2627          * Two operations have to be performed:
2628          *
2629          *  - allocation of new object (->do_create()), and
2630          *
2631          *  - insertion into parent index (->dio_insert()).
2632          *
2633          * Due to locking, operation order is not important, when both are
2634          * successful, *but* error handling cases are quite different:
2635          *
2636          *  - if insertion is done first, and following object creation fails,
2637          *  insertion has to be rolled back, but this operation might fail
2638          *  also leaving us with dangling index entry.
2639          *
2640          *  - if creation is done first, is has to be undone if insertion
2641          *  fails, leaving us with leaked space, which is neither good, nor
2642          *  fatal.
2643          *
2644          * It seems that creation-first is simplest solution, but it is
2645          * sub-optimal in the frequent
2646          *
2647          *         $ mkdir foo
2648          *         $ mkdir foo
2649          *
2650          * case, because second mkdir is bound to create object, only to
2651          * destroy it immediately.
2652          *
2653          * To avoid this follow local file systems that do double lookup:
2654          *
2655          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2656          *
2657          *     1. create            (__mdd_object_create())
2658          *
2659          *     2. insert            (__mdd_index_insert(), lookup again)
2660          */
2661
2662         /* sanity checks before big job */
2663         rc = mdd_create_sanity_check(env, pobj, name, ma);
2664         if (rc)
2665                 RETURN(rc);
2666
2667         /* no RPC inside the transaction, so OST objects should be created at
2668          * first */
2669         if (S_ISREG(attr->la_mode)) {
2670                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2671                                     spec, attr);
2672                 if (rc)
2673                         RETURN(rc);
2674         }
2675
2676         mdd_txn_param_build(env, MDD_TXN_MKDIR_OP);
2677         handle = mdd_trans_start(env, mdd);
2678         if (IS_ERR(handle))
2679                 RETURN(PTR_ERR(handle));
2680
2681         mdd_write_lock(env, mdd_pobj);
2682
2683         /*
2684          * XXX check that link can be added to the parent in mkdir case.
2685          */
2686
2687         mdd_write_lock(env, son);
2688         rc = __mdd_object_create(env, son, ma, handle);
2689         if (rc) {
2690                 mdd_write_unlock(env, son);
2691                 GOTO(cleanup, rc);
2692         }
2693
2694         created = 1;
2695
2696 #ifdef CONFIG_FS_POSIX_ACL
2697         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
2698         if (rc) {
2699                 mdd_write_unlock(env, son);
2700                 GOTO(cleanup, rc);
2701         } else {
2702                 ma->ma_attr.la_valid |= LA_MODE;
2703         }
2704 #endif
2705
2706         rc = __mdd_object_initialize(env, mdo2fid(mdd_pobj),
2707                                      son, ma, handle);
2708         mdd_write_unlock(env, son);
2709         if (rc)
2710                 /*
2711                  * Object has no links, so it will be destroyed when last
2712                  * reference is released. (XXX not now.)
2713                  */
2714                 GOTO(cleanup, rc);
2715
2716         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2717                                 name, S_ISDIR(attr->la_mode), handle,
2718                                 mdd_object_capa(env, mdd_pobj));
2719
2720         if (rc)
2721                 GOTO(cleanup, rc);
2722
2723         inserted = 1;
2724         /* replay creates has objects already */
2725         if (spec->u.sp_ea.no_lov_create) {
2726                 CDEBUG(D_INFO, "we already have lov ea\n");
2727                 rc = mdd_lov_set_md(env, mdd_pobj, son,
2728                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
2729                                     spec->u.sp_ea.eadatalen, handle, 0);
2730         } else
2731                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
2732                                     lmm_size, handle, 0);
2733         if (rc) {
2734                 CERROR("error on stripe info copy %d \n", rc);
2735                 GOTO(cleanup, rc);
2736         }
2737
2738         if (S_ISLNK(attr->la_mode)) {
2739                 struct dt_object *dt = mdd_object_child(son);
2740                 const char *target_name = spec->u.sp_symname;
2741                 int sym_len = strlen(target_name);
2742                 const struct lu_buf *buf;
2743                 loff_t pos = 0;
2744
2745                 buf = mdd_buf_get_const(env, target_name, sym_len);
2746                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2747                                                 mdd_object_capa(env, son));
2748                 if (rc == sym_len)
2749                         rc = 0;
2750                 else
2751                         rc = -EFAULT;
2752         }
2753
2754         *la_copy = ma->ma_attr;
2755         la_copy->la_valid = LA_CTIME | LA_MTIME;
2756         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
2757         if (rc)
2758                 GOTO(cleanup, rc);
2759
2760         /* return attr back */
2761         rc = mdd_attr_get_internal_locked(env, son, ma);
2762 cleanup:
2763         if (rc && created) {
2764                 int rc2 = 0;
2765
2766                 if (inserted) {
2767                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2768                                                  S_ISDIR(attr->la_mode),
2769                                                  handle, BYPASS_CAPA);
2770                         if (rc2)
2771                                 CERROR("error can not cleanup destroy %d\n",
2772                                        rc2);
2773                 }
2774                 if (rc2 == 0) {
2775                         mdd_write_lock(env, son);
2776                         __mdd_ref_del(env, son, handle);
2777                         mdd_write_unlock(env, son);
2778                 }
2779         }
2780         /* finish mdd_lov_create() stuff */
2781         mdd_lov_create_finish(env, mdd, rc);
2782         if (lmm)
2783                 OBD_FREE(lmm, lmm_size);
2784         mdd_write_unlock(env, mdd_pobj);
2785         mdd_trans_stop(env, mdd, rc, handle);
2786         RETURN(rc);
2787 }
2788
2789 /* partial operation */
2790 static int mdd_oc_sanity_check(const struct lu_env *env,
2791                                struct mdd_object *obj,
2792                                struct md_attr *ma)
2793 {
2794         int rc;
2795         ENTRY;
2796
2797         switch (ma->ma_attr.la_mode & S_IFMT) {
2798         case S_IFREG:
2799         case S_IFDIR:
2800         case S_IFLNK:
2801         case S_IFCHR:
2802         case S_IFBLK:
2803         case S_IFIFO:
2804         case S_IFSOCK:
2805                 rc = 0;
2806                 break;
2807         default:
2808                 rc = -EINVAL;
2809                 break;
2810         }
2811         RETURN(rc);
2812 }
2813
2814 static int mdd_object_create(const struct lu_env *env,
2815                              struct md_object *obj,
2816                              const struct md_create_spec *spec,
2817                              struct md_attr *ma)
2818 {
2819
2820         struct mdd_device *mdd = mdo2mdd(obj);
2821         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2822         const struct lu_fid *pfid = spec->u.sp_pfid;
2823         struct thandle *handle;
2824         int rc;
2825         ENTRY;
2826
2827         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2828         if (rc)
2829                 RETURN(rc);
2830
2831         mdd_txn_param_build(env, MDD_TXN_OBJECT_CREATE_OP);
2832         handle = mdd_trans_start(env, mdd);
2833         if (IS_ERR(handle))
2834                 RETURN(PTR_ERR(handle));
2835
2836         mdd_write_lock(env, mdd_obj);
2837         rc = __mdd_object_create(env, mdd_obj, ma, handle);
2838         if (rc)
2839                 GOTO(unlock, rc);
2840
2841         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2842                 /* If creating the slave object, set slave EA here. */
2843                 int lmv_size = spec->u.sp_ea.eadatalen;
2844                 struct lmv_stripe_md *lmv;
2845
2846                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2847                 LASSERT(lmv != NULL && lmv_size > 0);
2848
2849                 rc = __mdd_xattr_set(env, mdd_obj,
2850                                      mdd_buf_get_const(env, lmv, lmv_size),
2851                                      MDS_LMV_MD_NAME, 0, handle);
2852                 if (rc)
2853                         GOTO(unlock, rc);
2854                 pfid = spec->u.sp_ea.fid;
2855
2856                 CDEBUG(D_INFO, "Set slave ea "DFID", eadatalen %d, rc %d\n",
2857                        PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
2858                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0);
2859         } else {
2860 #ifdef CONFIG_FS_POSIX_ACL
2861                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2862                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2863
2864                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2865                         buf->lb_len = spec->u.sp_ea.eadatalen;
2866                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2867                                 rc = __mdd_acl_init(env, mdd_obj, buf, 
2868                                                     &ma->ma_attr.la_mode,
2869                                                     handle);
2870                                 if (rc)
2871                                         GOTO(unlock, rc);
2872                                 else
2873                                         ma->ma_attr.la_valid |= LA_MODE;
2874                         }
2875                 }
2876 #endif
2877                 rc = __mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
2878         }
2879         EXIT;
2880 unlock:
2881         mdd_write_unlock(env, mdd_obj);
2882         if (rc == 0)
2883                 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
2884
2885         mdd_trans_stop(env, mdd, rc, handle);
2886         return rc;
2887 }
2888
2889 /*
2890  * Partial operation. Be aware, this is called with write lock taken, so we use
2891  * locksless version of __mdd_lookup() here.
2892  */
2893 static int mdd_ni_sanity_check(const struct lu_env *env,
2894                                struct md_object *pobj,
2895                                const char *name,
2896                                const struct lu_fid *fid)
2897 {
2898         struct mdd_object *obj       = md2mdd_obj(pobj);
2899 #if 0
2900         int rc;
2901 #endif
2902         ENTRY;
2903
2904         /* EEXIST check */
2905         if (mdd_is_dead_obj(obj))
2906                 RETURN(-ENOENT);
2907
2908          /* The exist of the name will be checked in _index_insert. */
2909 #if 0
2910         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2911         if (rc != -ENOENT)
2912                 RETURN(rc ? : -EEXIST);
2913         else
2914                 RETURN(0);
2915 #endif
2916         RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
2917 }
2918
2919 static int mdd_name_insert(const struct lu_env *env,
2920                            struct md_object *pobj,
2921                            const char *name, const struct lu_fid *fid,
2922                            int isdir)
2923 {
2924         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2925         struct thandle *handle;
2926         int rc;
2927         ENTRY;
2928
2929         mdd_txn_param_build(env, MDD_TXN_INDEX_INSERT_OP);
2930         handle = mdd_trans_start(env, mdo2mdd(pobj));
2931         if (IS_ERR(handle))
2932                 RETURN(PTR_ERR(handle));
2933
2934         mdd_write_lock(env, mdd_obj);
2935         rc = mdd_ni_sanity_check(env, pobj, name, fid);
2936         if (rc)
2937                 GOTO(out_unlock, rc);
2938
2939         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
2940                                 BYPASS_CAPA);
2941
2942 out_unlock:
2943         mdd_write_unlock(env, mdd_obj);
2944
2945         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
2946         RETURN(rc);
2947 }
2948
2949 /*
2950  * Be aware, this is called with write lock taken, so we use locksless version
2951  * of __mdd_lookup() here.
2952  */
2953 static int mdd_nr_sanity_check(const struct lu_env *env,
2954                                struct md_object *pobj,
2955                                const char *name)
2956 {
2957         struct mdd_object *obj       = md2mdd_obj(pobj);
2958 #if 0
2959         struct mdd_thread_info *info = mdd_env_info(env);
2960         struct lu_fid     *fid       = &info->mti_fid;
2961         int rc;
2962 #endif
2963         ENTRY;
2964
2965         /* EEXIST check */
2966         if (mdd_is_dead_obj(obj))
2967                 RETURN(-ENOENT);
2968
2969          /* The exist of the name will be checked in _index_delete. */
2970 #if 0
2971         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
2972         RETURN(rc);
2973 #endif
2974         RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
2975 }
2976
2977 static int mdd_name_remove(const struct lu_env *env,
2978                            struct md_object *pobj,
2979                            const char *name, int is_dir)
2980 {
2981         struct mdd_device *mdd = mdo2mdd(pobj);
2982         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
2983         struct thandle *handle;
2984         int rc;
2985         ENTRY;
2986
2987         mdd_txn_param_build(env, MDD_TXN_INDEX_DELETE_OP);
2988         handle = mdd_trans_start(env, mdd);
2989         if (IS_ERR(handle))
2990                 RETURN(PTR_ERR(handle));
2991
2992         mdd_write_lock(env, mdd_obj);
2993         rc = mdd_nr_sanity_check(env, pobj, name);
2994         if (rc)
2995                 GOTO(out_unlock, rc);
2996
2997         rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
2998                                 BYPASS_CAPA);
2999
3000 out_unlock:
3001         mdd_write_unlock(env, mdd_obj);
3002
3003         mdd_trans_stop(env, mdd, rc, handle);
3004         RETURN(rc);
3005 }
3006
3007 static int mdd_rt_sanity_check(const struct lu_env *env,
3008                                struct mdd_object *tgt_pobj,
3009                                struct mdd_object *tobj,
3010                                const struct lu_fid *sfid,
3011                                const char *name, struct md_attr *ma)
3012 {
3013         int rc, src_is_dir;
3014         ENTRY;
3015
3016         /* EEXIST check */
3017         if (mdd_is_dead_obj(tgt_pobj))
3018                 RETURN(-ENOENT);
3019
3020         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
3021         if (tobj) {
3022                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
3023                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
3024                      mdd_dir_is_empty(env, tobj))
3025                                 RETURN(-ENOTEMPTY);
3026         } else {
3027                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
3028         }
3029
3030         RETURN(rc);
3031 }
3032
3033 static int mdd_rename_tgt(const struct lu_env *env,
3034                           struct md_object *pobj, struct md_object *tobj,
3035                           const struct lu_fid *lf, const char *name,
3036                           struct md_attr *ma)
3037 {
3038         struct mdd_device *mdd = mdo2mdd(pobj);
3039         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
3040         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
3041         struct thandle *handle;
3042         int rc;
3043         ENTRY;
3044
3045         mdd_txn_param_build(env, MDD_TXN_RENAME_TGT_OP);
3046         handle = mdd_trans_start(env, mdd);
3047         if (IS_ERR(handle))
3048                 RETURN(PTR_ERR(handle));
3049
3050         if (mdd_tobj)
3051                 mdd_lock2(env, mdd_tpobj, mdd_tobj);
3052         else
3053                 mdd_write_lock(env, mdd_tpobj);
3054
3055         /*TODO rename sanity checking*/
3056         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
3057         if (rc)
3058                 GOTO(cleanup, rc);
3059
3060         /* if rename_tgt is called then we should just re-insert name with
3061          * correct fid, no need to dec/inc parent nlink if obj is dir */
3062         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
3063         if (rc)
3064                 GOTO(cleanup, rc);
3065
3066         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
3067                                      BYPASS_CAPA);
3068         if (rc)
3069                 GOTO(cleanup, rc);
3070
3071         if (tobj && lu_object_exists(&tobj->mo_lu))
3072                 __mdd_ref_del(env, mdd_tobj, handle);
3073 cleanup:
3074         if (tobj)
3075                 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
3076         else
3077                 mdd_write_unlock(env, mdd_tpobj);
3078         mdd_trans_stop(env, mdd, rc, handle);
3079         RETURN(rc);
3080 }
3081
3082 /*
3083  * No permission check is needed.
3084  */
3085 static int mdd_root_get(const struct lu_env *env,
3086                         struct md_device *m, struct lu_fid *f)
3087 {
3088         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3089
3090         ENTRY;
3091         *f = mdd->mdd_root_fid;
3092         RETURN(0);
3093 }
3094
3095 /*
3096  * No permission check is needed.
3097  */
3098 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
3099                       struct kstatfs *sfs)
3100 {
3101         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3102         int rc;
3103
3104         ENTRY;
3105
3106         rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
3107
3108         RETURN(rc);
3109 }
3110
3111 /*
3112  * No permission check is needed.
3113  */
3114 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
3115                            int *md_size, int *cookie_size)
3116 {
3117         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3118         ENTRY;
3119
3120         *md_size = mdd_lov_mdsize(env, mdd);
3121         *cookie_size = mdd_lov_cookiesize(env, mdd);
3122
3123         RETURN(0);
3124 }
3125
3126 static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m,
3127                               int mode, unsigned long timeout, __u32 alg,
3128                               struct lustre_capa_key *keys)
3129 {
3130         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3131         struct mds_obd    *mds = &mdd2obd_dev(mdd)->u.mds;
3132         int rc;
3133         ENTRY;
3134
3135         mds->mds_capa_keys = keys;
3136         rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, mode,
3137                                                    timeout, alg, keys);
3138         RETURN(rc);
3139 }
3140
3141 static int mdd_update_capa_key(const struct lu_env *env,
3142                                struct md_device *m,
3143                                struct lustre_capa_key *key)
3144 {
3145         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
3146         struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_osc_exp;
3147         int rc;
3148         ENTRY;
3149
3150         rc = obd_set_info_async(lov_exp, strlen(KEY_CAPA_KEY), KEY_CAPA_KEY,
3151                                 sizeof(*key), key, NULL);
3152         RETURN(rc);
3153 }
3154
3155 static void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
3156                          struct thandle *handle)
3157 {
3158         struct dt_object *next;
3159
3160         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
3161         next = mdd_object_child(obj);
3162         next->do_ops->do_ref_add(env, next, handle);
3163 }
3164
3165 /*
3166  * XXX: if permission check is needed here?
3167  */
3168 static int mdd_ref_add(const struct lu_env *env,
3169                        struct md_object *obj)
3170 {
3171         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3172         struct mdd_device *mdd = mdo2mdd(obj);
3173         struct thandle *handle;
3174         int rc;
3175         ENTRY;
3176
3177         mdd_txn_param_build(env, MDD_TXN_XATTR_SET_OP);
3178         handle = mdd_trans_start(env, mdd);
3179         if (IS_ERR(handle))
3180                 RETURN(-ENOMEM);
3181
3182         mdd_write_lock(env, mdd_obj);
3183         rc = mdd_link_sanity_check(env, NULL, mdd_obj);
3184         if (!rc)
3185                 __mdd_ref_add(env, mdd_obj, handle);
3186         mdd_write_unlock(env, mdd_obj);
3187
3188         mdd_trans_stop(env, mdd, 0, handle);
3189
3190         RETURN(0);
3191 }
3192
3193 static void
3194 __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
3195               struct thandle *handle)
3196 {
3197         struct dt_object *next = mdd_object_child(obj);
3198         ENTRY;
3199
3200         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
3201
3202         next->do_ops->do_ref_del(env, next, handle);
3203         EXIT;
3204 }
3205
3206 /* do NOT or the MAY_*'s, you'll get the weakest */
3207 static int accmode(struct mdd_object *mdd_obj, int flags)
3208 {
3209         int res = 0;
3210
3211 #if 0
3212         /* Sadly, NFSD reopens a file repeatedly during operation, so the
3213          * "acc_mode = 0" allowance for newly-created files isn't honoured.
3214          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
3215          * owner can write to a file even if it is marked readonly to hide
3216          * its brokenness. (bug 5781) */
3217         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
3218                 return 0;
3219 #endif
3220         if (flags & FMODE_READ)
3221                 res = MAY_READ;
3222         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
3223                 res |= MAY_WRITE;
3224         if (flags & MDS_FMODE_EXEC)
3225                 res = MAY_EXEC;
3226         return res;
3227 }
3228
3229 static int mdd_open_sanity_check(const struct lu_env *env,
3230                                  struct mdd_object *obj, int flag)
3231 {
3232         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
3233         int mode = accmode(obj, flag);
3234         int rc;
3235         ENTRY;
3236
3237         /* EEXIST check */
3238         if (mdd_is_dead_obj(obj))
3239                 RETURN(-ENOENT);
3240
3241         rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
3242         if (rc)
3243                RETURN(rc);
3244
3245         if (S_ISLNK(tmp_la->la_mode))
3246                 RETURN(-ELOOP);
3247
3248         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
3249                 RETURN(-EISDIR);
3250
3251         if (!(flag & MDS_OPEN_CREATED)) {
3252                 rc = __mdd_permission_internal(env, obj, mode, 0);
3253                 if (rc)
3254                         RETURN(rc);
3255         }
3256
3257         /*
3258          * FIFO's, sockets and device files are special: they don't
3259          * actually live on the filesystem itself, and as such you
3260          * can write to them even if the filesystem is read-only.
3261          */
3262         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
3263             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
3264                 flag &= ~O_TRUNC;
3265
3266         /*
3267          * An append-only file must be opened in append mode for writing.
3268          */
3269         if (mdd_is_append(obj)) {
3270                 if ((flag & FMODE_WRITE) && !(flag & O_APPEND))
3271                         RETURN(-EPERM);
3272                 if (flag & O_TRUNC)
3273                         RETURN(-EPERM);
3274         }
3275
3276         /* O_NOATIME can only be set by the owner or superuser */
3277         if (flag & O_NOATIME) {
3278                 struct md_ucred *uc = md_ucred(env);
3279
3280                 if (uc->mu_fsuid != tmp_la->la_uid &&
3281                     !mdd_capable(uc, CAP_FOWNER))
3282                         RETURN(-EPERM);
3283         }
3284
3285         RETURN(0);
3286 }
3287
3288 static int mdd_open(const struct lu_env *env, struct md_object *obj,
3289                     int flags)
3290 {
3291         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3292         int rc = 0;
3293
3294         mdd_write_lock(env, mdd_obj);
3295
3296         rc = mdd_open_sanity_check(env, mdd_obj, flags);
3297         if (rc == 0)
3298                 mdd_obj->mod_count++;
3299
3300         mdd_write_unlock(env, mdd_obj);
3301         return rc;
3302 }
3303
3304 /*
3305  * No permission check is needed.
3306  */
3307 static int mdd_close(const struct lu_env *env, struct md_object *obj,
3308                      struct md_attr *ma)
3309 {
3310         int rc;
3311         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3312         ENTRY;
3313
3314         mdd_write_lock(env, mdd_obj);
3315         /* release open count */
3316         mdd_obj->mod_count --;
3317
3318         rc = __mdd_iattr_get(env, mdd_obj, ma);
3319         if (rc == 0 && mdd_obj->mod_count == 0) {
3320                 if (ma->ma_attr.la_nlink == 0)
3321                         rc = __mdd_object_kill(env, mdd_obj, ma);
3322         }
3323         mdd_write_unlock(env, mdd_obj);
3324         RETURN(rc);
3325 }
3326
3327 /*
3328  * Permission check is done when open,
3329  * no need check again.
3330  */
3331 static int mdd_readpage_sanity_check(const struct lu_env *env,
3332                                      struct mdd_object *obj)
3333 {
3334         struct dt_object *next = mdd_object_child(obj);
3335         int rc;
3336         ENTRY;
3337
3338         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
3339 #if 0
3340                 rc = mdd_permission_internal(env, obj, MAY_READ);
3341 #else
3342                 rc = 0;
3343 #endif
3344         else
3345                 rc = -ENOTDIR;
3346
3347         RETURN(rc);
3348 }
3349
3350 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
3351                         const struct lu_rdpg *rdpg)
3352 {
3353         struct dt_object *next;
3354         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3355         int rc;
3356         ENTRY;
3357
3358         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3359         next = mdd_object_child(mdd_obj);
3360
3361         mdd_read_lock(env, mdd_obj);
3362         rc = mdd_readpage_sanity_check(env, mdd_obj);
3363         if (rc)
3364                 GOTO(out_unlock, rc);
3365
3366         rc = next->do_ops->do_readpage(env, next, rdpg,
3367                                        mdd_object_capa(env, mdd_obj));
3368
3369 out_unlock:
3370         mdd_read_unlock(env, mdd_obj);
3371         RETURN(rc);
3372 }
3373
3374 #ifdef CONFIG_FS_POSIX_ACL
3375 static int mdd_posix_acl_permission(struct md_ucred *uc, struct lu_attr *la,
3376                                     int want, posix_acl_xattr_entry *entry,
3377                                     int count)
3378 {
3379         posix_acl_xattr_entry *pa, *pe, *mask_obj;
3380         int found = 0;
3381         ENTRY;
3382
3383         if (count <= 0)
3384                 RETURN(-EACCES);
3385
3386         pa = &entry[0];
3387         pe = &entry[count - 1];
3388         for (; pa <= pe; pa++) {
3389                 switch(pa->e_tag) {
3390                         case ACL_USER_OBJ:
3391                                 /* (May have been checked already) */
3392                                 if (la->la_uid == uc->mu_fsuid)
3393                                         goto check_perm;
3394                                 break;
3395                         case ACL_USER:
3396                                 if (pa->e_id == uc->mu_fsuid)
3397                                         goto mask;
3398                                 break;
3399                         case ACL_GROUP_OBJ:
3400                                 if (mdd_in_group_p(uc, la->la_gid)) {
3401                                         found = 1;
3402                                         if ((pa->e_perm & want) == want)
3403                                                 goto mask;
3404                                 }
3405                                 break;
3406                         case ACL_GROUP:
3407                                 if (mdd_in_group_p(uc, pa->e_id)) {
3408                                         found = 1;
3409                                         if ((pa->e_perm & want) == want)
3410                                                 goto mask;
3411                                 }
3412                                 break;
3413                         case ACL_MASK:
3414                                 break;
3415                         case ACL_OTHER:
3416                                 if (found)
3417                                         RETURN(-EACCES);
3418                                 else
3419                                         goto check_perm;
3420                         default:
3421                                 RETURN(-EIO);
3422                 }
3423         }
3424         RETURN(-EIO);
3425
3426 mask:
3427         for (mask_obj = pa + 1; mask_obj <= pe; mask_obj++) {
3428                 if (mask_obj->e_tag == ACL_MASK) {
3429                         if ((pa->e_perm & mask_obj->e_perm & want) == want)
3430                                 RETURN(0);
3431
3432                         RETURN(-EACCES);
3433                 }
3434         }
3435
3436 check_perm:
3437         if ((pa->e_perm & want) == want)
3438                 RETURN(0);
3439
3440         RETURN(-EACCES);
3441 }
3442 #endif
3443
3444 static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
3445                          struct lu_attr* la, int mask)
3446 {
3447 #ifdef CONFIG_FS_POSIX_ACL
3448         struct dt_object *next;
3449         struct lu_buf    *buf = &mdd_env_info(env)->mti_buf;
3450         struct md_ucred  *uc  = md_ucred(env);
3451         posix_acl_xattr_entry *entry;
3452         int entry_count;
3453         int rc;
3454         ENTRY;
3455
3456         next = mdd_object_child(obj);
3457
3458         buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
3459         buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
3460         rc = next->do_ops->do_xattr_get(env, next, buf,
3461                                         XATTR_NAME_ACL_ACCESS,
3462                                         mdd_object_capa(env, obj));
3463         if (rc <= 0)
3464                 RETURN(rc ? : -EACCES);
3465
3466         entry = ((posix_acl_xattr_header *)(buf->lb_buf))->a_entries;
3467         entry_count = (rc - 4) / sizeof(posix_acl_xattr_entry);
3468
3469         rc = mdd_posix_acl_permission(uc, la, mask, entry, entry_count);
3470         RETURN(rc);
3471 #else
3472         ENTRY;
3473         RETURN(-EAGAIN);
3474 #endif
3475 }
3476
3477 #if 0
3478 static int mdd_exec_permission_lite(const struct lu_env *env,
3479                                     struct mdd_object *obj)
3480 {
3481         struct lu_attr  *la = &mdd_env_info(env)->mti_la;
3482         struct md_ucred *uc = md_ucred(env);
3483         umode_t mode;
3484         int rc;
3485         ENTRY;
3486
3487         /* These means unnecessary for permission check */
3488         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3489                 RETURN(0);
3490
3491         /* Invalid user credit */
3492         if (uc->mu_valid == UCRED_INVALID)
3493                 RETURN(-EACCES);
3494
3495         rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
3496         if (rc)
3497                 RETURN(rc);
3498
3499         mode = la->la_mode;
3500         if (uc->mu_fsuid == la->la_uid)
3501                 mode >>= 6;
3502         else if (mdd_in_group_p(uc, la->la_gid))
3503                 mode >>= 3;
3504
3505         if (mode & MAY_EXEC)
3506                 RETURN(0);
3507
3508         if (((la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) &&
3509             mdd_capable(uc, CAP_DAC_OVERRIDE))
3510                 RETURN(0);
3511
3512         if (S_ISDIR(la->la_mode) && mdd_capable(uc, CAP_DAC_READ_SEARCH))
3513                 RETURN(0);
3514
3515         RETURN(-EACCES);
3516 }
3517 #endif
3518
3519 static int __mdd_permission_internal(const struct lu_env *env,
3520                                      struct mdd_object *obj,
3521                                      int mask, int getattr)
3522 {
3523         struct lu_attr  *la = &mdd_env_info(env)->mti_la;
3524         struct md_ucred *uc = md_ucred(env);
3525         __u32 mode;
3526         int rc;
3527
3528         ENTRY;
3529
3530         if (mask == 0)
3531                 RETURN(0);
3532
3533         /* These means unnecessary for permission check */
3534         if ((uc == NULL) || (uc->mu_valid == UCRED_INIT))
3535                 RETURN(0);
3536
3537         /* Invalid user credit */
3538         if (uc->mu_valid == UCRED_INVALID)
3539                 RETURN(-EACCES);
3540
3541         /*
3542          * Nobody gets write access to an immutable file.
3543          */
3544         if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
3545                 RETURN(-EACCES);
3546
3547         if (getattr) {
3548                 rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
3549                 if (rc)
3550                         RETURN(rc);
3551         }
3552
3553         mode = la->la_mode;
3554         if (uc->mu_fsuid == la->la_uid) {
3555                 mode >>= 6;
3556         } else {
3557                 if (mode & S_IRWXG) {
3558                         rc = mdd_check_acl(env, obj, la, mask);
3559                         if (rc == -EACCES)
3560                                 goto check_capabilities;
3561                         else if ((rc != -EAGAIN) && (rc != -EOPNOTSUPP) &&
3562                                  (rc != -ENODATA))
3563                                 RETURN(rc);
3564                 }
3565                 if (mdd_in_group_p(uc, la->la_gid))
3566                         mode >>= 3;
3567         }
3568
3569         /*
3570          * If the DACs are ok we don't need any capability check.
3571          */
3572         if (((mode & mask & S_IRWXO) == mask))
3573                 RETURN(0);
3574
3575 check_capabilities:
3576
3577         /*
3578          * Read/write DACs are always overridable.
3579          * Executable DACs are overridable if at least one exec bit is set.
3580          * Dir's DACs are always overridable.
3581          */
3582         if (!(mask & MAY_EXEC) ||
3583             (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
3584                 if (mdd_capable(uc, CAP_DAC_OVERRIDE))
3585                         RETURN(0);
3586
3587         /*
3588          * Searching includes executable on directories, else just read.
3589          */
3590         if ((mask == MAY_READ) ||
3591             (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
3592                 if (mdd_capable(uc, CAP_DAC_READ_SEARCH))
3593                         RETURN(0);
3594
3595         RETURN(-EACCES);
3596 }
3597
3598 static inline int mdd_permission_internal_locked(const struct lu_env *env,
3599                                                  struct mdd_object *obj,
3600                                                  int mask)
3601 {
3602         int rc;
3603
3604         mdd_read_lock(env, obj);
3605         rc = mdd_permission_internal(env, obj, mask);
3606         mdd_read_unlock(env, obj);
3607
3608         return rc;
3609 }
3610
3611 static int mdd_permission(const struct lu_env *env, struct md_object *obj,
3612                           int mask)
3613 {
3614         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3615         int rc;
3616         ENTRY;
3617
3618         rc = mdd_permission_internal_locked(env, mdd_obj, mask);
3619
3620         RETURN(rc);
3621 }
3622
3623 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
3624                         struct lustre_capa *capa, int renewal)
3625 {
3626         struct dt_object *next;
3627         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3628         struct obd_capa *oc;
3629         int rc = 0;
3630         ENTRY;
3631
3632         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
3633         next = mdd_object_child(mdd_obj);
3634
3635         oc = next->do_ops->do_capa_get(env, next, renewal ? capa : NULL,
3636                                        capa->lc_opc);
3637         if (IS_ERR(oc)) {
3638                 rc = PTR_ERR(oc);
3639         } else {
3640                 capa_cpy(capa, oc);
3641                 capa_put(oc);
3642         }
3643
3644         RETURN(rc);
3645 }
3646
3647 struct md_device_operations mdd_ops = {
3648         .mdo_statfs         = mdd_statfs,
3649         .mdo_root_get       = mdd_root_get,
3650         .mdo_maxsize_get    = mdd_maxsize_get,
3651         .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
3652         .mdo_update_capa_key= mdd_update_capa_key,
3653 };
3654
3655 static struct md_dir_operations mdd_dir_ops = {
3656         .mdo_is_subdir     = mdd_is_subdir,
3657         .mdo_lookup        = mdd_lookup,
3658         .mdo_create        = mdd_create,
3659         .mdo_rename        = mdd_rename,
3660         .mdo_link          = mdd_link,
3661         .mdo_unlink        = mdd_unlink,
3662         .mdo_name_insert   = mdd_name_insert,
3663         .mdo_name_remove   = mdd_name_remove,
3664         .mdo_rename_tgt    = mdd_rename_tgt,
3665         .mdo_create_data   = mdd_create_data
3666 };
3667
3668 static struct md_object_operations mdd_obj_ops = {
3669         .moo_permission    = mdd_permission,
3670         .moo_attr_get      = mdd_attr_get,
3671         .moo_attr_set      = mdd_attr_set,
3672         .moo_xattr_get     = mdd_xattr_get,
3673         .moo_xattr_set     = mdd_xattr_set,
3674         .moo_xattr_list    = mdd_xattr_list,
3675         .moo_xattr_del     = mdd_xattr_del,
3676         .moo_object_create = mdd_object_create,
3677         .moo_ref_add       = mdd_ref_add,
3678         .moo_ref_del       = mdd_ref_del,
3679         .moo_open          = mdd_open,
3680         .moo_close         = mdd_close,
3681         .moo_readpage      = mdd_readpage,
3682         .moo_readlink      = mdd_readlink,
3683         .moo_capa_get      = mdd_capa_get
3684 };
3685
3686 static struct obd_ops mdd_obd_device_ops = {
3687         .o_owner = THIS_MODULE
3688 };
3689
3690 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
3691                                           struct lu_device_type *t,
3692                                           struct lustre_cfg *lcfg)
3693 {
3694         struct lu_device  *l;
3695         struct mdd_device *m;
3696
3697         OBD_ALLOC_PTR(m);
3698         if (m == NULL) {
3699                 l = ERR_PTR(-ENOMEM);
3700         } else {
3701                 md_device_init(&m->mdd_md_dev, t);
3702                 l = mdd2lu_dev(m);
3703                 l->ld_ops = &mdd_lu_ops;
3704                 m->mdd_md_dev.md_ops = &mdd_ops;
3705         }
3706
3707         return l;
3708 }
3709
3710 static void mdd_device_free(const struct lu_env *env,
3711                             struct lu_device *lu)
3712 {
3713         struct mdd_device *m = lu2mdd_dev(lu);
3714
3715         LASSERT(atomic_read(&lu->ld_ref) == 0);
3716         md_device_fini(&m->mdd_md_dev);
3717         OBD_FREE_PTR(m);
3718 }
3719
3720 static void *mdd_ucred_key_init(const struct lu_context *ctx,
3721                                 struct lu_context_key *key)
3722 {
3723         struct md_ucred *uc;
3724
3725         OBD_ALLOC_PTR(uc);
3726         if (uc == NULL)
3727                 uc = ERR_PTR(-ENOMEM);
3728         return uc;
3729 }
3730
3731 static void mdd_ucred_key_fini(const struct lu_context *ctx,
3732                              struct lu_context_key *key, void *data)
3733 {
3734         struct md_ucred *uc = data;
3735         OBD_FREE_PTR(uc);
3736 }
3737
3738 static struct lu_context_key mdd_ucred_key = {
3739         .lct_tags = LCT_SESSION,
3740         .lct_init = mdd_ucred_key_init,
3741         .lct_fini = mdd_ucred_key_fini
3742 };
3743
3744 struct md_ucred *md_ucred(const struct lu_env *env)
3745 {
3746         LASSERT(env->le_ses != NULL);
3747         return lu_context_key_get(env->le_ses, &mdd_ucred_key);
3748 }
3749 EXPORT_SYMBOL(md_ucred);
3750
3751 static void *mdd_capainfo_key_init(const struct lu_context *ctx,
3752                                    struct lu_context_key *key)
3753 {
3754         struct md_capainfo *ci;
3755
3756         OBD_ALLOC_PTR(ci);
3757         if (ci == NULL)
3758                 ci = ERR_PTR(-ENOMEM);
3759         return ci;
3760 }
3761
3762 static void mdd_capainfo_key_fini(const struct lu_context *ctx,
3763                                   struct lu_context_key *key, void *data)
3764 {
3765         struct md_capainfo *ci = data;
3766         OBD_FREE_PTR(ci);
3767 }
3768
3769 struct lu_context_key mdd_capainfo_key = {
3770         .lct_tags = LCT_SESSION,
3771         .lct_init = mdd_capainfo_key_init,
3772         .lct_fini = mdd_capainfo_key_fini
3773 };
3774
3775 struct md_capainfo *md_capainfo(const struct lu_env *env)
3776 {
3777         /* NB, in mdt_init0 */
3778         if (env->le_ses == NULL)
3779                 return NULL;
3780         return lu_context_key_get(env->le_ses, &mdd_capainfo_key);
3781 }
3782 EXPORT_SYMBOL(md_capainfo);
3783
3784 static int mdd_type_init(struct lu_device_type *t)
3785 {
3786         int result;
3787
3788         result = lu_context_key_register(&mdd_thread_key);
3789         if (result == 0)
3790                 result = lu_context_key_register(&mdd_ucred_key);
3791         if (result == 0)
3792                 result = lu_context_key_register(&mdd_capainfo_key);
3793         return result;
3794 }
3795
3796 static void mdd_type_fini(struct lu_device_type *t)
3797 {
3798         lu_context_key_degister(&mdd_capainfo_key);
3799         lu_context_key_degister(&mdd_ucred_key);
3800         lu_context_key_degister(&mdd_thread_key);
3801 }
3802
3803 static struct lu_device_type_operations mdd_device_type_ops = {
3804         .ldto_init = mdd_type_init,
3805         .ldto_fini = mdd_type_fini,
3806
3807         .ldto_device_alloc = mdd_device_alloc,
3808         .ldto_device_free  = mdd_device_free,
3809
3810         .ldto_device_init    = mdd_device_init,
3811         .ldto_device_fini    = mdd_device_fini
3812 };
3813
3814 static struct lu_device_type mdd_device_type = {
3815         .ldt_tags     = LU_DEVICE_MD,
3816         .ldt_name     = LUSTRE_MDD_NAME,
3817         .ldt_ops      = &mdd_device_type_ops,
3818         .ldt_ctx_tags = LCT_MD_THREAD
3819 };
3820
3821 static void *mdd_key_init(const struct lu_context *ctx,
3822                           struct lu_context_key *key)
3823 {
3824         struct mdd_thread_info *info;
3825
3826         OBD_ALLOC_PTR(info);
3827         if (info == NULL)
3828                 info = ERR_PTR(-ENOMEM);
3829         return info;
3830 }
3831
3832 static void mdd_key_fini(const struct lu_context *ctx,
3833                          struct lu_context_key *key, void *data)
3834 {
3835         struct mdd_thread_info *info = data;
3836         OBD_FREE_PTR(info);
3837 }
3838
3839 static struct lu_context_key mdd_thread_key = {
3840         .lct_tags = LCT_MD_THREAD,
3841         .lct_init = mdd_key_init,
3842         .lct_fini = mdd_key_fini
3843 };
3844
3845 struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
3846         { 0 }
3847 };
3848
3849 struct lprocfs_vars lprocfs_mdd_module_vars[] = {
3850         { 0 }
3851 };
3852
3853 LPROCFS_INIT_VARS(mdd, lprocfs_mdd_module_vars, lprocfs_mdd_obd_vars);
3854
3855 static int __init mdd_mod_init(void)
3856 {
3857         struct lprocfs_static_vars lvars;
3858         printk(KERN_INFO "Lustre: MetaData Device; info@clusterfs.com\n");
3859         lprocfs_init_vars(mdd, &lvars);
3860         return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
3861                                    LUSTRE_MDD_NAME, &mdd_device_type);
3862 }
3863
3864 static void __exit mdd_mod_exit(void)
3865 {
3866         class_unregister_type(LUSTRE_MDD_NAME);
3867 }
3868
3869 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3870 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
3871 MODULE_LICENSE("GPL");
3872
3873 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);