Whamcloud - gitweb
LU-2056 mdd: create transaction before locking object
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index cb86461..c3f6958 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -40,9 +40,6 @@
  * Author: Wang Di <wangdi@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
@@ -61,6 +58,7 @@
 #include "mdd_internal.h"
 
 static const struct lu_object_operations mdd_lu_obj_ops;
+extern cfs_mem_cache_t *mdd_object_kmem;
 
 static int mdd_xattr_get(const struct lu_env *env,
                          struct md_object *obj, struct lu_buf *buf,
@@ -121,11 +119,10 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 
 void mdd_buf_put(struct lu_buf *buf)
 {
-        if (buf == NULL || buf->lb_buf == NULL)
-                return;
-        OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-        buf->lb_buf = NULL;
-        buf->lb_len = 0;
+       if (buf == NULL || buf->lb_buf == NULL)
+               return;
+       OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       *buf = LU_BUF_NULL;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
@@ -141,19 +138,19 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
 
 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
 {
-        struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
 
-        if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-                OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-                buf->lb_buf = NULL;
-        }
-        if (buf->lb_buf == NULL) {
-                buf->lb_len = len;
-                OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
-                if (buf->lb_buf == NULL)
-                        buf->lb_len = 0;
-        }
-        return buf;
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+               *buf = LU_BUF_NULL;
+       }
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               buf->lb_len = len;
+               OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
+               if (buf->lb_buf == NULL)
+                       *buf = LU_BUF_NULL;
+       }
+       return buf;
 }
 
 /** Increase the size of the \a mti_big_buf.
@@ -182,68 +179,13 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len)
         return 0;
 }
 
-struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
-                                       struct mdd_device *mdd)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-        int                     max_cookie_size;
-
-        max_cookie_size = mdd_lov_cookiesize(env, mdd);
-        if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
-                if (mti->mti_max_cookie)
-                        OBD_FREE_LARGE(mti->mti_max_cookie,
-                                       mti->mti_max_cookie_size);
-                mti->mti_max_cookie = NULL;
-                mti->mti_max_cookie_size = 0;
-        }
-        if (unlikely(mti->mti_max_cookie == NULL)) {
-                OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
-                if (likely(mti->mti_max_cookie != NULL))
-                        mti->mti_max_cookie_size = max_cookie_size;
-        }
-        if (likely(mti->mti_max_cookie != NULL))
-                memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
-        return mti->mti_max_cookie;
-}
-
-struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-
-        if (unlikely(mti->mti_max_lmm_size < size)) {
-                int rsize = size_roundup_power2(size);
-
-                if (mti->mti_max_lmm_size > 0) {
-                        LASSERT(mti->mti_max_lmm);
-                        OBD_FREE_LARGE(mti->mti_max_lmm,
-                                       mti->mti_max_lmm_size);
-                        mti->mti_max_lmm = NULL;
-                        mti->mti_max_lmm_size = 0;
-                }
-
-                OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
-                if (likely(mti->mti_max_lmm != NULL))
-                        mti->mti_max_lmm_size = rsize;
-        }
-        return mti->mti_max_lmm;
-}
-
-struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
-                                   struct mdd_device *mdd)
-{
-        int max_lmm_size;
-
-        max_lmm_size = mdd_lov_mdsize(env, mdd);
-        return mdd_max_lmm_buffer(env, max_lmm_size);
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
 {
         struct mdd_object *mdd_obj;
 
-        OBD_ALLOC_PTR(mdd_obj);
+       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
         if (mdd_obj != NULL) {
                 struct lu_object *o;
 
@@ -293,7 +235,7 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
         struct mdd_object *mdd = lu2mdd_obj(o);
 
         lu_object_fini(o);
-        OBD_FREE_PTR(mdd);
+       OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
 }
 
 static int mdd_object_print(const struct lu_env *env, void *cookie,
@@ -476,9 +418,9 @@ static int mdd_path_current(const struct lu_env *env,
         /* Verify that our path hasn't changed since we started the lookup.
            Record the current index, and verify the path resolves to the
            same fid. If it does, then the path is correct as of this index. */
-        cfs_spin_lock(&mdd->mdd_cl.mc_lock);
-        pli->pli_currec = mdd->mdd_cl.mc_index;
-        cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       pli->pli_currec = mdd->mdd_cl.mc_index;
+       spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -573,290 +515,17 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
         RETURN(rc);
 }
 
-/* get only inode attributes */
-int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct md_attr *ma)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (ma->ma_valid & MA_INODE)
-                RETURN(0);
-
-        rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                          mdd_object_capa(env, mdd_obj));
-        if (rc == 0)
-                ma->ma_valid |= MA_INODE;
-        RETURN(rc);
-}
-
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
-{
-        struct lov_desc *ldesc;
-        struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
-        struct lov_user_md *lum = (struct lov_user_md*)lmm;
-        ENTRY;
-
-        if (!lum)
-                RETURN(0);
-
-        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
-        LASSERT(ldesc != NULL);
-
-        lum->lmm_magic = LOV_MAGIC_V1;
-        lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
-        lum->lmm_pattern = ldesc->ld_pattern;
-        lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
-        lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
-        lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
-
-        RETURN(sizeof(*lum));
-}
-
-static int is_rootdir(struct mdd_object *mdd_obj)
-{
-        const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
-        const struct lu_fid *fid = mdo2fid(mdd_obj);
-
-        return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
-}
-
-int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
-{
-        struct mdd_thread_info *info = mdd_env_info(env);
-        int size;
-        int rc;
-        ENTRY;
-
-        LASSERT(info != NULL);
-        LASSERT(ma->ma_lmm_size > 0);
-        LASSERT(ma->ma_big_lmm_used == 0);
-
-        rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
-                           mdd_object_capa(env, obj));
-        if (rc < 0)
-                RETURN(rc);
-
-        /* big_lmm may need to grow */
-        size = rc;
-        mdd_max_lmm_buffer(env, size);
-        if (info->mti_max_lmm == NULL)
-                RETURN(-ENOMEM);
-
-        LASSERT(info->mti_max_lmm_size >= size);
-        rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
-                        XATTR_NAME_LOV);
-        if (rc < 0)
-                RETURN(rc);
-
-        ma->ma_big_lmm_used = 1;
-        ma->ma_valid |= MA_LOV;
-        ma->ma_lmm = info->mti_max_lmm;
-        ma->ma_lmm_size = size;
-        LASSERT(size == rc);
-        RETURN(rc);
-}
-
-/* get lov EA only */
-static int __mdd_lmm_get(const struct lu_env *env,
-                         struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        if (ma->ma_valid & MA_LOV)
-                RETURN(0);
-
-        rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
-                        XATTR_NAME_LOV);
-        if (rc == -ERANGE)
-                rc = mdd_big_lmm_get(env, mdd_obj, ma);
-        else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
-                rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
-
-        if (rc > 0) {
-                ma->ma_lmm_size = rc;
-                ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
-                ma->ma_valid |= MA_LOV | MA_LAY_GEN;
-                rc = 0;
-        }
-        RETURN(rc);
-}
-
-/* get the first parent fid from link EA */
-static int mdd_pfid_get(const struct lu_env *env,
-                        struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        struct lu_buf *buf;
-        struct link_ea_header *leh;
-        struct link_ea_entry *lee;
-        struct lu_fid *pfid = &ma->ma_pfid;
-        ENTRY;
-
-        if (ma->ma_valid & MA_PFID)
-                RETURN(0);
-
-        buf = mdd_links_get(env, mdd_obj);
-        if (IS_ERR(buf))
-                RETURN(PTR_ERR(buf));
-
-        leh = buf->lb_buf;
-        lee = (struct link_ea_entry *)(leh + 1);
-        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
-        fid_be_to_cpu(pfid, pfid);
-        ma->ma_valid |= MA_PFID;
-        if (buf->lb_len > OBD_ALLOC_BIG)
-                /* if we vmalloced a large buffer drop it */
-                mdd_buf_put(buf);
-        RETURN(0);
-}
-
-int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lmm_get(env, mdd_obj, ma);
-        mdd_read_unlock(env, mdd_obj);
-        RETURN(rc);
-}
-
-/* get lmv EA only*/
-static int __mdd_lmv_get(const struct lu_env *env,
-                         struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        if (ma->ma_valid & MA_LMV)
-                RETURN(0);
-
-        rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
-                        XATTR_NAME_LMV);
-        if (rc > 0) {
-                ma->ma_valid |= MA_LMV;
-                rc = 0;
-        }
-        RETURN(rc);
-}
-
-static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct md_attr *ma)
-{
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lustre_mdt_attrs *lma =
-                                 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
-        int lma_size;
-        int rc;
-        ENTRY;
-
-        /* If all needed data are already valid, nothing to do */
-        if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
-            (ma->ma_need & (MA_HSM | MA_SOM)))
-                RETURN(0);
-
-        /* Read LMA from disk EA */
-        lma_size = sizeof(info->mti_xattr_buf);
-        rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
-        if (rc <= 0)
-                RETURN(rc);
-
-        /* Useless to check LMA incompatibility because this is already done in
-         * osd_ea_fid_get(), and this will fail long before this code is
-         * called.
-         * So, if we are here, LMA is compatible.
-         */
-
-        lustre_lma_swab(lma);
-
-        /* Swab and copy LMA */
-        if (ma->ma_need & MA_HSM) {
-                if (lma->lma_compat & LMAC_HSM)
-                        ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
-                else
-                        ma->ma_hsm.mh_flags = 0;
-                ma->ma_valid |= MA_HSM;
-        }
-
-        /* Copy SOM */
-        if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
-                LASSERT(ma->ma_som != NULL);
-                ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
-                ma->ma_som->msd_size    = lma->lma_som_size;
-                ma->ma_som->msd_blocks  = lma->lma_som_blocks;
-                ma->ma_som->msd_mountid = lma->lma_som_mountid;
-                ma->ma_valid |= MA_SOM;
-        }
-
-        RETURN(0);
-}
-
-int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
-                                 struct md_attr *ma)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (ma->ma_need & MA_INODE)
-                rc = mdd_iattr_get(env, mdd_obj, ma);
-
-        if (rc == 0 && ma->ma_need & MA_LOV) {
-                if (S_ISREG(mdd_object_type(mdd_obj)) ||
-                    S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lmm_get(env, mdd_obj, ma);
-        }
-        if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
-                if (S_ISREG(mdd_object_type(mdd_obj)))
-                        rc = mdd_pfid_get(env, mdd_obj, ma);
-        }
-        if (rc == 0 && ma->ma_need & MA_LMV) {
-                if (S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lmv_get(env, mdd_obj, ma);
-        }
-        if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
-                if (S_ISREG(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lma_get(env, mdd_obj, ma);
-        }
-#ifdef CONFIG_FS_POSIX_ACL
-        if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
-                if (S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = mdd_def_acl_get(env, mdd_obj, ma);
-        }
-#endif
-        CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
-               rc, ma->ma_valid, ma->ma_lmm);
-        RETURN(rc);
-}
-
-int mdd_attr_get_internal_locked(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        int needlock = ma->ma_need &
-                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
-
-        if (needlock)
-                mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_attr_get_internal(env, mdd_obj, ma);
-        if (needlock)
-                mdd_read_unlock(env, mdd_obj);
-        return rc;
-}
-
 /*
  * No permission check is needed.
  */
