Whamcloud - gitweb
LU-2509 quota: various code cleanups
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index ddb26db..cb742d5 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Author: Wang Di <wangdi@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs_jbd2.h>
-#else
-#include <linux/jbd.h>
-#endif
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
+#include <obd_lov.h>
 
 #include <lustre_param.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs.h>
-#else
-#include <linux/ldiskfs_fs.h>
-#endif
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
 #include "mdd_internal.h"
 
 static const struct lu_object_operations mdd_lu_obj_ops;
+extern cfs_mem_cache_t *mdd_object_kmem;
 
 static int mdd_xattr_get(const struct lu_env *env,
                          struct md_object *obj, struct lu_buf *buf,
@@ -78,8 +67,11 @@ static int mdd_xattr_get(const struct lu_env *env,
 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
                  void **data)
 {
-        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
-                 PFID(mdd_object_fid(obj)));
+        if (mdd_object_exists(obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
+                return -ENOENT;
+        }
         mdo_data_get(env, obj, data);
         return 0;
 }
@@ -87,8 +79,11 @@ int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa)
 {
-        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
-                 PFID(mdd_object_fid(obj)));
+        if (mdd_object_exists(obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
+                return -ENOENT;
+        }
         return mdo_attr_get(env, obj, la, capa);
 }
 
@@ -124,13 +119,10 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 
 void mdd_buf_put(struct lu_buf *buf)
 {
-        if (buf == NULL || buf->lb_buf == NULL)
-                return;
-        if (buf->lb_vmalloc)
-                OBD_VFREE(buf->lb_buf, buf->lb_len);
-        else
-                OBD_FREE(buf->lb_buf, buf->lb_len);
-        buf->lb_buf = NULL;
+       if (buf == NULL || buf->lb_buf == NULL)
+               return;
+       OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       *buf = LU_BUF_NULL;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
@@ -144,32 +136,21 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
         return buf;
 }
 
-#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
 {
-        struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
 
-        if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-                if (buf->lb_vmalloc)
-                        OBD_VFREE(buf->lb_buf, buf->lb_len);
-                else
-                        OBD_FREE(buf->lb_buf, buf->lb_len);
-                buf->lb_buf = NULL;
-        }
-        if (buf->lb_buf == NULL) {
-                buf->lb_len = len;
-                if (buf->lb_len <= BUF_VMALLOC_SIZE) {
-                        OBD_ALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 0;
-                }
-                if (buf->lb_buf == NULL) {
-                        OBD_VMALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 1;
-                }
-                if (buf->lb_buf == NULL)
-                        buf->lb_len = 0;
-        }
-        return buf;
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+               *buf = LU_BUF_NULL;
+       }
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               buf->lb_len = len;
+               OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
+               if (buf->lb_buf == NULL)
+                       *buf = LU_BUF_NULL;
+       }
+       return buf;
 }
 
 /** Increase the size of the \a mti_big_buf.
@@ -183,80 +164,28 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len)
         struct lu_buf buf;
 
         LASSERT(len >= oldbuf->lb_len);
-        if (len > BUF_VMALLOC_SIZE) {
-                OBD_VMALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 1;
-        } else {
-                OBD_ALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 0;
-        }
+        OBD_ALLOC_LARGE(buf.lb_buf, len);
+
         if (buf.lb_buf == NULL)
                 return -ENOMEM;
 
         buf.lb_len = len;
         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
 
-        if (oldbuf->lb_vmalloc)
-                OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
-        else
-                OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+        OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
 
         memcpy(oldbuf, &buf, sizeof(buf));
 
         return 0;
 }
 
-struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
-                                       struct mdd_device *mdd)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-        int                     max_cookie_size;
-
-        max_cookie_size = mdd_lov_cookiesize(env, mdd);
-        if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
-                if (mti->mti_max_cookie)
-                        OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
-                mti->mti_max_cookie = NULL;
-                mti->mti_max_cookie_size = 0;
-        }
-        if (unlikely(mti->mti_max_cookie == NULL)) {
-                OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
-                if (likely(mti->mti_max_cookie != NULL))
-                        mti->mti_max_cookie_size = max_cookie_size;
-        }
-        if (likely(mti->mti_max_cookie != NULL))
-                memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
-        return mti->mti_max_cookie;
-}
-
-struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
-                                   struct mdd_device *mdd)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-        int                     max_lmm_size;
-
-        max_lmm_size = mdd_lov_mdsize(env, mdd);
-        if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
-                if (mti->mti_max_lmm)
-                        OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
-                mti->mti_max_lmm = NULL;
-                mti->mti_max_lmm_size = 0;
-        }
-        if (unlikely(mti->mti_max_lmm == NULL)) {
-                OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
-                if (unlikely(mti->mti_max_lmm != NULL))
-                        mti->mti_max_lmm_size = max_lmm_size;
-        }
-        return mti->mti_max_lmm;
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
 {
         struct mdd_object *mdd_obj;
 
-        OBD_ALLOC_PTR(mdd_obj);
+       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
         if (mdd_obj != NULL) {
                 struct lu_object *o;
 
@@ -306,7 +235,7 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
         struct mdd_object *mdd = lu2mdd_obj(o);
 
         lu_object_fini(o);
-        OBD_FREE_PTR(mdd);
+       OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
 }
 
 static int mdd_object_print(const struct lu_env *env, void *cookie,
@@ -314,7 +243,7 @@ static int mdd_object_print(const struct lu_env *env, void *cookie,
 {
         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
-                    "valid=%x, cltime=%llu, flags=%lx)",
+                    "valid=%x, cltime="LPU64", flags=%lx)",
                     mdd, mdd->mod_count, mdd->mod_valid,
                     mdd->mod_cltime, mdd->mod_flags);
 }
@@ -371,7 +300,7 @@ static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
                 if (obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(obj))
-                        GOTO(out, rc = -PTR_ERR(obj));
+                        GOTO(out, rc = PTR_ERR(obj));
                 /* get child fid from parent and name */
                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
                 mdd_object_put(env, obj);
@@ -434,7 +363,7 @@ static int mdd_path_current(const struct lu_env *env,
                 if (mdd_obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(mdd_obj))
-                        GOTO(out, rc = -PTR_ERR(mdd_obj));
+                        GOTO(out, rc = PTR_ERR(mdd_obj));
                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
                 if (rc <= 0) {
                         mdd_object_put(env, mdd_obj);
@@ -489,9 +418,9 @@ static int mdd_path_current(const struct lu_env *env,
         /* Verify that our path hasn't changed since we started the lookup.
            Record the current index, and verify the path resolves to the
            same fid. If it does, then the path is correct as of this index. */
-        spin_lock(&mdd->mdd_cl.mc_lock);
-        pli->pli_currec = mdd->mdd_cl.mc_index;
-        spin_unlock(&mdd->mdd_cl.mc_lock);
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       pli->pli_currec = mdd->mdd_cl.mc_index;
+       spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -503,12 +432,12 @@ static int mdd_path_current(const struct lu_env *env,
                        PFID(&pli->pli_fid));
                 GOTO(out, rc = -EAGAIN);
         }
-
+        ptr++; /* skip leading / */
         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
 
         EXIT;
 out:
-        if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+        if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
                 /* if we vmalloced a large buffer drop it */
                 mdd_buf_put(buf);
 
@@ -534,8 +463,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj,
                 RETURN(-EOVERFLOW);
 
         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
-                path[0] = '/';
-                path[1] = '\0';
+                path[0] = '\0';
                 RETURN(0);
         }
 
@@ -583,217 +511,21 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
         if (rc == 0) {
                 mdd_flags_xlate(obj, la->la_flags);
-                if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
-                        obj->mod_flags |= MNLINK_OBJ;
-        }
-        RETURN(rc);
-}
-
-/* get only inode attributes */
-int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct md_attr *ma)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (ma->ma_valid & MA_INODE)
-                RETURN(0);
-
-        rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                          mdd_object_capa(env, mdd_obj));
-        if (rc == 0)
-                ma->ma_valid |= MA_INODE;
-        RETURN(rc);
-}
-
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
-                       int *size)
-{
-        struct lov_desc *ldesc;
-        struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
-        ENTRY;
-
-        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
-        LASSERT(ldesc != NULL);
-
-        if (!lmm)
-                RETURN(0);
-
-        lmm->lmm_magic = LOV_MAGIC_V1;
-        lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
-        lmm->lmm_pattern = ldesc->ld_pattern;
-        lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
-        lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
-        *size = sizeof(struct lov_mds_md);
-
-        RETURN(sizeof(struct lov_mds_md));
-}
-
-/* get lov EA only */
-static int __mdd_lmm_get(const struct lu_env *env,
-                         struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        if (ma->ma_valid & MA_LOV)
-                RETURN(0);
-
-        rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
-                        XATTR_NAME_LOV);
-
-        if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
-                rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
-                                &ma->ma_lmm_size);
-        }
-
-        if (rc > 0) {
-                ma->ma_valid |= MA_LOV;
-                rc = 0;
-        }
-        RETURN(rc);
-}
-
-int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lmm_get(env, mdd_obj, ma);
-        mdd_read_unlock(env, mdd_obj);
-        RETURN(rc);
-}
-
-/* get lmv EA only*/
-static int __mdd_lmv_get(const struct lu_env *env,
-                         struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        if (ma->ma_valid & MA_LMV)
-                RETURN(0);
-
-        rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
-                        XATTR_NAME_LMV);
-        if (rc > 0) {
-                ma->ma_valid |= MA_LMV;
-                rc = 0;
-        }
-        RETURN(rc);
-}
-
-static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct md_attr *ma)
-{
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lustre_mdt_attrs *lma =
-                                 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
-        int lma_size;
-        int rc;
-        ENTRY;
-
-        /* If all needed data are already valid, nothing to do */
-        if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
-            (ma->ma_need & (MA_HSM | MA_SOM)))
-                RETURN(0);
-
-        /* Read LMA from disk EA */
-        lma_size = sizeof(info->mti_xattr_buf);
-        rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
-        if (rc <= 0)
-                RETURN(rc);
-
-        /* Useless to check LMA incompatibility because this is already done in
-         * osd_ea_fid_get(), and this will fail long before this code is
-         * called.
-         * So, if we are here, LMA is compatible.
-         */
-
-        lustre_lma_swab(lma);
-
-        /* Swab and copy LMA */
-        if (ma->ma_need & MA_HSM) {
-                if (lma->lma_compat & LMAC_HSM)
-                        ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
-                else
-                        ma->ma_hsm_flags = 0;
-                ma->ma_valid |= MA_HSM;
-        }
-        if (ma->ma_need & MA_SOM) {
-
-                /* XXX: Here, copy and swab SoM data, and then remove this
-                 * assert. */
-                LASSERT(!(ma->ma_need & MA_SOM));
-
-                ma->ma_valid |= MA_SOM;
-        }
-
-        RETURN(0);
-}
-
-static int mdd_attr_get_internal(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
-                                 struct md_attr *ma)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (ma->ma_need & MA_INODE)
-                rc = mdd_iattr_get(env, mdd_obj, ma);
-
-        if (rc == 0 && ma->ma_need & MA_LOV) {
-                if (S_ISREG(mdd_object_type(mdd_obj)) ||
-                    S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lmm_get(env, mdd_obj, ma);
         }
-        if (rc == 0 && ma->ma_need & MA_LMV) {
-                if (S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lmv_get(env, mdd_obj, ma);
-        }
-        if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
-                if (S_ISREG(mdd_object_type(mdd_obj)))
-                        rc = __mdd_lma_get(env, mdd_obj, ma);
-        }
-#ifdef CONFIG_FS_POSIX_ACL
-        if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
-                if (S_ISDIR(mdd_object_type(mdd_obj)))
-                        rc = mdd_def_acl_get(env, mdd_obj, ma);
-        }
-#endif
-        CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
-               rc, ma->ma_valid);
         RETURN(rc);
 }
 
