Whamcloud - gitweb
LU-3467 mdt: call MDT handlers via unified request handler
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 8ab38ac..12a8166 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -58,7 +58,7 @@
 #include "mdd_internal.h"
 
 static const struct lu_object_operations mdd_lu_obj_ops;
-extern cfs_mem_cache_t *mdd_object_kmem;
+extern struct kmem_cache *mdd_object_kmem;
 
 static int mdd_xattr_get(const struct lu_env *env,
                          struct md_object *obj, struct lu_buf *buf,
@@ -102,6 +102,7 @@ struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
 {
         struct mdd_thread_info *info;
 
+       lu_env_refill((struct lu_env *)env);
         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
         LASSERT(info != NULL);
         return info;
@@ -120,23 +121,23 @@ const struct lu_name *mdd_name_get_const(const struct lu_env *env,
 
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
-        struct lu_buf *buf;
+       struct lu_buf *buf;
 
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
+       buf = &mdd_env_info(env)->mti_buf[0];
+       buf->lb_buf = area;
+       buf->lb_len = len;
+       return buf;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
                                        const void *area, ssize_t len)
 {
-        struct lu_buf *buf;
+       struct lu_buf *buf;
 
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = (void *)area;
-        buf->lb_len = len;
-        return buf;
+       buf = &mdd_env_info(env)->mti_buf[0];
+       buf->lb_buf = (void *)area;
+       buf->lb_len = len;
+       return buf;
 }
 
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
@@ -145,7 +146,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env,
 {
         struct mdd_object *mdd_obj;
 
-       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
+       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, __GFP_IO);
         if (mdd_obj != NULL) {
                 struct lu_object *o;
 
@@ -173,7 +174,6 @@ static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
         mdd_obj->mod_cltime = 0;
         under = &d->mdd_child->dd_lu_dev;
         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
-        mdd_pdlock_init(mdd_obj);
        if (IS_ERR(below))
                RETURN(PTR_ERR(below));
 
@@ -273,6 +273,12 @@ static int mdd_xattr_get(const struct lu_env *env,
                 return -ENOENT;
         }
 
+       /* If the object has been delete from the namespace, then
+        * get linkEA should return -ENOENT as well */
+       if (unlikely((mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ)) &&
+                     strcmp(name, XATTR_NAME_LINK) == 0))
+               RETURN(-ENOENT);
+
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_get(env, mdd_obj, buf, name,
                            mdd_object_capa(env, mdd_obj));
@@ -560,11 +566,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                    !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
-                if (la->la_mode == (cfs_umode_t) -1)
-                        la->la_mode = tmp_la->la_mode;
-                else
-                        la->la_mode = (la->la_mode & S_IALLUGO) |
-                                      (tmp_la->la_mode & ~S_IALLUGO);
+               if (la->la_mode == (umode_t) -1)
+                       la->la_mode = tmp_la->la_mode;
+               else
+                       la->la_mode = (la->la_mode & S_IALLUGO) |
+                                     (tmp_la->la_mode & ~S_IALLUGO);
 
                /* Also check the setgid bit! */
                if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
@@ -672,11 +678,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
  * \param mdd_obj - mdd_object of change
  * \param handle - transacion handle
  */
-static int mdd_changelog_data_store(const struct lu_env *env,
-                                   struct mdd_device *mdd,
-                                   enum changelog_rec_type type,
-                                   int flags, struct mdd_object *mdd_obj,
-                                   struct thandle *handle)
+int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
+                            enum changelog_rec_type type, int flags,
+                            struct mdd_object *mdd_obj, struct thandle *handle)
 {
        const struct lu_fid             *tfid;
        struct llog_changelog_rec       *rec;
@@ -825,6 +829,13 @@ static int mdd_declare_attr_set(const struct lu_env *env,
        return rc;
 }
 
+static inline bool permission_is_reduced(const struct lu_attr *old,
+                                        const struct lu_attr *new)
+{
+       return ((new->la_mode & old->la_mode) & S_IRWXUGO) !=
+              (old->la_mode & S_IRWXUGO);
+}
+
 /* set attr and LOV EA at once, return updated attr */
 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 const struct md_attr *ma)
@@ -834,6 +845,7 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct thandle *handle;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
        const struct lu_attr *la = &ma->ma_attr;
+       bool sync_perm = false;
        int rc;
         ENTRY;
 
@@ -863,9 +875,38 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(stop, rc);
 
-        /* permission changes may require sync operation */
-       if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
-                handle->th_sync |= !!mdd->mdd_sync_permission;
+       /*
+        * LU-3671
+        *
+        * permission changes may require sync operation, to mitigate
+        * performance impact, only do this for dir and when permission is
+        * reduced.
+        *
+        * For regular files, version is updated with permission change
+        * (see VBR), async permission won't cause any issue, while missing
+        * permission change on directory may affect accessibility of other
+        * objects.
+        */
+       if (S_ISDIR(mdd_object_type(mdd_obj))) {
+               if (la->la_valid & (LA_UID | LA_GID)) {
+                       sync_perm = true;
+               } else if (la->la_valid & LA_MODE &&
+                          la->la_mode & (S_ISUID | S_ISGID | S_ISVTX)) {
+                       sync_perm = true;
+               } else if (la->la_valid & LA_MODE) {
+                       struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+
+                       rc = mdd_la_get(env, mdd_obj, tmp_la, BYPASS_CAPA);
+                       if (rc)
+                               GOTO(stop, rc);
+
+                       if (permission_is_reduced(tmp_la, la))
+                               sync_perm = true;
+               }
+       }
+
+       if (sync_perm)
+               handle->th_sync |= !!mdd->mdd_sync_permission;
 
        if (la->la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
@@ -922,16 +963,22 @@ static int mdd_declare_xattr_set(const struct lu_env *env,
        if (rc)
                return rc;
 
-       /* Only record user xattr changes */
-       if ((strncmp("user.", name, 5) == 0)) {
+       /* Only record user and layout xattr changes */
+       if (strncmp(XATTR_USER_PREFIX, name,
+                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+           strncmp(XATTR_NAME_LOV, name,
+                   sizeof(XATTR_NAME_LOV) - 1) == 0) {
                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
                if (rc)
                        return rc;
        }
 
        /* If HSM data is modified, this could add a changelog */
-       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
+       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               if (rc)
+                       return rc;
+       }
 
        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
        return rc;
@@ -954,7 +1001,7 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
        struct mdd_thread_info *info = mdd_env_info(env);
        struct mdd_device      *mdd = mdo2mdd(obj);
        struct mdd_object      *mdd_obj = md2mdd_obj(obj);
-       struct lu_buf          *current_buf = &info->mti_buf;
+       struct lu_buf          *current_buf;
        struct md_hsm          *current_mh;
        struct md_hsm          *new_mh;
        int                     rc;
@@ -965,12 +1012,12 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
                RETURN(-ENOMEM);
 
        /* Read HSM attrs from disk */
-       current_buf->lb_buf = info->mti_xattr_buf;
-       current_buf->lb_len = sizeof(info->mti_xattr_buf);
        CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       current_buf = mdd_buf_get(env, info->mti_xattr_buf,
+                                 sizeof(info->mti_xattr_buf));
        rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
                           mdd_object_capa(env, mdd_obj));
-       rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
+       rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
        if (rc < 0 && rc != -ENODATA)
                GOTO(free, rc);
        else if (rc == -ENODATA)
@@ -1000,7 +1047,6 @@ free:
        return(rc);
 }
 
-
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1036,10 +1082,6 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        if (rc)
                GOTO(stop, rc);
 
-       /* security-replated changes may require sync */
-       if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
-               handle->th_sync |= !!mdd->mdd_sync_permission;
-
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
        if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
@@ -1056,8 +1098,10 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        if (rc)
                GOTO(stop, rc);
 
-       /* Only record system & user xattr changes */
-       if (strncmp(XATTR_USER_PREFIX, name,
+       if (strncmp(XATTR_NAME_LOV, name, sizeof(XATTR_NAME_LOV) - 1) == 0)
+               rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj,
+                                             handle);
+       else if (strncmp(XATTR_USER_PREFIX, name,
                        sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
            strncmp(POSIX_ACL_XATTR_ACCESS, name,
                        sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
@@ -1147,11 +1191,11 @@ stop:
  * read lov EA of an object
  * return the lov EA in an allocated lu_buf
  */
-static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
-                                    struct mdd_object *obj)
+static int mdd_get_lov_ea(const struct lu_env *env,
+                         struct mdd_object *obj,
+                         struct lu_buf *lmm_buf)
 {
        struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       struct lu_buf   *lmm_buf = NULL;
        int              rc, sz;
        ENTRY;
 
@@ -1186,28 +1230,46 @@ repeat:
                goto repeat;
        }
 
-       OBD_ALLOC_PTR(lmm_buf);
-       if (!lmm_buf)
+       lu_buf_alloc(lmm_buf, sz);
+       if (lmm_buf->lb_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       OBD_ALLOC(lmm_buf->lb_buf, sz);
-       if (!lmm_buf->lb_buf)
-               GOTO(free, rc = -ENOMEM);
-
        memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
-       lmm_buf->lb_len = sz;
-
-       GOTO(out, rc = 0);
+       rc = 0;
+       EXIT;
 
-free:
-       if (lmm_buf)
-               OBD_FREE_PTR(lmm_buf);
 out:
-       if (rc)
-               return ERR_PTR(rc);
-       return lmm_buf;
+       if (rc < 0)
+               lu_buf_free(lmm_buf);
+       return rc;
 }
 
+static int mdd_xattr_hsm_replace(const struct lu_env *env,
+                                struct mdd_object *o, struct lu_buf *buf,
+                                struct thandle *handle)
+{
+       struct hsm_attrs *attrs;
+       __u32 hsm_flags;
+       int flags = 0;
+       int rc;
+       ENTRY;
+
+       rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                          handle, mdd_object_capa(env, o));
+       if (rc != 0)
+               RETURN(rc);
+
+       attrs = buf->lb_buf;
+       hsm_flags = le32_to_cpu(attrs->hsm_flags);
+       if (!(hsm_flags & HS_RELEASED) || mdd_is_dead_obj(o))
+               RETURN(0);
+
+       /* Add a changelog record for release. */
+       hsm_set_cl_event(&flags, HE_RELEASE);
+       rc = mdd_changelog_data_store(env, mdo2mdd(&o->mod_obj), CL_HSM,
+                                     flags, o, handle);
+       RETURN(rc);
+}
 
 /*
  *  check if layout swapping between 2 objects is allowed
@@ -1256,35 +1318,38 @@ static int mdd_layout_swap_allowed(const struct lu_env *env,
 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                            struct md_object *obj2, __u64 flags)
 {
-       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
-       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
-       struct lu_buf           *fst_buf, *snd_buf;
-       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
-       struct thandle          *handle;
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct mdd_object       *fst_o = md2mdd_obj(obj1);
+       struct mdd_object       *snd_o = md2mdd_obj(obj2);
        struct mdd_device       *mdd = mdo2mdd(obj1);
-       int                      rc;
+       struct lov_mds_md       *fst_lmm, *snd_lmm;
+       struct lu_buf           *fst_buf = &info->mti_buf[0];
+       struct lu_buf           *snd_buf = &info->mti_buf[1];
+       struct lu_buf           *fst_hsm_buf = &info->mti_buf[2];
+       struct lu_buf           *snd_hsm_buf = &info->mti_buf[3];
+       struct ost_id           *saved_oi = NULL;
+       struct thandle          *handle;
        __u16                    fst_gen, snd_gen;
+       int                      fst_fl;
+       int                      rc;
+       int                      rc2;
        ENTRY;
 
+       CLASSERT(ARRAY_SIZE(info->mti_buf) >= 4);
+       memset(info->mti_buf, 0, sizeof(info->mti_buf));
+
        /* we have to sort the 2 obj, so locking will always
         * be in the same order, even in case of 2 concurrent swaps */
-       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
-                       mdo2fid(md2mdd_obj(obj2)));
-       /* same fid ? */
-       if (rc == 0)
+       rc = lu_fid_cmp(mdo2fid(fst_o), mdo2fid(snd_o));
+       if (rc == 0) /* same fid ? */
                RETURN(-EPERM);
 
-       if (rc > 0) {
-               o1 = md2mdd_obj(obj1);
-               o2 = md2mdd_obj(obj2);
-       } else {
-               o1 = md2mdd_obj(obj2);
-               o2 = md2mdd_obj(obj1);
-       }
+       if (rc < 0)
+               swap(fst_o, snd_o);
 
        /* check if layout swapping is allowed */
-       rc = mdd_layout_swap_allowed(env, o1, o2);
-       if (rc)
+       rc = mdd_layout_swap_allowed(env, fst_o, snd_o);
+       if (rc != 0)
                RETURN(rc);
 
        handle = mdd_trans_create(env, mdd);
@@ -1292,133 +1357,180 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                RETURN(PTR_ERR(handle));
 
        /* objects are already sorted */
-       mdd_write_lock(env, o1, MOR_TGT_CHILD);
-       mdd_write_lock(env, o2, MOR_TGT_CHILD);
-
-       lmm1_buf = mdd_get_lov_ea(env, o1);
-       if (IS_ERR(lmm1_buf)) {
-               rc = PTR_ERR(lmm1_buf);
-               lmm1_buf = NULL;
-               if (rc != -ENODATA)
-                       GOTO(unlock, rc);
-       }
+       mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
+       mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
 
-       lmm2_buf = mdd_get_lov_ea(env, o2);
-       if (IS_ERR(lmm2_buf)) {
-               rc = PTR_ERR(lmm2_buf);
-               lmm2_buf = NULL;
-               if (rc != -ENODATA)
-                       GOTO(unlock, rc);
-       }
+       rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(stop, rc);
+
+       rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(stop, rc);
 
        /* swapping 2 non existant layouts is a success */
-       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
-               GOTO(unlock, rc = 0);
+       if (fst_buf->lb_buf == NULL && snd_buf->lb_buf == NULL)
+               GOTO(stop, rc = 0);
 
        /* to help inode migration between MDT, it is better to
         * start by the no layout file (if one), so we order the swap */
-       if (lmm1_buf == NULL) {
-               fst_o = o1;
-               fst_buf = lmm1_buf;
-               snd_o = o2;
-               snd_buf = lmm2_buf;
-       } else {
-               fst_o = o2;
-               fst_buf = lmm2_buf;
-               snd_o = o1;
-               snd_buf = lmm1_buf;
+       if (snd_buf->lb_buf == NULL) {
+               swap(fst_o, snd_o);
+               swap(fst_buf, snd_buf);
        }
 
        /* lmm and generation layout initialization */
-       if (fst_buf) {
+       if (fst_buf->lb_buf != NULL) {
                fst_lmm = fst_buf->lb_buf;
                fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
+               fst_fl  = LU_XATTR_REPLACE;
        } else {
                fst_lmm = NULL;
                fst_gen = 0;
+               fst_fl  = LU_XATTR_CREATE;
        }
 
-       if (snd_buf) {
-               snd_lmm = snd_buf->lb_buf;
-               snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
-       } else {
-               snd_lmm = NULL;
-               snd_gen = 0;
-       }
-
-       /* save the orignal lmm common header of first file
-        * to be able to roll back */
-       OBD_ALLOC_PTR(old_fst_lmm);
-       if (old_fst_lmm == NULL)
-               GOTO(unlock, rc = -ENOMEM);
-
-       memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
+       snd_lmm = snd_buf->lb_buf;
+       snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
 
        /* increase the generation layout numbers */
        snd_gen++;
        fst_gen++;
 
        /* set the file specific informations in lmm */
-       if (fst_lmm) {
+       if (fst_lmm != NULL) {
+               saved_oi = &info->mti_oa.o_oi;
+
+               *saved_oi = fst_lmm->lmm_oi;
                fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
-               fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
-               fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
+               fst_lmm->lmm_oi = snd_lmm->lmm_oi;
+               snd_lmm->lmm_oi = *saved_oi;
+       } else {
+               if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1))
+                       snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               else if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3))
+                       snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
+               else
+                       GOTO(stop, rc = -EPROTO);
        }
+       snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+
+       /* Prepare HSM attribute if it's required */
+       if (flags & SWAP_LAYOUTS_MDS_HSM) {
+               const int buflen = sizeof(struct hsm_attrs);
+
+               lu_buf_alloc(fst_hsm_buf, buflen);
+               lu_buf_alloc(snd_hsm_buf, buflen);
+               if (fst_hsm_buf->lb_buf == NULL || snd_hsm_buf->lb_buf == NULL)
+                       GOTO(stop, rc = -ENOMEM);
+
+               /* Read HSM attribute */
+               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM,
+                                  BYPASS_CAPA);
+               if (rc < 0)
+                       GOTO(stop, rc);
 
-       if (snd_lmm) {
-               snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
-               snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
-               snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM,
+                                  BYPASS_CAPA);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_hsm_buf,
+                                          XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                                          handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_hsm_buf,
+                                          XATTR_NAME_HSM, LU_XATTR_REPLACE,
+                                          handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
        }
 
        /* prepare transaction */
        rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
-                                  LU_XATTR_REPLACE, handle);
-       if (rc)
+                                  fst_fl, handle);
+       if (rc != 0)
                GOTO(stop, rc);
 
