Whamcloud - gitweb
LU-998 acl: declare acl operation for setattr
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index ecfe82b..cf17146 100644 (file)
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs_jbd2.h>
-#else
-#include <linux/jbd.h>
-#endif
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
+#include <obd_lov.h>
 
 #include <lustre_param.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs.h>
-#else
-#include <linux/ldiskfs_fs.h>
-#endif
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
@@ -635,7 +626,8 @@ static int __mdd_lmm_get(const struct lu_env *env,
                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
         if (rc > 0) {
                 ma->ma_lmm_size = rc;
-                ma->ma_valid |= MA_LOV;
+                ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
+                ma->ma_valid |= MA_LOV | MA_LAY_GEN;
                 rc = 0;
         }
         RETURN(rc);
@@ -880,6 +872,30 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         RETURN(rc);
 }
 
+int mdd_declare_object_create_internal(const struct lu_env *env,
+                                       struct mdd_object *p,
+                                       struct mdd_object *c,
+                                       struct md_attr *ma,
+                                       struct thandle *handle,
+                                       const struct md_op_spec *spec)
+{
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
+        int rc;
+        ENTRY;
+
+        if (feat != &dt_directory_features && feat != NULL)
+                dof->dof_type = DFT_INDEX;
+        else
+                dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
+
+        dof->u.dof_idx.di_feat = feat;
+
+        rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+
+        RETURN(rc);
+}
+
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
                                struct thandle *handle,
@@ -1277,6 +1293,7 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
 {
         const struct lu_fid *tfid = mdo2fid(mdd_obj);
         struct llog_changelog_rec *rec;
+        struct thandle *th = NULL;
         struct lu_buf *buf;
         int reclen;
         int rc;
@@ -1287,8 +1304,8 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
                 RETURN(0);
 
-        LASSERT(handle != NULL);
         LASSERT(mdd_obj != NULL);
+        LASSERT(handle != NULL);
 
         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
@@ -1310,7 +1327,11 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
-        rc = mdd_changelog_llog_write(mdd, rec, handle);
+        rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
+
+        if (th)
+                mdd_trans_stop(env, mdd, rc, th);
+
         if (rc < 0) {
                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
                        rc, type, PFID(tfid));
@@ -1329,14 +1350,22 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
         int rc;
         ENTRY;
 
-        handle = mdd_trans_start(env, mdd);
-
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 return(PTR_ERR(handle));
 
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
                                       handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1449,6 +1478,99 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
                                         md2mdd_obj(obj), handle);
 }
 
+static int mdd_declare_attr_set(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                struct mdd_object *obj,
+                                const struct md_attr *ma,
+                                struct lov_mds_md *lmm,
+                                struct thandle *handle)
+{
+        struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
+        int             rc, i;
+
+        rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
+        if (rc)
+                return rc;
+
+        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+        if (rc)
+                return rc;
+
+        if (ma->ma_valid & MA_LOV) {
+                buf->lb_buf = NULL;
+                buf->lb_len = ma->ma_lmm_size;
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+        if (ma->ma_valid & (MA_HSM | MA_SOM)) {
+                buf->lb_buf = NULL;
+                buf->lb_len = sizeof(struct lustre_mdt_attrs);
+                rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
+                                           0, handle);
+                if (rc)
+                        return rc;
+        }
+
+#ifdef CONFIG_FS_POSIX_ACL
+        if (ma->ma_attr.la_valid & LA_MODE) {
+                mdd_read_lock(env, obj, MOR_TGT_CHILD);
+                rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
+                                   BYPASS_CAPA);
+                mdd_read_unlock(env, obj);
+                if (rc == -EOPNOTSUPP || rc == -ENODATA)
+                        rc = 0;
+                else if (rc < 0)
+                        return rc;
+
+                if (rc != 0) {
+                        buf->lb_buf = NULL;
+                        buf->lb_len = rc;
+                        rc = mdo_declare_xattr_set(env, obj, buf,
+                                                   XATTR_NAME_ACL_ACCESS, 0,
+                                                   handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+#endif
+
+        /* basically the log is the same as in unlink case */
+        if (lmm) {
+                __u16 stripe;
+
+                if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
+                                le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
+                        CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+                               mdd->mdd_obd_dev->obd_name,
+                               le32_to_cpu(lmm->lmm_magic),
+                               PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+                        return -EINVAL;
+                }
+
+                stripe = le16_to_cpu(lmm->lmm_stripe_count);
+                if (stripe == LOV_ALL_STRIPES) {
+                        struct lov_desc *ldesc;
+
+                        ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
+                        LASSERT(ldesc != NULL);
+                        stripe = ldesc->ld_tgt_count;
+                }
+
+                for (i = 0; i < stripe; i++) {
+                        rc = mdd_declare_llog_record(env, mdd,
+                                        sizeof(struct llog_unlink_rec),
+                                        handle);
+                        if (rc)
+                                return rc;
+                }
+        }
+
+        return rc;
+}
+
 /* set attr and LOV EA at once, return updated attr */
 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         const struct md_attr *ma)
