Whamcloud - gitweb
LU-1518 mdd: Fixup mdd_{obf,dot_lustre}_obj_ops.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 4882609..a8dbadb 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -28,6 +26,8 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Author: Wang Di <wangdi@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs_jbd2.h>
-#else
-#include <linux/jbd.h>
-#endif
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
+#include <obd_lov.h>
 
 #include <lustre_param.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs.h>
-#else
-#include <linux/ldiskfs_fs.h>
-#endif
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
@@ -78,8 +66,11 @@ static int mdd_xattr_get(const struct lu_env *env,
 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
                  void **data)
 {
-        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
-                 PFID(mdd_object_fid(obj)));
+        if (mdd_object_exists(obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
+                return -ENOENT;
+        }
         mdo_data_get(env, obj, data);
         return 0;
 }
@@ -87,8 +78,11 @@ int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa)
 {
-        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
-                 PFID(mdd_object_fid(obj)));
+        if (mdd_object_exists(obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
+                return -ENOENT;
+        }
         return mdo_attr_get(env, obj, la, capa);
 }
 
@@ -126,10 +120,7 @@ void mdd_buf_put(struct lu_buf *buf)
 {
         if (buf == NULL || buf->lb_buf == NULL)
                 return;
-        if (buf->lb_vmalloc)
-                OBD_VFREE(buf->lb_buf, buf->lb_len);
-        else
-                OBD_FREE(buf->lb_buf, buf->lb_len);
+        OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
         buf->lb_buf = NULL;
         buf->lb_len = 0;
 }
@@ -145,28 +136,17 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
         return buf;
 }
 
-#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
 {
         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
 
         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-                if (buf->lb_vmalloc)
-                        OBD_VFREE(buf->lb_buf, buf->lb_len);
-                else
-                        OBD_FREE(buf->lb_buf, buf->lb_len);
+                OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
                 buf->lb_buf = NULL;
         }
         if (buf->lb_buf == NULL) {
                 buf->lb_len = len;
-                if (buf->lb_len <= BUF_VMALLOC_SIZE) {
-                        OBD_ALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 0;
-                }
-                if (buf->lb_buf == NULL) {
-                        OBD_VMALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 1;
-                }
+                OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
                 if (buf->lb_buf == NULL)
                         buf->lb_len = 0;
         }
@@ -184,23 +164,15 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len)
         struct lu_buf buf;
 
         LASSERT(len >= oldbuf->lb_len);
-        if (len > BUF_VMALLOC_SIZE) {
-                OBD_VMALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 1;
-        } else {
-                OBD_ALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 0;
-        }
+        OBD_ALLOC_LARGE(buf.lb_buf, len);
+
         if (buf.lb_buf == NULL)
                 return -ENOMEM;
 
         buf.lb_len = len;
         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
 
-        if (oldbuf->lb_vmalloc)
-                OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
-        else
-                OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+        OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
 
         memcpy(oldbuf, &buf, sizeof(buf));
 
@@ -216,12 +188,13 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
         max_cookie_size = mdd_lov_cookiesize(env, mdd);
         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
                 if (mti->mti_max_cookie)
-                        OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
+                        OBD_FREE_LARGE(mti->mti_max_cookie,
+                                       mti->mti_max_cookie_size);
                 mti->mti_max_cookie = NULL;
                 mti->mti_max_cookie_size = 0;
         }
         if (unlikely(mti->mti_max_cookie == NULL)) {
-                OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
+                OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
                 if (likely(mti->mti_max_cookie != NULL))
                         mti->mti_max_cookie_size = max_cookie_size;
         }
@@ -230,27 +203,37 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
         return mti->mti_max_cookie;
 }
 
-struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
-                                   struct mdd_device *mdd)
+struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
 {
         struct mdd_thread_info *mti = mdd_env_info(env);
-        int                     max_lmm_size;
 
-        max_lmm_size = mdd_lov_mdsize(env, mdd);
-        if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
-                if (mti->mti_max_lmm)
-                        OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
-                mti->mti_max_lmm = NULL;
-                mti->mti_max_lmm_size = 0;
-        }
-        if (unlikely(mti->mti_max_lmm == NULL)) {
-                OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
+        if (unlikely(mti->mti_max_lmm_size < size)) {
+                int rsize = size_roundup_power2(size);
+
+                if (mti->mti_max_lmm_size > 0) {
+                        LASSERT(mti->mti_max_lmm);
+                        OBD_FREE_LARGE(mti->mti_max_lmm,
+                                       mti->mti_max_lmm_size);
+                        mti->mti_max_lmm = NULL;
+                        mti->mti_max_lmm_size = 0;
+                }
+
+                OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
                 if (likely(mti->mti_max_lmm != NULL))
-                        mti->mti_max_lmm_size = max_lmm_size;
+                        mti->mti_max_lmm_size = rsize;
         }
         return mti->mti_max_lmm;
 }
 
+struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
+                                   struct mdd_device *mdd)
+{
+        int max_lmm_size;
+
+        max_lmm_size = mdd_lov_mdsize(env, mdd);
+        return mdd_max_lmm_buffer(env, max_lmm_size);
+}
+
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
@@ -372,7 +355,7 @@ static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
                 if (obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(obj))
-                        GOTO(out, rc = -PTR_ERR(obj));
+                        GOTO(out, rc = PTR_ERR(obj));
                 /* get child fid from parent and name */
                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
                 mdd_object_put(env, obj);
@@ -435,7 +418,7 @@ static int mdd_path_current(const struct lu_env *env,
                 if (mdd_obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(mdd_obj))
-                        GOTO(out, rc = -PTR_ERR(mdd_obj));
+                        GOTO(out, rc = PTR_ERR(mdd_obj));
                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
                 if (rc <= 0) {
                         mdd_object_put(env, mdd_obj);
@@ -509,7 +492,7 @@ static int mdd_path_current(const struct lu_env *env,
 
         EXIT;
 out:
-        if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+        if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
                 /* if we vmalloced a large buffer drop it */
                 mdd_buf_put(buf);
 
@@ -583,8 +566,6 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
         if (rc == 0) {
                 mdd_flags_xlate(obj, la->la_flags);
-                if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
-                        obj->mod_flags |= MNLINK_OBJ;
         }
         RETURN(rc);
 }
@@ -620,7 +601,7 @@ int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
         LASSERT(ldesc != NULL);
 
         lum->lmm_magic = LOV_MAGIC_V1;
-        lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
+        lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
         lum->lmm_pattern = ldesc->ld_pattern;
         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
@@ -629,6 +610,56 @@ int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
         RETURN(sizeof(*lum));
 }
 
+static int is_rootdir(struct mdd_object *mdd_obj)
+{
+        const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
+        const struct lu_fid *fid = mdo2fid(mdd_obj);
+
+        return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
+}
+
+int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
+                    struct md_attr *ma)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       int                     size;
+       int                     rc   = -EINVAL;
+       ENTRY;
+
+       LASSERT(info != NULL);
+       LASSERT(ma->ma_big_lmm_used == 0);
+
+       if (ma->ma_lmm_size == 0) {
+               CERROR("No buffer to hold %s xattr of object "DFID"\n",
+                      XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
+               RETURN(rc);
+       }
+
+        rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
+                           mdd_object_capa(env, obj));
+        if (rc < 0)
+                RETURN(rc);
+
+        /* big_lmm may need to grow */
+        size = rc;
+        mdd_max_lmm_buffer(env, size);
+        if (info->mti_max_lmm == NULL)
+                RETURN(-ENOMEM);
+
+        LASSERT(info->mti_max_lmm_size >= size);
+        rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
+                        XATTR_NAME_LOV);
+        if (rc < 0)
+                RETURN(rc);
+
+        ma->ma_big_lmm_used = 1;
+        ma->ma_valid |= MA_LOV;
+        ma->ma_lmm = info->mti_max_lmm;
+        ma->ma_lmm_size = size;
+        LASSERT(size == rc);
+        RETURN(rc);
+}
+
 /* get lov EA only */
 static int __mdd_lmm_get(const struct lu_env *env,
                          struct mdd_object *mdd_obj, struct md_attr *ma)