-       rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
-                                  LU_XATTR_REPLACE, handle);
-       if (rc)
+       if (fst_buf->lb_buf != NULL)
+               rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf,
+                                          XATTR_NAME_LOV, LU_XATTR_REPLACE,
+                                          handle);
+       else
+               rc = mdd_declare_xattr_del(env, mdd, snd_o, XATTR_NAME_LOV,
+                                          handle);
+       if (rc != 0)
                GOTO(stop, rc);
 
        rc = mdd_trans_start(env, mdd, handle);
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
-       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
-                          LU_XATTR_REPLACE, handle,
+       if (flags & SWAP_LAYOUTS_MDS_HSM) {
+               rc = mdd_xattr_hsm_replace(env, fst_o, snd_hsm_buf, handle);
+               if (rc < 0)
+                       GOTO(stop, rc);
+
+               rc = mdd_xattr_hsm_replace(env, snd_o, fst_hsm_buf, handle);
+               if (rc < 0) {
+                       rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
+                                                   handle);
+                       if (rc2 < 0)
+                               CERROR("%s: restore "DFID" HSM error: %d/%d\n",
+                                      mdd_obj_dev_name(fst_o),
+                                      PFID(mdo2fid(fst_o)), rc, rc2);
+                       GOTO(stop, rc);
+               }
+       }
+
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle,
                           mdd_object_capa(env, fst_o));