-static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
-                        struct md_attr *ma)
+int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
+                struct md_attr *ma)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        int                rc;
-
+       int rc;
         ENTRY;
-        rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
+
+       return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
+                         mdd_object_capa(env, md2mdd_obj(obj)));
         RETURN(rc);
 }
 
@@ -932,63 +601,56 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
 }
 
 int mdd_declare_object_create_internal(const struct lu_env *env,
-                                       struct mdd_object *p,
-                                       struct mdd_object *c,
-                                       struct md_attr *ma,
-                                       struct thandle *handle,
-                                       const struct md_op_spec *spec)
+                                      struct mdd_object *p,
+                                      struct mdd_object *c,
+                                      struct lu_attr *attr,
+                                      struct thandle *handle,
+                                      const struct md_op_spec *spec)
 {
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
         const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
-        if (feat != &dt_directory_features && feat != NULL)
+       if (feat != &dt_directory_features && feat != NULL) {
                 dof->dof_type = DFT_INDEX;
-        else
-                dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
-
-        dof->u.dof_idx.di_feat = feat;
-
-        rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+               dof->u.dof_idx.di_feat = feat;
+
+       } else {
+               dof->dof_type = dt_mode_to_dft(attr->la_mode);
+               if (dof->dof_type == DFT_REGULAR) {
+                       dof->u.dof_reg.striped =
+                               md_should_create(spec->sp_cr_flags);
+                       if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
+                               dof->u.dof_reg.striped = 0;
+                       /* is this replay? */
+                       if (spec->no_create)
+                               dof->u.dof_reg.striped = 0;
+               }
+       }
+
+       rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
 
         RETURN(rc);
 }
 
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
-                               struct mdd_object *c, struct md_attr *ma,
+                              struct mdd_object *c, struct lu_attr *attr,
                                struct thandle *handle,
                                const struct md_op_spec *spec)
 {
-        struct lu_attr *attr = &ma->ma_attr;
         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
-        const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
-        if (!mdd_object_exists(c)) {
-                struct dt_object *next = mdd_object_child(c);
-                LASSERT(next);
-
-                if (feat != &dt_directory_features && feat != NULL)
-                        dof->dof_type = DFT_INDEX;
-                else
-                        dof->dof_type = dt_mode_to_dft(attr->la_mode);
-
-                dof->u.dof_idx.di_feat = feat;
+       LASSERT(!mdd_object_exists(c));
 
-                /* @hint will be initialized by underlying device. */
-                next->do_ops->do_ah_init(env, hint,
-                                         p ? mdd_object_child(p) : NULL,
-                                         attr->la_mode & S_IFMT);
+       rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-                rc = mdo_create_obj(env, c, attr, hint, dof, handle);
-                LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-        } else
-                rc = -EEXIST;
+       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /**
@@ -1016,11 +678,9 @@ static inline int mdd_attr_check(const struct lu_env *env,
         RETURN(0);
 }
 
-int mdd_attr_set_internal(const struct lu_env *env,
-                          struct mdd_object *obj,
-                          struct lu_attr *attr,
-                          struct thandle *handle,
-                          int needacl)
+int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
+                         struct lu_attr *attr, struct thandle *handle,
+                         int needacl)
 {
         int rc;
         ENTRY;
@@ -1034,10 +694,8 @@ int mdd_attr_set_internal(const struct lu_env *env,
 }
 
 int mdd_attr_check_set_internal(const struct lu_env *env,
-                                struct mdd_object *obj,
-                                struct lu_attr *attr,
-                                struct thandle *handle,
-                                int needacl)
+                               struct mdd_object *obj, struct lu_attr *attr,
+                               struct thandle *handle, int needacl)
 {
         int rc;
         ENTRY;
@@ -1051,58 +709,6 @@ int mdd_attr_check_set_internal(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_attr_set_internal_locked(const struct lu_env *env,
-                                        struct mdd_object *obj,
-                                        struct lu_attr *attr,
-                                        struct thandle *handle,
-                                        int needacl)
-{
-        int rc;
-        ENTRY;
-
-        needacl = needacl && (attr->la_valid & LA_MODE);
-        if (needacl)
-                mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
-        if (needacl)
-                mdd_write_unlock(env, obj);
-        RETURN(rc);
-}
-
-int mdd_attr_check_set_internal_locked(const struct lu_env *env,
-                                       struct mdd_object *obj,
-                                       struct lu_attr *attr,
-                                       struct thandle *handle,
-                                       int needacl)
-{
-        int rc;
-        ENTRY;
-
-        needacl = needacl && (attr->la_valid & LA_MODE);
-        if (needacl)
-                mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
-        if (needacl)
-                mdd_write_unlock(env, obj);
-        RETURN(rc);
-}
-
-int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
-                    const struct lu_buf *buf, const char *name,
-                    int fl, struct thandle *handle)
-{
-        struct lustre_capa *capa = mdd_object_capa(env, obj);
-        int rc = -EINVAL;
-        ENTRY;
-
-        if (buf->lb_buf && buf->lb_len > 0)
-                rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
-        else if (buf->lb_buf == NULL && buf->lb_len == 0)
-                rc = mdo_xattr_del(env, obj, name, handle, capa);
-
-        RETURN(rc);
-}
-
 /*
  * This gives the same functionality as the code between
  * sys_chmod and inode_setattr
@@ -1111,10 +717,10 @@ int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
  */
 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
-                        struct lu_attr *la, const struct md_attr *ma)
+                       struct lu_attr *la, const unsigned long flags)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc;
+       struct lu_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -1129,23 +735,21 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
-        /* export destroy does not have ->le_ses, but we may want
-         * to drop LUSTRE_SOM_FL. */
-        if (!env->le_ses)
-                RETURN(0);
-
-        uc = md_ucred(env);
+       /* export destroy does not have ->le_ses, but we may want
+        * to drop LUSTRE_SOM_FL. */
+       uc = lu_ucred_check(env);
+       if (uc == NULL)
+               RETURN(0);
 
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
 
         if (la->la_valid == LA_CTIME) {
-                if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
+               if (!(flags & MDS_PERM_BYPASS))
                         /* This is only for set ctime when rename's source is
                          * on remote MDS. */
-                        rc = mdd_may_delete(env, NULL, obj,
-                                            (struct md_attr *)ma, 1, 0);
+                       rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
                         la->la_valid &= ~LA_CTIME;
                 RETURN(rc);
@@ -1166,9 +770,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 unsigned int newflags = la->la_flags &
                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
                  * only be changed by the relevant capability. */
@@ -1186,21 +790,20 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
             (la->la_valid & ~LA_FLAGS) &&
-            !(ma->ma_attr_flags & MDS_PERM_BYPASS))
+           !(flags & MDS_PERM_BYPASS))
                 RETURN(-EPERM);
 