-int mdd_attr_get_internal_locked(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        int needlock = ma->ma_need &
-                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
-
-        if (needlock)
-                mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_attr_get_internal(env, mdd_obj, ma);
-        if (needlock)
-                mdd_read_unlock(env, mdd_obj);
-        return rc;
-}
-
 /*
  * No permission check is needed.
  */
-static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
-                        struct md_attr *ma)
+int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
+                struct md_attr *ma)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        int                rc;
-
+       int rc;
         ENTRY;
-        rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
+
+       return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
+                         mdd_object_capa(env, md2mdd_obj(obj)));
         RETURN(rc);
 }
 
@@ -809,7 +541,11 @@ static int mdd_xattr_get(const struct lu_env *env,
 
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_get(env, mdd_obj, buf, name,
@@ -832,7 +568,11 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
         int                rc;
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         next = mdd_object_child(mdd_obj);
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -860,40 +600,57 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         RETURN(rc);
 }
 
+int mdd_declare_object_create_internal(const struct lu_env *env,
+                                      struct mdd_object *p,
+                                      struct mdd_object *c,
+                                      struct lu_attr *attr,
+                                      struct thandle *handle,
+                                      const struct md_op_spec *spec)
+{
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
+        int rc;
+        ENTRY;
+
+       if (feat != &dt_directory_features && feat != NULL) {
+                dof->dof_type = DFT_INDEX;
+               dof->u.dof_idx.di_feat = feat;
+
+       } else {
+               dof->dof_type = dt_mode_to_dft(attr->la_mode);
+               if (dof->dof_type == DFT_REGULAR) {
+                       dof->u.dof_reg.striped =
+                               md_should_create(spec->sp_cr_flags);
+                       if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
+                               dof->u.dof_reg.striped = 0;
+                       /* is this replay? */
+                       if (spec->no_create)
+                               dof->u.dof_reg.striped = 0;
+               }
+       }
+
+       rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
+
+        RETURN(rc);
+}
+
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
-                               struct mdd_object *c, struct md_attr *ma,
+                              struct mdd_object *c, struct lu_attr *attr,
                                struct thandle *handle,
                                const struct md_op_spec *spec)
 {
-        struct lu_attr *attr = &ma->ma_attr;
         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
-        const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
-        if (!mdd_object_exists(c)) {
-                struct dt_object *next = mdd_object_child(c);
-                LASSERT(next);
-
-                if (feat != &dt_directory_features && feat != NULL)
-                        dof->dof_type = DFT_INDEX;
-                else
-                        dof->dof_type = dt_mode_to_dft(attr->la_mode);
-
-                dof->u.dof_idx.di_feat = feat;
+       LASSERT(!mdd_object_exists(c));
 
-                /* @hint will be initialized by underlying device. */
-                next->do_ops->do_ah_init(env, hint,
-                                         p ? mdd_object_child(p) : NULL,
-                                         attr->la_mode & S_IFMT);
+       rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-                rc = mdo_create_obj(env, c, attr, hint, dof, handle);
-                LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-        } else
-                rc = -EEXIST;
+       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /**
@@ -921,11 +678,9 @@ static inline int mdd_attr_check(const struct lu_env *env,
         RETURN(0);
 }
 
-int mdd_attr_set_internal(const struct lu_env *env,
-                          struct mdd_object *obj,
-                          struct lu_attr *attr,
-                          struct thandle *handle,
-                          int needacl)
+int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
+                         struct lu_attr *attr, struct thandle *handle,
+                         int needacl)
 {
         int rc;
         ENTRY;
@@ -939,10 +694,8 @@ int mdd_attr_set_internal(const struct lu_env *env,
 }
 
 int mdd_attr_check_set_internal(const struct lu_env *env,
-                                struct mdd_object *obj,
-                                struct lu_attr *attr,
-                                struct thandle *handle,
-                                int needacl)
+                               struct mdd_object *obj, struct lu_attr *attr,
+                               struct thandle *handle, int needacl)
 {
         int rc;
         ENTRY;
@@ -956,58 +709,6 @@ int mdd_attr_check_set_internal(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_attr_set_internal_locked(const struct lu_env *env,
-                                        struct mdd_object *obj,
-                                        struct lu_attr *attr,
-                                        struct thandle *handle,
-                                        int needacl)
-{
-        int rc;
-        ENTRY;
-
-        needacl = needacl && (attr->la_valid & LA_MODE);
-        if (needacl)
-                mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
-        if (needacl)
-                mdd_write_unlock(env, obj);
-        RETURN(rc);
-}
-
-int mdd_attr_check_set_internal_locked(const struct lu_env *env,
-                                       struct mdd_object *obj,
-                                       struct lu_attr *attr,
-                                       struct thandle *handle,
-                                       int needacl)
-{
-        int rc;
-        ENTRY;
-
-        needacl = needacl && (attr->la_valid & LA_MODE);
-        if (needacl)
-                mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
-        if (needacl)
-                mdd_write_unlock(env, obj);
-        RETURN(rc);
-}
-
-int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
-                    const struct lu_buf *buf, const char *name,
-                    int fl, struct thandle *handle)
-{
-        struct lustre_capa *capa = mdd_object_capa(env, obj);
-        int rc = -EINVAL;
-        ENTRY;
-
-        if (buf->lb_buf && buf->lb_len > 0)
-                rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
-        else if (buf->lb_buf == NULL && buf->lb_len == 0)
-                rc = mdo_xattr_del(env, obj, name, handle, capa);
-
-        RETURN(rc);
-}
-
 /*
  * This gives the same functionality as the code between
  * sys_chmod and inode_setattr
@@ -1016,10 +717,10 @@ int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
  */
 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
-                        struct lu_attr *la, const struct md_attr *ma)
+                       struct lu_attr *la, const unsigned long flags)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc         = md_ucred(env);
+       struct lu_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -1034,16 +735,21 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
+       /* export destroy does not have ->le_ses, but we may want
+        * to drop LUSTRE_SOM_FL. */
+       uc = lu_ucred_check(env);
+       if (uc == NULL)
+               RETURN(0);
+
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
 
         if (la->la_valid == LA_CTIME) {
-                if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
+               if (!(flags & MDS_PERM_BYPASS))
                         /* This is only for set ctime when rename's source is
                          * on remote MDS. */
-                        rc = mdd_may_delete(env, NULL, obj,
-                                            (struct md_attr *)ma, 1, 0);
+                       rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
                         la->la_valid &= ~LA_CTIME;
                 RETURN(rc);
@@ -1051,8 +757,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if (la->la_valid == LA_ATIME) {
                 /* This is atime only set for read atime update on close. */
-                if (la->la_atime <= tmp_la->la_atime +
-                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff)
+                if (la->la_atime >= tmp_la->la_atime &&
+                    la->la_atime < (tmp_la->la_atime +
+                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff))
                         la->la_valid &= ~LA_ATIME;
                 RETURN(0);
         }
@@ -1063,9 +770,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 unsigned int newflags = la->la_flags &
                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
                  * only be changed by the relevant capability. */
@@ -1083,33 +790,50 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
             (la->la_valid & ~LA_FLAGS) &&
-            !(ma->ma_attr_flags & MDS_PERM_BYPASS))
+           !(flags & MDS_PERM_BYPASS))
                 RETURN(-EPERM);
 
