Whamcloud - gitweb
LU-2056 mdd: create transaction before locking object
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index ba4b81a..c3f6958 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -119,11 +119,10 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 
 void mdd_buf_put(struct lu_buf *buf)
 {
-        if (buf == NULL || buf->lb_buf == NULL)
-                return;
-        OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-        buf->lb_buf = NULL;
-        buf->lb_len = 0;
+       if (buf == NULL || buf->lb_buf == NULL)
+               return;
+       OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       *buf = LU_BUF_NULL;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
@@ -139,19 +138,19 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
 
 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
 {
-        struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
 
-        if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-                OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-                buf->lb_buf = NULL;
-        }
-        if (buf->lb_buf == NULL) {
-                buf->lb_len = len;
-                OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
-                if (buf->lb_buf == NULL)
-                        buf->lb_len = 0;
-        }
-        return buf;
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+               *buf = LU_BUF_NULL;
+       }
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               buf->lb_len = len;
+               OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
+               if (buf->lb_buf == NULL)
+                       *buf = LU_BUF_NULL;
+       }
+       return buf;
 }
 
 /** Increase the size of the \a mti_big_buf.
@@ -180,61 +179,6 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len)
         return 0;
 }
 
-struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
-                                       struct mdd_device *mdd)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-        int                     max_cookie_size;
-
-        max_cookie_size = mdd_lov_cookiesize(env, mdd);
-        if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
-                if (mti->mti_max_cookie)
-                        OBD_FREE_LARGE(mti->mti_max_cookie,
-                                       mti->mti_max_cookie_size);
-                mti->mti_max_cookie = NULL;
-                mti->mti_max_cookie_size = 0;
-        }
-        if (unlikely(mti->mti_max_cookie == NULL)) {
-                OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
-                if (likely(mti->mti_max_cookie != NULL))
-                        mti->mti_max_cookie_size = max_cookie_size;
-        }
-        if (likely(mti->mti_max_cookie != NULL))
-                memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
-        return mti->mti_max_cookie;
-}
-
-struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
-{
-        struct mdd_thread_info *mti = mdd_env_info(env);
-
-        if (unlikely(mti->mti_max_lmm_size < size)) {
-                int rsize = size_roundup_power2(size);
-
-                if (mti->mti_max_lmm_size > 0) {
-                        LASSERT(mti->mti_max_lmm);
-                        OBD_FREE_LARGE(mti->mti_max_lmm,
-                                       mti->mti_max_lmm_size);
-                        mti->mti_max_lmm = NULL;
-                        mti->mti_max_lmm_size = 0;
-                }
-
-                OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
-                if (likely(mti->mti_max_lmm != NULL))
-                        mti->mti_max_lmm_size = rsize;
-        }
-        return mti->mti_max_lmm;
-}
-
-struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
-                                   struct mdd_device *mdd)
-{
-        int max_lmm_size;
-
-        max_lmm_size = mdd_lov_mdsize(env, mdd);
-        return mdd_max_lmm_buffer(env, max_lmm_size);
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
@@ -474,9 +418,9 @@ static int mdd_path_current(const struct lu_env *env,
         /* Verify that our path hasn't changed since we started the lookup.
            Record the current index, and verify the path resolves to the
            same fid. If it does, then the path is correct as of this index. */
-        cfs_spin_lock(&mdd->mdd_cl.mc_lock);
-        pli->pli_currec = mdd->mdd_cl.mc_index;
-        cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       pli->pli_currec = mdd->mdd_cl.mc_index;
+       spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -571,145 +515,17 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
         RETURN(rc);
 }
 
-/* get only inode attributes */
-int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct md_attr *ma)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (ma->ma_valid & MA_INODE)
-                RETURN(0);
-
-        rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                          mdd_object_capa(env, mdd_obj));
-        if (rc == 0)
-                ma->ma_valid |= MA_INODE;
-        RETURN(rc);
-}
-
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
-{
-        struct lov_desc *ldesc;
-        struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
-        struct lov_user_md *lum = (struct lov_user_md*)lmm;
-        ENTRY;
-
-        if (!lum)
-                RETURN(0);
-
-        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
-        LASSERT(ldesc != NULL);
-
-        lum->lmm_magic = LOV_MAGIC_V1;
-        lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
-        lum->lmm_pattern = ldesc->ld_pattern;
-        lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
-        lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
-        lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
-
-        RETURN(sizeof(*lum));
-}
-
-static int is_rootdir(struct mdd_object *mdd_obj)
-{
-        const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
-        const struct lu_fid *fid = mdo2fid(mdd_obj);
-
-        return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
-}
-
-int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
-{
-       struct mdd_thread_info *info = mdd_env_info(env);
-       int                     size;
-       int                     rc   = -EINVAL;
-       ENTRY;
-
-       LASSERT(info != NULL);
-       LASSERT(ma->ma_big_lmm_used == 0);
-
-       if (ma->ma_lmm_size == 0) {
-               CERROR("No buffer to hold %s xattr of object "DFID"\n",
-                      XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
-               RETURN(rc);
-       }
-
-        rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
-                           mdd_object_capa(env, obj));
-        if (rc < 0)
-                RETURN(rc);
-
-        /* big_lmm may need to grow */
-        size = rc;
-        mdd_max_lmm_buffer(env, size);
-        if (info->mti_max_lmm == NULL)
-                RETURN(-ENOMEM);
-
-        LASSERT(info->mti_max_lmm_size >= size);
-        rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
-                        XATTR_NAME_LOV);
-        if (rc < 0)
-                RETURN(rc);
-
-        ma->ma_big_lmm_used = 1;
-        ma->ma_valid |= MA_LOV;
-        ma->ma_lmm = info->mti_max_lmm;
-        ma->ma_lmm_size = size;
-        LASSERT(size == rc);
-        RETURN(rc);
-}
-
-/* get lov EA only */
-static int __mdd_lmm_get(const struct lu_env *env,
-                         struct mdd_object *mdd_obj, struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        if (ma->ma_valid & MA_LOV)
-                RETURN(0);
-
-        rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
-                        XATTR_NAME_LOV);
-        if (rc == -ERANGE)
-                rc = mdd_big_lmm_get(env, mdd_obj, ma);
-        else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
-                rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
-
-        if (rc > 0) {
-                ma->ma_lmm_size = rc;
-                ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
-                ma->ma_valid |= MA_LOV | MA_LAY_GEN;
-                rc = 0;
-        }
-        RETURN(rc);
-}
-
-int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       struct md_attr *ma)
-{
-        int rc;
-        ENTRY;
-
-        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lmm_get(env, mdd_obj, ma);
-        mdd_read_unlock(env, mdd_obj);
-        RETURN(rc);
-}
-
 /*
  * No permission check is needed.
  */
 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
                 struct md_attr *ma)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        int                rc;