-       if (rc)
+       if (rc != 0)
                GOTO(stop, rc);
 
-       rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
-                          LU_XATTR_REPLACE, handle,
-                          mdd_object_capa(env, snd_o));
-       if (rc) {
-               int     rc2;
+       if (fst_buf->lb_buf != NULL)
+               rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle,
+                                  mdd_object_capa(env, snd_o));
+       else
+               rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle,
+                                  mdd_object_capa(env, snd_o));
+       if (rc != 0) {
+               int steps = 0;
 
                /* failure on second file, but first was done, so we have
-                * to roll back first */
-               /* restore object_id, object_seq and generation number
-                * on first file */
-               if (fst_lmm) {
-                       fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
-                       fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
-                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+                * to roll back first. */
+               if (fst_buf->lb_buf != NULL) {
+                       fst_lmm->lmm_oi = *saved_oi;
+                       fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1);
+                       rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
+                                           LU_XATTR_REPLACE, handle,
+                                           mdd_object_capa(env, fst_o));
+               } else {
+                       rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle,
+                                           mdd_object_capa(env, fst_o));
                }
+               if (rc2 < 0)
+                       goto do_lbug;
+
+               ++steps;
+               rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf, handle);
+               if (rc2 < 0)
+                       goto do_lbug;
+
+               ++steps;
+               rc2 = mdd_xattr_hsm_replace(env, snd_o, snd_hsm_buf, handle);
 