@@ -1458,7 +1580,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct thandle *handle;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
+        int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
@@ -1481,33 +1603,36 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        /*TODO: add lock here*/
-        /* start a log jounal handle if needed */
         if (S_ISREG(mdd_object_type(mdd_obj)) &&
             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
                 lmm_size = mdd_lov_mdsize(env, mdd);
                 lmm = mdd_max_lmm_get(env, mdd);
                 if (lmm == NULL)
-                        GOTO(no_trans, rc = -ENOMEM);
+                        RETURN(-ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
                                 XATTR_NAME_LOV);
 
                 if (rc < 0)
-                        GOTO(no_trans, rc);
-        }
-
-        chlog_cnt = 1;
-        if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
-                chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
-                         lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
+                        RETURN(rc);
         }
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP, chlog_cnt);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
-                GOTO(no_trans, rc = PTR_ERR(handle));
+                RETURN(PTR_ERR(handle));
+
+        rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
+                                  lmm_size > 0 ? lmm : NULL, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* permission changes may require sync operation */
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync = !!mdd->mdd_sync_permission;
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
 
         /* permission changes may require sync operation */
         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
@@ -1592,8 +1717,8 @@ cleanup:
         if (rc == 0)
                 rc = mdd_attr_set_changelog(env, obj, handle,
                                             ma->ma_attr.la_valid);
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
-no_trans:
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
@@ -1651,6 +1776,27 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_set(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const struct lu_buf *buf,
+                                 const char *name,
+                                 struct thandle *handle)
+
+{
+        int rc;
+
+        rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1669,26 +1815,64 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
         /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+            mdd->mdd_sync_permission == 1)
+                handle->th_sync = 1;
+
+        rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        /* security-replated changes may require sync */
         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
                 handle->th_sync |= mdd->mdd_sync_permission;
 
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) == 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
+
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
 }
 
+static int mdd_declare_xattr_del(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *obj,
+                                 const char *name,
+                                 struct thandle *handle)
+{
+        int rc;
+
+        rc = mdo_declare_xattr_del(env, obj, name, handle);
+        if (rc)
+                return rc;
+
+        /* Only record user xattr changes */
+        if ((strncmp("user.", name, 5) == 0))
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+        return rc;
+}
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1706,21 +1890,34 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+        if (rc)
+                GOTO(stop, rc);
+
+        rc = mdd_trans_start(env, mdd, handle);
+        if (rc)
+                GOTO(stop, rc);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) != 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
+stop:
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1743,6 +1940,10 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
         /*
          * Check -ENOENT early here because we need to get object type
          * to calculate credits before transaction start
@@ -1752,14 +1953,12 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
 
         LASSERT(mdd_object_exists(mdd_obj) > 0);
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
-        if (rc)
-                RETURN(rc);
-
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
@@ -1852,6 +2051,10 @@ static int mdd_object_create(const struct lu_env *env,
         int rc = 0;
         ENTRY;
 
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota) {
                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
@@ -1877,11 +2080,12 @@ static int mdd_object_create(const struct lu_env *env,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
         if (rc)
@@ -1964,11 +2168,16 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
-        handle = mdd_trans_start(env, mdd);
+        /* XXX: this code won't be used ever:
+         * DNE uses slightly different approach */
+        LBUG();
+
+        handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
 
+        rc = mdd_trans_start(env, mdd, handle);
+
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
@@ -2091,10 +2300,22 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
         return rc;
 }
 
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+                            struct md_attr *ma, struct thandle *handle)
+{
+        int rc;
+
+        rc = mdd_declare_unlink_log(env, obj, ma, handle);
+        if (rc)
+                return rc;
+
+        return mdo_declare_destroy(env, obj, handle);
+}
+
 /* return md_attr back,
  * if it is last unlink then return lov ea + llog cookie*/
 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma)
+                    struct md_attr *ma, struct thandle *handle)
 {
         int rc = 0;
         ENTRY;
@@ -2108,14 +2329,32 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
                                             obj, ma);
         }
+
+        if (rc == 0)
+                rc = mdo_destroy(env, obj, handle);
+
         RETURN(rc);
 }
 