-
+       int rc;
         ENTRY;
-       rc = mdd_iattr_get(env, mdd_obj, ma);
+
+       return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
+                         mdd_object_capa(env, md2mdd_obj(obj)));
         RETURN(rc);
 }
 
@@ -721,9 +537,6 @@ static int mdd_xattr_get(const struct lu_env *env,
                          const char *name)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
-       struct mdd_device *mdd = mdo2mdd(obj);
-       struct lu_fid      rootfid;
-       int is_root;
         int rc;
 
         ENTRY;
@@ -739,20 +552,6 @@ static int mdd_xattr_get(const struct lu_env *env,
                            mdd_object_capa(env, mdd_obj));
         mdd_read_unlock(env, mdd_obj);
 
-       dt_root_get(env, mdd->mdd_child, &rootfid);
-       is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid);
-
-       /* XXX: a temp. solution till LOD/OSP is landed */
-       if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) {
-               if (buf->lb_buf == NULL) {
-                       rc = sizeof(struct lov_user_md);
-               } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
-                       rc = mdd_get_default_md(mdd_obj, buf->lb_buf);
-               } else {
-                       rc = -ERANGE;
-               }
-       }
-
         RETURN(rc);
 }
 
@@ -813,12 +612,22 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
         int rc;
         ENTRY;
 
-        if (feat != &dt_directory_features && feat != NULL)
+       if (feat != &dt_directory_features && feat != NULL) {
                 dof->dof_type = DFT_INDEX;
-        else
-               dof->dof_type = dt_mode_to_dft(attr->la_mode);
+               dof->u.dof_idx.di_feat = feat;
 
-        dof->u.dof_idx.di_feat = feat;
+       } else {
+               dof->dof_type = dt_mode_to_dft(attr->la_mode);
+               if (dof->dof_type == DFT_REGULAR) {
+                       dof->u.dof_reg.striped =
+                               md_should_create(spec->sp_cr_flags);
+                       if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
+                               dof->u.dof_reg.striped = 0;
+                       /* is this replay? */
+                       if (spec->no_create)
+                               dof->u.dof_reg.striped = 0;
+               }
+       }
 
        rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
 
@@ -832,27 +641,16 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
 {
         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
-        const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
-        if (!mdd_object_exists(c)) {
-                struct dt_object *next = mdd_object_child(c);
-                LASSERT(next);
-
-                if (feat != &dt_directory_features && feat != NULL)
-                        dof->dof_type = DFT_INDEX;
-                else
-                        dof->dof_type = dt_mode_to_dft(attr->la_mode);
+       LASSERT(!mdd_object_exists(c));
 
-                dof->u.dof_idx.di_feat = feat;
+       rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-                rc = mdo_create_obj(env, c, attr, hint, dof, handle);
-                LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-        } else
-                rc = -EEXIST;
+       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 /**
@@ -911,40 +709,6 @@ int mdd_attr_check_set_internal(const struct lu_env *env,
         RETURN(rc);
 }
 
-int mdd_attr_check_set_internal_locked(const struct lu_env *env,
-                                       struct mdd_object *obj,
-                                       struct lu_attr *attr,
-                                       struct thandle *handle,
-                                       int needacl)
-{
-        int rc;
-        ENTRY;
-
-        needacl = needacl && (attr->la_valid & LA_MODE);
-        if (needacl)
-                mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
-        if (needacl)
-                mdd_write_unlock(env, obj);
-        RETURN(rc);
-}
-
-int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
-                    const struct lu_buf *buf, const char *name,
-                    int fl, struct thandle *handle)
-{
-        struct lustre_capa *capa = mdd_object_capa(env, obj);
-        int rc = -EINVAL;
-        ENTRY;
-
-        if (buf->lb_buf && buf->lb_len > 0)
-                rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
-        else if (buf->lb_buf == NULL && buf->lb_len == 0)
-                rc = mdo_xattr_del(env, obj, name, handle, capa);
-
-        RETURN(rc);
-}
-
 /*
  * This gives the same functionality as the code between
  * sys_chmod and inode_setattr
@@ -956,7 +720,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                        struct lu_attr *la, const unsigned long flags)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc;
+       struct lu_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -971,12 +735,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
-        /* export destroy does not have ->le_ses, but we may want
-         * to drop LUSTRE_SOM_FL. */
-        if (!env->le_ses)
-                RETURN(0);
-
-        uc = md_ucred(env);
+       /* export destroy does not have ->le_ses, but we may want
+        * to drop LUSTRE_SOM_FL. */
+       uc = lu_ucred_check(env);
+       if (uc == NULL)
+               RETURN(0);
 
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
@@ -1007,9 +770,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 unsigned int newflags = la->la_flags &
                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
                  * only be changed by the relevant capability. */
@@ -1030,17 +793,17 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
            !(flags & MDS_PERM_BYPASS))
                 RETURN(-EPERM);
 