-        /* Check for setting the obj time. */
-        if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
-            !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER)) {
-                        rc = mdd_permission_internal_locked(env, obj, tmp_la,
-                                                            MAY_WRITE,
-                                                            MOR_TGT_CHILD);
-                        if (rc)
-                                RETURN(rc);
+       /* Check for setting the obj time. */
+       if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
+           !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER)) {
+                       rc = mdd_permission_internal(env, obj, tmp_la,
+                                                    MAY_WRITE);
+                       if (rc)
+                               RETURN(rc);
+               }
+       }
+
+        if (la->la_valid & LA_KILL_SUID) {
+                la->la_valid &= ~LA_KILL_SUID;
+                if ((tmp_la->la_mode & S_ISUID) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
+                }
+                la->la_mode &= ~S_ISUID;
+        }
+
+        if (la->la_valid & LA_KILL_SGID) {
+                la->la_valid &= ~LA_KILL_SGID;
+                if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
+                                        (S_ISGID | S_IXGRP)) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
                 }
+                la->la_mode &= ~S_ISGID;
         }
 
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
-                /* Bypass la_vaild == LA_MODE,
-                 * this is for changing file with SUID or SGID. */
-                if ((la->la_valid & ~LA_MODE) &&
-                    !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (!(flags & MDS_PERM_BYPASS) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
-                if (la->la_mode == (umode_t) -1)
+                if (la->la_mode == (cfs_umode_t) -1)
                         la->la_mode = tmp_la->la_mode;
                 else
                         la->la_mode = (la->la_mode & S_IALLUGO) |
@@ -1128,10 +852,10 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_UID) {
                 if (la->la_uid == (uid_t) -1)
                         la->la_uid = tmp_la->la_uid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    (la->la_uid != tmp_la->la_uid)) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    (la->la_uid != tmp_la->la_uid)) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* If the user or group of a non-directory has been
                  * changed by a non-root user, remove the setuid bit.
@@ -1153,11 +877,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_GID) {
                 if (la->la_gid == (gid_t) -1)
                         la->la_gid = tmp_la->la_gid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    ((la->la_gid != tmp_la->la_gid) &&
-                    !lustre_in_group_p(uc, la->la_gid))) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    ((la->la_gid != tmp_la->la_gid) &&
+                     !lustre_in_group_p(uc, la->la_gid))) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* Likewise, if the user or group of a non-directory
                  * has been changed by a non-root user, remove the
@@ -1176,11 +900,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         /* For both Size-on-MDS case and truncate case,
          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
-         * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
+        * We distinguish them by "flags & MDS_SOM".
          * For SOM case, it is true, the MAY_WRITE perm has been checked
          * when open, no need check again. For truncate case, it is false,
          * the MAY_WRITE perm should be checked here. */
-        if (ma->ma_attr_flags & MDS_SOM) {
+       if (flags & MDS_SOM) {
                 /* For the "Size-on-MDS" setattr update, merge coming
                  * attributes with the set in the inode. BUG 10641 */
                 if ((la->la_valid & LA_ATIME) &&
@@ -1194,12 +918,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
         } else {
                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
-                        if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
-                              (uc->mu_fsuid == tmp_la->la_uid)) &&
-                            !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
-                                rc = mdd_permission_internal_locked(env, obj,
-                                                            tmp_la, MAY_WRITE,
-                                                            MOR_TGT_CHILD);
+                       if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
+                             (uc->uc_fsuid == tmp_la->la_uid)) &&
+                           !(flags & MDS_PERM_BYPASS)) {
+                               rc = mdd_permission_internal(env, obj,
+                                                            tmp_la, MAY_WRITE);
                                 if (rc)
                                         RETURN(rc);
                         }
@@ -1222,25 +945,28 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
  * \param mdd_obj - mdd_object of change
  * \param handle - transacion handle
  */
-static int mdd_changelog_data_store(const struct lu_env     *env,
-                                    struct mdd_device       *mdd,
-                                    enum changelog_rec_type type,
-                                    struct mdd_object       *mdd_obj,
-                                    struct thandle          *handle)
-{
-        const struct lu_fid *tfid = mdo2fid(mdd_obj);
-        struct llog_changelog_rec *rec;
-        struct lu_buf *buf;
-        int reclen;
-        int rc;
-
+static int mdd_changelog_data_store(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   enum changelog_rec_type type,
+                                   int flags, struct mdd_object *mdd_obj,
+                                   struct thandle *handle)
+{
+       const struct lu_fid             *tfid = mdo2fid(mdd_obj);
+       struct llog_changelog_rec       *rec;
+       struct lu_buf                   *buf;
+       int                              reclen;
+       int                              rc;
+
+        /* Not recording */
         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
                 RETURN(0);
+        if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+                RETURN(0);
 
-        LASSERT(handle != NULL);
         LASSERT(mdd_obj != NULL);
+        LASSERT(handle != NULL);
 
-        if ((type == CL_SETATTR) &&
+        if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
                 /* Don't need multiple updates in this log */
                 /* Don't check under lock - no big deal if we get an extra
@@ -1252,71 +978,45 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         buf = mdd_buf_alloc(env, reclen);
         if (buf->lb_buf == NULL)
                 RETURN(-ENOMEM);
-        rec = (struct llog_changelog_rec *)buf->lb_buf;
+       rec = buf->lb_buf;
 
-        rec->cr.cr_flags = CLF_VERSION;
+        rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
         rec->cr.cr_type = (__u32)type;
         rec->cr.cr_tfid = *tfid;
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        rc = mdd_changelog_llog_write(mdd, rec, handle);
-        if (rc < 0) {
-                CERROR("changelog failed: rc=%d op%d t"DFID"\n",
-                       rc, type, PFID(tfid));
-                return -EFAULT;
-        }
+       rc = mdd_changelog_store(env, mdd, rec, handle);
 
-        return 0;
+       RETURN(rc);
 }
 
-/**
- * Should be called with write lock held.
- *
- * \see mdd_lma_set_locked().
- */
-static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       const struct md_attr *ma, struct thandle *handle)
+int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
+                  int flags, struct md_object *obj)
 {
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_buf *buf;
-        struct lustre_mdt_attrs *lma =
-                                (struct lustre_mdt_attrs *) info->mti_xattr_buf;
-        int lmasize = sizeof(struct lustre_mdt_attrs);
-        int rc = 0;
-
+        struct thandle *handle;
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+        struct mdd_device *mdd = mdo2mdd(obj);
+        int rc;
         ENTRY;
 
-        memset(lma, 0, lmasize);
-
-        /* Either HSM or SOM part is not valid, we need to read it before */
-        if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
-                rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
-                if (rc)
-                        RETURN(rc);
+        handle = mdd_trans_create(env, mdd);
+        if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
 
-                lustre_lma_swab(lma);
-        }
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
 
-        /* Copy HSM data */
-        if (ma->ma_valid & MA_HSM) {
-                lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
-                lma->lma_compat |= LMAC_HSM;
-        }
-        /* XXX: Copy SOM data */
-        if (ma->ma_valid & MA_SOM) {
-                /*
-                lma->lma_compat |= LMAC_SOM;
-                */
-                LASSERT(!(ma->ma_valid & MA_SOM));
-        }
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
 
-        /* Copy FID */
-        memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
+        rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
+                                      handle);
 
-        lustre_lma_swab(lma);
-        buf = mdd_buf_get(env, lma, lmasize);
-        rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
+stop:
+        mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
 }
@@ -1328,241 +1028,338 @@ static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
  * not, LMA EA will be first read from disk, modified and write back.
  *
  */