-        /* Check for setting the obj time. */
-        if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
-            !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER)) {
-                        rc = mdd_permission_internal_locked(env, obj, tmp_la,
-                                                            MAY_WRITE,
-                                                            MOR_TGT_CHILD);
-                        if (rc)
-                                RETURN(rc);
-                }
-        }
+       /* Check for setting the obj time. */
+       if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
+           !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER)) {
+                       rc = mdd_permission_internal(env, obj, tmp_la,
+                                                    MAY_WRITE);
+                       if (rc)
+                               RETURN(rc);
+               }
+       }
 
         if (la->la_valid & LA_KILL_SUID) {
                 la->la_valid &= ~LA_KILL_SUID;
@@ -1225,10 +828,10 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
-                if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (!(flags & MDS_PERM_BYPASS) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 if (la->la_mode == (cfs_umode_t) -1)
                         la->la_mode = tmp_la->la_mode;
@@ -1249,10 +852,10 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_UID) {
                 if (la->la_uid == (uid_t) -1)
                         la->la_uid = tmp_la->la_uid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    (la->la_uid != tmp_la->la_uid)) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    (la->la_uid != tmp_la->la_uid)) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* If the user or group of a non-directory has been
                  * changed by a non-root user, remove the setuid bit.
@@ -1274,11 +877,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_GID) {
                 if (la->la_gid == (gid_t) -1)
                         la->la_gid = tmp_la->la_gid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    ((la->la_gid != tmp_la->la_gid) &&
-                    !lustre_in_group_p(uc, la->la_gid))) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    ((la->la_gid != tmp_la->la_gid) &&
+                     !lustre_in_group_p(uc, la->la_gid))) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* Likewise, if the user or group of a non-directory
                  * has been changed by a non-root user, remove the
@@ -1297,11 +900,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         /* For both Size-on-MDS case and truncate case,
          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
-         * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
+        * We distinguish them by "flags & MDS_SOM".
          * For SOM case, it is true, the MAY_WRITE perm has been checked
          * when open, no need check again. For truncate case, it is false,
          * the MAY_WRITE perm should be checked here. */
-        if (ma->ma_attr_flags & MDS_SOM) {
+       if (flags & MDS_SOM) {
                 /* For the "Size-on-MDS" setattr update, merge coming
                  * attributes with the set in the inode. BUG 10641 */
                 if ((la->la_valid & LA_ATIME) &&
@@ -1315,12 +918,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
         } else {
                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
-                        if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
-                              (uc->mu_fsuid == tmp_la->la_uid)) &&
-                            !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
-                                rc = mdd_permission_internal_locked(env, obj,
-                                                            tmp_la, MAY_WRITE,
-                                                            MOR_TGT_CHILD);
+                       if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
+                             (uc->uc_fsuid == tmp_la->la_uid)) &&
+                           !(flags & MDS_PERM_BYPASS)) {
+                               rc = mdd_permission_internal(env, obj,
+                                                            tmp_la, MAY_WRITE);
                                 if (rc)
                                         RETURN(rc);
                         }
@@ -1343,19 +945,17 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
  * \param mdd_obj - mdd_object of change
  * \param handle - transacion handle
  */
-static int mdd_changelog_data_store(const struct lu_env     *env,
-                                    struct mdd_device       *mdd,
-                                    enum changelog_rec_type type,
-                                    int                     flags,
-                                    struct mdd_object       *mdd_obj,
-                                    struct thandle          *handle)
-{
-        const struct lu_fid *tfid = mdo2fid(mdd_obj);
-        struct llog_changelog_rec *rec;
-        struct thandle *th = NULL;
-        struct lu_buf *buf;
-        int reclen;
-        int rc;
+static int mdd_changelog_data_store(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   enum changelog_rec_type type,
+                                   int flags, struct mdd_object *mdd_obj,
+                                   struct thandle *handle)
+{
+       const struct lu_fid             *tfid = mdo2fid(mdd_obj);
+       struct llog_changelog_rec       *rec;
+       struct lu_buf                   *buf;
+       int                              reclen;
+       int                              rc;
 
         /* Not recording */
         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
@@ -1378,7 +978,7 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         buf = mdd_buf_alloc(env, reclen);
         if (buf->lb_buf == NULL)
                 RETURN(-ENOMEM);
-        rec = (struct llog_changelog_rec *)buf->lb_buf;
+       rec = buf->lb_buf;
 
         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
         rec->cr.cr_type = (__u32)type;
@@ -1386,18 +986,9 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
+       rc = mdd_changelog_store(env, mdd, rec, handle);
 
-        if (th)
-                mdd_trans_stop(env, mdd, rc, th);
-
-        if (rc < 0) {
-                CERROR("changelog failed: rc=%d op%d t"DFID"\n",
-                       rc, type, PFID(tfid));
-                return -EFAULT;
-        }
-
-        return 0;
+       RETURN(rc);
 }
 
 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
@@ -1411,7 +1002,7 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
-                return(PTR_ERR(handle));
+               RETURN(PTR_ERR(handle));
 
         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
         if (rc)
@@ -1431,82 +1022,12 @@ stop:
 }
 
 /**
- * Should be called with write lock held.
- *
- * \see mdd_lma_set_locked().
- */
-static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       const struct md_attr *ma, struct thandle *handle)
-{
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_buf *buf;
-        struct lustre_mdt_attrs *lma =
-                                (struct lustre_mdt_attrs *) info->mti_xattr_buf;
-        int lmasize = sizeof(struct lustre_mdt_attrs);
-        int rc = 0;
-
-        ENTRY;
-
-        /* Either HSM or SOM part is not valid, we need to read it before */
-        if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
-                rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
-                if (rc <= 0)
-                        RETURN(rc);
-
-                lustre_lma_swab(lma);
-        } else {
-                memset(lma, 0, lmasize);
-        }
-
-        /* Copy HSM data */
-        if (ma->ma_valid & MA_HSM) {
-                lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
-                lma->lma_compat |= LMAC_HSM;
-        }
-
-        /* Copy SOM data */
-        if (ma->ma_valid & MA_SOM) {
-                LASSERT(ma->ma_som != NULL);
-                if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
-                        lma->lma_compat     &= ~LMAC_SOM;
-                } else {
-                        lma->lma_compat     |= LMAC_SOM;
-                        lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
-                        lma->lma_som_size    = ma->ma_som->msd_size;
-                        lma->lma_som_blocks  = ma->ma_som->msd_blocks;
-                        lma->lma_som_mountid = ma->ma_som->msd_mountid;
-                }
-        }
-
-        /* Copy FID */
-        memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
-
-        lustre_lma_swab(lma);
-        buf = mdd_buf_get(env, lma, lmasize);
-        rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
-
-        RETURN(rc);
-}
-
-/**
  * Save LMA extended attributes with data from \a ma.
  *
  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
  * not, LMA EA will be first read from disk, modified and write back.
  *
  */