-        /* Check for setting the obj time. */
-        if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
-            !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER)) {
+       /* Check for setting the obj time. */
+       if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
+           !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER)) {
                        rc = mdd_permission_internal(env, obj, tmp_la,
                                                     MAY_WRITE);
-                        if (rc)
-                                RETURN(rc);
-                }
-        }
+                       if (rc)
+                               RETURN(rc);
+               }
+       }
 
         if (la->la_valid & LA_KILL_SUID) {
                 la->la_valid &= ~LA_KILL_SUID;
@@ -1066,9 +829,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
                if (!(flags & MDS_PERM_BYPASS) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 if (la->la_mode == (cfs_umode_t) -1)
                         la->la_mode = tmp_la->la_mode;
@@ -1089,10 +852,10 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_UID) {
                 if (la->la_uid == (uid_t) -1)
                         la->la_uid = tmp_la->la_uid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    (la->la_uid != tmp_la->la_uid)) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    (la->la_uid != tmp_la->la_uid)) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* If the user or group of a non-directory has been
                  * changed by a non-root user, remove the setuid bit.
@@ -1114,11 +877,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_GID) {
                 if (la->la_gid == (gid_t) -1)
                         la->la_gid = tmp_la->la_gid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    ((la->la_gid != tmp_la->la_gid) &&
-                    !lustre_in_group_p(uc, la->la_gid))) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    ((la->la_gid != tmp_la->la_gid) &&
+                     !lustre_in_group_p(uc, la->la_gid))) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* Likewise, if the user or group of a non-directory
                  * has been changed by a non-root user, remove the
@@ -1156,7 +919,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         } else {
                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
                        if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
-                              (uc->mu_fsuid == tmp_la->la_uid)) &&
+                             (uc->uc_fsuid == tmp_la->la_uid)) &&
                            !(flags & MDS_PERM_BYPASS)) {
                                rc = mdd_permission_internal(env, obj,
                                                             tmp_la, MAY_WRITE);
@@ -1182,19 +945,17 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
  * \param mdd_obj - mdd_object of change
  * \param handle - transacion handle
  */
-static int mdd_changelog_data_store(const struct lu_env     *env,
-                                    struct mdd_device       *mdd,
-                                    enum changelog_rec_type type,
-                                    int                     flags,
-                                    struct mdd_object       *mdd_obj,
-                                    struct thandle          *handle)
-{
-        const struct lu_fid *tfid = mdo2fid(mdd_obj);
-        struct llog_changelog_rec *rec;
-        struct thandle *th = NULL;
-        struct lu_buf *buf;
-        int reclen;
-        int rc;
+static int mdd_changelog_data_store(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   enum changelog_rec_type type,
+                                   int flags, struct mdd_object *mdd_obj,
+                                   struct thandle *handle)
+{
+       const struct lu_fid             *tfid = mdo2fid(mdd_obj);
+       struct llog_changelog_rec       *rec;
+       struct lu_buf                   *buf;
+       int                              reclen;
+       int                              rc;
 
         /* Not recording */
         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
@@ -1217,7 +978,7 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         buf = mdd_buf_alloc(env, reclen);
         if (buf->lb_buf == NULL)
                 RETURN(-ENOMEM);
-        rec = (struct llog_changelog_rec *)buf->lb_buf;
+       rec = buf->lb_buf;
 
         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
         rec->cr.cr_type = (__u32)type;
@@ -1225,18 +986,9 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
-
-        if (th)
-                mdd_trans_stop(env, mdd, rc, th);
-
-        if (rc < 0) {
-                CERROR("changelog failed: rc=%d op%d t"DFID"\n",
-                       rc, type, PFID(tfid));
-                return -EFAULT;
-        }
+       rc = mdd_changelog_store(env, mdd, rec, handle);
 
-        return 0;
+       RETURN(rc);
 }
 
 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
@@ -1250,7 +1002,7 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
-                return(PTR_ERR(handle));
+               RETURN(PTR_ERR(handle));
 
         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
         if (rc)
@@ -1270,82 +1022,12 @@ stop:
 }
 
 /**
- * Should be called with write lock held.
- *
- * \see mdd_lma_set_locked().
- */
-static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
-                       const struct md_attr *ma, struct thandle *handle)
-{
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_buf *buf;
-        struct lustre_mdt_attrs *lma =
-                                (struct lustre_mdt_attrs *) info->mti_xattr_buf;
-        int lmasize = sizeof(struct lustre_mdt_attrs);
-        int rc = 0;
-
-        ENTRY;
-
-        /* Either HSM or SOM part is not valid, we need to read it before */
-        if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
-                rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
-                if (rc <= 0)
-                        RETURN(rc);
-
-                lustre_lma_swab(lma);
-        } else {
-                memset(lma, 0, lmasize);
-        }
-
-        /* Copy HSM data */
-        if (ma->ma_valid & MA_HSM) {
-                lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
-                lma->lma_compat |= LMAC_HSM;
-        }
-
-        /* Copy SOM data */
-        if (ma->ma_valid & MA_SOM) {
-                LASSERT(ma->ma_som != NULL);
-                if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
-                        lma->lma_compat     &= ~LMAC_SOM;
-                } else {
-                        lma->lma_compat     |= LMAC_SOM;
-                        lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
-                        lma->lma_som_size    = ma->ma_som->msd_size;
-                        lma->lma_som_blocks  = ma->ma_som->msd_blocks;
-                        lma->lma_som_mountid = ma->ma_som->msd_mountid;
-                }
-        }
-
-        /* Copy FID */
-        memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
-
-        lustre_lma_swab(lma);
-        buf = mdd_buf_get(env, lma, lmasize);
-        rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
-
-        RETURN(rc);
-}
-
-/**
  * Save LMA extended attributes with data from \a ma.
  *
  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
  * not, LMA EA will be first read from disk, modified and write back.
  *
  */
