Whamcloud - gitweb
LU-2509 quota: various code cleanups
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index e1b75af..cb742d5 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -418,9 +418,9 @@ static int mdd_path_current(const struct lu_env *env,
         /* Verify that our path hasn't changed since we started the lookup.
            Record the current index, and verify the path resolves to the
            same fid. If it does, then the path is correct as of this index. */
-        cfs_spin_lock(&mdd->mdd_cl.mc_lock);
-        pli->pli_currec = mdd->mdd_cl.mc_index;
-        cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       pli->pli_currec = mdd->mdd_cl.mc_index;
+       spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -720,7 +720,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                        struct lu_attr *la, const unsigned long flags)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc;
+       struct lu_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -735,12 +735,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
-        /* export destroy does not have ->le_ses, but we may want
-         * to drop LUSTRE_SOM_FL. */
-        if (!env->le_ses)
-                RETURN(0);
-
-        uc = md_ucred(env);
+       /* export destroy does not have ->le_ses, but we may want
+        * to drop LUSTRE_SOM_FL. */
+       uc = lu_ucred_check(env);
+       if (uc == NULL)
+               RETURN(0);
 
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
@@ -771,9 +770,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 unsigned int newflags = la->la_flags &
                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
                  * only be changed by the relevant capability. */
@@ -794,17 +793,17 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
            !(flags & MDS_PERM_BYPASS))
                 RETURN(-EPERM);
 
-        /* Check for setting the obj time. */
-        if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
-            !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
-                if ((uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER)) {
+       /* Check for setting the obj time. */
+       if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
+           !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
+               if ((uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER)) {
                        rc = mdd_permission_internal(env, obj, tmp_la,
                                                     MAY_WRITE);
-                        if (rc)
-                                RETURN(rc);
-                }
-        }
+                       if (rc)
+                               RETURN(rc);
+               }
+       }
 
         if (la->la_valid & LA_KILL_SUID) {
                 la->la_valid &= ~LA_KILL_SUID;
@@ -830,9 +829,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
                if (!(flags & MDS_PERM_BYPASS) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
 
                 if (la->la_mode == (cfs_umode_t) -1)
                         la->la_mode = tmp_la->la_mode;
@@ -853,10 +852,10 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_UID) {
                 if (la->la_uid == (uid_t) -1)
                         la->la_uid = tmp_la->la_uid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    (la->la_uid != tmp_la->la_uid)) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    (la->la_uid != tmp_la->la_uid)) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* If the user or group of a non-directory has been
                  * changed by a non-root user, remove the setuid bit.