+static int mdd_declare_close(const struct lu_env *env,
+                             struct mdd_object *obj,
+                             struct md_attr *ma,
+                             struct thandle *handle)
+{
+        int rc;
+
+        rc = orph_declare_index_delete(env, obj, handle);
+        if (rc)
+                return rc;
+
+        return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
 /*
  * No permission check is needed.
  */
 static int mdd_close(const struct lu_env *env, struct md_object *obj,
-                     struct md_attr *ma)
+                     struct md_attr *ma, int mode)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
@@ -2144,12 +2383,21 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         if (mdd_obj->mod_count == 1 &&
             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
  again:
-                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
-                if (rc)
-                        RETURN(rc);
-                handle = mdd_trans_start(env, mdo2mdd(obj));
+                handle = mdd_trans_create(env, mdo2mdd(obj));
                 if (IS_ERR(handle))
                         RETURN(PTR_ERR(handle));
+
+                rc = mdd_declare_close(env, mdd_obj, ma, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+                if (rc)
+                        GOTO(stop, rc);
+
+                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                if (rc)
+                        GOTO(stop, rc);
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -2164,6 +2412,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
 
         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
                 /* remove link to object from orphan index */
+                LASSERT(handle != NULL);
                 rc = __mdd_orphan_del(env, mdd_obj, handle);
                 if (rc == 0) {
                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
@@ -2196,7 +2445,27 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
                 } else {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
+                        if (handle == NULL) {
+                                handle = mdd_trans_create(env, mdo2mdd(obj));
+                                if (IS_ERR(handle))
+                                        GOTO(out, rc = PTR_ERR(handle));
+
+                                rc = mdd_declare_object_kill(env, mdd_obj, ma,
+                                                             handle);
+                                if (rc)
+                                        GOTO(out, rc);
+
+                                rc = mdd_declare_changelog_store(env, mdd,
+                                                                 NULL, handle);
+                                if (rc)
+                                        GOTO(stop, rc);
+
+                                rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                                if (rc)
+                                        GOTO(out, rc);
+                        }
+
+                        rc = mdd_object_kill(env, mdd_obj, ma, handle);
                         if (rc == 0)
                                 reset = 0;
                 }
@@ -2213,8 +2482,32 @@ out:
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         mdd_write_unlock(env, mdd_obj);
+
+        if (rc == 0 &&
+            (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
+            !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
+                if (handle == NULL) {
+                        handle = mdd_trans_create(env, mdo2mdd(obj));
+                        if (IS_ERR(handle))
+                                GOTO(stop, rc = IS_ERR(handle));
+
+                        rc = mdd_declare_changelog_store(env, mdd, NULL,
+                                                         handle);
+                        if (rc)
+                                GOTO(stop, rc);
+
+                        rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+                        if (rc)
+                                GOTO(stop, rc);
+                }
+
+                mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
+                                         mdd_obj, handle);
+        }
+
+stop:
         if (handle != NULL)
-                mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
+                mdd_trans_stop(env, mdd, rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
                 /* Trigger dqrel on the owner of child. If failed,
@@ -2265,7 +2558,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 int    len;
                 int    recsize;
 
-                len  = iops->key_size(env, it);
+                len = iops->key_size(env, it);
 
                 /* IAM iterator can return record with zero len. */
                 if (len == 0)
@@ -2281,7 +2574,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 recsize = lu_dirent_calc_size(len, attr);
 
                 if (nob >= recsize) {
-                        result = iops->rec(env, it, ent, attr);
+                        result = iops->rec(env, it, (struct dt_rec *)ent, attr);
                         if (result == -ESTALE)
                                 goto next;
                         if (result != 0)
@@ -2477,24 +2770,6 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
         return next->do_ops->do_object_sync(env, next);
 }
 
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
-                                        struct md_object *obj)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_get(env, mdd_object_child(mdd_obj));
-}
-
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
-                            dt_obj_version_t version)
-{
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
-        LASSERT(mdd_object_exists(mdd_obj));
-        do_version_set(env, mdd_object_child(mdd_obj), version);
-}
-
 const struct md_object_operations mdd_obj_ops = {
         .moo_permission    = mdd_permission,
         .moo_attr_get      = mdd_attr_get,
@@ -2513,8 +2788,6 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_changelog     = mdd_changelog,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
-        .moo_version_get   = mdd_version_get,
-        .moo_version_set   = mdd_version_set,
         .moo_path          = mdd_path,
         .moo_file_lock     = mdd_file_lock,
         .moo_file_unlock   = mdd_file_unlock,