@@ -641,16 +672,48 @@ static int __mdd_lmm_get(const struct lu_env *env,
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
                         XATTR_NAME_LOV);
-        if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
+        if (rc == -ERANGE)
+                rc = mdd_big_lmm_get(env, mdd_obj, ma);
+        else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
+
         if (rc > 0) {
                 ma->ma_lmm_size = rc;
-                ma->ma_valid |= MA_LOV;
+                ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
+                ma->ma_valid |= MA_LOV | MA_LAY_GEN;
                 rc = 0;
         }
         RETURN(rc);
 }
 
+/* get the first parent fid from link EA */
+static int mdd_pfid_get(const struct lu_env *env,
+                        struct mdd_object *mdd_obj, struct md_attr *ma)
+{
+        struct lu_buf *buf;
+        struct link_ea_header *leh;
+        struct link_ea_entry *lee;
+        struct lu_fid *pfid = &ma->ma_pfid;
+        ENTRY;
+
+        if (ma->ma_valid & MA_PFID)
+                RETURN(0);
+
+        buf = mdd_links_get(env, mdd_obj);
+        if (IS_ERR(buf))
+                RETURN(PTR_ERR(buf));
+
+        leh = buf->lb_buf;
+        lee = (struct link_ea_entry *)(leh + 1);
+        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
+        fid_be_to_cpu(pfid, pfid);
+        ma->ma_valid |= MA_PFID;
+        if (buf->lb_len > OBD_ALLOC_BIG)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+        RETURN(0);
+}
+
 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
                        struct md_attr *ma)
 {
@@ -747,6 +810,10 @@ int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
                     S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmm_get(env, mdd_obj, ma);
         }
+        if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
+                if (S_ISREG(mdd_object_type(mdd_obj)))
+                        rc = mdd_pfid_get(env, mdd_obj, ma);
+        }
         if (rc == 0 && ma->ma_need & MA_LMV) {
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmv_get(env, mdd_obj, ma);
@@ -771,7 +838,7 @@ int mdd_attr_get_internal_locked(const struct lu_env *env,
 {
         int rc;
         int needlock = ma->ma_need &
-                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
+                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
 
         if (needlock)
                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -784,8 +851,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env,
 /*
  * No permission check is needed.
  */
-static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
-                        struct md_attr *ma)
+int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
+                 struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         int                rc;
@@ -807,7 +874,11 @@ static int mdd_xattr_get(const struct lu_env *env,
 
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_get(env, mdd_obj, buf, name,
@@ -830,7 +901,11 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
         int                rc;
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         next = mdd_object_child(mdd_obj);
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -858,6 +933,30 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         RETURN(rc);
 }
 
+int mdd_declare_object_create_internal(const struct lu_env *env,
+                                       struct mdd_object *p,
+                                       struct mdd_object *c,
+                                       struct md_attr *ma,
+                                       struct thandle *handle,
+                                       const struct md_op_spec *spec)
+{
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
+        int rc;
+        ENTRY;
+
+        if (feat != &dt_directory_features && feat != NULL)
+                dof->dof_type = DFT_INDEX;
+        else
+                dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
+
+        dof->u.dof_idx.di_feat = feat;
+
+        rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+
+        RETURN(rc);
+}
+
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
                                struct thandle *handle,
@@ -1056,9 +1155,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if (la->la_valid == LA_ATIME) {
                 /* This is atime only set for read atime update on close. */
-                if (la->la_atime > tmp_la->la_atime &&
-                    la->la_atime <= (tmp_la->la_atime +
-                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
+                if (la->la_atime >= tmp_la->la_atime &&
+                    la->la_atime < (tmp_la->la_atime +
+                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff))
                         la->la_valid &= ~LA_ATIME;
                 RETURN(0);
         }
@@ -1105,12 +1204,30 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 }
         }
 
+        if (la->la_valid & LA_KILL_SUID) {
+                la->la_valid &= ~LA_KILL_SUID;
+                if ((tmp_la->la_mode & S_ISUID) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
+                }
+                la->la_mode &= ~S_ISUID;
+        }
+
+        if (la->la_valid & LA_KILL_SGID) {
+                la->la_valid &= ~LA_KILL_SGID;
+                if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
+                                        (S_ISGID | S_IXGRP)) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
+                }
+                la->la_mode &= ~S_ISGID;
+        }
+
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
-                /* Bypass la_vaild == LA_MODE,
-                 * this is for changing file with SUID or SGID. */
-                if ((la->la_valid & ~LA_MODE) &&
-                    !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
+                if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
                     (uc->mu_fsuid != tmp_la->la_uid) &&
                     !mdd_capable(uc, CFS_CAP_FOWNER))
                         RETURN(-EPERM);
@@ -1237,6 +1354,7 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
 {
         const struct lu_fid *tfid = mdo2fid(mdd_obj);
         struct llog_changelog_rec *rec;
+        struct thandle *th = NULL;
         struct lu_buf *buf;
         int reclen;
         int rc;
@@ -1247,8 +1365,8 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
                 RETURN(0);
 
-        LASSERT(handle != NULL);
         LASSERT(mdd_obj != NULL);
+        LASSERT(handle != NULL);
 
         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
@@ -1270,7 +1388,11 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        rc = mdd_changelog_llog_write(mdd, rec, handle);
+        rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
+
+        if (th)
+                mdd_trans_stop(env, mdd, rc, th);
+
         if (rc < 0) {
                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
                        rc, type, PFID(tfid));
@@ -1289,14 +1411,22 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
         int rc;
         ENTRY;
 
-        handle = mdd_trans_start(env, mdd);
-
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 return(PTR_ERR(handle));
 
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
                                       handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1409,9 +1539,102 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
                                         md2mdd_obj(obj), handle);
 }
 