-static int mdd_lma_set_locked(const struct lu_env *env,
-                              struct mdd_object *mdd_obj,
-                              const struct md_attr *ma, struct thandle *handle)
-{
-        int rc;
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lma_set(env, mdd_obj, ma, handle);
-        mdd_write_unlock(env, mdd_obj);
-        return rc;
-}
-
 /* Precedence for choosing record type when multiple
  * attributes change: setattr > mtime > ctime > atime
  * (ctime changes when mtime does, plus chmod/chown.
@@ -1540,44 +1061,20 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
 static int mdd_declare_attr_set(const struct lu_env *env,
                                 struct mdd_device *mdd,
                                 struct mdd_object *obj,
-                                const struct md_attr *ma,
-                                struct lov_mds_md *lmm,
+                               const struct lu_attr *attr,
                                 struct thandle *handle)
 {
-        struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
-        int             rc, i;
-
-        rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
-        if (rc)
-                return rc;
+       int rc;
 
-        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       rc = mdo_declare_attr_set(env, obj, attr, handle);
         if (rc)
                 return rc;
 
-        if (ma->ma_valid & MA_LOV) {
-                buf->lb_buf = NULL;
-                buf->lb_len = ma->ma_lmm_size;
-                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
-                                           0, handle);
-                if (rc)
-                        return rc;
-        }
-
-        if (ma->ma_valid & (MA_HSM | MA_SOM)) {
-                buf->lb_buf = NULL;
-                buf->lb_len = sizeof(struct lustre_mdt_attrs);
-                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
-                                           0, handle);
-                if (rc)
-                        return rc;
-        }
-
 #ifdef CONFIG_FS_POSIX_ACL
-        if (ma->ma_attr.la_valid & LA_MODE) {
+       if (attr->la_valid & LA_MODE) {
                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
-                rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
-                                   BYPASS_CAPA);
+               rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
+                                  XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
                 mdd_read_unlock(env, obj);
                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
                         rc = 0;
@@ -1585,8 +1082,7 @@ static int mdd_declare_attr_set(const struct lu_env *env,
                         return rc;
 
                 if (rc != 0) {
-                        buf->lb_buf = NULL;
-                        buf->lb_len = rc;
+                       struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
                         rc = mdo_declare_xattr_set(env, obj, buf,
                                                    XATTR_NAME_ACL_ACCESS, 0,
                                                    handle);
@@ -1596,92 +1092,41 @@ static int mdd_declare_attr_set(const struct lu_env *env,
         }
 #endif
 
-        /* basically the log is the same as in unlink case */
-        if (lmm) {
-                __u16 stripe;
-
-                if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
-                                le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
-                        CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
-                               mdd->mdd_obd_dev->obd_name,
-                               le32_to_cpu(lmm->lmm_magic),
-                               PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
-                        return -EINVAL;
-                }
-
-                stripe = le16_to_cpu(lmm->lmm_stripe_count);
-                if (stripe == LOV_ALL_STRIPES) {
-                        struct lov_desc *ldesc;
-
-                        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
-                        LASSERT(ldesc != NULL);
-                        stripe = ldesc->ld_tgt_count;
-                }
-
-                for (i = 0; i < stripe; i++) {
-                        rc = mdd_declare_llog_record(env, mdd,
-                                        sizeof(struct llog_unlink_rec),
-                                        handle);
-                        if (rc)
-                                return rc;
-                }
-        }
-
-        return rc;
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
 }
 
 /* set attr and LOV EA at once, return updated attr */
-static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
-                        const struct md_attr *ma)
+int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
+                const struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
-        struct lov_mds_md *lmm = NULL;
-        struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qnids[MAXQUOTAS] = { 0, 0 };
-        unsigned int qoids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0, block_count = 0;
-        int inode_pending[MAXQUOTAS] = { 0, 0 };
-        int block_pending[MAXQUOTAS] = { 0, 0 };
-#endif
+       const struct lu_attr *la = &ma->ma_attr;
+       int rc;
         ENTRY;
 
+       /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
+       LASSERT((ma->ma_valid & MA_LOV) == 0);
+       LASSERT((ma->ma_valid & MA_HSM) == 0);
+       LASSERT((ma->ma_valid & MA_SOM) == 0);
+
         *la_copy = ma->ma_attr;
-        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
-        if (rc != 0)
+       rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
+       if (rc)
                 RETURN(rc);
 
         /* setattr on "close" only change atime, or do nothing */
-        if (ma->ma_valid == MA_INODE &&
-            ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
+       if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        if (S_ISREG(mdd_object_type(mdd_obj)) &&
-            ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
-                lmm_size = mdd_lov_mdsize(env, mdd);
-                lmm = mdd_max_lmm_get(env, mdd);
-                if (lmm == NULL)
-                        RETURN(-ENOMEM);
-
-                rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
-                                XATTR_NAME_LOV);
-
-                if (rc < 0)
-                        RETURN(rc);
-        }
-
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
-                                  lmm_size > 0 ? lmm : NULL, handle);
+       rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
         if (rc)
                 GOTO(stop, rc);
 
@@ -1690,166 +1135,70 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 GOTO(stop, rc);
 
         /* permission changes may require sync operation */
-        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+       if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                 handle->th_sync |= !!mdd->mdd_sync_permission;
 
-        if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
+       if (la->la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
-                       ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
-
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
-                struct obd_export *exp = md_quota(env)->mq_exp;
-                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
-
-                rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
-                if (!rc) {
-                        quota_opc = FSFILT_OP_SETATTR;
-                        mdd_quota_wrapper(la_copy, qnids);
-                        mdd_quota_wrapper(la_tmp, qoids);
-                        /* get file quota for new owner */
-                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                        qnids, inode_pending, 1, NULL, 0,
-                                        NULL, 0);
-                        block_count = (la_tmp->la_blocks + 7) >> 3;
-                        if (block_count) {
-                                void *data = NULL;
-                                mdd_data_get(env, mdd_obj, &data);
-                                /* get block quota for new owner */
-                                lquota_chkquota(mds_quota_interface_ref, obd,
-                                                exp, qnids, block_pending,
-                                                block_count, NULL,
-                                                LQUOTA_FLAGS_BLK, data, 1);
-                        }
-                }
-        }
-#endif
+                      la->la_mtime, la->la_ctime);
 
         if (la_copy->la_valid & LA_FLAGS) {
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
-                                                  handle, 1);
+               rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
                 if (rc == 0)
                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
         } else if (la_copy->la_valid) {            /* setattr */
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
-                                                  handle, 1);
-                /* journal chown/chgrp in llog, just like unlink */
-                if (rc == 0 && lmm_size){
-                        cookie_size = mdd_lov_cookiesize(env, mdd);
-                        logcookies = mdd_max_cookie_get(env, mdd);
-                        if (logcookies == NULL)
-                                GOTO(cleanup, rc = -ENOMEM);
-
-                        if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
-                                            logcookies, cookie_size) <= 0)
-                                logcookies = NULL;
-                }
+               rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
         }
 