@@ -878,11 +877,11 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_GID) {
                 if (la->la_gid == (gid_t) -1)
                         la->la_gid = tmp_la->la_gid;
-                if (((uc->mu_fsuid != tmp_la->la_uid) ||
-                    ((la->la_gid != tmp_la->la_gid) &&
-                    !lustre_in_group_p(uc, la->la_gid))) &&
-                    !mdd_capable(uc, CFS_CAP_CHOWN))
-                        RETURN(-EPERM);
+               if (((uc->uc_fsuid != tmp_la->la_uid) ||
+                    ((la->la_gid != tmp_la->la_gid) &&
+                     !lustre_in_group_p(uc, la->la_gid))) &&
+                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                       RETURN(-EPERM);
 
                 /* Likewise, if the user or group of a non-directory
                  * has been changed by a non-root user, remove the
@@ -920,7 +919,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         } else {
                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
                        if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
-                              (uc->mu_fsuid == tmp_la->la_uid)) &&
+                             (uc->uc_fsuid == tmp_la->la_uid)) &&
                            !(flags & MDS_PERM_BYPASS)) {
                                rc = mdd_permission_internal(env, obj,
                                                             tmp_la, MAY_WRITE);
@@ -1160,60 +1159,131 @@ stop:
 }
 
 static int mdd_xattr_sanity_check(const struct lu_env *env,
-                                  struct mdd_object *obj)
+                                 struct mdd_object *obj)
 {
-        struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
-        struct md_ucred *uc     = md_ucred(env);
-        int rc;
-        ENTRY;
+       struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
+       struct lu_ucred *uc     = lu_ucred_assert(env);
+       int rc;
+       ENTRY;
 
-        if (mdd_is_immutable(obj) || mdd_is_append(obj))
-                RETURN(-EPERM);
+       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+               RETURN(-EPERM);
 
-        rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
-        if (rc)
-                RETURN(rc);
+       rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
 
-        if ((uc->mu_fsuid != tmp_la->la_uid) &&
-            !mdd_capable(uc, CFS_CAP_FOWNER))
-                RETURN(-EPERM);
+       if ((uc->uc_fsuid != tmp_la->la_uid) &&
+           !mdd_capable(uc, CFS_CAP_FOWNER))
+               RETURN(-EPERM);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_set(const struct lu_env *env,
-                                 struct mdd_device *mdd,
-                                 struct mdd_object *obj,
-                                 const struct lu_buf *buf,
-                                 const char *name,
-                                 struct thandle *handle)
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct lu_buf *buf,
+                                const char *name,
+                                int fl, struct thandle *handle)
 {
-        int rc;
+       int     rc;
 
-        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
-        if (rc)
-                return rc;
+       rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
+       if (rc)
+               return rc;
 
-        /* Only record user xattr changes */
-        if ((strncmp("user.", name, 5) == 0))
-                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       /* Only record user xattr changes */
+       if ((strncmp("user.", name, 5) == 0)) {
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               if (rc)
+                       return rc;
+       }
+
+       /* If HSM data is modified, this could add a changelog */
+       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
+               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
 
        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-        return rc;
+       return rc;
 }
 
+/*
+ * Compare current and future data of HSM EA and add a changelog if needed.
+ *
+ * Caller should have write-locked \param obj.
+ *
+ * \param buf - Future HSM EA content.
+ * \retval 0 if no changelog is needed or changelog was added properly.
+ * \retval -ve errno if there was a problem
+ */
+static int mdd_hsm_update_locked(const struct lu_env *env,
+                                struct md_object *obj,
+                                const struct lu_buf *buf,
+                                struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device      *mdd = mdo2mdd(obj);
+       struct mdd_object      *mdd_obj = md2mdd_obj(obj);
+       struct lu_buf          *current_buf = &info->mti_buf;
+       struct md_hsm          *current_mh;
+       struct md_hsm          *new_mh;
+       int                     rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(current_mh);
+       if (current_mh == NULL)
+               RETURN(-ENOMEM);
+
+       /* Read HSM attrs from disk */
+       current_buf->lb_buf = info->mti_xattr_buf;
+       current_buf->lb_len = sizeof(info->mti_xattr_buf);
+       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
+                          mdd_object_capa(env, mdd_obj));
+       rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
+       if (rc < 0 && rc != -ENODATA)
+               GOTO(free, rc);
+       else if (rc == -ENODATA)
+               current_mh->mh_flags = 0;
+
+       /* Map future HSM xattr */
+       OBD_ALLOC_PTR(new_mh);
+       if (new_mh == NULL)
+               GOTO(free, rc = -ENOMEM);
+       lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
+
+       /* If HSM flags are different, add a changelog */
+       rc = 0;
+       if (current_mh->mh_flags != new_mh->mh_flags) {
+               int flags = 0;
+               hsm_set_cl_event(&flags, HE_STATE);
+               if (new_mh->mh_flags & HS_DIRTY)
+                       hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
+
+               rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
+                                             handle);
+       }
+
+       OBD_FREE_PTR(new_mh);
+free:
+       OBD_FREE_PTR(current_mh);
+       return(rc);
+}
+
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
  */
 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
-                         const struct lu_buf *buf, const char *name,
-                         int fl)
+                        const struct lu_buf *buf, const char *name,
+                        int fl)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct thandle *handle;
-        int  rc;
+       struct mdd_object       *mdd_obj = md2mdd_obj(obj);
+       struct mdd_device       *mdd = mdo2mdd(obj);
+       struct thandle          *handle;
+       int                      rc;
         ENTRY;
 
        if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
@@ -1221,47 +1291,56 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                RETURN(rc);
        }
 
-        rc = mdd_xattr_sanity_check(env, mdd_obj);
-        if (rc)
-                RETURN(rc);
+       rc = mdd_xattr_sanity_check(env, mdd_obj);
+       if (rc)
+               RETURN(rc);
 
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
-                handle->th_sync |= !!mdd->mdd_sync_permission;
+       /* security-replated changes may require sync */
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+               handle->th_sync |= !!mdd->mdd_sync_permission;
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+
+       if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
+               rc = mdd_hsm_update_locked(env, obj, buf, handle);
+               if (rc) {
+                       mdd_write_unlock(env, mdd_obj);
+                       GOTO(stop, rc);
+               }
+       }
+
        rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
                           mdd_object_capa(env, mdd_obj));
        mdd_write_unlock(env, mdd_obj);
        if (rc)
                GOTO(stop, rc);
 