-static int mdd_lma_set_locked(const struct lu_env *env,
-                              struct mdd_object *mdd_obj,
-                              const struct md_attr *ma, struct thandle *handle)
+/* Precedence for choosing record type when multiple
+ * attributes change: setattr > mtime > ctime > atime
+ * (ctime changes when mtime does, plus chmod/chown.
+ * atime and ctime are independent.) */
+static int mdd_attr_set_changelog(const struct lu_env *env,
+                                  struct md_object *obj, struct thandle *handle,
+                                  __u64 valid)
 {
-        int rc;
+        struct mdd_device *mdd = mdo2mdd(obj);
+        int bits, type = 0;
+
+        bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
+        bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
+        bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
+        bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
+        bits = bits & mdd->mdd_cl.mc_mask;
+        if (bits == 0)
+                return 0;
 
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lma_set(env, mdd_obj, ma, handle);
-        mdd_write_unlock(env, mdd_obj);
-        return rc;
+        /* The record type is the lowest non-masked set bit */
+        while (bits && ((bits & 1) == 0)) {
+                bits = bits >> 1;
+                type++;
+        }
+
+        /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
+        return mdd_changelog_data_store(env, mdd, type, (int)valid,
+                                        md2mdd_obj(obj), handle);
+}
+
+static int mdd_declare_attr_set(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                               const struct lu_attr *attr,
+                                struct thandle *handle)
+{
+       int rc;
+
+       rc = mdo_declare_attr_set(env, obj, attr, handle);
+        if (rc)
+                return rc;
+
+#ifdef CONFIG_FS_POSIX_ACL
+       if (attr->la_valid & LA_MODE) {
+                mdd_read_lock(env, obj, MOR_TGT_CHILD);
+               rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
+                                  XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
+                mdd_read_unlock(env, obj);
+                if (rc == -EOPNOTSUPP || rc == -ENODATA)
+                        rc = 0;
+                else if (rc < 0)
+                        return rc;
+
+                if (rc != 0) {
+                       struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
+                        rc = mdo_declare_xattr_set(env, obj, buf,
+                                                   XATTR_NAME_ACL_ACCESS, 0,
+                                                   handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+#endif
+
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
 }
 
 /* set attr and LOV EA at once, return updated attr */
-static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
-                        const struct md_attr *ma)
+int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
+                const struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
-        struct lov_mds_md *lmm = NULL;
-        struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct obd_export *exp = md_quota(env)->mq_exp;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qnids[MAXQUOTAS] = { 0, 0 };
-        unsigned int qoids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0, block_count = 0;
-        int inode_pending[MAXQUOTAS] = { 0, 0 };
-        int block_pending[MAXQUOTAS] = { 0, 0 };
-#endif
+       const struct lu_attr *la = &ma->ma_attr;
+       int rc;
         ENTRY;
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
+       /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
+       LASSERT((ma->ma_valid & MA_LOV) == 0);
+       LASSERT((ma->ma_valid & MA_HSM) == 0);
+       LASSERT((ma->ma_valid & MA_SOM) == 0);
+
+        *la_copy = ma->ma_attr;
+       rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
+       if (rc)
+                RETURN(rc);
+
+        /* setattr on "close" only change atime, or do nothing */
+       if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
+                RETURN(0);
+
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
-        /*TODO: add lock here*/
-        /* start a log jounal handle if needed */
-        if (S_ISREG(mdd_object_type(mdd_obj)) &&
-            ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
-                lmm_size = mdd_lov_mdsize(env, mdd);
-                lmm = mdd_max_lmm_get(env, mdd);
-                if (lmm == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
-
-                rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
-                                XATTR_NAME_LOV);
-
-                if (rc < 0)
-                        GOTO(cleanup, rc);
-        }
 
-        if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
-                CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
-                       ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
+       rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
+        if (rc)
+                GOTO(stop, rc);
 
-        *la_copy = ma->ma_attr;
-        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
+        rc = mdd_trans_start(env, mdd, handle);
         if (rc)
-                GOTO(cleanup, rc);
-
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
-                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
-
-                rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
-                if (!rc) {
-                        quota_opc = FSFILT_OP_SETATTR;
-                        mdd_quota_wrapper(la_copy, qnids);
-                        mdd_quota_wrapper(la_tmp, qoids);
-                        /* get file quota for new owner */
-                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                        qnids, inode_pending, 1, NULL, 0,
-                                        NULL, 0);
-                        block_count = (la_tmp->la_blocks + 7) >> 3;
-                        if (block_count) {
-                                void *data = NULL;
-                                mdd_data_get(env, mdd_obj, &data);
-                                /* get block quota for new owner */
-                                lquota_chkquota(mds_quota_interface_ref, obd,
-                                                exp, qnids, block_pending,
-                                                block_count, NULL,
-                                                LQUOTA_FLAGS_BLK, data, 1);
-                        }
-                }
-        }
-#endif
+                GOTO(stop, rc);
+
+        /* permission changes may require sync operation */
+       if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync |= !!mdd->mdd_sync_permission;
+
+       if (la->la_valid & (LA_MTIME | LA_CTIME))
+                CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
+                      la->la_mtime, la->la_ctime);
 
         if (la_copy->la_valid & LA_FLAGS) {
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
-                                                  handle, 1);
+               rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
                 if (rc == 0)
                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
         } else if (la_copy->la_valid) {            /* setattr */
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
-                                                  handle, 1);
-                /* journal chown/chgrp in llog, just like unlink */
-                if (rc == 0 && lmm_size){
-                        cookie_size = mdd_lov_cookiesize(env, mdd);
-                        logcookies = mdd_max_cookie_get(env, mdd);
-                        if (logcookies == NULL)
-                                GOTO(cleanup, rc = -ENOMEM);
-
-                        if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
-                                            logcookies, cookie_size) <= 0)
-                                logcookies = NULL;
-                }
+               rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
         }
 
-        if (rc == 0 && ma->ma_valid & MA_LOV) {
-                umode_t mode;
+        if (rc == 0)
+                rc = mdd_attr_set_changelog(env, obj, handle,
+                                           la->la_valid);
+stop:
+        mdd_trans_stop(env, mdd, rc, handle);
+        RETURN(rc);
+}
 
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode) || S_ISDIR(mode)) {
-                        rc = mdd_lsm_sanity_check(env, mdd_obj);
-                        if (rc)
-                                GOTO(cleanup, rc);
+static int mdd_xattr_sanity_check(const struct lu_env *env,
+                                 struct mdd_object *obj)
+{
+       struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
+       struct lu_ucred *uc     = lu_ucred_assert(env);
+       int rc;
+       ENTRY;
 
-                        rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
-                                            ma->ma_lmm_size, handle, 1);
-                }
+       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+               RETURN(-EPERM);
 
-        }
-        if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
-                umode_t mode;
+       rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode))
-                        rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
+       if ((uc->uc_fsuid != tmp_la->la_uid) &&
+           !mdd_capable(uc, CFS_CAP_FOWNER))
+               RETURN(-EPERM);
 
-        }
-cleanup:
-        if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
-                rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
-                                              handle);
-        mdd_trans_stop(env, mdd, rc, handle);
-        if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
-                /*set obd attr, if needed*/
-                rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
-                                           logcookies);
-        }
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc) {
-                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
-                                      inode_pending, 0);
-                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
-                                      block_pending, 1);
-                /* Trigger dqrel/dqacq for original owner and new owner.
-                 * If failed, the next call for lquota_chkquota will
-                 * process it. */
-                lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
-                              quota_opc);
-        }
-#endif
-        RETURN(rc);
+       RETURN(rc);
 }
 
-int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
-                      const struct lu_buf *buf, const char *name, int fl,
-                      struct thandle *handle)
+static int mdd_declare_xattr_set(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct lu_buf *buf,
+                                const char *name,
+                                int fl, struct thandle *handle)
 {
-        int  rc;
-        ENTRY;
+       int     rc;
 
-        mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
-        mdd_write_unlock(env, obj);
+       rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
+       if (rc)
+               return rc;
 
-        RETURN(rc);
-}
+       /* Only record user xattr changes */
+       if ((strncmp("user.", name, 5) == 0)) {
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               if (rc)
+                       return rc;
+       }
 
-static int mdd_xattr_sanity_check(const struct lu_env *env,
-                                  struct mdd_object *obj)
-{
-        struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
-        struct md_ucred *uc     = md_ucred(env);
-        int rc;
-        ENTRY;
-
-        if (mdd_is_immutable(obj) || mdd_is_append(obj))
-                RETURN(-EPERM);
-
-        rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
-        if (rc)
-                RETURN(rc);
+       /* If HSM data is modified, this could add a changelog */
+       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
 
-        if ((uc->mu_fsuid != tmp_la->la_uid) &&
-            !mdd_capable(uc, CFS_CAP_FOWNER))
-                RETURN(-EPERM);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
+}
 