-        if (rc == 0 && ma->ma_valid & MA_LOV) {
-                cfs_umode_t mode;
-
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode) || S_ISDIR(mode)) {
-                        rc = mdd_lsm_sanity_check(env, mdd_obj);
-                        if (rc)
-                                GOTO(cleanup, rc);
-
-                        rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
-                                            ma->ma_lmm_size, handle, 1);
-                }
-
-        }
-        if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
-                cfs_umode_t mode;
-
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode))
-                        rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
-
-        }
-cleanup:
         if (rc == 0)
                 rc = mdd_attr_set_changelog(env, obj, handle,
-                                            ma->ma_attr.la_valid);
+                                           la->la_valid);
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
-        if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
-                /*set obd attr, if needed*/
-                rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
-                                           logcookies);
-        }
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc) {
-                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
-                                      inode_pending, 0);
-                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
-                                      block_pending, 1);
-                /* Trigger dqrel/dqacq for original owner and new owner.
-                 * If failed, the next call for lquota_chkquota will
-                 * process it. */
-                lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
-                              quota_opc);
-        }
-#endif
-        RETURN(rc);
-}
-
-int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
-                      const struct lu_buf *buf, const char *name, int fl,
-                      struct thandle *handle)
-{
-        int  rc;
-        ENTRY;
-
-        mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
-        mdd_write_unlock(env, obj);
-
         RETURN(rc);
 }
 
 static int mdd_xattr_sanity_check(const struct lu_env *env,
-                                  struct mdd_object *obj)
+                                 struct mdd_object *obj)
 {
-        struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
-        struct md_ucred *uc     = md_ucred(env);
-        int rc;
-        ENTRY;
+       struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
+       struct lu_ucred *uc     = lu_ucred_assert(env);
+       int rc;
+       ENTRY;
 
-        if (mdd_is_immutable(obj) || mdd_is_append(obj))
-                RETURN(-EPERM);
+       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+               RETURN(-EPERM);
 
-        rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
-        if (rc)
-                RETURN(rc);
+       rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-        if ((uc->mu_fsuid != tmp_la->la_uid) &&
-            !mdd_capable(uc, CFS_CAP_FOWNER))
-                RETURN(-EPERM);
+       if ((uc->uc_fsuid != tmp_la->la_uid) &&
+           !mdd_capable(uc, CFS_CAP_FOWNER))
+               RETURN(-EPERM);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_set(const struct lu_env *env,
-                                 struct mdd_device *mdd,
-                                 struct mdd_object *obj,
-                                 const struct lu_buf *buf,
-                                 const char *name,
-                                 struct thandle *handle)
-
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct lu_buf *buf,
+                                const char *name,
+                                int fl, struct thandle *handle)
 {
-        int rc;
+       int     rc;
 
-        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
-        if (rc)
-                return rc;
+       rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
+       if (rc)
+               return rc;
 
-        /* Only record user xattr changes */
-        if ((strncmp("user.", name, 5) == 0))
-                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       /* Only record user xattr changes */
+       if ((strncmp("user.", name, 5) == 0))
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
 
-        return rc;
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
 }
 
 /**
@@ -1857,51 +1206,61 @@ static int mdd_declare_xattr_set(const struct lu_env *env,
  * after xattr_set if needed.
  */
 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
+                        const struct lu_buf *buf, const char *name,
+                        int fl)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int  rc;
+       struct mdd_object       *mdd_obj = md2mdd_obj(obj);
+       struct mdd_device       *mdd = mdo2mdd(obj);
+       struct thandle          *handle;
+       int                      rc;
         ENTRY;
 
-        rc = mdd_xattr_sanity_check(env, mdd_obj);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
-
-        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
-        if (rc)
-                GOTO(stop, rc);
-
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
-
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
-                handle->th_sync |= !!mdd->mdd_sync_permission;
-
-        rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
-
-        /* Only record system & user xattr changes */
-        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
-                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
-                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
-                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
-                                              handle);
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
+               rc = mdd_acl_set(env, mdd_obj, buf, fl);
+               RETURN(rc);
+       }
+
+       rc = mdd_xattr_sanity_check(env, mdd_obj);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       /* security-replated changes may require sync */
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+               handle->th_sync |= !!mdd->mdd_sync_permission;
+
+       mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
+                          mdd_object_capa(env, mdd_obj));
+       mdd_write_unlock(env, mdd_obj);
+       if (rc)
+               GOTO(stop, rc);
+
+       /* Only record system & user xattr changes */
+       if (strncmp(XATTR_USER_PREFIX, name,
+                       sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                       sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                       sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
+               rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
+                                             handle);
 
 stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       mdd_trans_stop(env, mdd, rc, handle);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_del(const struct lu_env *env,
@@ -1956,14 +1315,16 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
+       if (rc)
+               GOTO(stop, rc);
 
         /* Only record system & user xattr changes */
-        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+       if (strncmp(XATTR_USER_PREFIX, name,
                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
-                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
+                                 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
@@ -1973,280 +1334,311 @@ stop:
         RETURN(rc);
 }
 
-/* partial unlink */
-static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
-                       struct md_attr *ma)
+/*
+ * read lov EA of an object
+ * return the lov EA in an allocated lu_buf
+ */
+static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
+                                    struct mdd_object *obj)
 {
-        struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0;
-#endif
-        int rc;
-        ENTRY;
-
-        /* XXX: this code won't be used ever:
-         * DNE uses slightly different approach */
-        LBUG();
-
-        /*
-         * Check -ENOENT early here because we need to get object type
-         * to calculate credits before transaction start
-         */
-        if (mdd_object_exists(mdd_obj) == 0) {
-                CERROR("%s: object "DFID" not found: rc = -2\n",
-                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
-                RETURN(-ENOENT);
-        }
-
-        LASSERT(mdd_object_exists(mdd_obj) > 0);
-
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(-ENOMEM);
+       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf   *lmm_buf = NULL;
+       int              rc, sz;
+       ENTRY;
 
-        rc = mdd_trans_start(env, mdd, handle);
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-
-        rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        mdo_ref_del(env, mdd_obj, handle);
-
-        if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
-                /* unlink dot */
-                mdo_ref_del(env, mdd_obj, handle);
-        }
-
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        la_copy->la_ctime = ma->ma_attr.la_ctime;
-
-        la_copy->la_valid = LA_CTIME;
-        rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota && ma->ma_valid & MA_INODE &&
-            ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
-                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                mdd_quota_wrapper(&ma->ma_attr, qids);
-        }
-#endif
-
-
-        EXIT;
-cleanup:
-        mdd_write_unlock(env, mdd_obj);
-        mdd_trans_stop(env, mdd, rc, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc)
-                /* Trigger dqrel on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-#endif
-        return rc;
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
+                          mdd_object_capa(env, obj));
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (rc == 0)
+               GOTO(out, rc = -ENODATA);
+
+       sz = rc;
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               /* mti_big_buf was not allocated, so we have to
+                * allocate it based on the ea size */
+               buf = mdd_buf_alloc(env, sz);
+               if (buf->lb_buf == NULL)
+                       GOTO(out, rc = -ENOMEM);
+               goto repeat;
+       }
+
+       OBD_ALLOC_PTR(lmm_buf);
+       if (!lmm_buf)
+               GOTO(out, rc = -ENOMEM);
+
+       OBD_ALLOC(lmm_buf->lb_buf, sz);
+       if (!lmm_buf->lb_buf)
+               GOTO(free, rc = -ENOMEM);
+
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       lmm_buf->lb_len = sz;
+
+       GOTO(out, rc = 0);
+
+free:
+       if (lmm_buf)
+               OBD_FREE_PTR(lmm_buf);
+out:
+       if (rc)
+               return ERR_PTR(rc);
+       return lmm_buf;
 }
 
-/* partial operation */
-static int mdd_oc_sanity_check(const struct lu_env *env,
-                               struct mdd_object *obj,
-                               struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
 
-        switch (ma->ma_attr.la_mode & S_IFMT) {
-        case S_IFREG:
-        case S_IFDIR:
-        case S_IFLNK:
-        case S_IFCHR:
-        case S_IFBLK:
-        case S_IFIFO:
-        case S_IFSOCK:
-                rc = 0;
-                break;
-        default:
-                rc = -EINVAL;
-                break;
-        }
-        RETURN(rc);
-}
-
-static int mdd_object_create(const struct lu_env *env,
-                             struct md_object *obj,
-                             const struct md_op_spec *spec,
-                             struct md_attr *ma)
+/*
+ *  check if layout swapping between 2 objects is allowed
+ *  the rules are:
+ *  - same type of objects
+ *  - same owner/group (so quotas are still valid)
+ */
+static int mdd_layout_swap_allowed(const struct lu_env *env,
+                                  struct mdd_object *o1,
+                                  struct mdd_object *o2)
 {
+       const struct lu_fid     *fid1, *fid2;
+       __u32                    uid, gid;
+       struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
+       int                      rc;
+       ENTRY;
 
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        const struct lu_fid *pfid = spec->u.sp_pfid;
-        struct thandle *handle;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct obd_export *exp = md_quota(env)->mq_exp;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0, block_count = 0;
-        int inode_pending[MAXQUOTAS] = { 0, 0 };
-        int block_pending[MAXQUOTAS] = { 0, 0 };
-#endif
-        int rc = 0;
-        ENTRY;
+       fid1 = mdo2fid(o1);
+       fid2 = mdo2fid(o2);
 
-        /* XXX: this code won't be used ever:
-         * DNE uses slightly different approach */
-        LBUG();
-
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota) {
-                quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
-                mdd_quota_wrapper(&ma->ma_attr, qids);
-                /* get file quota for child */
-                lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                qids, inode_pending, 1, NULL, 0,
-                                NULL, 0);
-                switch (ma->ma_attr.la_mode & S_IFMT) {
-                case S_IFLNK:
-                case S_IFDIR:
-                        block_count = 2;
-                        break;
-                case S_IFREG:
-                        block_count = 1;
-                        break;
-                }
-                /* get block quota for child */
-                if (block_count)
-                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                        qids, block_pending, block_count,
-                                        NULL, LQUOTA_FLAGS_BLK, NULL, 0);
-        }
-#endif
+       if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
+           (mdd_object_type(o1) != mdd_object_type(o2)))
+               RETURN(-EPERM);
 
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                GOTO(out_pending, rc = PTR_ERR(handle));
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+       uid = tmp_la->la_uid;
+       gid = tmp_la->la_gid;
 