-        /* Only record system & user xattr changes */
+       /* Only record system & user xattr changes */
        if (strncmp(XATTR_USER_PREFIX, name,
-                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
-                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
-                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
-                                 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
-                                              handle);
+                       sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                       sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                       sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
+               rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
+                                             handle);
 
 stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       mdd_trans_stop(env, mdd, rc, handle);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_xattr_del(const struct lu_env *env,
@@ -1335,6 +1414,302 @@ stop:
         RETURN(rc);
 }
 
+/*
+ * read lov EA of an object
+ * return the lov EA in an allocated lu_buf
+ */
+static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
+                                    struct mdd_object *obj)
+{
+       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
+       struct lu_buf   *lmm_buf = NULL;
+       int              rc, sz;
+       ENTRY;
+
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
+                          mdd_object_capa(env, obj));
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (rc == 0)
+               GOTO(out, rc = -ENODATA);
+
+       sz = rc;
+       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
+               /* mti_big_buf was not allocated, so we have to
+                * allocate it based on the ea size */
+               buf = mdd_buf_alloc(env, sz);
+               if (buf->lb_buf == NULL)
+                       GOTO(out, rc = -ENOMEM);
+               goto repeat;
+       }
+
+       OBD_ALLOC_PTR(lmm_buf);
+       if (!lmm_buf)
+               GOTO(out, rc = -ENOMEM);
+
+       OBD_ALLOC(lmm_buf->lb_buf, sz);
+       if (!lmm_buf->lb_buf)
+               GOTO(free, rc = -ENOMEM);
+
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       lmm_buf->lb_len = sz;
+
+       GOTO(out, rc = 0);
+
+free:
+       if (lmm_buf)
+               OBD_FREE_PTR(lmm_buf);
+out:
+       if (rc)
+               return ERR_PTR(rc);
+       return lmm_buf;
+}
+
+
+/*
+ *  check if layout swapping between 2 objects is allowed
+ *  the rules are:
+ *  - same type of objects
+ *  - same owner/group (so quotas are still valid)
+ */
+static int mdd_layout_swap_allowed(const struct lu_env *env,
+                                  struct mdd_object *o1,
+                                  struct mdd_object *o2)
+{
+       const struct lu_fid     *fid1, *fid2;
+       __u32                    uid, gid;
+       struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
+       int                      rc;
+       ENTRY;
+
+       fid1 = mdo2fid(o1);
+       fid2 = mdo2fid(o2);
+
+       if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
+           (mdd_object_type(o1) != mdd_object_type(o2)))
+               RETURN(-EPERM);
+
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+       uid = tmp_la->la_uid;
+       gid = tmp_la->la_gid;
+
+       tmp_la->la_valid = 0;
+       rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+
+       if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
+               RETURN(-EPERM);
+
+       RETURN(0);
+}
+
+/**
+ * swap layouts between 2 lustre objects
+ */
+static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
+                           struct md_object *obj2, __u64 flags)
+{
+       struct mdd_object       *o1, *o2, *fst_o, *snd_o;
+       struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
+       struct lu_buf           *fst_buf, *snd_buf;
+       struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
+       struct thandle          *handle;
+       struct mdd_device       *mdd = mdo2mdd(obj1);
+       int                      rc;
+       __u16                    fst_gen, snd_gen;
+       ENTRY;
+
+       /* we have to sort the 2 obj, so locking will always
+        * be in the same order, even in case of 2 concurrent swaps */
+       rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
+                      mdo2fid(md2mdd_obj(obj2)));
+       /* same fid ? */
+       if (rc == 0)
+               RETURN(-EPERM);
+
+       if (rc > 0) {
+               o1 = md2mdd_obj(obj1);
+               o2 = md2mdd_obj(obj2);
+       } else {
+               o1 = md2mdd_obj(obj2);
+               o2 = md2mdd_obj(obj1);
+       }
+
+       /* check if layout swapping is allowed */
+       rc = mdd_layout_swap_allowed(env, o1, o2);
+       if (rc)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       /* objects are already sorted */
+       mdd_write_lock(env, o1, MOR_TGT_CHILD);
+       mdd_write_lock(env, o2, MOR_TGT_CHILD);
+
+       lmm1_buf = mdd_get_lov_ea(env, o1);
+       if (IS_ERR(lmm1_buf)) {
+               rc = PTR_ERR(lmm1_buf);
+               lmm1_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       lmm2_buf = mdd_get_lov_ea(env, o2);
+       if (IS_ERR(lmm2_buf)) {
+               rc = PTR_ERR(lmm2_buf);
+               lmm2_buf = NULL;
+               if (rc != -ENODATA)
+                       GOTO(unlock, rc);
+       }
+
+       /* swapping 2 non existant layouts is a success */
+       if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
+               GOTO(unlock, rc = 0);
+
+       /* to help inode migration between MDT, it is better to
+        * start by the no layout file (if one), so we order the swap */
+       if (lmm1_buf == NULL) {
+               fst_o = o1;
+               fst_buf = lmm1_buf;
+               snd_o = o2;
+               snd_buf = lmm2_buf;
+       } else {
+               fst_o = o2;
+               fst_buf = lmm2_buf;
+               snd_o = o1;
+               snd_buf = lmm1_buf;
+       }
+
+       /* lmm and generation layout initialization */
+       if (fst_buf) {
+               fst_lmm = fst_buf->lb_buf;
+               fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
+       } else {
+               fst_lmm = NULL;
+               fst_gen = 0;
+       }
+
+       if (snd_buf) {
+               snd_lmm = snd_buf->lb_buf;
+               snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
+       } else {
+               snd_lmm = NULL;
+               snd_gen = 0;
+       }
+
+       /* save the orignal lmm common header of first file
+        * to be able to roll back */
+       OBD_ALLOC_PTR(old_fst_lmm);
+       if (old_fst_lmm == NULL)
+               GOTO(unlock, rc = -ENOMEM);
+
+       memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
+
+       /* increase the generation layout numbers */
+       snd_gen++;
+       fst_gen++;
+
+       /* set the file specific informations in lmm */
+       if (fst_lmm) {
+               fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
+               fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
+               fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
+       }
+
+       if (snd_lmm) {
+               snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
+               snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+               snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+       }
+
+       /* prepare transaction */
+       rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
+                                  LU_XATTR_REPLACE, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, fst_o));
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
+                          LU_XATTR_REPLACE, handle,
+                          mdd_object_capa(env, snd_o));
+       if (rc) {
+               int     rc2;
+
+               /* failure on second file, but first was done, so we have
+                * to roll back first */
+               /* restore object_id, object_seq and generation number
+                * on first file */
+               if (fst_lmm) {
+                       fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
+                       fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
+                       fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
+               }
+
+               rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
+                                   LU_XATTR_REPLACE, handle,
+                                   mdd_object_capa(env, fst_o));
+               if (rc2) {
+                       /* very bad day */
+                       CERROR("%s: unable to roll back after swap layouts"
+                              " failure between "DFID" and "DFID
+                              " rc2 = %d rc = %d)\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
+                              rc2, rc);
+                       /* a solution to avoid journal commit is to panic,
+                        * but it has strong consequences so we use LBUG to
+                        * allow sysdamin to choose to panic or not
+                        */
+                       LBUG();
+               }
+               GOTO(stop, rc);
+       }
+       EXIT;
+
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+unlock:
+       mdd_write_unlock(env, o2);
+       mdd_write_unlock(env, o1);
+
+       if (lmm1_buf && lmm1_buf->lb_buf)
+               OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
+       if (lmm1_buf)
+               OBD_FREE_PTR(lmm1_buf);
+
+       if (lmm2_buf && lmm2_buf->lb_buf)
+               OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
+       if (lmm2_buf)
+               OBD_FREE_PTR(lmm2_buf);
+
+       if (old_fst_lmm)
+               OBD_FREE_PTR(old_fst_lmm);
+
+       return rc;
+}
+
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                struct mdd_object *child, struct lu_attr *attr)
 {
@@ -1351,28 +1726,27 @@ void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
  */
 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
 {
-        int res = 0;
-
-        /* Sadly, NFSD reopens a file repeatedly during operation, so the
-         * "acc_mode = 0" allowance for newly-created files isn't honoured.
-         * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
-         * owner can write to a file even if it is marked readonly to hide
-         * its brokenness. (bug 5781) */
-        if (flags & MDS_OPEN_OWNEROVERRIDE) {
-                struct md_ucred *uc = md_ucred(env);
-
-                if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
-                    (la->la_uid == uc->mu_fsuid))
-                        return 0;
-        }
+       int res = 0;
+
+       /* Sadly, NFSD reopens a file repeatedly during operation, so the
+        * "acc_mode = 0" allowance for newly-created files isn't honoured.
+        * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
+        * owner can write to a file even if it is marked readonly to hide
+        * its brokenness. (bug 5781) */
+       if (flags & MDS_OPEN_OWNEROVERRIDE) {
+               struct lu_ucred *uc = lu_ucred_check(env);
+
+               if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
+                       return 0;
+       }
 
-        if (flags & FMODE_READ)
-                res |= MAY_READ;
-        if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
-                res |= MAY_WRITE;
-        if (flags & MDS_FMODE_EXEC)
-                res = MAY_EXEC;
-        return res;
+       if (flags & FMODE_READ)
+               res |= MAY_READ;
+       if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
+               res |= MAY_WRITE;
+       if (flags & MDS_FMODE_EXEC)
+               res = MAY_EXEC;
+       return res;
 }
 
 static int mdd_open_sanity_check(const struct lu_env *env,
@@ -1421,13 +1795,13 @@ static int mdd_open_sanity_check(const struct lu_env *env,
          * Now, flag -- O_NOATIME does not be packed by client.
          */
         if (flag & O_NOATIME) {
-                struct md_ucred *uc = md_ucred(env);
+               struct lu_ucred *uc = lu_ucred(env);
 
-                if (uc && ((uc->mu_valid == UCRED_OLD) ||
-                    (uc->mu_valid == UCRED_NEW)) &&
-                    (uc->mu_fsuid != tmp_la->la_uid) &&
-                    !mdd_capable(uc, CFS_CAP_FOWNER))
-                        RETURN(-EPERM);
+               if (uc && ((uc->uc_valid == UCRED_OLD) ||
+                          (uc->uc_valid == UCRED_NEW)) &&
+                   (uc->uc_fsuid != tmp_la->la_uid) &&
+                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                       RETURN(-EPERM);
         }
 #endif
 
@@ -1496,7 +1870,9 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         ENTRY;
 
         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
-                mdd_obj->mod_count--;
+               mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+               mdd_obj->mod_count--;
+               mdd_write_unlock(env, mdd_obj);
 
                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
@@ -1504,10 +1880,14 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                 RETURN(0);
         }
 
-        /* check without any lock */
-        if (mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
- again:
+       /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
+        * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
+       /* check without any lock */
+       is_orphan = mdd_obj->mod_count == 1 &&
+                   (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
+
+again:
+       if (is_orphan) {
                 handle = mdd_trans_create(env, mdo2mdd(obj));
                 if (IS_ERR(handle))
                         RETURN(PTR_ERR(handle));
@@ -1526,71 +1906,62 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        if (handle == NULL && mdd_obj->mod_count == 1 &&
-            (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
-                mdd_write_unlock(env, mdd_obj);
-                goto again;
-        }
-
-        /* release open count */
-        mdd_obj->mod_count --;
-
-        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
-                /* remove link to object from orphan index */
-                LASSERT(handle != NULL);
-                rc = __mdd_orphan_del(env, mdd_obj, handle);
-                if (rc == 0) {
-                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
-                               "list, OSS objects to be destroyed.\n",
-                               PFID(mdd_object_fid(mdd_obj)));
-                        is_orphan = 1;
-                } else {
-                        CERROR("Object "DFID" can not be deleted from orphan "
-                                "list, maybe cause OST objects can not be "
-                                "destroyed (err: %d).\n",
-                                PFID(mdd_object_fid(mdd_obj)), rc);
-                        /* If object was not deleted from orphan list, do not
-                         * destroy OSS objects, which will be done when next
-                         * recovery. */
-                        GOTO(out, rc);
-                }
-        }
-
        rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
                        mdd_object_capa(env, mdd_obj));
-        /* Object maybe not in orphan list originally, it is rare case for
-         * mdd_finish_unlink() failure. */
-        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
-               if (handle == NULL) {
-                       handle = mdd_trans_create(env, mdo2mdd(obj));
-                       if (IS_ERR(handle))
-                               GOTO(out, rc = PTR_ERR(handle));
-
-                       rc = mdo_declare_destroy(env, mdd_obj, handle);
-                       if (rc)
-                               GOTO(out, rc);
+       if (rc != 0) {
+               CERROR("Failed to get lu_attr of "DFID": %d\n",
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               GOTO(out, rc);
+       }
 
-                       rc = mdd_declare_changelog_store(env, mdd,
-                                       NULL, handle);
-                       if (rc)
-                               GOTO(stop, rc);
+       /* check again with lock */
+       is_orphan = (mdd_obj->mod_count == 1) &&
+                   ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
+                    ma->ma_attr.la_nlink == 0);
 
-                       rc = mdd_trans_start(env, mdo2mdd(obj), handle);
-                       if (rc)
-                               GOTO(out, rc);
+       if (is_orphan && handle == NULL) {
+               mdd_write_unlock(env, mdd_obj);
+               goto again;
+       }
+
+       mdd_obj->mod_count--; /*release open count */
+
+       if (!is_orphan)
+               GOTO(out, rc = 0);
+
+       /* Orphan object */
+       /* NB: Object maybe not in orphan list originally, it is rare case for
+        * mdd_finish_unlink() failure, in that case, the object doesn't have
+        * ORPHAN_OBJ flag */
+       if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+               /* remove link to object from orphan index */
+               LASSERT(handle != NULL);
+               rc = __mdd_orphan_del(env, mdd_obj, handle);
+               if (rc != 0) {
+                       CERROR("%s: unable to delete "DFID" from orphan list: "
+                              "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                              PFID(mdd_object_fid(mdd_obj)), rc);
+                       /* If object was not deleted from orphan list, do not
+                        * destroy OSS objects, which will be done when next
+                        * recovery. */
+                       GOTO(out, rc);
                }
 
-               rc = mdo_destroy(env, mdd_obj, handle);
+               CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                      "list, OSS objects to be destroyed.\n",
+                      PFID(mdd_object_fid(mdd_obj)));
+       }
+
+       rc = mdo_destroy(env, mdd_obj, handle);
 
-                if (rc != 0)
-                        CERROR("Error when prepare to delete Object "DFID" , "
-                               "which will cause OST objects can not be "
-                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
-        }
-        EXIT;
+       if (rc != 0) {
+               CERROR("%s: unable to delete "DFID" from orphan list: "
+                      "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+       }
+       EXIT;
 
 out:
-
         mdd_write_unlock(env, mdd_obj);
 
         if (rc == 0 &&
@@ -1710,7 +2081,7 @@ out:
        if (result > 0)
                /* end of directory */
                dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
-       if (result < 0)
+       else if (result < 0)
                CWARN("build page failed: %d!\n", result);
         return result;
 }
@@ -1795,19 +2166,20 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 }
 
 const struct md_object_operations mdd_obj_ops = {
-        .moo_permission    = mdd_permission,
-        .moo_attr_get      = mdd_attr_get,
-        .moo_attr_set      = mdd_attr_set,
-        .moo_xattr_get     = mdd_xattr_get,
-        .moo_xattr_set     = mdd_xattr_set,
-        .moo_xattr_list    = mdd_xattr_list,
-        .moo_xattr_del     = mdd_xattr_del,
-        .moo_open          = mdd_open,
-        .moo_close         = mdd_close,
-        .moo_readpage      = mdd_readpage,
-        .moo_readlink      = mdd_readlink,
-        .moo_changelog     = mdd_changelog,
-        .moo_capa_get      = mdd_capa_get,
-        .moo_object_sync   = mdd_object_sync,
-        .moo_path          = mdd_path,
+       .moo_permission         = mdd_permission,
+       .moo_attr_get           = mdd_attr_get,
+       .moo_attr_set           = mdd_attr_set,
+       .moo_xattr_get          = mdd_xattr_get,
+       .moo_xattr_set          = mdd_xattr_set,
+       .moo_xattr_list         = mdd_xattr_list,
+       .moo_xattr_del          = mdd_xattr_del,
+       .moo_swap_layouts       = mdd_swap_layouts,
+       .moo_open               = mdd_open,
+       .moo_close              = mdd_close,
+       .moo_readpage           = mdd_readpage,
+       .moo_readlink           = mdd_readlink,
+       .moo_changelog          = mdd_changelog,
+       .moo_capa_get           = mdd_capa_get,
+       .moo_object_sync        = mdd_object_sync,
+       .moo_path               = mdd_path,
 };