-static int mdd_lma_set_locked(const struct lu_env *env,
-                              struct mdd_object *mdd_obj,
-                              const struct md_attr *ma, struct thandle *handle)
-{
-        int rc;
-
-        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = __mdd_lma_set(env, mdd_obj, ma, handle);
-        mdd_write_unlock(env, mdd_obj);
-        return rc;
-}
-
 /* Precedence for choosing record type when multiple
  * attributes change: setattr > mtime > ctime > atime
  * (ctime changes when mtime does, plus chmod/chown.
@@ -1379,40 +1061,15 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
 static int mdd_declare_attr_set(const struct lu_env *env,
                                 struct mdd_device *mdd,
                                 struct mdd_object *obj,
-                                const struct md_attr *ma,
-                                struct lov_mds_md *lmm,
+                               const struct lu_attr *attr,
                                 struct thandle *handle)
 {
-        struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
-       struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr;
-        int             rc, i;
+       int rc;
 
        rc = mdo_declare_attr_set(env, obj, attr, handle);
         if (rc)
                 return rc;
 
-        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-        if (rc)
-                return rc;
-
-        if (ma->ma_valid & MA_LOV) {
-                buf->lb_buf = NULL;
-                buf->lb_len = ma->ma_lmm_size;
-                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
-                                           0, handle);
-                if (rc)
-                        return rc;
-        }
-
-        if (ma->ma_valid & (MA_HSM | MA_SOM)) {
-                buf->lb_buf = NULL;
-                buf->lb_len = sizeof(struct lustre_mdt_attrs);
-                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
-                                           0, handle);
-                if (rc)
-                        return rc;
-        }
-
 #ifdef CONFIG_FS_POSIX_ACL
        if (attr->la_valid & LA_MODE) {
                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
@@ -1435,38 +1092,8 @@ static int mdd_declare_attr_set(const struct lu_env *env,
         }
 #endif
 
-        /* basically the log is the same as in unlink case */
-        if (lmm) {
-                __u16 stripe;
-
-                if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
-                                le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
-                        CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
-                               mdd->mdd_obd_dev->obd_name,
-                               le32_to_cpu(lmm->lmm_magic),
-                               PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
-                        return -EINVAL;
-                }
-
-                stripe = le16_to_cpu(lmm->lmm_stripe_count);
-                if (stripe == LOV_ALL_STRIPES) {
-                        struct lov_desc *ldesc;
-
-                        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
-                        LASSERT(ldesc != NULL);
-                        stripe = ldesc->ld_tgt_count;
-                }
-
-                for (i = 0; i < stripe; i++) {
-                        rc = mdd_declare_llog_record(env, mdd,
-                                        sizeof(struct llog_unlink_rec),
-                                        handle);
-                        if (rc)
-                                return rc;
-                }
-        }
-
-        return rc;
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
 }
 
 /* set attr and LOV EA at once, return updated attr */
@@ -1476,13 +1103,16 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
-        struct lov_mds_md *lmm = NULL;
-        struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
        const struct lu_attr *la = &ma->ma_attr;
+       int rc;
         ENTRY;
 
+       /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
+       LASSERT((ma->ma_valid & MA_LOV) == 0);
+       LASSERT((ma->ma_valid & MA_HSM) == 0);
+       LASSERT((ma->ma_valid & MA_SOM) == 0);
+
         *la_copy = ma->ma_attr;
        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
        if (rc)
@@ -1492,26 +1122,11 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
        if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        if (S_ISREG(mdd_object_type(mdd_obj)) &&
-            ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
-                lmm_size = mdd_lov_mdsize(env, mdd);
-                lmm = mdd_max_lmm_get(env, mdd);
-                if (lmm == NULL)
-                        RETURN(-ENOMEM);
-
-                rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
-                                XATTR_NAME_LOV);
-
-                if (rc < 0)
-                        RETURN(rc);
-        }
-
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
-                                  lmm_size > 0 ? lmm : NULL, handle);
+       rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
         if (rc)
                 GOTO(stop, rc);
 