-        RETURN(rc);
+/*
+ * Compare current and future data of HSM EA and add a changelog if needed.
+ *
+ * Caller should have write-locked \param obj.
+ *
+ * \param buf - Future HSM EA content.
+ * \retval 0 if no changelog is needed or changelog was added properly.
+ * \retval -ve errno if there was a problem
+ */
+static int mdd_hsm_update_locked(const struct lu_env *env,
+                                struct md_object *obj,
+                                const struct lu_buf *buf,
+                                struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device      *mdd = mdo2mdd(obj);
+       struct mdd_object      *mdd_obj = md2mdd_obj(obj);
+       struct lu_buf          *current_buf = &info->mti_buf;
+       struct md_hsm          *current_mh;
+       struct md_hsm          *new_mh;
+       int                     rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(current_mh);
+       if (current_mh == NULL)
+               RETURN(-ENOMEM);
+
+       /* Read HSM attrs from disk */
+       current_buf->lb_buf = info->mti_xattr_buf;
+       current_buf->lb_len = sizeof(info->mti_xattr_buf);
+       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
+                          mdd_object_capa(env, mdd_obj));
+       rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(free, rc);
+       else if (rc == -ENODATA)
+               current_mh->mh_flags = 0;
+
+       /* Map future HSM xattr */
+       OBD_ALLOC_PTR(new_mh);
+       if (new_mh == NULL)
+               GOTO(free, rc = -ENOMEM);
+       lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
+
+       /* If HSM flags are different, add a changelog */
+       rc = 0;
+       if (current_mh->mh_flags != new_mh->mh_flags) {
+               int flags = 0;
+               hsm_set_cl_event(&flags, HE_STATE);
+               if (new_mh->mh_flags & HS_DIRTY)
+                       hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
+
+               rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
+                                             handle);
+       }
+
+       OBD_FREE_PTR(new_mh);
+free:
+       OBD_FREE_PTR(current_mh);
+       return(rc);
 }
 
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
  */
 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
+                        const struct lu_buf *buf, const char *name,
+                        int fl)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int  rc;
+       struct mdd_object       *mdd_obj = md2mdd_obj(obj);
+       struct mdd_device       *mdd = mdo2mdd(obj);
+       struct thandle          *handle;
+       int                      rc;
         ENTRY;
 
-        rc = mdd_xattr_sanity_check(env, mdd_obj);
-        if (rc)
-                RETURN(rc);
-
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
-            mdd->mdd_sync_permission == 1)
-                txn_param_sync(&mdd_env_info(env)->mti_param);
-
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
+               rc = mdd_acl_set(env, mdd_obj, buf, fl);
+               RETURN(rc);
+       }
+
+       rc = mdd_xattr_sanity_check(env, mdd_obj);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       /* security-replated changes may require sync */
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+               handle->th_sync |= !!mdd->mdd_sync_permission;
+
+       mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+
+       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
+               rc = mdd_hsm_update_locked(env, obj, buf, handle);
+               if (rc) {
+                       mdd_write_unlock(env, mdd_obj);
+                       GOTO(stop, rc);
+               }
+       }
+
+       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
+                          mdd_object_capa(env, mdd_obj));
+       mdd_write_unlock(env, mdd_obj);
+       if (rc)
+               GOTO(stop, rc);
+
+       /* Only record system & user xattr changes */
+       if (strncmp(XATTR_USER_PREFIX, name,
+                       sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                       sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                       sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
+               rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
+                                             handle);
+
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+
+       RETURN(rc);
+}
+
+static int mdd_declare_xattr_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const char *name,
+                                 struct thandle *handle)
+{
+        int rc;
 
-        rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
+        rc = mdo_declare_xattr_del(env, obj, name, handle);
+        if (rc)
+                return rc;
 
         /* Only record user xattr changes */
-        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
-            (strncmp("user.", name, 5) == 0))
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
-                                              handle);
-        mdd_trans_stop(env, mdd, rc, handle);
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
 
-        RETURN(rc);
+        return rc;
 }
 
 /**
@@ -1582,286 +1379,346 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
-
-        /* Only record user xattr changes */
-        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
-            (strncmp("user.", name, 5) != 0))
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+       if (rc)
+               GOTO(stop, rc);
+
+        /* Only record system & user xattr changes */
+       if (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
 }
 
-/* partial unlink */
-static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
-                       struct md_attr *ma)
-{
-        struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0;
-#endif
-        int rc;
-        ENTRY;
-
-        /*
-         * Check -ENOENT early here because we need to get object type
-         * to calculate credits before transaction start
-         */
-        if (!mdd_object_exists(mdd_obj))
-                RETURN(-ENOENT);
-
-        LASSERT(mdd_object_exists(mdd_obj) > 0);
-
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(-ENOMEM);
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-
-        rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        __mdd_ref_del(env, mdd_obj, handle, 0);
-
-        if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
-                /* unlink dot */
-                __mdd_ref_del(env, mdd_obj, handle, 1);
-        }
-
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        la_copy->la_ctime = ma->ma_attr.la_ctime;
-
-        la_copy->la_valid = LA_CTIME;
-        rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota && ma->ma_valid & MA_INODE &&
-            ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
-                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                mdd_quota_wrapper(&ma->ma_attr, qids);
-        }
-#endif
-
-
-        EXIT;
-cleanup:
-        mdd_write_unlock(env, mdd_obj);
-        mdd_trans_stop(env, mdd, rc, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc)
-                /* Trigger dqrel on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-#endif
-        return rc;
+/*
+ * read lov EA of an object
+ * return the lov EA in an allocated lu_buf
+ */
+static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
+                                    struct mdd_object *obj)
+{
+       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf   *lmm_buf = NULL;
+       int              rc, sz;
+       ENTRY;
+
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
+                          mdd_object_capa(env, obj));
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (rc == 0)
+               GOTO(out, rc = -ENODATA);
+
+       sz = rc;
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               /* mti_big_buf was not allocated, so we have to
+                * allocate it based on the ea size */
+               buf = mdd_buf_alloc(env, sz);
+               if (buf->lb_buf == NULL)
+                       GOTO(out, rc = -ENOMEM);
+               goto repeat;
+       }
+
+       OBD_ALLOC_PTR(lmm_buf);
+       if (!lmm_buf)
+               GOTO(out, rc = -ENOMEM);
+
+       OBD_ALLOC(lmm_buf->lb_buf, sz);
+       if (!lmm_buf->lb_buf)
+               GOTO(free, rc = -ENOMEM);
+
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       lmm_buf->lb_len = sz;
+
+       GOTO(out, rc = 0);
+
+free:
+       if (lmm_buf)
+               OBD_FREE_PTR(lmm_buf);
+out:
+       if (rc)
+               return ERR_PTR(rc);
+       return lmm_buf;
 }
 
-/* partial operation */
-static int mdd_oc_sanity_check(const struct lu_env *env,
-                               struct mdd_object *obj,
-                               struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        switch (ma->ma_attr.la_mode & S_IFMT) {
-        case S_IFREG:
-        case S_IFDIR:
-        case S_IFLNK:
-        case S_IFCHR:
-        case S_IFBLK:
-        case S_IFIFO:
-        case S_IFSOCK:
-                rc = 0;
-                break;
-        default:
-                rc = -EINVAL;
-                break;
-        }
-        RETURN(rc);
-}
 
-static int mdd_object_create(const struct lu_env *env,
-                             struct md_object *obj,
-                             const struct md_op_spec *spec,
-                             struct md_attr *ma)
+/*
+ *  check if layout swapping between 2 objects is allowed
+ *  the rules are:
+ *  - same type of objects
+ *  - same owner/group (so quotas are still valid)
+ */
+static int mdd_layout_swap_allowed(const struct lu_env *env,
+                                  struct mdd_object *o1,
+                                  struct mdd_object *o2)
 {
+       const struct lu_fid     *fid1, *fid2;
+       __u32                    uid, gid;
+       struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
+       int                      rc;
+       ENTRY;
 
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        const struct lu_fid *pfid = spec->u.sp_pfid;
-        struct thandle *handle;
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdd->mdd_obd_dev;
-        struct obd_export *exp = md_quota(env)->mq_exp;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0, block_count = 0;
-        int inode_pending[MAXQUOTAS] = { 0, 0 };
-        int block_pending[MAXQUOTAS] = { 0, 0 };
-#endif
-        int rc = 0;
-        ENTRY;
+       fid1 = mdo2fid(o1);
+       fid2 = mdo2fid(o2);
 
-#ifdef HAVE_QUOTA_SUPPORT
-        if (mds->mds_quota) {
-                quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
-                mdd_quota_wrapper(&ma->ma_attr, qids);
-                /* get file quota for child */
-                lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                qids, inode_pending, 1, NULL, 0,
-                                NULL, 0);
-                switch (ma->ma_attr.la_mode & S_IFMT) {
-                case S_IFLNK:
-                case S_IFDIR:
-                        block_count = 2;
-                        break;
-                case S_IFREG:
-                        block_count = 1;
-                        break;
-                }
-                /* get block quota for child */
-                if (block_count)
-                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
-                                        qids, block_pending, block_count,
-                                        NULL, LQUOTA_FLAGS_BLK, NULL, 0);
-        }
-#endif
+       if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
+           (mdd_object_type(o1) != mdd_object_type(o2)))
+               RETURN(-EPERM);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                GOTO(out_pending, rc = PTR_ERR(handle));
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+       uid = tmp_la->la_uid;
+       gid = tmp_la->la_gid;
 
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_oc_sanity_check(env, mdd_obj, ma);
-        if (rc)
-                GOTO(unlock, rc);
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
-        if (rc)
-                GOTO(unlock, rc);
+       if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
+               RETURN(-EPERM);
 