-        rc = mdd_trans_start(env, mdd, handle);
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_oc_sanity_check(env, mdd_obj, ma);
-        if (rc)
-                GOTO(unlock, rc);
+       if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
+               RETURN(-EPERM);
 
-        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
-        if (rc)
-                GOTO(unlock, rc);
+       RETURN(0);
+}
 
-        if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
-                /* If creating the slave object, set slave EA here. */
-                int lmv_size = spec->u.sp_ea.eadatalen;
-                struct lmv_stripe_md *lmv;
+/**
+ * swap layouts between 2 lustre objects
+ */
+static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
+                           struct md_object *obj2, __u64 flags)
+{
+       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
+       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
+       struct lu_buf           *fst_buf, *snd_buf;
+       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
+       struct thandle          *handle;
+       struct mdd_device       *mdd = mdo2mdd(obj1);
+       int                      rc;
+       __u16                    fst_gen, snd_gen;
+       ENTRY;
+
+       /* we have to sort the 2 obj, so locking will always
+        * be in the same order, even in case of 2 concurrent swaps */
+       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
+                      mdo2fid(md2mdd_obj(obj2)));
+       /* same fid ? */
+       if (rc == 0)
+               RETURN(-EPERM);
+
+       if (rc > 0) {
+               o1 = md2mdd_obj(obj1);
+               o2 = md2mdd_obj(obj2);
+       } else {
+               o1 = md2mdd_obj(obj2);
+               o2 = md2mdd_obj(obj1);
+       }
+
+       /* check if layout swapping is allowed */
+       rc = mdd_layout_swap_allowed(env, o1, o2);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       /* objects are already sorted */
+       mdd_write_lock(env, o1, MOR_TGT_CHILD);
+       mdd_write_lock(env, o2, MOR_TGT_CHILD);
+
+       lmm1_buf = mdd_get_lov_ea(env, o1);
+       if (IS_ERR(lmm1_buf)) {
+               rc = PTR_ERR(lmm1_buf);
+               lmm1_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       lmm2_buf = mdd_get_lov_ea(env, o2);
+       if (IS_ERR(lmm2_buf)) {
+               rc = PTR_ERR(lmm2_buf);
+               lmm2_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       /* swapping 2 non existant layouts is a success */
+       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+               GOTO(unlock, rc = 0);
+
+       /* to help inode migration between MDT, it is better to
+        * start by the no layout file (if one), so we order the swap */
+       if (lmm1_buf == NULL) {
+               fst_o = o1;
+               fst_buf = lmm1_buf;
+               snd_o = o2;
+               snd_buf = lmm2_buf;
+       } else {
+               fst_o = o2;
+               fst_buf = lmm2_buf;
+               snd_o = o1;
+               snd_buf = lmm1_buf;
+       }
+
+       /* lmm and generation layout initialization */
+       if (fst_buf) {
+               fst_lmm = fst_buf->lb_buf;
+               fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
+       } else {
+               fst_lmm = NULL;
+               fst_gen = 0;
+       }
+
+       if (snd_buf) {
+               snd_lmm = snd_buf->lb_buf;
+               snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
+       } else {
+               snd_lmm = NULL;
+               snd_gen = 0;
+       }
+
+       /* save the orignal lmm common header of first file
+        * to be able to roll back */
+       OBD_ALLOC_PTR(old_fst_lmm);
+       if (old_fst_lmm == NULL)
+               GOTO(unlock, rc = -ENOMEM);
+
+       memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
+
+       /* increase the generation layout numbers */
+       snd_gen++;
+       fst_gen++;
+
+       /* set the file specific informations in lmm */
+       if (fst_lmm) {
+               fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
+               fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
+               fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
+       }
+
+       if (snd_lmm) {
+               snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+               snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+               snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+       }
+
+       /* prepare transaction */
+       rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, fst_o));
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, snd_o));
+       if (rc) {
+               int     rc2;
+
+               /* failure on second file, but first was done, so we have
+                * to roll back first */
+               /* restore object_id, object_seq and generation number
+                * on first file */
+               if (fst_lmm) {
+                       fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+                       fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+               }
+
+               rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
+                                   LU_XATTR_REPLACE, handle,
+                                   mdd_object_capa(env, fst_o));
+               if (rc2) {
+                       /* very bad day */
+                       CERROR("%s: unable to roll back after swap layouts"
+                              " failure between "DFID" and "DFID
+                              " rc2 = %d rc = %d)\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
+                              rc2, rc);
+                       /* a solution to avoid journal commit is to panic,
+                        * but it has strong consequences so we use LBUG to
+                        * allow sysdamin to choose to panic or not
+                        */
+                       LBUG();
+               }
+               GOTO(stop, rc);
+       }
+       EXIT;
 
-                lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
-                LASSERT(lmv != NULL && lmv_size > 0);
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+unlock:
+       mdd_write_unlock(env, o2);
+       mdd_write_unlock(env, o1);
 
-                rc = __mdd_xattr_set(env, mdd_obj,
-                                     mdd_buf_get_const(env, lmv, lmv_size),
-                                     XATTR_NAME_LMV, 0, handle);
-                if (rc)
-                        GOTO(unlock, rc);
+       if (lmm1_buf && lmm1_buf->lb_buf)
+               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
+       if (lmm1_buf)
+               OBD_FREE_PTR(lmm1_buf);
 
-                rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
-                                           handle, 0);
-        } else {
-#ifdef CONFIG_FS_POSIX_ACL
-                if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
-                        struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
-
-                        buf->lb_buf = (void *)spec->u.sp_ea.eadata;
-                        buf->lb_len = spec->u.sp_ea.eadatalen;
-                        if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
-                                rc = __mdd_acl_init(env, mdd_obj, buf,
-                                                    &ma->ma_attr.la_mode,
-                                                    handle);
-                                if (rc)
-                                        GOTO(unlock, rc);
-                                else
-                                        ma->ma_attr.la_valid |= LA_MODE;
-                        }
+       if (lmm2_buf && lmm2_buf->lb_buf)
+               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
+       if (lmm2_buf)
+               OBD_FREE_PTR(lmm2_buf);
 
-                        pfid = spec->u.sp_ea.fid;
-                }
-#endif
-                rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
-                                           spec);
-        }
-        EXIT;
-unlock:
-        if (rc == 0)
-                rc = mdd_attr_get_internal(env, mdd_obj, ma);
-        mdd_write_unlock(env, mdd_obj);
+       if (old_fst_lmm)
+               OBD_FREE_PTR(old_fst_lmm);
 
-        mdd_trans_stop(env, mdd, rc, handle);
-out_pending:
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc) {
-                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
-                                      inode_pending, 0);
-                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
-                                      block_pending, 1);
-                /* Trigger dqacq on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it. */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-        }
-#endif
-        return rc;
+       return rc;
 }
 
-/* partial link */
-static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
-                       const struct md_attr *ma)
+void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
+               struct mdd_object *child, struct lu_attr *attr)
 {
-        struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int rc;
-        ENTRY;
-
-        /* XXX: this code won't be used ever:
-         * DNE uses slightly different approach */
-        LBUG();
-
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(-ENOMEM);
-
-        rc = mdd_trans_start(env, mdd, handle);
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
-        if (rc == 0)
-                mdo_ref_add(env, mdd_obj, handle);
-        mdd_write_unlock(env, mdd_obj);
-        if (rc == 0) {
-                LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-                la_copy->la_ctime = ma->ma_attr.la_ctime;
-
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
-                                                        handle, 0);
-        }
-        mdd_trans_stop(env, mdd, 0, handle);
+       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+       struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
+       struct dt_object *nc = mdd_object_child(child);
 