@@ -1533,109 +1148,57 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
         } else if (la_copy->la_valid) {            /* setattr */
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-                /* journal chown/chgrp in llog, just like unlink */
-                if (rc == 0 && lmm_size){
-                        cookie_size = mdd_lov_cookiesize(env, mdd);
-                        logcookies = mdd_max_cookie_get(env, mdd);
-                        if (logcookies == NULL)
-                                GOTO(cleanup, rc = -ENOMEM);
-
-                        if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
-                                            logcookies, cookie_size) <= 0)
-                                logcookies = NULL;
-                }
-        }
-
-        if (rc == 0 && ma->ma_valid & MA_LOV) {
-                cfs_umode_t mode;
-
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode) || S_ISDIR(mode)) {
-                        rc = mdd_lsm_sanity_check(env, mdd_obj);
-                        if (rc)
-                                GOTO(cleanup, rc);
-
-                        rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
-                                            ma->ma_lmm_size, handle, 1);
-                }
-
         }
-        if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
-                cfs_umode_t mode;
-
-                mode = mdd_object_type(mdd_obj);
-                if (S_ISREG(mode))
-                        rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
 
-        }
-cleanup:
         if (rc == 0)
                 rc = mdd_attr_set_changelog(env, obj, handle,
-                                            ma->ma_attr.la_valid);
+                                           la->la_valid);
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
-        if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
-                /*set obd attr, if needed*/
-                rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
-                                           logcookies);
-        }
-        RETURN(rc);
-}
-
-int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
-                      const struct lu_buf *buf, const char *name, int fl,
-                      struct thandle *handle)
-{
-        int  rc;
-        ENTRY;
-
-        mdd_write_lock(env, obj, MOR_TGT_CHILD);
-        rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
-        mdd_write_unlock(env, obj);
-
         RETURN(rc);
 }
 
 static int mdd_xattr_sanity_check(const struct lu_env *env,
-                                  struct mdd_object *obj)
+                                 struct mdd_object *obj)
 {
-        struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
-        struct md_ucred *uc     = md_ucred(env);
-        int rc;
-        ENTRY;
+       struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
+       struct lu_ucred *uc     = lu_ucred_assert(env);
+       int rc;
+       ENTRY;
 
-        if (mdd_is_immutable(obj) || mdd_is_append(obj))
-                RETURN(-EPERM);
+       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+               RETURN(-EPERM);
 
-        rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
-        if (rc)
-                RETURN(rc);
+       rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-        if ((uc->mu_fsuid != tmp_la->la_uid) &&
-            !mdd_capable(uc, CFS_CAP_FOWNER))
-                RETURN(-EPERM);
+       if ((uc->uc_fsuid != tmp_la->la_uid) &&
+           !mdd_capable(uc, CFS_CAP_FOWNER))
+               RETURN(-EPERM);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_set(const struct lu_env *env,
-                                 struct mdd_device *mdd,
-                                 struct mdd_object *obj,
-                                 const struct lu_buf *buf,
-                                 const char *name,
-                                 struct thandle *handle)
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct lu_buf *buf,
+                                const char *name,
+                                int fl, struct thandle *handle)
 {
-        int rc;
+       int     rc;
 
-        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
-        if (rc)
-                return rc;
+       rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
+       if (rc)
+               return rc;
 
-        /* Only record user xattr changes */
-        if ((strncmp("user.", name, 5) == 0))
-                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       /* Only record user xattr changes */
+       if ((strncmp("user.", name, 5) == 0))
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
 
-        return rc;
+       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       return rc;
 }
 
 /**
@@ -1643,13 +1206,13 @@ static int mdd_declare_xattr_set(const struct lu_env *env,
  * after xattr_set if needed.
  */
 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
+                        const struct lu_buf *buf, const char *name,
+                        int fl)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int  rc;
+       struct mdd_object       *mdd_obj = md2mdd_obj(obj);
+       struct mdd_device       *mdd = mdo2mdd(obj);
+       struct thandle          *handle;
+       int                      rc;
         ENTRY;
 
        if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
@@ -1657,25 +1220,25 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                RETURN(rc);
        }
 
-        rc = mdd_xattr_sanity_check(env, mdd_obj);
-        if (rc)
-                RETURN(rc);
+       rc = mdd_xattr_sanity_check(env, mdd_obj);
+       if (rc)
+               RETURN(rc);
 
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
-                handle->th_sync |= !!mdd->mdd_sync_permission;
+       /* security-replated changes may require sync */
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+               handle->th_sync |= !!mdd->mdd_sync_permission;
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
        rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
@@ -1684,20 +1247,20 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        if (rc)
                GOTO(stop, rc);
 
-        /* Only record system & user xattr changes */
+       /* Only record system & user xattr changes */
        if (strncmp(XATTR_USER_PREFIX, name,
-                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
-                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
-                                 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
-                                              handle);
+                       sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                       sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                       sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
+               rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
+                                             handle);
 
 stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       mdd_trans_stop(env, mdd, rc, handle);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_del(const struct lu_env *env,
@@ -1771,6 +1334,302 @@ stop:
         RETURN(rc);
 }
 
