Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <obd_support.h>
38 #include <lprocfs_status.h>
39 /* fid_be_cpu(), fid_cpu_to_be(). */
40 #include <lustre_fid.h>
41
42 #include <linux/ldiskfs_fs.h>
43 #include <lustre_mds.h>
44 #include <lustre/lustre_idl.h>
45
46 #include "mdd_internal.h"
47
48 static struct lu_object_operations mdd_lu_obj_ops;
49
50 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
51                struct lu_attr *la, struct lustre_capa *capa)
52 {
53         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
54                  PFID(mdd_object_fid(obj)));
55         return mdo_attr_get(env, obj, la, capa);
56 }
57
58 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
59 {
60         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
61
62         if (flags & LUSTRE_APPEND_FL)
63                 obj->mod_flags |= APPEND_OBJ;
64
65         if (flags & LUSTRE_IMMUTABLE_FL)
66                 obj->mod_flags |= IMMUTE_OBJ;
67 }
68
69 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
70 {
71         struct lu_buf *buf;
72
73         buf = &mdd_env_info(env)->mti_buf;
74         buf->lb_buf = area;
75         buf->lb_len = len;
76         return buf;
77 }
78
79 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
80                                        struct mdd_device *mdd)
81 {
82         struct mdd_thread_info *mti = mdd_env_info(env);
83         int                     max_cookie_size;
84
85         max_cookie_size = mdd_lov_cookiesize(env, mdd);
86         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
87                 if (mti->mti_max_cookie)
88                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
89                 mti->mti_max_cookie = NULL;
90                 mti->mti_max_cookie_size = 0;
91         }
92         if (unlikely(mti->mti_max_cookie == NULL)) {
93                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
94                 if (unlikely(mti->mti_max_cookie != NULL))
95                         mti->mti_max_cookie_size = max_cookie_size;
96         }
97         return mti->mti_max_cookie;
98 }
99
100 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
101                                    struct mdd_device *mdd)
102 {
103         struct mdd_thread_info *mti = mdd_env_info(env);
104         int                     max_lmm_size;
105
106         max_lmm_size = mdd_lov_mdsize(env, mdd);
107         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
108                 if (mti->mti_max_lmm)
109                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
110                 mti->mti_max_lmm = NULL;
111                 mti->mti_max_lmm_size = 0;
112         }
113         if (unlikely(mti->mti_max_lmm == NULL)) {
114                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
115                 if (unlikely(mti->mti_max_lmm != NULL))
116                         mti->mti_max_lmm_size = max_lmm_size;
117         }
118         return mti->mti_max_lmm;
119 }
120
121 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
122                                        const void *area, ssize_t len)
123 {
124         struct lu_buf *buf;
125
126         buf = &mdd_env_info(env)->mti_buf;
127         buf->lb_buf = (void *)area;
128         buf->lb_len = len;
129         return buf;
130 }
131
132 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
133 {
134         struct mdd_thread_info *info;
135
136         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
137         LASSERT(info != NULL);
138         return info;
139 }
140
141 struct lu_object *mdd_object_alloc(const struct lu_env *env,
142                                    const struct lu_object_header *hdr,
143                                    struct lu_device *d)
144 {
145         struct mdd_object *mdd_obj;
146
147         OBD_ALLOC_PTR(mdd_obj);
148         if (mdd_obj != NULL) {
149                 struct lu_object *o;
150
151                 o = mdd2lu_obj(mdd_obj);
152                 lu_object_init(o, NULL, d);
153                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
154                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
155                 mdd_obj->mod_count = 0;
156                 o->lo_ops = &mdd_lu_obj_ops;
157                 return o;
158         } else {
159                 return NULL;
160         }
161 }
162
163 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
164 {
165         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
166         struct lu_object  *below;
167         struct lu_device  *under;
168         ENTRY;
169
170         under = &d->mdd_child->dd_lu_dev;
171         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
172         mdd_pdlock_init(lu2mdd_obj(o));
173         if (below == NULL)
174                 RETURN(-ENOMEM);
175
176         lu_object_add(o, below);
177         RETURN(0);
178 }
179
180 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
181 {
182         if (lu_object_exists(o))
183                 return mdd_get_flags(env, lu2mdd_obj(o));
184         else
185                 return 0;
186 }
187
188 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
189 {
190         struct mdd_object *mdd = lu2mdd_obj(o);
191         
192         lu_object_fini(o);
193         OBD_FREE_PTR(mdd);
194 }
195
196 static int mdd_object_print(const struct lu_env *env, void *cookie,
197                             lu_printer_t p, const struct lu_object *o)
198 {
199         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
200 }
201
202 /* orphan handling is here */
203 static void mdd_object_delete(const struct lu_env *env,
204                                struct lu_object *o)
205 {
206         struct mdd_object *mdd_obj = lu2mdd_obj(o);
207         struct thandle *handle = NULL;
208         ENTRY;
209
210         if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
211                 return;
212
213         if (mdd_obj->mod_flags & ORPHAN_OBJ) {
214                 mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev),
215                                     MDD_TXN_INDEX_DELETE_OP);
216                 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
217                 if (IS_ERR(handle))
218                         CERROR("Cannot get thandle\n");
219                 else {
220                         mdd_write_lock(env, mdd_obj);
221                         /* let's remove obj from the orphan list */
222                         __mdd_orphan_del(env, mdd_obj, handle);
223                         mdd_write_unlock(env, mdd_obj);
224                         mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
225                                        0, handle);
226                 }
227         }
228 }
229
230 static struct lu_object_operations mdd_lu_obj_ops = {
231         .loo_object_init    = mdd_object_init,
232         .loo_object_start   = mdd_object_start,
233         .loo_object_free    = mdd_object_free,
234         .loo_object_print   = mdd_object_print,
235         .loo_object_delete  = mdd_object_delete
236 };
237
238 struct mdd_object *mdd_object_find(const struct lu_env *env,
239                                    struct mdd_device *d,
240                                    const struct lu_fid *f)
241 {
242         struct lu_object *o, *lo;
243         struct mdd_object *m;
244         ENTRY;
245
246         o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
247         if (IS_ERR(o))
248                 m = (struct mdd_object *)o;
249         else {
250                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
251                 /* remote object can't be located and should be put then */
252                 if (lo == NULL)
253                         lu_object_put(env, o);
254                 m = lu2mdd_obj(lo);
255         }
256         RETURN(m);
257 }
258
259 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
260 {
261         struct lu_attr *la = &mdd_env_info(env)->mti_la;
262         int rc;
263
264         ENTRY;
265         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
266         if (rc == 0)
267                 mdd_flags_xlate(obj, la->la_flags);
268         RETURN(rc);
269 }
270
271 /* get only inode attributes */
272 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
273                   struct md_attr *ma)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (ma->ma_valid & MA_INODE)
279                 RETURN(0);
280
281         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
282                           mdd_object_capa(env, mdd_obj));
283         if (rc == 0)
284                 ma->ma_valid |= MA_INODE;
285         RETURN(rc);
286 }
287
288 static int mdd_get_default_md(struct mdd_object *mdd_obj,
289                 struct lov_mds_md *lmm, int *size)
290 {
291         struct lov_desc *ldesc;
292         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
293         ENTRY;
294
295         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
296         LASSERT(ldesc != NULL);
297
298         if (!lmm)
299                 RETURN(0);
300
301         lmm->lmm_magic = LOV_MAGIC_V1;
302         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
303         lmm->lmm_pattern = ldesc->ld_pattern;
304         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
305         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
306         *size = sizeof(struct lov_mds_md);
307
308         RETURN(sizeof(struct lov_mds_md));
309 }
310
311 /* get lov EA only */
312 static int __mdd_lmm_get(const struct lu_env *env,
313                          struct mdd_object *mdd_obj, struct md_attr *ma)
314 {
315         int rc;
316         ENTRY;
317
318         if (ma->ma_valid & MA_LOV)
319                 RETURN(0);
320
321         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
322                         MDS_LOV_MD_NAME);
323
324         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
325                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
326                                 &ma->ma_lmm_size);
327         }
328
329         if (rc > 0) {
330                 ma->ma_valid |= MA_LOV;
331                 rc = 0;
332         }
333         RETURN(rc);
334 }
335
336 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
337                        struct md_attr *ma)
338 {
339         int rc;
340         ENTRY;
341
342         mdd_read_lock(env, mdd_obj);
343         rc = __mdd_lmm_get(env, mdd_obj, ma);
344         mdd_read_unlock(env, mdd_obj);
345         RETURN(rc);
346 }
347
348 /* get lmv EA only*/
349 static int __mdd_lmv_get(const struct lu_env *env,
350                          struct mdd_object *mdd_obj, struct md_attr *ma)
351 {
352         int rc;
353         ENTRY;
354
355         if (ma->ma_valid & MA_LMV)
356                 RETURN(0);
357
358         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
359                         MDS_LMV_MD_NAME);
360         if (rc > 0) {
361                 ma->ma_valid |= MA_LMV;
362                 rc = 0;
363         }
364         RETURN(rc);
365 }
366
367 static int mdd_attr_get_internal(const struct lu_env *env,
368                                  struct mdd_object *mdd_obj,
369                                  struct md_attr *ma)
370 {
371         int rc = 0;
372         ENTRY;
373
374         if (ma->ma_need & MA_INODE)
375                 rc = mdd_iattr_get(env, mdd_obj, ma);
376
377         if (rc == 0 && ma->ma_need & MA_LOV) {
378                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
379                     S_ISDIR(mdd_object_type(mdd_obj)))
380                         rc = __mdd_lmm_get(env, mdd_obj, ma);
381         }
382         if (rc == 0 && ma->ma_need & MA_LMV) {
383                 if (S_ISDIR(mdd_object_type(mdd_obj)))
384                         rc = __mdd_lmv_get(env, mdd_obj, ma);
385         }
386 #ifdef CONFIG_FS_POSIX_ACL
387         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
388                 if (S_ISDIR(mdd_object_type(mdd_obj)))
389                         rc = mdd_acl_def_get(env, mdd_obj, ma);
390         }
391 #endif
392         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
393                rc, ma->ma_valid);
394         RETURN(rc);
395 }
396
397 int mdd_attr_get_internal_locked(const struct lu_env *env,
398                                  struct mdd_object *mdd_obj, struct md_attr *ma)
399 {
400         int rc;
401         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
402
403         if (needlock)
404                 mdd_read_lock(env, mdd_obj);
405         rc = mdd_attr_get_internal(env, mdd_obj, ma);
406         if (needlock)
407                 mdd_read_unlock(env, mdd_obj);
408         return rc;
409 }
410
411 /*
412  * No permission check is needed.
413  */
414 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
415                         struct md_attr *ma)
416 {
417         struct mdd_object *mdd_obj = md2mdd_obj(obj);
418         int                rc;
419
420         ENTRY;
421         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
422         RETURN(rc);
423 }
424
425 /*
426  * No permission check is needed.
427  */
428 static int mdd_xattr_get(const struct lu_env *env,
429                          struct md_object *obj, struct lu_buf *buf,
430                          const char *name)
431 {
432         struct mdd_object *mdd_obj = md2mdd_obj(obj);
433         int rc;
434
435         ENTRY;
436
437         LASSERT(mdd_object_exists(mdd_obj));
438
439         mdd_read_lock(env, mdd_obj);
440         rc = mdo_xattr_get(env, mdd_obj, buf, name,
441                            mdd_object_capa(env, mdd_obj));
442         mdd_read_unlock(env, mdd_obj);
443
444         RETURN(rc);
445 }
446
447 /*
448  * Permission check is done when open,
449  * no need check again.
450  */
451 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
452                         struct lu_buf *buf)
453 {
454         struct mdd_object *mdd_obj = md2mdd_obj(obj);
455         struct dt_object  *next;
456         loff_t             pos = 0;
457         int                rc;
458         ENTRY;
459
460         LASSERT(mdd_object_exists(mdd_obj));
461
462         next = mdd_object_child(mdd_obj);
463         mdd_read_lock(env, mdd_obj);
464         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
465                                          mdd_object_capa(env, mdd_obj));
466         mdd_read_unlock(env, mdd_obj);
467         RETURN(rc);
468 }
469
470 /*
471  * No permission check is needed.
472  */
473 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
474                           struct lu_buf *buf)
475 {
476         struct mdd_object *mdd_obj = md2mdd_obj(obj);
477         int rc;
478
479         ENTRY;
480
481         mdd_read_lock(env, mdd_obj);
482         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
483         mdd_read_unlock(env, mdd_obj);
484
485         RETURN(rc);
486 }
487
488 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
489                                struct mdd_object *c, struct md_attr *ma,
490                                struct thandle *handle)
491 {
492         struct lu_attr *attr = &ma->ma_attr;
493         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
494         int rc;
495         ENTRY;
496
497         if (!mdd_object_exists(c)) {
498                 struct dt_object *next = mdd_object_child(c);
499                 LASSERT(next);
500
501                 /* @hint will be initialized by underlying device. */
502                 next->do_ops->do_ah_init(env, hint,
503                                          p ? mdd_object_child(p) : NULL,
504                                          attr->la_mode & S_IFMT);
505                 rc = mdo_create_obj(env, c, attr, hint, handle);
506                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
507         } else
508                 rc = -EEXIST;
509
510         RETURN(rc);
511 }
512
513
514 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
515                           const struct lu_attr *attr, struct thandle *handle,
516                           const int needacl)
517 {
518         int rc;
519         ENTRY;
520
521         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
522 #ifdef CONFIG_FS_POSIX_ACL
523         if (!rc && (attr->la_valid & LA_MODE) && needacl)
524                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
525 #endif
526         RETURN(rc);
527 }
528
529 int mdd_attr_set_internal_locked(const struct lu_env *env,
530                                  struct mdd_object *o,
531                                  const struct lu_attr *attr,
532                                  struct thandle *handle, int needacl)
533 {
534         int rc;
535         ENTRY;
536
537         needacl = needacl && (attr->la_valid & LA_MODE);
538
539         if (needacl)
540                 mdd_write_lock(env, o);
541
542         rc = mdd_attr_set_internal(env, o, attr, handle, needacl);
543
544         if (needacl)
545                 mdd_write_unlock(env, o);
546         RETURN(rc);
547 }
548
549 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
550                            const struct lu_buf *buf, const char *name,
551                            int fl, struct thandle *handle)
552 {
553         struct lustre_capa *capa = mdd_object_capa(env, obj);
554         int rc = -EINVAL;
555         ENTRY;
556
557         if (buf->lb_buf && buf->lb_len > 0)
558                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
559         else if (buf->lb_buf == NULL && buf->lb_len == 0)
560                 rc = mdo_xattr_del(env, obj, name, handle, capa);
561
562         RETURN(rc);
563 }
564
565 /*
566  * This gives the same functionality as the code between
567  * sys_chmod and inode_setattr
568  * chown_common and inode_setattr
569  * utimes and inode_setattr
570  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
571  */
572 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
573                         struct lu_attr *la, const struct md_attr *ma)
574 {
575         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
576         struct md_ucred  *uc         = md_ucred(env);
577         int               rc;
578         ENTRY;
579
580         if (!la->la_valid)
581                 RETURN(0);
582
583         /* Do not permit change file type */
584         if (la->la_valid & LA_TYPE)
585                 RETURN(-EPERM);
586
587         /* They should not be processed by setattr */
588         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
589                 RETURN(-EPERM);
590
591         /* This is only for set ctime when rename's source is on remote MDS. */
592         if (unlikely(la->la_valid == LA_CTIME)) {
593                 rc = mdd_may_delete(env, NULL, obj, (struct md_attr *)ma, 1, 0);
594                 RETURN(rc);
595         }
596
597         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
598         if (rc)
599                 RETURN(rc);
600
601         if (la->la_valid == LA_ATIME) {
602                 /* This is atime only set for read atime update on close. */
603                 if (la->la_atime <= tmp_la->la_atime + 0/*XXX:mds_atime_diff*/)
604                         la->la_valid &= ~LA_ATIME;
605                 RETURN(0);
606         }
607  
608         /* Check if flags change. */
609         if (la->la_valid & LA_FLAGS) {
610                 unsigned int oldflags = 0;
611                 unsigned int newflags = la->la_flags &
612                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
613
614                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
615                     !mdd_capable(uc, CAP_FOWNER))
616                         RETURN(-EPERM);
617
618                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
619                  * only be changed by the relevant capability. */
620                 if (mdd_is_immutable(obj))
621                         oldflags |= LUSTRE_IMMUTABLE_FL;
622                 if (mdd_is_append(obj))
623                         oldflags |= LUSTRE_APPEND_FL; 
624                 if ((oldflags ^ newflags) &&
625                     !mdd_capable(uc, CAP_LINUX_IMMUTABLE))
626                         RETURN(-EPERM);
627
628                 if (!S_ISDIR(tmp_la->la_mode))
629                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
630         }
631
632         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
633             (la->la_valid & ~LA_FLAGS) &&
634             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
635                 RETURN(-EPERM);
636
637         /* Check for setting the obj time. */
638         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
639             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
640                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
641                     !mdd_capable(uc, CAP_FOWNER)) {
642                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
643                                                             MAY_WRITE);
644                         if (rc)
645                                 RETURN(rc);
646                 }
647         }
648
649         /* Make sure a caller can chmod. */
650         if (la->la_valid & LA_MODE) {
651                 /*
652                  * Bypass la_vaild == LA_MODE,
653                  * this is for changing file with SUID or SGID.
654                  */
655                 if ((la->la_valid & ~LA_MODE) &&
656                     (uc->mu_fsuid != tmp_la->la_uid) &&
657                     !mdd_capable(uc, CAP_FOWNER))
658                         RETURN(-EPERM);
659
660                 if (la->la_mode == (umode_t) -1)
661                         la->la_mode = tmp_la->la_mode;
662                 else
663                         la->la_mode = (la->la_mode & S_IALLUGO) |
664                                       (tmp_la->la_mode & ~S_IALLUGO);
665
666                 /* Also check the setgid bit! */
667                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
668                                 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
669                         la->la_mode &= ~S_ISGID;
670         } else {
671                la->la_mode = tmp_la->la_mode;
672         }
673
674         /* Make sure a caller can chown. */
675         if (la->la_valid & LA_UID) {
676                 if (la->la_uid == (uid_t) -1)
677                         la->la_uid = tmp_la->la_uid;
678                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
679                     (la->la_uid != tmp_la->la_uid)) &&
680                     !mdd_capable(uc, CAP_CHOWN))
681                         RETURN(-EPERM);
682
683                 /*
684                  * If the user or group of a non-directory has been
685                  * changed by a non-root user, remove the setuid bit.
686                  * 19981026 David C Niemi <niemi@tux.org>
687                  *
688                  * Changed this to apply to all users, including root,
689                  * to avoid some races. This is the behavior we had in
690                  * 2.0. The check for non-root was definitely wrong
691                  * for 2.2 anyway, as it should have been using
692                  * CAP_FSETID rather than fsuid -- 19990830 SD.
693                  */
694                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
695                     !S_ISDIR(tmp_la->la_mode)) {
696                         la->la_mode &= ~S_ISUID;
697                         la->la_valid |= LA_MODE;
698                 }
699         }
700
701         /* Make sure caller can chgrp. */
702         if (la->la_valid & LA_GID) {
703                 if (la->la_gid == (gid_t) -1)
704                         la->la_gid = tmp_la->la_gid;
705                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
706                     ((la->la_gid != tmp_la->la_gid) &&
707                     !lustre_in_group_p(uc, la->la_gid))) &&
708                     !mdd_capable(uc, CAP_CHOWN))
709                         RETURN(-EPERM);
710
711                 /*
712                  * Likewise, if the user or group of a non-directory
713                  * has been changed by a non-root user, remove the
714                  * setgid bit UNLESS there is no group execute bit
715                  * (this would be a file marked for mandatory
716                  * locking).  19981026 David C Niemi <niemi@tux.org>
717                  *
718                  * Removed the fsuid check (see the comment above) --
719                  * 19990830 SD.
720                  */
721                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
722                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
723                         la->la_mode &= ~S_ISGID;
724                         la->la_valid |= LA_MODE;
725                 }
726         }
727
728         /* For truncate (or setsize), we should have MAY_WRITE perm */
729         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
730                 if (!((la->la_valid & MDS_OPEN_OWNEROVERRIDE) &&
731                     (uc->mu_fsuid == tmp_la->la_uid)) &&
732                     !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
733                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
734                                                             MAY_WRITE);
735                         if (rc)
736                                 RETURN(rc);
737                 }
738
739                 /* For the "Size-on-MDS" setattr update, merge coming
740                  * attributes with the set in the inode. BUG 10641 */
741                 if ((la->la_valid & LA_ATIME) &&
742                     (la->la_atime <= tmp_la->la_atime))
743                         la->la_valid &= ~LA_ATIME;
744
745                 /* OST attributes do not have a priority over MDS attributes,
746                  * so drop times if ctime is equal. */
747                 if ((la->la_valid & LA_CTIME) &&
748                     (la->la_ctime <= tmp_la->la_ctime))
749                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
750         } else if (la->la_valid & LA_CTIME) {
751                 /* The pure setattr, it has the priority over what is already
752                  * set, do not drop it if ctime is equal. */
753                 if (la->la_ctime < tmp_la->la_ctime)
754                         la->la_valid &= ~(LA_ATIME | LA_MTIME | LA_CTIME);
755         }
756
757         RETURN(0);
758 }
759
760 /* set attr and LOV EA at once, return updated attr */
761 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
762                         const struct md_attr *ma)
763 {
764         struct mdd_object *mdd_obj = md2mdd_obj(obj);
765         struct mdd_device *mdd = mdo2mdd(obj);
766         struct thandle *handle;
767         struct lov_mds_md *lmm = NULL;
768         struct llog_cookie *logcookies = NULL;
769         int  rc, lmm_size = 0, cookie_size = 0;
770         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
771         ENTRY;
772
773         mdd_txn_param_build(env, mdd, MDD_TXN_ATTR_SET_OP);
774         handle = mdd_trans_start(env, mdd);
775         if (IS_ERR(handle))
776                 RETURN(PTR_ERR(handle));
777         /*TODO: add lock here*/
778         /* start a log jounal handle if needed */
779         if (S_ISREG(mdd_object_type(mdd_obj)) &&
780             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
781                 lmm_size = mdd_lov_mdsize(env, mdd);
782                 lmm = mdd_max_lmm_get(env, mdd);
783                 if (lmm == NULL)
784                         GOTO(cleanup, rc = -ENOMEM);
785
786                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
787                                 MDS_LOV_MD_NAME);
788
789                 if (rc < 0)
790                         GOTO(cleanup, rc);
791         }
792
793         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
794                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
795                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
796
797         *la_copy = ma->ma_attr;
798         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
799         if (rc)
800                 GOTO(cleanup, rc);
801
802         if (la_copy->la_valid & LA_FLAGS) {
803                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
804                                                   handle, 1);
805                 if (rc == 0)
806                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
807         } else if (la_copy->la_valid) {            /* setattr */
808                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
809                                                   handle, 1);
810                 /* journal chown/chgrp in llog, just like unlink */
811                 if (rc == 0 && lmm_size){
812                         cookie_size = mdd_lov_cookiesize(env, mdd);
813                         logcookies = mdd_max_cookie_get(env, mdd);
814                         if (logcookies == NULL)
815                                 GOTO(cleanup, rc = -ENOMEM);
816
817                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
818                                             logcookies, cookie_size) <= 0)
819                                 logcookies = NULL;
820                 }
821         }
822
823         if (rc == 0 && ma->ma_valid & MA_LOV) {
824                 umode_t mode;
825
826                 mode = mdd_object_type(mdd_obj);
827                 if (S_ISREG(mode) || S_ISDIR(mode)) {
828                         rc = mdd_lsm_sanity_check(env, mdd_obj);
829                         if (rc)
830                                 GOTO(cleanup, rc);
831
832                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
833                                             ma->ma_lmm_size, handle, 1);
834                 }
835
836         }
837 cleanup:
838         mdd_trans_stop(env, mdd, rc, handle);
839         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
840                 /*set obd attr, if needed*/
841                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
842                                            logcookies);
843         }
844         RETURN(rc);
845 }
846
847 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
848                       const struct lu_buf *buf, const char *name, int fl,
849                       struct thandle *handle)
850 {
851         int  rc;
852         ENTRY;
853
854         mdd_write_lock(env, obj);
855         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
856         mdd_write_unlock(env, obj);
857
858         RETURN(rc);
859 }
860
861 static int mdd_xattr_sanity_check(const struct lu_env *env,
862                                   struct mdd_object *obj)
863 {
864         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
865         struct md_ucred *uc     = md_ucred(env);
866         int rc;
867         ENTRY;
868
869         if (mdd_is_immutable(obj) || mdd_is_append(obj))
870                 RETURN(-EPERM);
871
872         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
873         if (rc)
874                 RETURN(rc);
875
876         if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
877                 RETURN(-EPERM);
878
879         RETURN(rc);
880 }
881
882 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
883                          const struct lu_buf *buf, const char *name, int fl)
884 {
885         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
886         struct mdd_object *mdd_obj = md2mdd_obj(obj);
887         struct mdd_device *mdd = mdo2mdd(obj);
888         struct thandle *handle;
889         int  rc;
890         ENTRY;
891
892         rc = mdd_xattr_sanity_check(env, mdd_obj);
893         if (rc)
894                 RETURN(rc);
895
896         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
897         handle = mdd_trans_start(env, mdd);
898         if (IS_ERR(handle))
899                 RETURN(PTR_ERR(handle));
900
901         rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, name,
902                                fl, handle);
903         if (rc == 0) {
904                 la_copy->la_ctime = CURRENT_SECONDS;
905                 la_copy->la_valid = LA_CTIME;
906                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
907                                                   handle, 0);
908         }
909         mdd_trans_stop(env, mdd, rc, handle);
910
911         RETURN(rc);
912 }
913
914 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
915                   const char *name)
916 {
917         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
918         struct mdd_object *mdd_obj = md2mdd_obj(obj);
919         struct mdd_device *mdd = mdo2mdd(obj);
920         struct thandle *handle;
921         int  rc;
922         ENTRY;
923
924         rc = mdd_xattr_sanity_check(env, mdd_obj);
925         if (rc)
926                 RETURN(rc);
927
928         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
929         handle = mdd_trans_start(env, mdd);
930         if (IS_ERR(handle))
931                 RETURN(PTR_ERR(handle));
932
933         mdd_write_lock(env, mdd_obj);
934         rc = mdo_xattr_del(env, md2mdd_obj(obj), name, handle,
935                            mdd_object_capa(env, mdd_obj));
936         mdd_write_unlock(env, mdd_obj);
937         if (rc == 0) {
938                 la_copy->la_ctime = CURRENT_SECONDS;
939                 la_copy->la_valid = LA_CTIME;
940                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
941                                                   handle, 0);
942         }
943
944         mdd_trans_stop(env, mdd, rc, handle);
945
946         RETURN(rc);
947 }
948
949 /* partial unlink */
950 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
951                        struct md_attr *ma)
952 {
953         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
954         struct mdd_object *mdd_obj = md2mdd_obj(obj);
955         struct mdd_device *mdd = mdo2mdd(obj);
956         struct thandle *handle;
957         int rc;
958         ENTRY;
959
960         /*
961          * Check -ENOENT early here because we need to get object type
962          * to calculate credits before transaction start
963          */
964         if (!mdd_object_exists(mdd_obj))
965                 RETURN(-ENOENT);
966
967         LASSERT(mdd_object_exists(mdd_obj) > 0);
968
969         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
970         if (rc)
971                 RETURN(rc);
972
973         handle = mdd_trans_start(env, mdd);
974         if (IS_ERR(handle))
975                 RETURN(-ENOMEM);
976
977         mdd_write_lock(env, mdd_obj);
978
979         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
980         if (rc)
981                 GOTO(cleanup, rc);
982
983         mdo_ref_del(env, mdd_obj, handle);
984
985         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
986                 /* unlink dot */
987                 mdo_ref_del(env, mdd_obj, handle);
988         }
989
990         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
991         la_copy->la_ctime = ma->ma_attr.la_ctime;
992
993         la_copy->la_valid = LA_CTIME;
994         rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 0);
995         if (rc)
996                 GOTO(cleanup, rc);
997
998         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
999
1000         EXIT;
1001 cleanup:
1002         mdd_write_unlock(env, mdd_obj);
1003         mdd_trans_stop(env, mdd, rc, handle);
1004         return rc;
1005 }
1006
1007 /* partial operation */
1008 static int mdd_oc_sanity_check(const struct lu_env *env,
1009                                struct mdd_object *obj,
1010                                struct md_attr *ma)
1011 {
1012         int rc;
1013         ENTRY;
1014
1015         switch (ma->ma_attr.la_mode & S_IFMT) {
1016         case S_IFREG:
1017         case S_IFDIR:
1018         case S_IFLNK:
1019         case S_IFCHR:
1020         case S_IFBLK:
1021         case S_IFIFO:
1022         case S_IFSOCK:
1023                 rc = 0;
1024                 break;
1025         default:
1026                 rc = -EINVAL;
1027                 break;
1028         }
1029         RETURN(rc);
1030 }
1031
1032 static int mdd_object_create(const struct lu_env *env,
1033                              struct md_object *obj,
1034                              const struct md_op_spec *spec,
1035                              struct md_attr *ma)
1036 {
1037
1038         struct mdd_device *mdd = mdo2mdd(obj);
1039         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1040         const struct lu_fid *pfid = spec->u.sp_pfid;
1041         struct thandle *handle;
1042         int rc;
1043         ENTRY;
1044
1045         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1046         handle = mdd_trans_start(env, mdd);
1047         if (IS_ERR(handle))
1048                 RETURN(PTR_ERR(handle));
1049
1050         mdd_write_lock(env, mdd_obj);
1051         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1052         if (rc)
1053                 GOTO(unlock, rc);
1054
1055         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle);
1056         if (rc)
1057                 GOTO(unlock, rc);
1058
1059         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1060                 /* If creating the slave object, set slave EA here. */
1061                 int lmv_size = spec->u.sp_ea.eadatalen;
1062                 struct lmv_stripe_md *lmv;
1063
1064                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1065                 LASSERT(lmv != NULL && lmv_size > 0);
1066
1067                 rc = __mdd_xattr_set(env, mdd_obj,
1068                                      mdd_buf_get_const(env, lmv, lmv_size),
1069                                      MDS_LMV_MD_NAME, 0, handle);
1070                 if (rc)
1071                         GOTO(unlock, rc);
1072
1073                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0);
1074         } else {
1075 #ifdef CONFIG_FS_POSIX_ACL
1076                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1077                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1078
1079                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1080                         buf->lb_len = spec->u.sp_ea.eadatalen;
1081                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1082                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1083                                                     &ma->ma_attr.la_mode,
1084                                                     handle);
1085                                 if (rc)
1086                                         GOTO(unlock, rc);
1087                                 else
1088                                         ma->ma_attr.la_valid |= LA_MODE;
1089                         }
1090
1091                         pfid = spec->u.sp_ea.fid;
1092                 }
1093 #endif
1094                 rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
1095         }
1096         EXIT;
1097 unlock:
1098         mdd_write_unlock(env, mdd_obj);
1099         if (rc == 0)
1100                 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
1101
1102         mdd_trans_stop(env, mdd, rc, handle);
1103         return rc;
1104 }
1105
1106 /* partial link */
1107 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1108                        const struct md_attr *ma)
1109 {
1110         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1111         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1112         struct mdd_device *mdd = mdo2mdd(obj);
1113         struct thandle *handle;
1114         int rc;
1115         ENTRY;
1116
1117         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1118         handle = mdd_trans_start(env, mdd);
1119         if (IS_ERR(handle))
1120                 RETURN(-ENOMEM);
1121
1122         mdd_write_lock(env, mdd_obj);
1123         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1124         if (rc == 0)
1125                 mdo_ref_add(env, mdd_obj, handle);
1126         mdd_write_unlock(env, mdd_obj);
1127         if (rc == 0) {
1128                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1129                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1130
1131                 la_copy->la_valid = LA_CTIME;
1132                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1133                                                   handle, 0);
1134         }
1135         mdd_trans_stop(env, mdd, 0, handle);
1136
1137         RETURN(rc);
1138 }
1139
1140 /*
1141  * do NOT or the MAY_*'s, you'll get the weakest
1142  */
1143 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1144 {
1145         int res = 0;
1146
1147         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1148          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1149          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1150          * owner can write to a file even if it is marked readonly to hide
1151          * its brokenness. (bug 5781) */
1152         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1153                 struct md_ucred *uc = md_ucred(env);
1154
1155                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1156                     (la->la_uid == uc->mu_fsuid))
1157                         return 0;
1158         }
1159
1160         if (flags & FMODE_READ)
1161                 res |= MAY_READ;
1162         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1163                 res |= MAY_WRITE;
1164         if (flags & MDS_FMODE_EXEC)
1165                 res |= MAY_EXEC;
1166         return res;
1167 }
1168
1169 static int mdd_open_sanity_check(const struct lu_env *env,
1170                                  struct mdd_object *obj, int flag)
1171 {
1172         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1173         int mode, rc;
1174         ENTRY;
1175
1176         /* EEXIST check */
1177         if (mdd_is_dead_obj(obj))
1178                 RETURN(-ENOENT);
1179
1180         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1181         if (rc)
1182                RETURN(rc);
1183
1184         if (S_ISLNK(tmp_la->la_mode))
1185                 RETURN(-ELOOP);
1186
1187         mode = accmode(env, tmp_la, flag);
1188
1189         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1190                 RETURN(-EISDIR);
1191
1192         if (!(flag & MDS_OPEN_CREATED)) {
1193                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1194                 if (rc)
1195                         RETURN(rc);
1196         }
1197
1198         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1199             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1200                 flag &= ~MDS_OPEN_TRUNC;
1201
1202         /* For writing append-only file must open it with append mode. */
1203         if (mdd_is_append(obj)) {
1204                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1205                         RETURN(-EPERM);
1206                 if (flag & MDS_OPEN_TRUNC)
1207                         RETURN(-EPERM);
1208         }
1209
1210 #if 0
1211         /*
1212          * Now, flag -- O_NOATIME does not be packed by client.
1213          */
1214         if (flag & O_NOATIME) {
1215                 struct md_ucred *uc = md_ucred(env);
1216
1217                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1218                     (uc->mu_valid == UCRED_NEW)) &&
1219                     (uc->mu_fsuid != tmp_la->la_uid) &&
1220                     !mdd_capable(uc, CAP_FOWNER))
1221                         RETURN(-EPERM);
1222         }
1223 #endif
1224
1225         RETURN(0);
1226 }
1227
1228 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1229                     int flags)
1230 {
1231         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1232         int rc = 0;
1233
1234         mdd_write_lock(env, mdd_obj);
1235
1236         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1237         if (rc == 0)
1238                 mdd_obj->mod_count++;
1239
1240         mdd_write_unlock(env, mdd_obj);
1241         return rc;
1242 }
1243
1244 /* return md_attr back,
1245  * if it is last unlink then return lov ea + llog cookie*/
1246 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1247                     struct md_attr *ma)
1248 {
1249         int rc = 0;
1250         ENTRY;
1251
1252         if (S_ISREG(mdd_object_type(obj))) {
1253                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1254                  * Caller must be ready for that. */
1255                 rc = __mdd_lmm_get(env, obj, ma);
1256                 if ((ma->ma_valid & MA_LOV))
1257                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1258                                             obj, ma);
1259         }
1260         RETURN(rc);
1261 }
1262
1263 /*
1264  * No permission check is needed.
1265  */
1266 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1267                      struct md_attr *ma)
1268 {
1269         int rc;
1270         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1271         struct thandle    *handle;
1272         ENTRY;
1273
1274         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1275         if (rc)
1276                 RETURN(rc);
1277         handle = mdd_trans_start(env, mdo2mdd(obj));
1278         if (IS_ERR(handle))
1279                 RETURN(PTR_ERR(handle));
1280
1281         mdd_write_lock(env, mdd_obj);
1282         /* release open count */
1283         mdd_obj->mod_count --;
1284
1285         rc = mdd_iattr_get(env, mdd_obj, ma);
1286         if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0)
1287                 rc = mdd_object_kill(env, mdd_obj, ma);
1288         else
1289                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1290         
1291         mdd_write_unlock(env, mdd_obj);
1292         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1293         RETURN(rc);
1294 }
1295
1296 /*
1297  * Permission check is done when open,
1298  * no need check again.
1299  */
1300 static int mdd_readpage_sanity_check(const struct lu_env *env,
1301                                      struct mdd_object *obj)
1302 {
1303         struct dt_object *next = mdd_object_child(obj);
1304         int rc;
1305         ENTRY;
1306
1307         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1308                 rc = 0;
1309         else
1310                 rc = -ENOTDIR;
1311
1312         RETURN(rc);
1313 }
1314
1315 static int mdd_dir_page_build(const struct lu_env *env, int first,
1316                               void *area, int nob, struct dt_it_ops *iops,
1317                               struct dt_it *it, __u32 *start, __u32 *end,
1318                               struct lu_dirent **last)
1319 {
1320         struct lu_fid          *fid  = &mdd_env_info(env)->mti_fid2;
1321         struct mdd_thread_info *info = mdd_env_info(env);
1322         struct lu_fid_pack     *pack = &info->mti_pack;
1323         int                     result;
1324         struct lu_dirent       *ent;
1325
1326         if (first) {
1327                 memset(area, 0, sizeof (struct lu_dirpage));
1328                 area += sizeof (struct lu_dirpage);
1329                 nob  -= sizeof (struct lu_dirpage);
1330         }
1331
1332         LASSERT(nob > sizeof *ent);
1333
1334         ent  = area;
1335         result = 0;
1336         do {
1337                 char  *name;
1338                 int    len;
1339                 int    recsize;
1340                 __u32  hash;
1341
1342                 name = (char *)iops->key(env, it);
1343                 len  = iops->key_size(env, it);
1344
1345                 pack = (struct lu_fid_pack *)iops->rec(env, it);
1346                 result = fid_unpack(pack, fid);
1347                 if (result != 0)
1348                         break;
1349
1350                 recsize = (sizeof(*ent) + len + 3) & ~3;
1351                 hash = iops->store(env, it);
1352                 *end = hash;
1353
1354                 CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n",
1355                        name, ent, nob, PFID(fid), hash, len, len, len, name);
1356
1357                 if (nob >= recsize) {
1358                         ent->lde_fid = *fid;
1359                         fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
1360                         ent->lde_hash = hash;
1361                         ent->lde_namelen = cpu_to_le16(len);
1362                         ent->lde_reclen  = cpu_to_le16(recsize);
1363                         memcpy(ent->lde_name, name, len);
1364                         if (first && ent == area)
1365                                 *start = hash;
1366                         *last = ent;
1367                         ent = (void *)ent + recsize;
1368                         nob -= recsize;
1369                         result = iops->next(env, it);
1370                 } else {
1371                         /*
1372                          * record doesn't fit into page, enlarge previous one.
1373                          */
1374                         LASSERT(*last != NULL);
1375                         (*last)->lde_reclen =
1376                                 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1377                                             nob);
1378                         break;
1379                 }
1380         } while (result == 0);
1381
1382         return result;
1383 }
1384
1385 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
1386                           const struct lu_rdpg *rdpg)
1387 {
1388         struct dt_it      *it;
1389         struct dt_object  *next = mdd_object_child(obj);
1390         struct dt_it_ops  *iops;
1391         struct page       *pg;
1392         struct lu_dirent  *last;
1393         int i;
1394         int rc;
1395         int nob;
1396         __u32 hash_start;
1397         __u32 hash_end;
1398
1399         LASSERT(rdpg->rp_pages != NULL);
1400         LASSERT(next->do_index_ops != NULL);
1401
1402         if (rdpg->rp_count <= 0)
1403                 return -EFAULT;
1404
1405         /*
1406          * iterate through directory and fill pages from @rdpg
1407          */
1408         iops = &next->do_index_ops->dio_it;
1409         it = iops->init(env, next, 0, mdd_object_capa(env, obj));
1410         if (IS_ERR(it))
1411                 return PTR_ERR(it);
1412
1413         rc = iops->load(env, it, rdpg->rp_hash);
1414
1415         if (rc == 0)
1416                 /*
1417                  * Iterator didn't find record with exactly the key requested.
1418                  *
1419                  * It is currently either
1420                  *
1421                  *     - positioned above record with key less than
1422                  *     requested---skip it.
1423                  *
1424                  *     - or not positioned at all (is in IAM_IT_SKEWED
1425                  *     state)---position it on the next item.
1426                  */
1427                 rc = iops->next(env, it);
1428         else if (rc > 0)
1429                 rc = 0;
1430
1431         /*
1432          * At this point and across for-loop:
1433          *
1434          *  rc == 0 -> ok, proceed.
1435          *  rc >  0 -> end of directory.
1436          *  rc <  0 -> error.
1437          */
1438         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
1439              i++, nob -= CFS_PAGE_SIZE) {
1440                 LASSERT(i < rdpg->rp_npages);
1441                 pg = rdpg->rp_pages[i];
1442                 rc = mdd_dir_page_build(env, !i, kmap(pg),
1443                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
1444                                         it, &hash_start, &hash_end, &last);
1445                 if (rc != 0 || i == rdpg->rp_npages - 1)
1446                         last->lde_reclen = 0;
1447                 kunmap(pg);
1448         }
1449         if (rc > 0) {
1450                 /*
1451                  * end of directory.
1452                  */
1453                 hash_end = DIR_END_OFF;
1454                 rc = 0;
1455         }
1456         if (rc == 0) {
1457                 struct lu_dirpage *dp;
1458
1459                 dp = kmap(rdpg->rp_pages[0]);
1460                 dp->ldp_hash_start = rdpg->rp_hash;
1461                 dp->ldp_hash_end   = hash_end;
1462                 if (i == 0)
1463                         /*
1464                          * No pages were processed, mark this.
1465                          */
1466                         dp->ldp_flags |= LDF_EMPTY;
1467                 dp->ldp_flags = cpu_to_le16(dp->ldp_flags);
1468                 kunmap(rdpg->rp_pages[0]);
1469         }
1470         iops->put(env, it);
1471         iops->fini(env, it);
1472
1473         return rc;
1474 }
1475
1476 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1477                         const struct lu_rdpg *rdpg)
1478 {
1479         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1480         int rc;
1481         ENTRY;
1482
1483         LASSERT(mdd_object_exists(mdd_obj));
1484
1485         mdd_read_lock(env, mdd_obj);
1486         rc = mdd_readpage_sanity_check(env, mdd_obj);
1487         if (rc)
1488                 GOTO(out_unlock, rc);
1489
1490         if (mdd_is_dead_obj(mdd_obj)) {
1491                 struct page *pg;
1492                 struct lu_dirpage *dp;
1493
1494                 /*
1495                  * According to POSIX, please do not return any entry to client:
1496                  * even dot and dotdot should not be returned.
1497                  */
1498                 CWARN("readdir from dead object: "DFID"\n",
1499                         PFID(mdd_object_fid(mdd_obj)));
1500
1501                 if (rdpg->rp_count <= 0)
1502                         GOTO(out_unlock, rc = -EFAULT);
1503                 LASSERT(rdpg->rp_pages != NULL);
1504
1505                 pg = rdpg->rp_pages[0];
1506                 dp = (struct lu_dirpage*)kmap(pg);
1507                 memset(dp, 0 , sizeof(struct lu_dirpage));
1508                 dp->ldp_hash_start = rdpg->rp_hash;
1509                 dp->ldp_hash_end   = DIR_END_OFF;
1510                 dp->ldp_flags |= LDF_EMPTY;
1511                 dp->ldp_flags = cpu_to_le16(dp->ldp_flags);
1512                 kunmap(pg);
1513                 GOTO(out_unlock, rc = 0);
1514         }
1515
1516         rc = __mdd_readpage(env, mdd_obj, rdpg);
1517
1518         EXIT;
1519 out_unlock:
1520         mdd_read_unlock(env, mdd_obj);
1521         return rc;
1522 }
1523
1524 struct md_object_operations mdd_obj_ops = {
1525         .moo_permission    = mdd_permission,
1526         .moo_attr_get      = mdd_attr_get,
1527         .moo_attr_set      = mdd_attr_set,
1528         .moo_xattr_get     = mdd_xattr_get,
1529         .moo_xattr_set     = mdd_xattr_set,
1530         .moo_xattr_list    = mdd_xattr_list,
1531         .moo_xattr_del     = mdd_xattr_del,
1532         .moo_object_create = mdd_object_create,
1533         .moo_ref_add       = mdd_ref_add,
1534         .moo_ref_del       = mdd_ref_del,
1535         .moo_open          = mdd_open,
1536         .moo_close         = mdd_close,
1537         .moo_readpage      = mdd_readpage,
1538         .moo_readlink      = mdd_readlink,
1539         .moo_capa_get      = mdd_capa_get
1540 };