+static int mdd_declare_attr_set(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct md_attr *ma,
+                                struct lov_mds_md *lmm,
+                                struct thandle *handle)
+{
+        struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
+        int             rc, i;
+
+        rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                return rc;
+
+        if (ma->ma_valid & MA_LOV) {
+                buf->lb_buf = NULL;
+                buf->lb_len = ma->ma_lmm_size;
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+        if (ma->ma_valid & (MA_HSM | MA_SOM)) {
+                buf->lb_buf = NULL;
+                buf->lb_len = sizeof(struct lustre_mdt_attrs);
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+#ifdef CONFIG_FS_POSIX_ACL
+        if (ma->ma_attr.la_valid & LA_MODE) {
+                mdd_read_lock(env, obj, MOR_TGT_CHILD);
+                rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
+                                   BYPASS_CAPA);
+                mdd_read_unlock(env, obj);
+                if (rc == -EOPNOTSUPP || rc == -ENODATA)
+                        rc = 0;
+                else if (rc < 0)
+                        return rc;
+
+                if (rc != 0) {
+                        buf->lb_buf = NULL;
+                        buf->lb_len = rc;
+                        rc = mdo_declare_xattr_set(env, obj, buf,
+                                                   XATTR_NAME_ACL_ACCESS, 0,
+                                                   handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+#endif
+
+        /* basically the log is the same as in unlink case */
+        if (lmm) {
+                __u16 stripe;
+
+                if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
+                                le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
+                        CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+                               mdd->mdd_obd_dev->obd_name,
+                               le32_to_cpu(lmm->lmm_magic),
+                               PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+                        return -EINVAL;
+                }
+
+                stripe = le16_to_cpu(lmm->lmm_stripe_count);
+                if (stripe == LOV_ALL_STRIPES) {
+                        struct lov_desc *ldesc;
+
+                        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
+                        LASSERT(ldesc != NULL);
+                        stripe = ldesc->ld_tgt_count;
+                }
+
+                for (i = 0; i < stripe; i++) {
+                        rc = mdd_declare_llog_record(env, mdd,
+                                        sizeof(struct llog_unlink_rec),
+                                        handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+
+        return rc;
+}
+
 /* set attr and LOV EA at once, return updated attr */
-static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
-                        const struct md_attr *ma)
+int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
+                 const struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
@@ -1431,36 +1654,51 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
 #endif
         ENTRY;
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
-        /*TODO: add lock here*/
-        /* start a log jounal handle if needed */
+        *la_copy = ma->ma_attr;
+        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
+        if (rc != 0)
+                RETURN(rc);
+
+        /* setattr on "close" only change atime, or do nothing */
+        if (ma->ma_valid == MA_INODE &&
+            ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
+                RETURN(0);
+
         if (S_ISREG(mdd_object_type(mdd_obj)) &&
             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
                 lmm_size = mdd_lov_mdsize(env, mdd);
                 lmm = mdd_max_lmm_get(env, mdd);
                 if (lmm == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
+                        RETURN(-ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
                                 XATTR_NAME_LOV);
 
                 if (rc < 0)
-                        GOTO(cleanup, rc);
+                        RETURN(rc);
         }
 
+        handle = mdd_trans_create(env, mdd);
+        if (IS_ERR(handle))
+                RETURN(PTR_ERR(handle));
+
+        rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
+                                  lmm_size > 0 ? lmm : NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* permission changes may require sync operation */
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync |= !!mdd->mdd_sync_permission;
+
         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
 
-        *la_copy = ma->ma_attr;
-        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
-        if (rc)
-                GOTO(cleanup, rc);
-
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
                 struct obd_export *exp = md_quota(env)->mq_exp;
@@ -1536,6 +1774,7 @@ cleanup:
         if (rc == 0)
                 rc = mdd_attr_set_changelog(env, obj, handle,
                                             ma->ma_attr.la_valid);
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
@@ -1594,6 +1833,27 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_set(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const struct lu_buf *buf,
+                                 const char *name,
+                                 struct thandle *handle)
+
+{
+        int rc;
+
+        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1612,27 +1872,59 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
-            mdd->mdd_sync_permission == 1)
-                txn_param_sync(&mdd_env_info(env)->mti_param);
-
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+                handle->th_sync |= !!mdd->mdd_sync_permission;
+
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) == 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
+
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const char *name,
+                                 struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_xattr_del(env, obj, name, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1650,21 +1942,34 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) != 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1687,34 +1992,39 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
         /*
          * Check -ENOENT early here because we need to get object type
          * to calculate credits before transaction start
          */
-        if (!mdd_object_exists(mdd_obj))
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
                 RETURN(-ENOENT);
+        }
 
         LASSERT(mdd_object_exists(mdd_obj) > 0);
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
         if (rc)
                 GOTO(cleanup, rc);
 
-        __mdd_ref_del(env, mdd_obj, handle, 0);
+        mdo_ref_del(env, mdd_obj, handle);
 
         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
                 /* unlink dot */
-                __mdd_ref_del(env, mdd_obj, handle, 1);
+                mdo_ref_del(env, mdd_obj, handle);
         }
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -1796,6 +2106,10 @@ static int mdd_object_create(const struct lu_env *env,
         int rc = 0;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota) {
                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
@@ -1821,11 +2135,12 @@ static int mdd_object_create(const struct lu_env *env,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
         if (rc)
@@ -1908,15 +2223,20 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
-                __mdd_ref_add(env, mdd_obj, handle);
+                mdo_ref_add(env, mdd_obj, handle);
         mdd_write_unlock(env, mdd_obj);
         if (rc == 0) {
                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -1956,7 +2276,7 @@ int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
                 res |= MAY_WRITE;
         if (flags & MDS_FMODE_EXEC)
-                res |= MAY_EXEC;
+                res = MAY_EXEC;
         return res;
 }
 
@@ -2035,10 +2355,22 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
         return rc;
 }
 
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+                            struct md_attr *ma, struct thandle *handle)
+{
+        int rc;
+
+        rc = mdd_declare_unlink_log(env, obj, ma, handle);
+        if (rc)
+                return rc;
+
+        return mdo_declare_destroy(env, obj, handle);
+}
+
 /* return md_attr back,
  * if it is last unlink then return lov ea + llog cookie*/
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
+                    struct md_attr *ma, struct thandle *handle)
 {
         int rc = 0;
         ENTRY;
@@ -2046,26 +2378,43 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
         if (S_ISREG(mdd_object_type(obj))) {
                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
                  * Caller must be ready for that. */
-
                 rc = __mdd_lmm_get(env, obj, ma);
                 if ((ma->ma_valid & MA_LOV))
                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
                                             obj, ma);
         }
+
+        if (rc == 0)
+                rc = mdo_destroy(env, obj, handle);
+
         RETURN(rc);
 }
 
+static int mdd_declare_close(const struct lu_env *env,
+                             struct mdd_object *obj,
+                             struct md_attr *ma,
+                             struct thandle *handle)
+{
+        int rc;
+
+        rc = orph_declare_index_delete(env, obj, handle);
+        if (rc)
+                return rc;
+
+        return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
 /*
  * No permission check is needed.
  */
 static int mdd_close(const struct lu_env *env, struct md_object *obj,
-                     struct md_attr *ma)
+                     struct md_attr *ma, int mode)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle    *handle;
+        struct thandle    *handle = NULL;
         int rc;
-        int reset = 1;
+        int is_orphan = 0, reset = 1;
 
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
@@ -2075,24 +2424,55 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
 #endif
         ENTRY;
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
-        if (rc)
-                RETURN(rc);
-        handle = mdd_trans_start(env, mdo2mdd(obj));
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+        if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
+                mdd_obj->mod_count--;
+
+                if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
+                        CDEBUG(D_HA, "Object "DFID" is retained in orphan "
+                               "list\n", PFID(mdd_object_fid(mdd_obj)));
+                RETURN(0);
+        }
+
+        /* check without any lock */
+        if (mdd_obj->mod_count == 1 &&
+            (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
+ again:
+                handle = mdd_trans_create(env, mdo2mdd(obj));
+                if (IS_ERR(handle))
+                        RETURN(PTR_ERR(handle));
+
+                rc = mdd_declare_close(env, mdd_obj, ma, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                if (rc)
+                        GOTO(stop, rc);
+        }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+        if (handle == NULL && mdd_obj->mod_count == 1 &&
+            (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+                mdd_write_unlock(env, mdd_obj);
+                goto again;
+        }
+
         /* release open count */
         mdd_obj->mod_count --;
 
         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
                 /* remove link to object from orphan index */
+                LASSERT(handle != NULL);
                 rc = __mdd_orphan_del(env, mdd_obj, handle);
                 if (rc == 0) {
                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
                                "list, OSS objects to be destroyed.\n",
                                PFID(mdd_object_fid(mdd_obj)));
+                        is_orphan = 1;
                 } else {
                         CERROR("Object "DFID" can not be deleted from orphan "
                                 "list, maybe cause OST objects can not be "
@@ -2108,7 +2488,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         rc = mdd_iattr_get(env, mdd_obj, ma);
         /* Object maybe not in orphan list originally, it is rare case for
          * mdd_finish_unlink() failure. */
-        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
 #ifdef HAVE_QUOTA_SUPPORT
                 if (mds->mds_quota) {
                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
@@ -2120,9 +2500,29 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
                 } else {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
-                                if (rc == 0)
-                                        reset = 0;
+                        if (handle == NULL) {
+                                handle = mdd_trans_create(env, mdo2mdd(obj));
+                                if (IS_ERR(handle))
+                                        GOTO(out, rc = PTR_ERR(handle));
+
+                                rc = mdd_declare_object_kill(env, mdd_obj, ma,
+                                                             handle);
+                                if (rc)
+                                        GOTO(out, rc);
+
+                                rc = mdd_declare_changelog_store(env, mdd,
+                                                                 NULL, handle);
+                                if (rc)
+                                        GOTO(stop, rc);
+
+                                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                                if (rc)
+                                        GOTO(out, rc);
+                        }
+
+                        rc = mdd_object_kill(env, mdd_obj, ma, handle);
+                        if (rc == 0)
+                                reset = 0;
                 }
 
                 if (rc != 0)
@@ -2137,7 +2537,32 @@ out:
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         mdd_write_unlock(env, mdd_obj);
-        mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
+
+        if (rc == 0 &&
+            (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
+            !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
+                if (handle == NULL) {
+                        handle = mdd_trans_create(env, mdo2mdd(obj));
+                        if (IS_ERR(handle))
+                                GOTO(stop, rc = IS_ERR(handle));
+
+                        rc = mdd_declare_changelog_store(env, mdd, NULL,
+                                                         handle);
+                        if (rc)
+                                GOTO(stop, rc);
+
+                        rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                        if (rc)
+                                GOTO(stop, rc);
+                }
+
+                mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
+                                         mdd_obj, handle);
+        }
+
+stop:
+        if (handle != NULL)
+                mdd_trans_stop(env, mdd, rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
                 /* Trigger dqrel on the owner of child. If failed,
@@ -2168,27 +2593,27 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              int first, void *area, int nob,
+                              struct lu_dirpage *dp, int nob,
                               const struct dt_it_ops *iops, struct dt_it *it,
-                              __u64 *start, __u64 *end,
-                              struct lu_dirent **last, __u32 attr)
+                              __u32 attr)
 {
+        void                   *area = dp;
         int                     result;
         __u64                   hash = 0;
         struct lu_dirent       *ent;
+        struct lu_dirent       *last = NULL;
+        int                     first = 1;
 
-        if (first) {
-                memset(area, 0, sizeof (struct lu_dirpage));
-                area += sizeof (struct lu_dirpage);
-                nob  -= sizeof (struct lu_dirpage);
-        }
+        memset(area, 0, sizeof (*dp));
+        area += sizeof (*dp);
+        nob  -= sizeof (*dp);
 
         ent  = area;
         do {
                 int    len;
                 int    recsize;
 
-                len  = iops->key_size(env, it);
+                len = iops->key_size(env, it);
 
                 /* IAM iterator can return record with zero len. */
                 if (len == 0)
@@ -2197,14 +2622,14 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 hash = iops->store(env, it);
                 if (unlikely(first)) {
                         first = 0;
-                        *start = hash;
+                        dp->ldp_hash_start = cpu_to_le64(hash);
                 }
 
                 /* calculate max space required for lu_dirent */
                 recsize = lu_dirent_calc_size(len, attr);
 
                 if (nob >= recsize) {
-                        result = iops->rec(env, it, ent, attr);
+                        result = iops->rec(env, it, (struct dt_rec *)ent, attr);
                         if (result == -ESTALE)
                                 goto next;
                         if (result != 0)
@@ -2214,20 +2639,10 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
-                        /*
-                         * record doesn't fit into page, enlarge previous one.
-                         */
-                        if (*last) {
-                                (*last)->lde_reclen =
-                                        cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
-                                                        nob);
-                                result = 0;
-                        } else
-                                result = -EINVAL;
-
+                        result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
                 }
-                *last = ent;
+                last = ent;
                 ent = (void *)ent + recsize;
                 nob -= recsize;
 
@@ -2238,7 +2653,12 @@ next:
         } while (result == 0);
 
 out:
-        *end = hash;
+        dp->ldp_hash_end = cpu_to_le64(hash);
+        if (last != NULL) {
+                if (last->lde_hash == dp->ldp_hash_end)
+                        dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+                last->lde_reclen = 0; /* end mark */
+        }
         return result;
 }
 
@@ -2249,13 +2669,11 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
         struct dt_object  *next = mdd_object_child(obj);
         const struct dt_it_ops  *iops;
         struct page       *pg;
-        struct lu_dirent  *last = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
         int i;
+        int nlupgs = 0;
         int rc;
         int nob;
-        __u64 hash_start;
-        __u64 hash_end = 0;
 
         LASSERT(rdpg->rp_pages != NULL);
         LASSERT(next->do_index_ops != NULL);
@@ -2267,13 +2685,13 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          * iterate through directory and fill pages from @rdpg
          */
         iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, mdd_object_capa(env, obj));
+        it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
         if (IS_ERR(it))
                 return PTR_ERR(it);
 
         rc = iops->load(env, it, rdpg->rp_hash);
 
-        if (rc == 0){
+        if (rc == 0) {
                 /*
                  * Iterator didn't find record with exactly the key requested.
                  *
@@ -2298,39 +2716,51 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          */
         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
              i++, nob -= CFS_PAGE_SIZE) {
+                struct lu_dirpage *dp;
+
                 LASSERT(i < rdpg->rp_npages);
                 pg = rdpg->rp_pages[i];
-                rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
-                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
-                                        it, &hash_start, &hash_end, &last,
-                                        rdpg->rp_attrs);
-                if (rc != 0 || i == rdpg->rp_npages - 1) {
-                        if (last)
-                                last->lde_reclen = 0;
+                dp = cfs_kmap(pg);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+repeat:
+#endif
+                rc = mdd_dir_page_build(env, mdd, dp,
+                                        min_t(int, nob, LU_PAGE_SIZE),
+                                        iops, it, rdpg->rp_attrs);
+                if (rc > 0) {
+                        /*
+                         * end of directory.
+                         */
+                        dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+                        nlupgs++;
+                } else if (rc < 0) {
+                        CWARN("build page failed: %d!\n", rc);
+                } else {
+                        nlupgs++;
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                        if ((unsigned long)dp & ~CFS_PAGE_MASK)
+                                goto repeat;
+#endif
                 }
                 cfs_kunmap(pg);
         }
-        if (rc > 0) {
-                /*
-                 * end of directory.
-                 */
-                hash_end = DIR_END_OFF;
-                rc = 0;
-        }
-        if (rc == 0) {
+        if (rc >= 0) {
                 struct lu_dirpage *dp;
 
                 dp = cfs_kmap(rdpg->rp_pages[0]);
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(hash_end);
-                if (i == 0)
+                if (nlupgs == 0) {
                         /*
-                         * No pages were processed, mark this.
+                         * No pages were processed, mark this for first page
+                         * and send back.
                          */
-                        dp->ldp_flags |= LDF_EMPTY;
-
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                        dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
+                        nlupgs = 1;
+                }
                 cfs_kunmap(rdpg->rp_pages[0]);
+
+                rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
         }
         iops->put(env, it);
         iops->fini(env, it);
@@ -2345,7 +2775,11 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_readpage_sanity_check(env, mdd_obj);
@@ -2371,11 +2805,10 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 dp = (struct lu_dirpage*)cfs_kmap(pg);
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
-                dp->ldp_flags |= LDF_EMPTY;
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
+                dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                 cfs_kunmap(pg);
-                GOTO(out_unlock, rc = 0);
+                GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
         rc = __mdd_readpage(env, mdd_obj, rdpg);
@@ -2389,29 +2822,13 @@ out_unlock:
 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct dt_object *next;
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        next = mdd_object_child(mdd_obj);
-        return next->do_ops->do_object_sync(env, next);
-}
-
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
-                                        struct md_object *obj)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_get(env, mdd_object_child(mdd_obj));
-}
 
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
-                            dt_obj_version_t version)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        do_version_set(env, mdd_object_child(mdd_obj), version);
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
+        return dt_object_sync(env, mdd_object_child(mdd_obj));
 }
 
 const struct md_object_operations mdd_obj_ops = {
@@ -2432,7 +2849,7 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_changelog     = mdd_changelog,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
-        .moo_version_get   = mdd_version_get,
-        .moo_version_set   = mdd_version_set,
         .moo_path          = mdd_path,
+        .moo_file_lock     = mdd_file_lock,
+        .moo_file_unlock   = mdd_file_unlock,
 };