-        if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
-                /* If creating the slave object, set slave EA here. */
-                int lmv_size = spec->u.sp_ea.eadatalen;
-                struct lmv_stripe_md *lmv;
+       RETURN(0);
+}
 
-                lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
-                LASSERT(lmv != NULL && lmv_size > 0);
+/**
+ * swap layouts between 2 lustre objects
+ */
+static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
+                           struct md_object *obj2, __u64 flags)
+{
+       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
+       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
+       struct lu_buf           *fst_buf, *snd_buf;
+       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
+       struct thandle          *handle;
+       struct mdd_device       *mdd = mdo2mdd(obj1);
+       int                      rc;
+       __u16                    fst_gen, snd_gen;
+       ENTRY;
+
+       /* we have to sort the 2 obj, so locking will always
+        * be in the same order, even in case of 2 concurrent swaps */
+       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
+                      mdo2fid(md2mdd_obj(obj2)));
+       /* same fid ? */
+       if (rc == 0)
+               RETURN(-EPERM);
+
+       if (rc > 0) {
+               o1 = md2mdd_obj(obj1);
+               o2 = md2mdd_obj(obj2);
+       } else {
+               o1 = md2mdd_obj(obj2);
+               o2 = md2mdd_obj(obj1);
+       }
+
+       /* check if layout swapping is allowed */
+       rc = mdd_layout_swap_allowed(env, o1, o2);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       /* objects are already sorted */
+       mdd_write_lock(env, o1, MOR_TGT_CHILD);
+       mdd_write_lock(env, o2, MOR_TGT_CHILD);
+
+       lmm1_buf = mdd_get_lov_ea(env, o1);
+       if (IS_ERR(lmm1_buf)) {
+               rc = PTR_ERR(lmm1_buf);
+               lmm1_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       lmm2_buf = mdd_get_lov_ea(env, o2);
+       if (IS_ERR(lmm2_buf)) {
+               rc = PTR_ERR(lmm2_buf);
+               lmm2_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       /* swapping 2 non existant layouts is a success */
+       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+               GOTO(unlock, rc = 0);
+
+       /* to help inode migration between MDT, it is better to
+        * start by the no layout file (if one), so we order the swap */
+       if (lmm1_buf == NULL) {
+               fst_o = o1;
+               fst_buf = lmm1_buf;
+               snd_o = o2;
+               snd_buf = lmm2_buf;
+       } else {
+               fst_o = o2;
+               fst_buf = lmm2_buf;
+               snd_o = o1;
+               snd_buf = lmm1_buf;
+       }
+
+       /* lmm and generation layout initialization */
+       if (fst_buf) {
+               fst_lmm = fst_buf->lb_buf;
+               fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
+       } else {
+               fst_lmm = NULL;
+               fst_gen = 0;
+       }
+
+       if (snd_buf) {
+               snd_lmm = snd_buf->lb_buf;
+               snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
+       } else {
+               snd_lmm = NULL;
+               snd_gen = 0;
+       }
+
+       /* save the orignal lmm common header of first file
+        * to be able to roll back */
+       OBD_ALLOC_PTR(old_fst_lmm);
+       if (old_fst_lmm == NULL)
+               GOTO(unlock, rc = -ENOMEM);
+
+       memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
+
+       /* increase the generation layout numbers */
+       snd_gen++;
+       fst_gen++;
+
+       /* set the file specific informations in lmm */
+       if (fst_lmm) {
+               fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
+               fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
+               fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
+       }
+
+       if (snd_lmm) {
+               snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+               snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+               snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+       }
+
+       /* prepare transaction */
+       rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, fst_o));
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, snd_o));
+       if (rc) {
+               int     rc2;
+
+               /* failure on second file, but first was done, so we have
+                * to roll back first */
+               /* restore object_id, object_seq and generation number
+                * on first file */
+               if (fst_lmm) {
+                       fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+                       fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+               }
+
+               rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
+                                   LU_XATTR_REPLACE, handle,
+                                   mdd_object_capa(env, fst_o));
+               if (rc2) {
+                       /* very bad day */
+                       CERROR("%s: unable to roll back after swap layouts"
+                              " failure between "DFID" and "DFID
+                              " rc2 = %d rc = %d)\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
+                              rc2, rc);
+                       /* a solution to avoid journal commit is to panic,
+                        * but it has strong consequences so we use LBUG to
+                        * allow sysdamin to choose to panic or not
+                        */
+                       LBUG();
+               }
+               GOTO(stop, rc);
+       }
+       EXIT;
+
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+unlock:
+       mdd_write_unlock(env, o2);
+       mdd_write_unlock(env, o1);
 
-                rc = __mdd_xattr_set(env, mdd_obj,
-                                     mdd_buf_get_const(env, lmv, lmv_size),
-                                     XATTR_NAME_LMV, 0, handle);
-                if (rc)
-                        GOTO(unlock, rc);
+       if (lmm1_buf && lmm1_buf->lb_buf)
+               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
+       if (lmm1_buf)
+               OBD_FREE_PTR(lmm1_buf);
 
-                rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
-                                           handle, 0);
-        } else {
-#ifdef CONFIG_FS_POSIX_ACL
-                if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
-                        struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
-
-                        buf->lb_buf = (void *)spec->u.sp_ea.eadata;
-                        buf->lb_len = spec->u.sp_ea.eadatalen;
-                        if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
-                                rc = __mdd_acl_init(env, mdd_obj, buf,
-                                                    &ma->ma_attr.la_mode,
-                                                    handle);
-                                if (rc)
-                                        GOTO(unlock, rc);
-                                else
-                                        ma->ma_attr.la_valid |= LA_MODE;
-                        }
+       if (lmm2_buf && lmm2_buf->lb_buf)
+               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
+       if (lmm2_buf)
+               OBD_FREE_PTR(lmm2_buf);
 
-                        pfid = spec->u.sp_ea.fid;
-                }
-#endif
-                rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
-                                           spec);
-        }
-        EXIT;
-unlock:
-        if (rc == 0)
-                rc = mdd_attr_get_internal(env, mdd_obj, ma);
-        mdd_write_unlock(env, mdd_obj);
+       if (old_fst_lmm)
+               OBD_FREE_PTR(old_fst_lmm);
 
-        mdd_trans_stop(env, mdd, rc, handle);
-out_pending:
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc) {
-                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
-                                      inode_pending, 0);
-                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
-                                      block_pending, 1);
-                /* Trigger dqacq on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it. */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-        }
-#endif
-        return rc;
+       return rc;
 }
 
-/* partial link */
-static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
-                       const struct md_attr *ma)
+void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
+               struct mdd_object *child, struct lu_attr *attr)
 {
-        struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int rc;
-        ENTRY;
-
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(-ENOMEM);
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
-        if (rc == 0)
-                __mdd_ref_add(env, mdd_obj, handle);
-        mdd_write_unlock(env, mdd_obj);
-        if (rc == 0) {
-                LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-                la_copy->la_ctime = ma->ma_attr.la_ctime;
-
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
-                                                        handle, 0);
-        }
-        mdd_trans_stop(env, mdd, 0, handle);
+       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+       struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
+       struct dt_object *nc = mdd_object_child(child);
 
-        RETURN(rc);
+       /* @hint will be initialized by underlying device. */
+       nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
 }
 
 /*
@@ -1869,28 +1726,27 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
  */
 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
 {
-        int res = 0;
-
-        /* Sadly, NFSD reopens a file repeatedly during operation, so the
-         * "acc_mode = 0" allowance for newly-created files isn't honoured.
-         * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
-         * owner can write to a file even if it is marked readonly to hide
-         * its brokenness. (bug 5781) */
-        if (flags & MDS_OPEN_OWNEROVERRIDE) {
-                struct md_ucred *uc = md_ucred(env);
-
-                if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
-                    (la->la_uid == uc->mu_fsuid))
-                        return 0;
-        }
+       int res = 0;
+
+       /* Sadly, NFSD reopens a file repeatedly during operation, so the
+        * "acc_mode = 0" allowance for newly-created files isn't honoured.
+        * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
+        * owner can write to a file even if it is marked readonly to hide
+        * its brokenness. (bug 5781) */
+       if (flags & MDS_OPEN_OWNEROVERRIDE) {
+               struct lu_ucred *uc = lu_ucred_check(env);
+
+               if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
+                       return 0;
+       }
 