+/*
+ * read lov EA of an object
+ * return the lov EA in an allocated lu_buf
+ */
+static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
+                                    struct mdd_object *obj)
+{
+       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf   *lmm_buf = NULL;
+       int              rc, sz;
+       ENTRY;
+
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
+                          mdd_object_capa(env, obj));
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (rc == 0)
+               GOTO(out, rc = -ENODATA);
+
+       sz = rc;
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               /* mti_big_buf was not allocated, so we have to
+                * allocate it based on the ea size */
+               buf = mdd_buf_alloc(env, sz);
+               if (buf->lb_buf == NULL)
+                       GOTO(out, rc = -ENOMEM);
+               goto repeat;
+       }
+
+       OBD_ALLOC_PTR(lmm_buf);
+       if (!lmm_buf)
+               GOTO(out, rc = -ENOMEM);
+
+       OBD_ALLOC(lmm_buf->lb_buf, sz);
+       if (!lmm_buf->lb_buf)
+               GOTO(free, rc = -ENOMEM);
+
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       lmm_buf->lb_len = sz;
+
+       GOTO(out, rc = 0);
+
+free:
+       if (lmm_buf)
+               OBD_FREE_PTR(lmm_buf);
+out:
+       if (rc)
+               return ERR_PTR(rc);
+       return lmm_buf;
+}
+
+
+/*
+ *  check if layout swapping between 2 objects is allowed
+ *  the rules are:
+ *  - same type of objects
+ *  - same owner/group (so quotas are still valid)
+ */
+static int mdd_layout_swap_allowed(const struct lu_env *env,
+                                  struct mdd_object *o1,
+                                  struct mdd_object *o2)
+{
+       const struct lu_fid     *fid1, *fid2;
+       __u32                    uid, gid;
+       struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
+       int                      rc;
+       ENTRY;
+
+       fid1 = mdo2fid(o1);
+       fid2 = mdo2fid(o2);
+
+       if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
+           (mdd_object_type(o1) != mdd_object_type(o2)))
+               RETURN(-EPERM);
+
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+       uid = tmp_la->la_uid;
+       gid = tmp_la->la_gid;
+
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+
+       if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
+               RETURN(-EPERM);
+
+       RETURN(0);
+}
+
+/**
+ * swap layouts between 2 lustre objects
+ */
+static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
+                           struct md_object *obj2, __u64 flags)
+{
+       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
+       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
+       struct lu_buf           *fst_buf, *snd_buf;
+       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
+       struct thandle          *handle;
+       struct mdd_device       *mdd = mdo2mdd(obj1);
+       int                      rc;
+       __u16                    fst_gen, snd_gen;
+       ENTRY;
+
+       /* we have to sort the 2 obj, so locking will always
+        * be in the same order, even in case of 2 concurrent swaps */
+       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
+                      mdo2fid(md2mdd_obj(obj2)));
+       /* same fid ? */
+       if (rc == 0)
+               RETURN(-EPERM);
+
+       if (rc > 0) {
+               o1 = md2mdd_obj(obj1);
+               o2 = md2mdd_obj(obj2);
+       } else {
+               o1 = md2mdd_obj(obj2);
+               o2 = md2mdd_obj(obj1);
+       }
+
+       /* check if layout swapping is allowed */
+       rc = mdd_layout_swap_allowed(env, o1, o2);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       /* objects are already sorted */
+       mdd_write_lock(env, o1, MOR_TGT_CHILD);
+       mdd_write_lock(env, o2, MOR_TGT_CHILD);
+
+       lmm1_buf = mdd_get_lov_ea(env, o1);
+       if (IS_ERR(lmm1_buf)) {
+               rc = PTR_ERR(lmm1_buf);
+               lmm1_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       lmm2_buf = mdd_get_lov_ea(env, o2);
+       if (IS_ERR(lmm2_buf)) {
+               rc = PTR_ERR(lmm2_buf);
+               lmm2_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       /* swapping 2 non existant layouts is a success */
+       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+               GOTO(unlock, rc = 0);
+
+       /* to help inode migration between MDT, it is better to
+        * start by the no layout file (if one), so we order the swap */
+       if (lmm1_buf == NULL) {
+               fst_o = o1;
+               fst_buf = lmm1_buf;
+               snd_o = o2;
+               snd_buf = lmm2_buf;
+       } else {
+               fst_o = o2;
+               fst_buf = lmm2_buf;
+               snd_o = o1;
+               snd_buf = lmm1_buf;
+       }
+
+       /* lmm and generation layout initialization */
+       if (fst_buf) {
+               fst_lmm = fst_buf->lb_buf;
+               fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
+       } else {
+               fst_lmm = NULL;
+               fst_gen = 0;
+       }
+
+       if (snd_buf) {
+               snd_lmm = snd_buf->lb_buf;
+               snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
+       } else {
+               snd_lmm = NULL;
+               snd_gen = 0;
+       }
+
+       /* save the orignal lmm common header of first file
+        * to be able to roll back */
+       OBD_ALLOC_PTR(old_fst_lmm);
+       if (old_fst_lmm == NULL)
+               GOTO(unlock, rc = -ENOMEM);
+
+       memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
+
+       /* increase the generation layout numbers */
+       snd_gen++;
+       fst_gen++;
+
+       /* set the file specific informations in lmm */
+       if (fst_lmm) {
+               fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
+               fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
+               fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
+       }
+
+       if (snd_lmm) {
+               snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+               snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+               snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+       }
+
+       /* prepare transaction */
+       rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, fst_o));
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, snd_o));
+       if (rc) {
+               int     rc2;
+
+               /* failure on second file, but first was done, so we have
+                * to roll back first */
+               /* restore object_id, object_seq and generation number
+                * on first file */
+               if (fst_lmm) {
+                       fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+                       fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+               }
+
+               rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
+                                   LU_XATTR_REPLACE, handle,
+                                   mdd_object_capa(env, fst_o));
+               if (rc2) {
+                       /* very bad day */
+                       CERROR("%s: unable to roll back after swap layouts"
+                              " failure between "DFID" and "DFID
+                              " rc2 = %d rc = %d)\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
+                              rc2, rc);
+                       /* a solution to avoid journal commit is to panic,
+                        * but it has strong consequences so we use LBUG to
+                        * allow sysdamin to choose to panic or not
+                        */
+                       LBUG();
+               }
+               GOTO(stop, rc);
+       }
+       EXIT;
+
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+unlock:
+       mdd_write_unlock(env, o2);
+       mdd_write_unlock(env, o1);
+
+       if (lmm1_buf && lmm1_buf->lb_buf)
+               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
+       if (lmm1_buf)
+               OBD_FREE_PTR(lmm1_buf);
+
+       if (lmm2_buf && lmm2_buf->lb_buf)
+               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
+       if (lmm2_buf)
+               OBD_FREE_PTR(lmm2_buf);
+
+       if (old_fst_lmm)
+               OBD_FREE_PTR(old_fst_lmm);
+
+       return rc;
+}
+
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                struct mdd_object *child, struct lu_attr *attr)
 {
@@ -1787,28 +1646,27 @@ void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
  */
 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
 {
-        int res = 0;
-
-        /* Sadly, NFSD reopens a file repeatedly during operation, so the
-         * "acc_mode = 0" allowance for newly-created files isn't honoured.
-         * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
-         * owner can write to a file even if it is marked readonly to hide
-         * its brokenness. (bug 5781) */
-        if (flags & MDS_OPEN_OWNEROVERRIDE) {
-                struct md_ucred *uc = md_ucred(env);
-
-                if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
-                    (la->la_uid == uc->mu_fsuid))
-                        return 0;
-        }
+       int res = 0;
+
+       /* Sadly, NFSD reopens a file repeatedly during operation, so the
+        * "acc_mode = 0" allowance for newly-created files isn't honoured.
+        * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
+        * owner can write to a file even if it is marked readonly to hide
+        * its brokenness. (bug 5781) */
+       if (flags & MDS_OPEN_OWNEROVERRIDE) {
+               struct lu_ucred *uc = lu_ucred_check(env);
+
+               if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
+                       return 0;
+       }
 
-        if (flags & FMODE_READ)
-                res |= MAY_READ;
-        if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
-                res |= MAY_WRITE;
-        if (flags & MDS_FMODE_EXEC)
-                res = MAY_EXEC;
-        return res;
+       if (flags & FMODE_READ)
+               res |= MAY_READ;
+       if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
+               res |= MAY_WRITE;
+       if (flags & MDS_FMODE_EXEC)
+               res = MAY_EXEC;
+       return res;
 }
 
 static int mdd_open_sanity_check(const struct lu_env *env,
@@ -1857,13 +1715,13 @@ static int mdd_open_sanity_check(const struct lu_env *env,
          * Now, flag -- O_NOATIME does not be packed by client.
          */
         if (flag & O_NOATIME) {
-                struct md_ucred *uc = md_ucred(env);
+               struct lu_ucred *uc = lu_ucred(env);
 
-                if (uc && ((uc->mu_valid == UCRED_OLD) ||
-                    (uc->mu_valid == UCRED_NEW)) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (uc && ((uc->uc_valid == UCRED_OLD) ||
+                          (uc->uc_valid == UCRED_NEW)) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
         }
 #endif
 
@@ -1889,12 +1747,6 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
                             struct md_attr *ma, struct thandle *handle)
 {
-        int rc;
-
-        rc = mdd_declare_unlink_log(env, obj, ma, handle);
-        if (rc)
-                return rc;
-
         return mdo_declare_destroy(env, obj, handle);
 }
 
@@ -1903,20 +1755,10 @@ int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
                     struct md_attr *ma, struct thandle *handle)
 {
-        int rc = 0;
+       int rc;
         ENTRY;
 
-        if (S_ISREG(mdd_object_type(obj))) {
-                /* Return LOV & COOKIES unconditionally here. We clean evth up.
-                 * Caller must be ready for that. */
-                rc = __mdd_lmm_get(env, obj, ma);
-                if ((ma->ma_valid & MA_LOV))
-                        rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
-                                            obj, ma);
-        }
-
-        if (rc == 0)
-                rc = mdo_destroy(env, obj, handle);
+       rc = mdo_destroy(env, obj, handle);
 
         RETURN(rc);
 }
@@ -1932,7 +1774,7 @@ static int mdd_declare_close(const struct lu_env *env,
         if (rc)
                 return rc;
 
-        return mdd_declare_object_kill(env, obj, ma, handle);
+       return mdo_declare_destroy(env, obj, handle);
 }
 
 /*
@@ -1944,12 +1786,13 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle    *handle = NULL;
-        int rc;
-        int is_orphan = 0, reset = 1;
+       int rc, is_orphan = 0;
         ENTRY;
 
         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
-                mdd_obj->mod_count--;
+               mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+               mdd_obj->mod_count--;
+               mdd_write_unlock(env, mdd_obj);
 
                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
@@ -1957,10 +1800,14 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                 RETURN(0);
         }
 
-        /* check without any lock */
-        if (mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
- again:
+       /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
+        * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
+       /* check without any lock */
+       is_orphan = mdd_obj->mod_count == 1 &&
+                   (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
+
+again:
+       if (is_orphan) {
                 handle = mdd_trans_create(env, mdo2mdd(obj));
                 if (IS_ERR(handle))
                         RETURN(PTR_ERR(handle));
@@ -1979,82 +1826,62 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        if (handle == NULL && mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
-                mdd_write_unlock(env, mdd_obj);
-                goto again;
-        }
-
-        /* release open count */
-        mdd_obj->mod_count --;
-
-        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
-                /* remove link to object from orphan index */
-                LASSERT(handle != NULL);
-                rc = __mdd_orphan_del(env, mdd_obj, handle);
-                if (rc == 0) {
-                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
-                               "list, OSS objects to be destroyed.\n",
-                               PFID(mdd_object_fid(mdd_obj)));
-                        is_orphan = 1;
-                } else {
-                        CERROR("Object "DFID" can not be deleted from orphan "
-                                "list, maybe cause OST objects can not be "
-                                "destroyed (err: %d).\n",
-                                PFID(mdd_object_fid(mdd_obj)), rc);
-                        /* If object was not deleted from orphan list, do not
-                         * destroy OSS objects, which will be done when next
-                         * recovery. */
-                        GOTO(out, rc);
-                }
-        }
-
        rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
                        mdd_object_capa(env, mdd_obj));
-        /* Object maybe not in orphan list originally, it is rare case for
-         * mdd_finish_unlink() failure. */
-        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
-                /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
-                if (ma->ma_valid & MA_FLAGS &&
-                    ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
-                        rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
-                } else {
-                        if (handle == NULL) {
-                                handle = mdd_trans_create(env, mdo2mdd(obj));
-                                if (IS_ERR(handle))
-                                        GOTO(out, rc = PTR_ERR(handle));
+       if (rc != 0) {
+               CERROR("Failed to get lu_attr of "DFID": %d\n",
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               GOTO(out, rc);
+       }
 
-                                rc = mdd_declare_object_kill(env, mdd_obj, ma,
-                                                             handle);
-                                if (rc)
-                                        GOTO(out, rc);
+       /* check again with lock */
+       is_orphan = (mdd_obj->mod_count == 1) &&
+                   ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
+                    ma->ma_attr.la_nlink == 0);
 
-                                rc = mdd_declare_changelog_store(env, mdd,
-                                                                 NULL, handle);
-                                if (rc)
-                                        GOTO(stop, rc);
+       if (is_orphan && handle == NULL) {
+               mdd_write_unlock(env, mdd_obj);
+               goto again;
+       }
 
-                                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
-                                if (rc)
-                                        GOTO(out, rc);
-                        }
+       mdd_obj->mod_count--; /*release open count */
+
+       if (!is_orphan)
+               GOTO(out, rc = 0);
+
+       /* Orphan object */
+       /* NB: Object maybe not in orphan list originally, it is rare case for
+        * mdd_finish_unlink() failure, in that case, the object doesn't have
+        * ORPHAN_OBJ flag */
+       if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+               /* remove link to object from orphan index */
+               LASSERT(handle != NULL);
+               rc = __mdd_orphan_del(env, mdd_obj, handle);
+               if (rc != 0) {
+                       CERROR("%s: unable to delete "DFID" from orphan list: "
+                              "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                              PFID(mdd_object_fid(mdd_obj)), rc);
+                       /* If object was not deleted from orphan list, do not
+                        * destroy OSS objects, which will be done when next
+                        * recovery. */
+                       GOTO(out, rc);
+               }
 
-                        rc = mdd_object_kill(env, mdd_obj, ma, handle);
-                        if (rc == 0)
-                                reset = 0;
-                }
+               CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                      "list, OSS objects to be destroyed.\n",
+                      PFID(mdd_object_fid(mdd_obj)));
+       }
 