-        RETURN(rc);
+       /* @hint will be initialized by underlying device. */
+       nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
 }
 
 /*
@@ -2254,28 +1646,27 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
  */
 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
 {
-        int res = 0;
-
-        /* Sadly, NFSD reopens a file repeatedly during operation, so the
-         * "acc_mode = 0" allowance for newly-created files isn't honoured.
-         * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
-         * owner can write to a file even if it is marked readonly to hide
-         * its brokenness. (bug 5781) */
-        if (flags & MDS_OPEN_OWNEROVERRIDE) {
-                struct md_ucred *uc = md_ucred(env);
-
-                if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
-                    (la->la_uid == uc->mu_fsuid))
-                        return 0;
-        }
+       int res = 0;
+
+       /* Sadly, NFSD reopens a file repeatedly during operation, so the
+        * "acc_mode = 0" allowance for newly-created files isn't honoured.
+        * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
+        * owner can write to a file even if it is marked readonly to hide
+        * its brokenness. (bug 5781) */
+       if (flags & MDS_OPEN_OWNEROVERRIDE) {
+               struct lu_ucred *uc = lu_ucred_check(env);
+
+               if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
+                       return 0;
+       }
 
-        if (flags & FMODE_READ)
-                res |= MAY_READ;
-        if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
-                res |= MAY_WRITE;
-        if (flags & MDS_FMODE_EXEC)
-                res = MAY_EXEC;
-        return res;
+       if (flags & FMODE_READ)
+               res |= MAY_READ;
+       if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
+               res |= MAY_WRITE;
+       if (flags & MDS_FMODE_EXEC)
+               res = MAY_EXEC;
+       return res;
 }
 
 static int mdd_open_sanity_check(const struct lu_env *env,
@@ -2324,13 +1715,13 @@ static int mdd_open_sanity_check(const struct lu_env *env,
          * Now, flag -- O_NOATIME does not be packed by client.
          */
         if (flag & O_NOATIME) {
-                struct md_ucred *uc = md_ucred(env);
+               struct lu_ucred *uc = lu_ucred(env);
 
-                if (uc && ((uc->mu_valid == UCRED_OLD) ||
-                    (uc->mu_valid == UCRED_NEW)) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (uc && ((uc->uc_valid == UCRED_OLD) ||
+                          (uc->uc_valid == UCRED_NEW)) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
         }
 #endif
 
@@ -2356,12 +1747,6 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
                             struct md_attr *ma, struct thandle *handle)
 {
-        int rc;
-
-        rc = mdd_declare_unlink_log(env, obj, ma, handle);
-        if (rc)
-                return rc;
-
         return mdo_declare_destroy(env, obj, handle);
 }
 
@@ -2370,20 +1755,10 @@ int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
                     struct md_attr *ma, struct thandle *handle)
 {
-        int rc = 0;
+       int rc;
         ENTRY;
 
-        if (S_ISREG(mdd_object_type(obj))) {
-                /* Return LOV & COOKIES unconditionally here. We clean evth up.
-                 * Caller must be ready for that. */
-                rc = __mdd_lmm_get(env, obj, ma);
-                if ((ma->ma_valid & MA_LOV))
-                        rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
-                                            obj, ma);
-        }
-
-        if (rc == 0)
-                rc = mdo_destroy(env, obj, handle);
+       rc = mdo_destroy(env, obj, handle);
 
         RETURN(rc);
 }
@@ -2399,7 +1774,7 @@ static int mdd_declare_close(const struct lu_env *env,
         if (rc)
                 return rc;
 
-        return mdd_declare_object_kill(env, obj, ma, handle);
+       return mdo_declare_destroy(env, obj, handle);
 }
 
 /*
@@ -2411,19 +1786,13 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle    *handle = NULL;
-        int rc;
-        int is_orphan = 0, reset = 1;
-
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0;
-#endif
+       int rc, is_orphan = 0;
         ENTRY;
 
         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
-                mdd_obj->mod_count--;
+               mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+               mdd_obj->mod_count--;
+               mdd_write_unlock(env, mdd_obj);
 
                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
@@ -2431,10 +1800,14 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                 RETURN(0);
         }
 
-        /* check without any lock */
-        if (mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
- again:
+       /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
+        * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
+       /* check without any lock */
+       is_orphan = mdd_obj->mod_count == 1 &&
+                   (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
+
+again:
+       if (is_orphan) {
                 handle = mdd_trans_create(env, mdo2mdd(obj));
                 if (IS_ERR(handle))
                         RETURN(PTR_ERR(handle));
@@ -2453,87 +1826,62 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        if (handle == NULL && mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
-                mdd_write_unlock(env, mdd_obj);
-                goto again;
-        }
-
-        /* release open count */
-        mdd_obj->mod_count --;
-
-        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
-                /* remove link to object from orphan index */
-                LASSERT(handle != NULL);
-                rc = __mdd_orphan_del(env, mdd_obj, handle);
-                if (rc == 0) {
-                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
-                               "list, OSS objects to be destroyed.\n",
-                               PFID(mdd_object_fid(mdd_obj)));
-                        is_orphan = 1;
-                } else {
-                        CERROR("Object "DFID" can not be deleted from orphan "
-                                "list, maybe cause OST objects can not be "
-                                "destroyed (err: %d).\n",
-                                PFID(mdd_object_fid(mdd_obj)), rc);
-                        /* If object was not deleted from orphan list, do not
-                         * destroy OSS objects, which will be done when next
-                         * recovery. */
-                        GOTO(out, rc);
-                }
-        }
-
-        rc = mdd_iattr_get(env, mdd_obj, ma);
-        /* Object maybe not in orphan list originally, it is rare case for
-         * mdd_finish_unlink() failure. */
-        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
-#ifdef HAVE_QUOTA_SUPPORT
-                if (mds->mds_quota) {
-                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                        mdd_quota_wrapper(&ma->ma_attr, qids);
-                }
-#endif
-                /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
-                if (ma->ma_valid & MA_FLAGS &&
-                    ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
-                        rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
-                } else {
-                        if (handle == NULL) {
-                                handle = mdd_trans_create(env, mdo2mdd(obj));
-                                if (IS_ERR(handle))
-                                        GOTO(out, rc = PTR_ERR(handle));
-
-                                rc = mdd_declare_object_kill(env, mdd_obj, ma,
-                                                             handle);
-                                if (rc)
-                                        GOTO(out, rc);
-
-                                rc = mdd_declare_changelog_store(env, mdd,
-                                                                 NULL, handle);
-                                if (rc)
-                                        GOTO(stop, rc);
-
-                                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
-                                if (rc)
-                                        GOTO(out, rc);
-                        }
-
-                        rc = mdd_object_kill(env, mdd_obj, ma, handle);
-                        if (rc == 0)
-                                reset = 0;
-                }
-
-                if (rc != 0)
-                        CERROR("Error when prepare to delete Object "DFID" , "
-                               "which will cause OST objects can not be "
-                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
-        }
-        EXIT;
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
+                       mdd_object_capa(env, mdd_obj));
+       if (rc != 0) {
+               CERROR("Failed to get lu_attr of "DFID": %d\n",
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               GOTO(out, rc);
+       }
+
+       /* check again with lock */
+       is_orphan = (mdd_obj->mod_count == 1) &&
+                   ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
+                    ma->ma_attr.la_nlink == 0);
+
+       if (is_orphan && handle == NULL) {
+               mdd_write_unlock(env, mdd_obj);
+               goto again;
+       }
+
+       mdd_obj->mod_count--; /*release open count */
+
+       if (!is_orphan)
+               GOTO(out, rc = 0);
+
+       /* Orphan object */
+       /* NB: Object maybe not in orphan list originally, it is rare case for
+        * mdd_finish_unlink() failure, in that case, the object doesn't have
+        * ORPHAN_OBJ flag */
+       if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+               /* remove link to object from orphan index */
+               LASSERT(handle != NULL);
+               rc = __mdd_orphan_del(env, mdd_obj, handle);
+               if (rc != 0) {
+                       CERROR("%s: unable to delete "DFID" from orphan list: "
+                              "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                              PFID(mdd_object_fid(mdd_obj)), rc);
+                       /* If object was not deleted from orphan list, do not
+                        * destroy OSS objects, which will be done when next
+                        * recovery. */
+                       GOTO(out, rc);
+               }
+
+               CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                      "list, OSS objects to be destroyed.\n",
+                      PFID(mdd_object_fid(mdd_obj)));
+       }
+
+       rc = mdo_destroy(env, mdd_obj, handle);
+
+       if (rc != 0) {
+               CERROR("%s: unable to delete "DFID" from orphan list: "
+                      "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+       }
+       EXIT;
 
 out:
-        if (reset)
-                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
-
         mdd_write_unlock(env, mdd_obj);
 
         if (rc == 0 &&
@@ -2561,13 +1909,6 @@ out:
 stop:
         if (handle != NULL)
                 mdd_trans_stop(env, mdd, rc, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc)
-                /* Trigger dqrel on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-#endif
         return rc;
 }
 
@@ -2590,17 +1931,17 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              struct lu_dirpage *dp, int nob,
-                              const struct dt_it_ops *iops, struct dt_it *it,
-                              __u32 attr)
+static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
+                             int nob, const struct dt_it_ops *iops,
+                             struct dt_it *it, __u32 attr, void *arg)
 {
-        void                   *area = dp;
-        int                     result;
-        __u64                   hash = 0;
-        struct lu_dirent       *ent;
-        struct lu_dirent       *last = NULL;
-        int                     first = 1;
+       struct lu_dirpage       *dp = &lp->lp_dir;
+       void                    *area = dp;
+       int                      result;
+       __u64                    hash = 0;
+       struct lu_dirent        *ent;
+       struct lu_dirent        *last = NULL;
+       int                      first = 1;
 
         memset(area, 0, sizeof (*dp));
         area += sizeof (*dp);
@@ -2657,115 +1998,14 @@ out:
                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
                 last->lde_reclen = 0; /* end mark */
         }
+       if (result > 0)
+               /* end of directory */
+               dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+       if (result < 0)
+               CWARN("build page failed: %d!\n", result);
         return result;
 }
 
-static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
-                          const struct lu_rdpg *rdpg)
-{
-        struct dt_it      *it;
-        struct dt_object  *next = mdd_object_child(obj);
-        const struct dt_it_ops  *iops;
-        struct page       *pg;
-        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        int i;
-        int nlupgs = 0;
-        int rc;
-        int nob;
-
-        LASSERT(rdpg->rp_pages != NULL);
-        LASSERT(next->do_index_ops != NULL);
-
-        if (rdpg->rp_count <= 0)
-                return -EFAULT;
-
-        /*
-         * iterate through directory and fill pages from @rdpg
-         */
-        iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
-        if (IS_ERR(it))
-                return PTR_ERR(it);
-
-        rc = iops->load(env, it, rdpg->rp_hash);
-
-        if (rc == 0) {
-                /*
-                 * Iterator didn't find record with exactly the key requested.
-                 *
-                 * It is currently either
-                 *
-                 *     - positioned above record with key less than
-                 *     requested---skip it.
-                 *
-                 *     - or not positioned at all (is in IAM_IT_SKEWED
-                 *     state)---position it on the next item.
-                 */
-                rc = iops->next(env, it);
-        } else if (rc > 0)
-                rc = 0;
-
-        /*
-         * At this point and across for-loop:
-         *
-         *  rc == 0 -> ok, proceed.
-         *  rc >  0 -> end of directory.
-         *  rc <  0 -> error.
-         */
-        for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
-             i++, nob -= CFS_PAGE_SIZE) {
-                struct lu_dirpage *dp;
-
-                LASSERT(i < rdpg->rp_npages);
-                pg = rdpg->rp_pages[i];
-                dp = cfs_kmap(pg);
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-repeat:
-#endif
-                rc = mdd_dir_page_build(env, mdd, dp,
-                                        min_t(int, nob, LU_PAGE_SIZE),
-                                        iops, it, rdpg->rp_attrs);
-                if (rc > 0) {
-                        /*
-                         * end of directory.
-                         */
-                        dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
-                        nlupgs++;
-                } else if (rc < 0) {
-                        CWARN("build page failed: %d!\n", rc);
-                } else {
-                        nlupgs++;
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
-                        if ((unsigned long)dp & ~CFS_PAGE_MASK)
-                                goto repeat;
-#endif
-                }
-                cfs_kunmap(pg);
-        }
-        if (rc >= 0) {
-                struct lu_dirpage *dp;
-
-                dp = cfs_kmap(rdpg->rp_pages[0]);
-                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                if (nlupgs == 0) {
-                        /*
-                         * No pages were processed, mark this for first page
-                         * and send back.
-                         */
-                        dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
-                        nlupgs = 1;
-                }
-                cfs_kunmap(rdpg->rp_pages[0]);
-
-                rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
-        }
-        iops->put(env, it);
-        iops->fini(env, it);
-
-        return rc;
-}
-
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                  const struct lu_rdpg *rdpg)
 {
@@ -2788,12 +2028,12 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 struct page *pg;
                 struct lu_dirpage *dp;
 
-                /*
-                 * According to POSIX, please do not return any entry to client:
-                 * even dot and dotdot should not be returned.
-                 */
-                CWARN("readdir from dead object: "DFID"\n",
-                        PFID(mdd_object_fid(mdd_obj)));
+               /*
+                * According to POSIX, please do not return any entry to client:
+                * even dot and dotdot should not be returned.
+                */
+               CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
+                      PFID(mdd_object_fid(mdd_obj)));
 
                 if (rdpg->rp_count <= 0)
                         GOTO(out_unlock, rc = -EFAULT);