-        if (flags & FMODE_READ)
-                res |= MAY_READ;
-        if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
-                res |= MAY_WRITE;
-        if (flags & MDS_FMODE_EXEC)
-                res |= MAY_EXEC;
-        return res;
+       if (flags & FMODE_READ)
+               res |= MAY_READ;
+       if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
+               res |= MAY_WRITE;
+       if (flags & MDS_FMODE_EXEC)
+               res = MAY_EXEC;
+       return res;
 }
 
 static int mdd_open_sanity_check(const struct lu_env *env,
@@ -1939,13 +1795,13 @@ static int mdd_open_sanity_check(const struct lu_env *env,
          * Now, flag -- O_NOATIME does not be packed by client.
          */
         if (flag & O_NOATIME) {
-                struct md_ucred *uc = md_ucred(env);
+               struct lu_ucred *uc = lu_ucred(env);
 
-                if (uc && ((uc->mu_valid == UCRED_OLD) ||
-                    (uc->mu_valid == UCRED_NEW)) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (uc && ((uc->uc_valid == UCRED_OLD) ||
+                          (uc->uc_valid == UCRED_NEW)) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
         }
 #endif
 
@@ -1968,116 +1824,171 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
         return rc;
 }
 
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+                            struct md_attr *ma, struct thandle *handle)
+{
+        return mdo_declare_destroy(env, obj, handle);
+}
+
 /* return md_attr back,
  * if it is last unlink then return lov ea + llog cookie*/
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
+                    struct md_attr *ma, struct thandle *handle)
 {
-        int rc = 0;
+       int rc;
         ENTRY;
 
-        if (S_ISREG(mdd_object_type(obj))) {
-                /* Return LOV & COOKIES unconditionally here. We clean evth up.
-                 * Caller must be ready for that. */
+       rc = mdo_destroy(env, obj, handle);
 
-                rc = __mdd_lmm_get(env, obj, ma);
-                if ((ma->ma_valid & MA_LOV))
-                        rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
-                                            obj, ma);
-        }
         RETURN(rc);
 }
 
+static int mdd_declare_close(const struct lu_env *env,
+                             struct mdd_object *obj,
+                             struct md_attr *ma,
+                             struct thandle *handle)
+{
+        int rc;
+
+        rc = orph_declare_index_delete(env, obj, handle);
+        if (rc)
+                return rc;
+
+       return mdo_declare_destroy(env, obj, handle);
+}
+
 /*
  * No permission check is needed.
  */
 static int mdd_close(const struct lu_env *env, struct md_object *obj,
-                     struct md_attr *ma)
+                     struct md_attr *ma, int mode)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle    *handle;
-        int rc;
-        int reset = 1;
-
-#ifdef HAVE_QUOTA_SUPPORT
-        struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
-        struct mds_obd *mds = &obd->u.mds;
-        unsigned int qids[MAXQUOTAS] = { 0, 0 };
-        int quota_opc = 0;
-#endif
+        struct thandle    *handle = NULL;
+       int rc, is_orphan = 0;
         ENTRY;
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
-        if (rc)
-                RETURN(rc);
-        handle = mdd_trans_start(env, mdo2mdd(obj));
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+        if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
+               mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+               mdd_obj->mod_count--;
+               mdd_write_unlock(env, mdd_obj);
 
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        /* release open count */
-        mdd_obj->mod_count --;
-
-        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
-                /* remove link to object from orphan index */
-                rc = __mdd_orphan_del(env, mdd_obj, handle);
-                if (rc == 0) {
-                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
-                               "list, OSS objects to be destroyed.\n",
-                               PFID(mdd_object_fid(mdd_obj)));
-                } else {
-                        CERROR("Object "DFID" can not be deleted from orphan "
-                                "list, maybe cause OST objects can not be "
-                                "destroyed (err: %d).\n",
-                                PFID(mdd_object_fid(mdd_obj)), rc);
-                        /* If object was not deleted from orphan list, do not
-                         * destroy OSS objects, which will be done when next
-                         * recovery. */
-                        GOTO(out, rc);
-                }
+                if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
+                        CDEBUG(D_HA, "Object "DFID" is retained in orphan "
+                               "list\n", PFID(mdd_object_fid(mdd_obj)));
+                RETURN(0);
         }
 
-        rc = mdd_iattr_get(env, mdd_obj, ma);
-        /* Object maybe not in orphan list originally, it is rare case for
-         * mdd_finish_unlink() failure. */
-        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
-#ifdef HAVE_QUOTA_SUPPORT
-                if (mds->mds_quota) {
-                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                        mdd_quota_wrapper(&ma->ma_attr, qids);
-                }
-#endif
-                /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
-                if (ma->ma_valid & MA_FLAGS &&
-                    ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
-                        rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
-                } else {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
-                                if (rc == 0)
-                                        reset = 0;
-                }
+       /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
+        * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
+       /* check without any lock */
+       is_orphan = mdd_obj->mod_count == 1 &&
+                   (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
+
+again:
+       if (is_orphan) {
+                handle = mdd_trans_create(env, mdo2mdd(obj));
+                if (IS_ERR(handle))
+                        RETURN(PTR_ERR(handle));
+
+                rc = mdd_declare_close(env, mdd_obj, ma, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+                if (rc)
+                        GOTO(stop, rc);
 
-                if (rc != 0)
-                        CERROR("Error when prepare to delete Object "DFID" , "
-                               "which will cause OST objects can not be "
-                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
+                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                if (rc)
+                        GOTO(stop, rc);
         }
-        EXIT;
 
-out:
-        if (reset)
-                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
+                       mdd_object_capa(env, mdd_obj));
+       if (rc != 0) {
+               CERROR("Failed to get lu_attr of "DFID": %d\n",
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               GOTO(out, rc);
+       }
+
+       /* check again with lock */
+       is_orphan = (mdd_obj->mod_count == 1) &&
+                   ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
+                    ma->ma_attr.la_nlink == 0);
+
+       if (is_orphan && handle == NULL) {
+               mdd_write_unlock(env, mdd_obj);
+               goto again;
+       }
+
+       mdd_obj->mod_count--; /*release open count */
+
+       if (!is_orphan)
+               GOTO(out, rc = 0);
+
+       /* Orphan object */
+       /* NB: Object maybe not in orphan list originally, it is rare case for
+        * mdd_finish_unlink() failure, in that case, the object doesn't have
+        * ORPHAN_OBJ flag */
+       if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+               /* remove link to object from orphan index */
+               LASSERT(handle != NULL);
+               rc = __mdd_orphan_del(env, mdd_obj, handle);
+               if (rc != 0) {
+                       CERROR("%s: unable to delete "DFID" from orphan list: "
+                              "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                              PFID(mdd_object_fid(mdd_obj)), rc);
+                       /* If object was not deleted from orphan list, do not
+                        * destroy OSS objects, which will be done when next
+                        * recovery. */
+                       GOTO(out, rc);
+               }
+
+               CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                      "list, OSS objects to be destroyed.\n",
+                      PFID(mdd_object_fid(mdd_obj)));
+       }
+
+       rc = mdo_destroy(env, mdd_obj, handle);
+
+       if (rc != 0) {
+               CERROR("%s: unable to delete "DFID" from orphan list: "
+                      "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+       }
+       EXIT;
 
+out:
         mdd_write_unlock(env, mdd_obj);
-        mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (quota_opc)
-                /* Trigger dqrel on the owner of child. If failed,
-                 * the next call for lquota_chkquota will process it */
-                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              quota_opc);
-#endif
+
+        if (rc == 0 &&
+            (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
+            !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
+                if (handle == NULL) {
+                        handle = mdd_trans_create(env, mdo2mdd(obj));
+                        if (IS_ERR(handle))
+                                GOTO(stop, rc = IS_ERR(handle));
+
+                        rc = mdd_declare_changelog_store(env, mdd, NULL,
+                                                         handle);
+                        if (rc)
+                                GOTO(stop, rc);
+
+                        rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                        if (rc)
+                                GOTO(stop, rc);
+                }
+
+                mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
+                                         mdd_obj, handle);
+        }
+
+stop:
+        if (handle != NULL)
+                mdd_trans_stop(env, mdd, rc, handle);
         return rc;
 }
 
@@ -2100,28 +2011,28 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              int first, void *area, int nob,
-                              const struct dt_it_ops *iops, struct dt_it *it,
-                              __u64 *start, __u64 *end,
-                              struct lu_dirent **last, __u32 attr)
+static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
+                             int nob, const struct dt_it_ops *iops,
+                             struct dt_it *it, __u32 attr, void *arg)
 {
-        int                     result;
-        __u64                   hash = 0;
-        struct lu_dirent       *ent;
-
-        if (first) {
-                memset(area, 0, sizeof (struct lu_dirpage));
-                area += sizeof (struct lu_dirpage);
-                nob  -= sizeof (struct lu_dirpage);
-        }
+       struct lu_dirpage       *dp = &lp->lp_dir;
+       void                    *area = dp;
+       int                      result;
+       __u64                    hash = 0;
+       struct lu_dirent        *ent;
+       struct lu_dirent        *last = NULL;
+       int                      first = 1;
+
+        memset(area, 0, sizeof (*dp));
+        area += sizeof (*dp);
+        nob  -= sizeof (*dp);
 
         ent  = area;
         do {
                 int    len;
                 int    recsize;
 
-                len  = iops->key_size(env, it);
+                len = iops->key_size(env, it);
 
                 /* IAM iterator can return record with zero len. */
                 if (len == 0)
@@ -2130,14 +2041,14 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 hash = iops->store(env, it);
                 if (unlikely(first)) {
                         first = 0;
-                        *start = hash;
+                        dp->ldp_hash_start = cpu_to_le64(hash);
                 }
 
                 /* calculate max space required for lu_dirent */
                 recsize = lu_dirent_calc_size(len, attr);
 
                 if (nob >= recsize) {
-                        result = iops->rec(env, it, ent, attr);
+                        result = iops->rec(env, it, (struct dt_rec *)ent, attr);
                         if (result == -ESTALE)
                                 goto next;
                         if (result != 0)
@@ -2147,20 +2058,10 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
-                        /*
-                         * record doesn't fit into page, enlarge previous one.
-                         */
-                        if (*last) {
-                                (*last)->lde_reclen =
-                                        cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
-                                                        nob);
-                                result = 0;
-                        } else
-                                result = -EINVAL;
-
+                        result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
                 }
-                *last = ent;
+                last = ent;
                 ent = (void *)ent + recsize;
                 nob -= recsize;
 
@@ -2171,106 +2072,20 @@ next:
         } while (result == 0);
 
 out:
-        *end = hash;
+        dp->ldp_hash_end = cpu_to_le64(hash);
+        if (last != NULL) {
+                if (last->lde_hash == dp->ldp_hash_end)
+                        dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+                last->lde_reclen = 0; /* end mark */
+        }
+       if (result > 0)
+               /* end of directory */
+               dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+       else if (result < 0)
+               CWARN("build page failed: %d!\n", result);
         return result;
 }
 
-static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
-                          const struct lu_rdpg *rdpg)
-{
-        struct dt_it      *it;
-        struct dt_object  *next = mdd_object_child(obj);
-        const struct dt_it_ops  *iops;
-        struct page       *pg;
-        struct lu_dirent  *last = NULL;
-        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        int i;
-        int rc;
-        int nob;
-        __u64 hash_start;
-        __u64 hash_end = 0;
-
-        LASSERT(rdpg->rp_pages != NULL);
-        LASSERT(next->do_index_ops != NULL);
-
-        if (rdpg->rp_count <= 0)
-                return -EFAULT;
-
-        /*
-         * iterate through directory and fill pages from @rdpg
-         */
-        iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, mdd_object_capa(env, obj));
-        if (IS_ERR(it))
-                return PTR_ERR(it);
-
-        rc = iops->load(env, it, rdpg->rp_hash);
-
-        if (rc == 0){
-                /*
-                 * Iterator didn't find record with exactly the key requested.
-                 *
-                 * It is currently either
-                 *
-                 *     - positioned above record with key less than
-                 *     requested---skip it.
-                 *
-                 *     - or not positioned at all (is in IAM_IT_SKEWED
-                 *     state)---position it on the next item.
-                 */
-                rc = iops->next(env, it);
-        } else if (rc > 0)
-                rc = 0;
-
-        /*
-         * At this point and across for-loop:
-         *
-         *  rc == 0 -> ok, proceed.
-         *  rc >  0 -> end of directory.
-         *  rc <  0 -> error.
-         */
-        for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
-             i++, nob -= CFS_PAGE_SIZE) {
-                LASSERT(i < rdpg->rp_npages);
-                pg = rdpg->rp_pages[i];
-                rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
-                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
-                                        it, &hash_start, &hash_end, &last,
-                                        rdpg->rp_attrs);
-                if (rc != 0 || i == rdpg->rp_npages - 1) {
-                        if (last)
-                                last->lde_reclen = 0;
-                }
-                cfs_kunmap(pg);
-        }
-        if (rc > 0) {
-                /*
-                 * end of directory.
-                 */
-                hash_end = DIR_END_OFF;
-                rc = 0;
-        }
-        if (rc == 0) {
-                struct lu_dirpage *dp;
-
-                dp = cfs_kmap(rdpg->rp_pages[0]);
-                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(hash_end);
-                if (i == 0)
-                        /*
-                         * No pages were processed, mark this.
-                         */
-                        dp->ldp_flags |= LDF_EMPTY;
-
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
-                cfs_kunmap(rdpg->rp_pages[0]);
-        }
-        iops->put(env, it);
-        iops->fini(env, it);
-
-        return rc;
-}
-
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                  const struct lu_rdpg *rdpg)
 {
@@ -2278,7 +2093,11 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        LASSERT(mdd_object_exists(mdd_obj));
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_readpage_sanity_check(env, mdd_obj);
@@ -2289,12 +2108,12 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 struct page *pg;
                 struct lu_dirpage *dp;
 
-                /*
-                 * According to POSIX, please do not return any entry to client:
-                 * even dot and dotdot should not be returned.
-                 */
-                CWARN("readdir from dead object: "DFID"\n",
-                        PFID(mdd_object_fid(mdd_obj)));
+               /*
+                * According to POSIX, please do not return any entry to client:
+                * even dot and dotdot should not be returned.
+                */
+               CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
+                      PFID(mdd_object_fid(mdd_obj)));
 
                 if (rdpg->rp_count <= 0)
                         GOTO(out_unlock, rc = -EFAULT);
@@ -2304,16 +2123,31 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 dp = (struct lu_dirpage*)cfs_kmap(pg);
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
-                dp->ldp_flags |= LDF_EMPTY;
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
+                dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                 cfs_kunmap(pg);
-                GOTO(out_unlock, rc = 0);
-        }
-
-        rc = __mdd_readpage(env, mdd_obj, rdpg);
-
-        EXIT;
+                GOTO(out_unlock, rc = LU_PAGE_SIZE);
+        }
+
+       rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
+                          mdd_dir_page_build, NULL);
+       if (rc >= 0) {
+               struct lu_dirpage       *dp;
+
+               dp = cfs_kmap(rdpg->rp_pages[0]);
+               dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
+               if (rc == 0) {
+                       /*
+                        * No pages were processed, mark this for first page
+                        * and send back.
+                        */
+                       dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
+                       rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
+               }
+               cfs_kunmap(rdpg->rp_pages[0]);
+       }
+
+       GOTO(out_unlock, rc);
 out_unlock:
         mdd_read_unlock(env, mdd_obj);
         return rc;
@@ -2322,49 +2156,30 @@ out_unlock:
 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct dt_object *next;
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        next = mdd_object_child(mdd_obj);
-        return next->do_ops->do_object_sync(env, next);
-}
-
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
-                                        struct md_object *obj)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_get(env, mdd_object_child(mdd_obj));
-}
-
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
-                            dt_obj_version_t version)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
-        LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_set(env, mdd_object_child(mdd_obj), version);
+        if (mdd_object_exists(mdd_obj) == 0) {
+                CERROR("%s: object "DFID" not found: rc = -2\n",
+                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
+                return -ENOENT;
+        }
+        return dt_object_sync(env, mdd_object_child(mdd_obj));
 }
 
 const struct md_object_operations mdd_obj_ops = {
-        .moo_permission    = mdd_permission,
-        .moo_attr_get      = mdd_attr_get,
-        .moo_attr_set      = mdd_attr_set,
-        .moo_xattr_get     = mdd_xattr_get,
-        .moo_xattr_set     = mdd_xattr_set,
-        .moo_xattr_list    = mdd_xattr_list,
-        .moo_xattr_del     = mdd_xattr_del,
-        .moo_object_create = mdd_object_create,
-        .moo_ref_add       = mdd_ref_add,
-        .moo_ref_del       = mdd_ref_del,
-        .moo_open          = mdd_open,
-        .moo_close         = mdd_close,
-        .moo_readpage      = mdd_readpage,
-        .moo_readlink      = mdd_readlink,
-        .moo_capa_get      = mdd_capa_get,
-        .moo_object_sync   = mdd_object_sync,
-        .moo_version_get   = mdd_version_get,
-        .moo_version_set   = mdd_version_set,
-        .moo_path          = mdd_path,
+       .moo_permission         = mdd_permission,
+       .moo_attr_get           = mdd_attr_get,
+       .moo_attr_set           = mdd_attr_set,
+       .moo_xattr_get          = mdd_xattr_get,
+       .moo_xattr_set          = mdd_xattr_set,
+       .moo_xattr_list         = mdd_xattr_list,
+       .moo_xattr_del          = mdd_xattr_del,
+       .moo_swap_layouts       = mdd_swap_layouts,
+       .moo_open               = mdd_open,
+       .moo_close              = mdd_close,
+       .moo_readpage           = mdd_readpage,
+       .moo_readlink           = mdd_readlink,
+       .moo_changelog          = mdd_changelog,
+       .moo_capa_get           = mdd_capa_get,
+       .moo_object_sync        = mdd_object_sync,
+       .moo_path               = mdd_path,
 };