-                if (rc != 0)
-                        CERROR("Error when prepare to delete Object "DFID" , "
-                               "which will cause OST objects can not be "
-                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
-        }
-        EXIT;
+       rc = mdo_destroy(env, mdd_obj, handle);
 
-out:
-        if (reset)
-                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+       if (rc != 0) {
+               CERROR("%s: unable to delete "DFID" from orphan list: "
+                      "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+       }
+       EXIT;
 
+out:
         mdd_write_unlock(env, mdd_obj);
 
         if (rc == 0 &&
@@ -2259,19 +2086,20 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 }
 
 const struct md_object_operations mdd_obj_ops = {
-        .moo_permission    = mdd_permission,
-        .moo_attr_get      = mdd_attr_get,
-        .moo_attr_set      = mdd_attr_set,
-        .moo_xattr_get     = mdd_xattr_get,
-        .moo_xattr_set     = mdd_xattr_set,
-        .moo_xattr_list    = mdd_xattr_list,
-        .moo_xattr_del     = mdd_xattr_del,
-        .moo_open          = mdd_open,
-        .moo_close         = mdd_close,
-        .moo_readpage      = mdd_readpage,
-        .moo_readlink      = mdd_readlink,
-        .moo_changelog     = mdd_changelog,
-        .moo_capa_get      = mdd_capa_get,
-        .moo_object_sync   = mdd_object_sync,
-        .moo_path          = mdd_path,
+       .moo_permission         = mdd_permission,
+       .moo_attr_get           = mdd_attr_get,
+       .moo_attr_set           = mdd_attr_set,
+       .moo_xattr_get          = mdd_xattr_get,
+       .moo_xattr_set          = mdd_xattr_set,
+       .moo_xattr_list         = mdd_xattr_list,
+       .moo_xattr_del          = mdd_xattr_del,
+       .moo_swap_layouts       = mdd_swap_layouts,
+       .moo_open               = mdd_open,
+       .moo_close              = mdd_close,
+       .moo_readpage           = mdd_readpage,
+       .moo_readlink           = mdd_readlink,
+       .moo_changelog          = mdd_changelog,
+       .moo_capa_get           = mdd_capa_get,
+       .moo_object_sync        = mdd_object_sync,
+       .moo_path               = mdd_path,
 };