-               rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
-                                   LU_XATTR_REPLACE, handle,
-                                   mdd_object_capa(env, fst_o));
-               if (rc2) {
+       do_lbug:
+               if (rc2 < 0) {
                        /* very bad day */
-                       CERROR("%s: unable to roll back after swap layouts"
-                              " failure between "DFID" and "DFID
-                              " rc2 = %d rc = %d)\n",
-                              mdd2obd_dev(mdd)->obd_name,
+                       CERROR("%s: unable to roll back layout swap. FIDs: "
+                              DFID" and "DFID "error: %d/%d, steps: %d\n",
+                              mdd_obj_dev_name(fst_o),
                               PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
-                              rc2, rc);
+                              rc, rc2, steps);
                        /* a solution to avoid journal commit is to panic,
                         * but it has strong consequences so we use LBUG to
                         * allow sysdamin to choose to panic or not
@@ -1427,27 +1539,26 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                }
                GOTO(stop, rc);
        }
+
+       /* Issue one changelog record per file */
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, fst_o, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, snd_o, handle);
+       if (rc)
+               GOTO(stop, rc);
        EXIT;
 
 stop:
        mdd_trans_stop(env, mdd, rc, handle);
-unlock:
-       mdd_write_unlock(env, o2);
-       mdd_write_unlock(env, o1);
-
-       if (lmm1_buf && lmm1_buf->lb_buf)
-               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
-       if (lmm1_buf)
-               OBD_FREE_PTR(lmm1_buf);
-
-       if (lmm2_buf && lmm2_buf->lb_buf)
-               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
-       if (lmm2_buf)
-               OBD_FREE_PTR(lmm2_buf);
-
-       if (old_fst_lmm)
-               OBD_FREE_PTR(old_fst_lmm);
+       mdd_write_unlock(env, snd_o);
+       mdd_write_unlock(env, fst_o);
 
+       lu_buf_free(fst_buf);
+       lu_buf_free(snd_buf);
+       lu_buf_free(fst_hsm_buf);
+       lu_buf_free(snd_hsm_buf);
        return rc;
 }
 