@@ -2809,9 +2049,25 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
-        rc = __mdd_readpage(env, mdd_obj, rdpg);
-
-        EXIT;
+       rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
+                          mdd_dir_page_build, NULL);
+       if (rc >= 0) {
+               struct lu_dirpage       *dp;
+
+               dp = cfs_kmap(rdpg->rp_pages[0]);
+               dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
+               if (rc == 0) {
+                       /*
+                        * No pages were processed, mark this for first page
+                        * and send back.
+                        */
+                       dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
+                       rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
+               }
+               cfs_kunmap(rdpg->rp_pages[0]);
+       }
+
+       GOTO(out_unlock, rc);
 out_unlock:
         mdd_read_unlock(env, mdd_obj);
         return rc;
@@ -2830,24 +2086,20 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 }
 
 const struct md_object_operations mdd_obj_ops = {
-        .moo_permission    = mdd_permission,
-        .moo_attr_get      = mdd_attr_get,
-        .moo_attr_set      = mdd_attr_set,
-        .moo_xattr_get     = mdd_xattr_get,
-        .moo_xattr_set     = mdd_xattr_set,
-        .moo_xattr_list    = mdd_xattr_list,
-        .moo_xattr_del     = mdd_xattr_del,
-        .moo_object_create = mdd_object_create,
-        .moo_ref_add       = mdd_ref_add,
-        .moo_ref_del       = mdd_ref_del,
-        .moo_open          = mdd_open,
-        .moo_close         = mdd_close,
-        .moo_readpage      = mdd_readpage,
-        .moo_readlink      = mdd_readlink,
-        .moo_changelog     = mdd_changelog,
-        .moo_capa_get      = mdd_capa_get,
-        .moo_object_sync   = mdd_object_sync,
-        .moo_path          = mdd_path,
-        .moo_file_lock     = mdd_file_lock,
-        .moo_file_unlock   = mdd_file_unlock,
+       .moo_permission         = mdd_permission,
+       .moo_attr_get           = mdd_attr_get,
+       .moo_attr_set           = mdd_attr_set,
+       .moo_xattr_get          = mdd_xattr_get,
+       .moo_xattr_set          = mdd_xattr_set,
+       .moo_xattr_list         = mdd_xattr_list,
+       .moo_xattr_del          = mdd_xattr_del,
+       .moo_swap_layouts       = mdd_swap_layouts,
+       .moo_open               = mdd_open,
+       .moo_close              = mdd_close,
+       .moo_readpage           = mdd_readpage,
+       .moo_readlink           = mdd_readlink,
+       .moo_changelog          = mdd_changelog,
+       .moo_capa_get           = mdd_capa_get,
+       .moo_object_sync        = mdd_object_sync,
+       .moo_path               = mdd_path,
 };