@@ -1711,7 +1822,7 @@ out:
                 if (handle == NULL) {
                         handle = mdd_trans_create(env, mdo2mdd(obj));
                         if (IS_ERR(handle))
-                                GOTO(stop, rc = IS_ERR(handle));
+                               GOTO(stop, rc = PTR_ERR(handle));
 
                         rc = mdd_declare_changelog_store(env, mdd, NULL,
                                                          handle);
@@ -1728,9 +1839,10 @@ out:
         }
 
 stop:
-        if (handle != NULL)
-                mdd_trans_stop(env, mdd, rc, handle);
-        return rc;
+       if (handle != NULL && !IS_ERR(handle))
+               mdd_trans_stop(env, mdd, rc, handle);
+
+       return rc;
 }
 
 /*
@@ -1861,12 +1973,12 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 LASSERT(rdpg->rp_pages != NULL);
 
                 pg = rdpg->rp_pages[0];
-                dp = (struct lu_dirpage*)cfs_kmap(pg);
+               dp = (struct lu_dirpage *)kmap(pg);
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
-                cfs_kunmap(pg);
+               kunmap(pg);
                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
@@ -1875,7 +1987,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
        if (rc >= 0) {
                struct lu_dirpage       *dp;
 
-               dp = cfs_kmap(rdpg->rp_pages[0]);
+               dp = kmap(rdpg->rp_pages[0]);
                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                if (rc == 0) {
                        /*
@@ -1885,7 +1997,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                        dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                        rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
                }
-               cfs_kunmap(rdpg->rp_pages[0]);
+               kunmap(rdpg->rp_pages[0]);
        }
 
        GOTO(out_unlock, rc);