Whamcloud - gitweb
LU-64 Modify the behavior of ioctl on directories without EA set.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 1b1ae53..10fea55 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs_jbd2.h>
+#else
 #include <linux/jbd.h>
+#endif
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lustre_fid.h>
 
 #include <lustre_param.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs.h>
+#else
 #include <linux/ldiskfs_fs.h>
+#endif
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
@@ -118,11 +126,9 @@ void mdd_buf_put(struct lu_buf *buf)
 {
         if (buf == NULL || buf->lb_buf == NULL)
                 return;
-        if (buf->lb_vmalloc)
-                OBD_VFREE(buf->lb_buf, buf->lb_len);
-        else
-                OBD_FREE(buf->lb_buf, buf->lb_len);
+        OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
         buf->lb_buf = NULL;
+        buf->lb_len = 0;
 }
 
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
@@ -136,28 +142,17 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
         return buf;
 }
 
-#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
 {
         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
 
         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-                if (buf->lb_vmalloc)
-                        OBD_VFREE(buf->lb_buf, buf->lb_len);
-                else
-                        OBD_FREE(buf->lb_buf, buf->lb_len);
+                OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
                 buf->lb_buf = NULL;
         }
         if (buf->lb_buf == NULL) {
                 buf->lb_len = len;
-                if (buf->lb_len <= BUF_VMALLOC_SIZE) {
-                        OBD_ALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 0;
-                }
-                if (buf->lb_buf == NULL) {
-                        OBD_VMALLOC(buf->lb_buf, buf->lb_len);
-                        buf->lb_vmalloc = 1;
-                }
+                OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
                 if (buf->lb_buf == NULL)
                         buf->lb_len = 0;
         }
@@ -175,23 +170,15 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len)
         struct lu_buf buf;
 
         LASSERT(len >= oldbuf->lb_len);
-        if (len > BUF_VMALLOC_SIZE) {
-                OBD_VMALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 1;
-        } else {
-                OBD_ALLOC(buf.lb_buf, len);
-                buf.lb_vmalloc = 0;
-        }
+        OBD_ALLOC_LARGE(buf.lb_buf, len);
+
         if (buf.lb_buf == NULL)
                 return -ENOMEM;
 
         buf.lb_len = len;
         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
 
-        if (oldbuf->lb_vmalloc)
-                OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
-        else
-                OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+        OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
 
         memcpy(oldbuf, &buf, sizeof(buf));
 
@@ -207,12 +194,13 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
         max_cookie_size = mdd_lov_cookiesize(env, mdd);
         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
                 if (mti->mti_max_cookie)
-                        OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
+                        OBD_FREE_LARGE(mti->mti_max_cookie,
+                                       mti->mti_max_cookie_size);
                 mti->mti_max_cookie = NULL;
                 mti->mti_max_cookie_size = 0;
         }
         if (unlikely(mti->mti_max_cookie == NULL)) {
-                OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
+                OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
                 if (likely(mti->mti_max_cookie != NULL))
                         mti->mti_max_cookie_size = max_cookie_size;
         }
@@ -230,13 +218,13 @@ struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
         max_lmm_size = mdd_lov_mdsize(env, mdd);
         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
                 if (mti->mti_max_lmm)
-                        OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
+                        OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
                 mti->mti_max_lmm = NULL;
                 mti->mti_max_lmm_size = 0;
         }
         if (unlikely(mti->mti_max_lmm == NULL)) {
-                OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
-                if (unlikely(mti->mti_max_lmm != NULL))
+                OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
+                if (likely(mti->mti_max_lmm != NULL))
                         mti->mti_max_lmm_size = max_lmm_size;
         }
         return mti->mti_max_lmm;
@@ -265,7 +253,7 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env,
 }
 
 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
-                           const struct lu_object_conf *_)
+                           const struct lu_object_conf *unused)
 {
         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
         struct mdd_object *mdd_obj = lu2mdd_obj(o);
@@ -306,7 +294,7 @@ static int mdd_object_print(const struct lu_env *env, void *cookie,
 {
         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
-                    "valid=%x, cltime=%llu, flags=%lx)",
+                    "valid=%x, cltime="LPU64", flags=%lx)",
                     mdd, mdd->mod_count, mdd->mod_valid,
                     mdd->mod_cltime, mdd->mod_flags);
 }
@@ -363,7 +351,7 @@ static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
                 if (obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(obj))
-                        GOTO(out, rc = -PTR_ERR(obj));
+                        GOTO(out, rc = PTR_ERR(obj));
                 /* get child fid from parent and name */
                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
                 mdd_object_put(env, obj);
@@ -426,7 +414,7 @@ static int mdd_path_current(const struct lu_env *env,
                 if (mdd_obj == NULL)
                         GOTO(out, rc = -EREMOTE);
                 if (IS_ERR(mdd_obj))
-                        GOTO(out, rc = -PTR_ERR(mdd_obj));
+                        GOTO(out, rc = PTR_ERR(mdd_obj));
                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
                 if (rc <= 0) {
                         mdd_object_put(env, mdd_obj);
@@ -481,9 +469,9 @@ static int mdd_path_current(const struct lu_env *env,
         /* Verify that our path hasn't changed since we started the lookup.
            Record the current index, and verify the path resolves to the
            same fid. If it does, then the path is correct as of this index. */
-        spin_lock(&mdd->mdd_cl.mc_lock);
+        cfs_spin_lock(&mdd->mdd_cl.mc_lock);
         pli->pli_currec = mdd->mdd_cl.mc_index;
-        spin_unlock(&mdd->mdd_cl.mc_lock);
+        cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -495,12 +483,12 @@ static int mdd_path_current(const struct lu_env *env,
                        PFID(&pli->pli_fid));
                 GOTO(out, rc = -EAGAIN);
         }
-
+        ptr++; /* skip leading / */
         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
 
         EXIT;
 out:
-        if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+        if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
                 /* if we vmalloced a large buffer drop it */
                 mdd_buf_put(buf);
 
@@ -526,8 +514,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj,
                 RETURN(-EOVERFLOW);
 
         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
-                path[0] = '/';
-                path[1] = '\0';
+                path[0] = '\0';
                 RETURN(0);
         }
 
@@ -598,27 +585,35 @@ int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
         RETURN(rc);
 }
 
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
-                       int *size)
+int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
 {
         struct lov_desc *ldesc;
         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
+        struct lov_user_md *lum = (struct lov_user_md*)lmm;
         ENTRY;
 
+        if (!lum)
+                RETURN(0);
+
         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
         LASSERT(ldesc != NULL);
 
-        if (!lmm)
-                RETURN(0);
+        lum->lmm_magic = LOV_MAGIC_V1;
+        lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
+        lum->lmm_pattern = ldesc->ld_pattern;
+        lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
+        lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
+        lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
+
+        RETURN(sizeof(*lum));
+}
 
-        lmm->lmm_magic = LOV_MAGIC_V1;
-        lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
-        lmm->lmm_pattern = ldesc->ld_pattern;
-        lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
-        lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
-        *size = sizeof(struct lov_mds_md);
+static int is_rootdir(struct mdd_object *mdd_obj)
+{
+        const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
+        const struct lu_fid *fid = mdo2fid(mdd_obj);
 
-        RETURN(sizeof(struct lov_mds_md));
+        return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
 }
 
 /* get lov EA only */
@@ -633,13 +628,10 @@ static int __mdd_lmm_get(const struct lu_env *env,
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
                         XATTR_NAME_LOV);
-
-        if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
-                rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
-                                &ma->ma_lmm_size);
-        }
-
+        if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
+                rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
         if (rc > 0) {
+                ma->ma_lmm_size = rc;
                 ma->ma_valid |= MA_LOV;
                 rc = 0;
         }
@@ -677,8 +669,58 @@ static int __mdd_lmv_get(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_attr_get_internal(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
+static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
+                         struct md_attr *ma)
+{
+        struct mdd_thread_info *info = mdd_env_info(env);
+        struct lustre_mdt_attrs *lma =
+                                 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
+        int lma_size;
+        int rc;
+        ENTRY;
+
+        /* If all needed data are already valid, nothing to do */
+        if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
+            (ma->ma_need & (MA_HSM | MA_SOM)))
+                RETURN(0);
+
+        /* Read LMA from disk EA */
+        lma_size = sizeof(info->mti_xattr_buf);
+        rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
+        if (rc <= 0)
+                RETURN(rc);
+
+        /* Useless to check LMA incompatibility because this is already done in
+         * osd_ea_fid_get(), and this will fail long before this code is
+         * called.
+         * So, if we are here, LMA is compatible.
+         */
+
+        lustre_lma_swab(lma);
+
+        /* Swab and copy LMA */
+        if (ma->ma_need & MA_HSM) {
+                if (lma->lma_compat & LMAC_HSM)
+                        ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
+                else
+                        ma->ma_hsm.mh_flags = 0;
+                ma->ma_valid |= MA_HSM;
+        }
+
+        /* Copy SOM */
+        if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
+                LASSERT(ma->ma_som != NULL);
+                ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
+                ma->ma_som->msd_size    = lma->lma_som_size;
+                ma->ma_som->msd_blocks  = lma->lma_som_blocks;
+                ma->ma_som->msd_mountid = lma->lma_som_mountid;
+                ma->ma_valid |= MA_SOM;
+        }
+
+        RETURN(0);
+}
+
+int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
                                  struct md_attr *ma)
 {
         int rc = 0;
@@ -696,14 +738,18 @@ static int mdd_attr_get_internal(const struct lu_env *env,
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmv_get(env, mdd_obj, ma);
         }
+        if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
+                if (S_ISREG(mdd_object_type(mdd_obj)))
+                        rc = __mdd_lma_get(env, mdd_obj, ma);
+        }
 #ifdef CONFIG_FS_POSIX_ACL
         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = mdd_def_acl_get(env, mdd_obj, ma);
         }
 #endif
-        CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
-               rc, ma->ma_valid);
+        CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+               rc, ma->ma_valid, ma->ma_lmm);
         RETURN(rc);
 }
 
@@ -711,7 +757,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env,
                                  struct mdd_object *mdd_obj, struct md_attr *ma)
 {
         int rc;
-        int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
+        int needlock = ma->ma_need &
+                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
 
         if (needlock)
                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -957,7 +1004,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         struct lu_attr *la, const struct md_attr *ma)
 {
         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
-        struct md_ucred  *uc         = md_ucred(env);
+        struct md_ucred  *uc;
         int               rc;
         ENTRY;
 
@@ -972,6 +1019,13 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
+        /* export destroy does not have ->le_ses, but we may want
+         * to drop LUSTRE_SOM_FL. */
+        if (!env->le_ses)
+                RETURN(0);
+
+        uc = md_ucred(env);
+
         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
@@ -989,8 +1043,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if (la->la_valid == LA_ATIME) {
                 /* This is atime only set for read atime update on close. */
-                if (la->la_atime <= tmp_la->la_atime +
-                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff)
+                if (la->la_atime >= tmp_la->la_atime &&
+                    la->la_atime < (tmp_la->la_atime +
+                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff))
                         la->la_valid &= ~LA_ATIME;
                 RETURN(0);
         }
@@ -1037,17 +1092,35 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 }
         }
 
+        if (la->la_valid & LA_KILL_SUID) {
+                la->la_valid &= ~LA_KILL_SUID;
+                if ((tmp_la->la_mode & S_ISUID) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
+                }
+                la->la_mode &= ~S_ISUID;
+        }
+
+        if (la->la_valid & LA_KILL_SGID) {
+                la->la_valid &= ~LA_KILL_SGID;
+                if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
+                                        (S_ISGID | S_IXGRP)) &&
+                    !(la->la_valid & LA_MODE)) {
+                        la->la_mode = tmp_la->la_mode;
+                        la->la_valid |= LA_MODE;
+                }
+                la->la_mode &= ~S_ISGID;
+        }
+
         /* Make sure a caller can chmod. */
         if (la->la_valid & LA_MODE) {
-                /* Bypass la_vaild == LA_MODE,
-                 * this is for changing file with SUID or SGID. */
-                if ((la->la_valid & ~LA_MODE) &&
-                    !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
+                if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
                     (uc->mu_fsuid != tmp_la->la_uid) &&
                     !mdd_capable(uc, CFS_CAP_FOWNER))
                         RETURN(-EPERM);
 
-                if (la->la_mode == (umode_t) -1)
+                if (la->la_mode == (cfs_umode_t) -1)
                         la->la_mode = tmp_la->la_mode;
                 else
                         la->la_mode = (la->la_mode & S_IALLUGO) |
@@ -1163,6 +1236,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 static int mdd_changelog_data_store(const struct lu_env     *env,
                                     struct mdd_device       *mdd,
                                     enum changelog_rec_type type,
+                                    int                     flags,
                                     struct mdd_object       *mdd_obj,
                                     struct thandle          *handle)
 {
@@ -1172,13 +1246,16 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         int reclen;
         int rc;
 
+        /* Not recording */
         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
                 RETURN(0);
+        if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+                RETURN(0);
 
         LASSERT(handle != NULL);
         LASSERT(mdd_obj != NULL);
 
-        if ((type == CL_SETATTR) &&
+        if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
                 /* Don't need multiple updates in this log */
                 /* Don't check under lock - no big deal if we get an extra
@@ -1192,10 +1269,10 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
                 RETURN(-ENOMEM);
         rec = (struct llog_changelog_rec *)buf->lb_buf;
 
-        rec->cr_flags = CLF_VERSION;
-        rec->cr_type = (__u32)type;
-        rec->cr_tfid = *tfid;
-        rec->cr_namelen = 0;
+        rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
+        rec->cr.cr_type = (__u32)type;
+        rec->cr.cr_tfid = *tfid;
+        rec->cr.cr_namelen = 0;
         mdd_obj->mod_cltime = cfs_time_current_64();
 
         rc = mdd_changelog_llog_write(mdd, rec, handle);
@@ -1208,6 +1285,135 @@ static int mdd_changelog_data_store(const struct lu_env     *env,
         return 0;
 }
 
+int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
+                  int flags, struct md_object *obj)
+{
+        struct thandle *handle;
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+        struct mdd_device *mdd = mdo2mdd(obj);
+        int rc;
+        ENTRY;
+
+        handle = mdd_trans_start(env, mdd);
+
+        if (IS_ERR(handle))
+                return(PTR_ERR(handle));
+
+        rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
+                                      handle);
+
+        mdd_trans_stop(env, mdd, rc, handle);
+
+        RETURN(rc);
+}
+
+/**
+ * Should be called with write lock held.
+ *
+ * \see mdd_lma_set_locked().
+ */
+static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
+                       const struct md_attr *ma, struct thandle *handle)
+{
+        struct mdd_thread_info *info = mdd_env_info(env);
+        struct lu_buf *buf;
+        struct lustre_mdt_attrs *lma =
+                                (struct lustre_mdt_attrs *) info->mti_xattr_buf;
+        int lmasize = sizeof(struct lustre_mdt_attrs);
+        int rc = 0;
+
+        ENTRY;
+
+        /* Either HSM or SOM part is not valid, we need to read it before */
+        if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
+                rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
+                if (rc <= 0)
+                        RETURN(rc);
+
+                lustre_lma_swab(lma);
+        } else {
+                memset(lma, 0, lmasize);
+        }
+
+        /* Copy HSM data */
+        if (ma->ma_valid & MA_HSM) {
+                lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
+                lma->lma_compat |= LMAC_HSM;
+        }
+
+        /* Copy SOM data */
+        if (ma->ma_valid & MA_SOM) {
+                LASSERT(ma->ma_som != NULL);
+                if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
+                        lma->lma_compat     &= ~LMAC_SOM;
+                } else {
+                        lma->lma_compat     |= LMAC_SOM;
+                        lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
+                        lma->lma_som_size    = ma->ma_som->msd_size;
+                        lma->lma_som_blocks  = ma->ma_som->msd_blocks;
+                        lma->lma_som_mountid = ma->ma_som->msd_mountid;
+                }
+        }
+
+        /* Copy FID */
+        memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
+
+        lustre_lma_swab(lma);
+        buf = mdd_buf_get(env, lma, lmasize);
+        rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
+
+        RETURN(rc);
+}
+
+/**
+ * Save LMA extended attributes with data from \a ma.
+ *
+ * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
+ * not, LMA EA will be first read from disk, modified and write back.
+ *
+ */
+static int mdd_lma_set_locked(const struct lu_env *env,
+                              struct mdd_object *mdd_obj,
+                              const struct md_attr *ma, struct thandle *handle)
+{
+        int rc;
+
+        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+        rc = __mdd_lma_set(env, mdd_obj, ma, handle);
+        mdd_write_unlock(env, mdd_obj);
+        return rc;
+}
+
+/* Precedence for choosing record type when multiple
+ * attributes change: setattr > mtime > ctime > atime
+ * (ctime changes when mtime does, plus chmod/chown.
+ * atime and ctime are independent.) */
+static int mdd_attr_set_changelog(const struct lu_env *env,
+                                  struct md_object *obj, struct thandle *handle,
+                                  __u64 valid)
+{
+        struct mdd_device *mdd = mdo2mdd(obj);
+        int bits, type = 0;
+
+        bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
+        bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
+        bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
+        bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
+        bits = bits & mdd->mdd_cl.mc_mask;
+        if (bits == 0)
+                return 0;
+
+        /* The record type is the lowest non-masked set bit */
+        while (bits && ((bits & 1) == 0)) {
+                bits = bits >> 1;
+                type++;
+        }
+
+        /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
+        return mdd_changelog_data_store(env, mdd, type, (int)valid,
+                                        md2mdd_obj(obj), handle);
+}
+
 /* set attr and LOV EA at once, return updated attr */
 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         const struct md_attr *ma)
@@ -1225,10 +1431,21 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0, block_count = 0;
-        int inode_pending = 0, block_pending = 0;
+        int inode_pending[MAXQUOTAS] = { 0, 0 };
+        int block_pending[MAXQUOTAS] = { 0, 0 };
 #endif
         ENTRY;
 
+        *la_copy = ma->ma_attr;
+        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
+        if (rc != 0)
+                RETURN(rc);
+
+        /* setattr on "close" only change atime, or do nothing */
+        if (ma->ma_valid == MA_INODE &&
+            ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
+                RETURN(0);
+
         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
                                     MDD_TXN_ATTR_SET_OP);
         handle = mdd_trans_start(env, mdd);
@@ -1254,13 +1471,9 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
 
-        *la_copy = ma->ma_attr;
-        rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
-        if (rc)
-                GOTO(cleanup, rc);
-
 #ifdef HAVE_QUOTA_SUPPORT
         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
+                struct obd_export *exp = md_quota(env)->mq_exp;
                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
 
                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
@@ -1269,20 +1482,18 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         mdd_quota_wrapper(la_copy, qnids);
                         mdd_quota_wrapper(la_tmp, qoids);
                         /* get file quota for new owner */
-                        lquota_chkquota(mds_quota_interface_ref, obd,
-                                        qnids[USRQUOTA], qnids[GRPQUOTA], 1,
-                                        &inode_pending, NULL, 0, NULL, 0);
+                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
+                                        qnids, inode_pending, 1, NULL, 0,
+                                        NULL, 0);
                         block_count = (la_tmp->la_blocks + 7) >> 3;
                         if (block_count) {
                                 void *data = NULL;
                                 mdd_data_get(env, mdd_obj, &data);
                                 /* get block quota for new owner */
                                 lquota_chkquota(mds_quota_interface_ref, obd,
-                                                qnids[USRQUOTA],
-                                                qnids[GRPQUOTA],
-                                                block_count, &block_pending,
-                                                NULL, LQUOTA_FLAGS_BLK,
-                                                data, 1);
+                                                exp, qnids, block_pending,
+                                                block_count, NULL,
+                                                LQUOTA_FLAGS_BLK, data, 1);
                         }
                 }
         }
@@ -1310,7 +1521,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         }
 
         if (rc == 0 && ma->ma_valid & MA_LOV) {
-                umode_t mode;
+                cfs_umode_t mode;
 
                 mode = mdd_object_type(mdd_obj);
                 if (S_ISREG(mode) || S_ISDIR(mode)) {
@@ -1323,10 +1534,18 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 }
 
         }
+        if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
+                cfs_umode_t mode;
+
+                mode = mdd_object_type(mdd_obj);
+                if (S_ISREG(mode))
+                        rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
+
+        }
 cleanup:
-        if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
-                rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
-                                              handle);
+        if (rc == 0)
+                rc = mdd_attr_set_changelog(env, obj, handle,
+                                            ma->ma_attr.la_valid);
         mdd_trans_stop(env, mdd, rc, handle);
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
@@ -1335,14 +1554,10 @@ cleanup:
         }
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc) {
-                if (inode_pending)
-                        lquota_pending_commit(mds_quota_interface_ref, obd,
-                                              qnids[USRQUOTA], qnids[GRPQUOTA],
-                                              inode_pending, 0);
-                if (block_pending)
-                        lquota_pending_commit(mds_quota_interface_ref, obd,
-                                              qnids[USRQUOTA], qnids[GRPQUOTA],
-                                              block_pending, 1);
+                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+                                      inode_pending, 0);
+                lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+                                      block_pending, 1);
                 /* Trigger dqrel/dqacq for original owner and new owner.
                  * If failed, the next call for lquota_chkquota will
                  * process it. */
@@ -1408,6 +1623,11 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                 RETURN(rc);
 
         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+            mdd->mdd_sync_permission == 1)
+                txn_param_sync(&mdd_env_info(env)->mti_param);
+
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
@@ -1415,9 +1635,8 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
 
         /* Only record user xattr changes */
-        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
-            (strncmp("user.", name, 5) == 0))
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+        if ((rc == 0) && (strncmp("user.", name, 5) == 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
         mdd_trans_stop(env, mdd, rc, handle);
 
@@ -1452,9 +1671,8 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         mdd_write_unlock(env, mdd_obj);
 
         /* Only record user xattr changes */
-        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
-            (strncmp("user.", name, 5) != 0))
-                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+        if ((rc == 0) && (strncmp("user.", name, 5) != 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
         mdd_trans_stop(env, mdd, rc, handle);
@@ -1578,10 +1796,12 @@ static int mdd_object_create(const struct lu_env *env,
         struct thandle *handle;
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdd->mdd_obd_dev;
+        struct obd_export *exp = md_quota(env)->mq_exp;
         struct mds_obd *mds = &obd->u.mds;
         unsigned int qids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0, block_count = 0;
-        int inode_pending = 0, block_pending = 0;
+        int inode_pending[MAXQUOTAS] = { 0, 0 };
+        int block_pending[MAXQUOTAS] = { 0, 0 };
 #endif
         int rc = 0;
         ENTRY;
@@ -1591,8 +1811,8 @@ static int mdd_object_create(const struct lu_env *env,
                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
                 mdd_quota_wrapper(&ma->ma_attr, qids);
                 /* get file quota for child */
-                lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
-                                qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
+                lquota_chkquota(mds_quota_interface_ref, obd, exp,
+                                qids, inode_pending, 1, NULL, 0,
                                 NULL, 0);
                 switch (ma->ma_attr.la_mode & S_IFMT) {
                 case S_IFLNK:
@@ -1605,10 +1825,9 @@ static int mdd_object_create(const struct lu_env *env,
                 }
                 /* get block quota for child */
                 if (block_count)
-                        lquota_chkquota(mds_quota_interface_ref, obd,
-                                        qids[USRQUOTA], qids[GRPQUOTA],
-                                        block_count, &block_pending, NULL,
-                                        LQUOTA_FLAGS_BLK, NULL, 0);
+                        lquota_chkquota(mds_quota_interface_ref, obd, exp,
+                                        qids, block_pending, block_count,
+                                        NULL, LQUOTA_FLAGS_BLK, NULL, 0);
         }
 #endif
 
@@ -1675,18 +1894,14 @@ unlock:
 out_pending:
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc) {
-                if (inode_pending)
-                        lquota_pending_commit(mds_quota_interface_ref, obd,
-                                              qids[USRQUOTA], qids[GRPQUOTA],
-                                              inode_pending, 0);
-                if (block_pending)
-                        lquota_pending_commit(mds_quota_interface_ref, obd,
-                                              qids[USRQUOTA], qids[GRPQUOTA],
-                                              block_pending, 1);
+                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+                                      inode_pending, 0);
+                lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+                                      block_pending, 1);
                 /* Trigger dqacq on the owner of child. If failed,
                  * the next call for lquota_chkquota will process it. */
                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
-                              FSFILT_OP_CREATE_PARTIAL_CHILD);
+                              quota_opc);
         }
 #endif
         return rc;
@@ -1751,7 +1966,7 @@ int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
                 res |= MAY_WRITE;
         if (flags & MDS_FMODE_EXEC)
-                res |= MAY_EXEC;
+                res = MAY_EXEC;
         return res;
 }
 
@@ -1857,7 +2072,8 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                      struct md_attr *ma)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct thandle    *handle;
+        struct mdd_device *mdd = mdo2mdd(obj);
+        struct thandle    *handle = NULL;
         int rc;
         int reset = 1;
 
@@ -1869,43 +2085,90 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
 #endif
         ENTRY;
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
-        if (rc)
-                RETURN(rc);
-        handle = mdd_trans_start(env, mdo2mdd(obj));
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+        if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
+                mdd_obj->mod_count--;
+
+                if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
+                        CDEBUG(D_HA, "Object "DFID" is retained in orphan "
+                               "list\n", PFID(mdd_object_fid(mdd_obj)));
+                RETURN(0);
+        }
+
+        /* check without any lock */
+        if (mdd_obj->mod_count == 1 &&
+            (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
+ again:
+                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
+                if (rc)
+                        RETURN(rc);
+                handle = mdd_trans_start(env, mdo2mdd(obj));
+                if (IS_ERR(handle))
+                        RETURN(PTR_ERR(handle));
+        }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+        if (handle == NULL && mdd_obj->mod_count == 1 &&
+            (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+                mdd_write_unlock(env, mdd_obj);
+                goto again;
+        }
+
         /* release open count */
         mdd_obj->mod_count --;
 
-        if (mdd_obj->mod_count == 0) {
+        if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
                 /* remove link to object from orphan index */
-                if (mdd_obj->mod_flags & ORPHAN_OBJ)
-                        __mdd_orphan_del(env, mdd_obj, handle);
+                rc = __mdd_orphan_del(env, mdd_obj, handle);
+                if (rc == 0) {
+                        CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+                               "list, OSS objects to be destroyed.\n",
+                               PFID(mdd_object_fid(mdd_obj)));
+                } else {
+                        CERROR("Object "DFID" can not be deleted from orphan "
+                                "list, maybe cause OST objects can not be "
+                                "destroyed (err: %d).\n",
+                                PFID(mdd_object_fid(mdd_obj)), rc);
+                        /* If object was not deleted from orphan list, do not
+                         * destroy OSS objects, which will be done when next
+                         * recovery. */
+                        GOTO(out, rc);
+                }
         }
 
         rc = mdd_iattr_get(env, mdd_obj, ma);
-        if (rc == 0) {
-                if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
-                        rc = mdd_object_kill(env, mdd_obj, ma);
+        /* Object maybe not in orphan list originally, it is rare case for
+         * mdd_finish_unlink() failure. */
+        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
 #ifdef HAVE_QUOTA_SUPPORT
-                        if (mds->mds_quota) {
-                                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
-                                mdd_quota_wrapper(&ma->ma_attr, qids);
-                        }
+                if (mds->mds_quota) {
+                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                        mdd_quota_wrapper(&ma->ma_attr, qids);
+                }
 #endif
+                /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+                if (ma->ma_valid & MA_FLAGS &&
+                    ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+                        rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+                } else {
+                        rc = mdd_object_kill(env, mdd_obj, ma);
                         if (rc == 0)
                                 reset = 0;
                 }
+
+                if (rc != 0)
+                        CERROR("Error when prepare to delete Object "DFID" , "
+                               "which will cause OST objects can not be "
+                               "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
         }
+        EXIT;
 
+out:
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         mdd_write_unlock(env, mdd_obj);
-        mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
+        if (handle != NULL)
+                mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
                 /* Trigger dqrel on the owner of child. If failed,
@@ -1913,7 +2176,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
                               quota_opc);
 #endif
-        RETURN(rc);
+        return rc;
 }
 
 /*
@@ -1935,71 +2198,6 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_append_attrs(const struct lu_env *env,
-                             struct mdd_device *mdd,
-                             __u32 attr,
-                             const struct dt_it_ops *iops,
-                             struct dt_it *it,
-                             struct lu_dirent*ent)
-{
-        struct mdd_thread_info  *info = mdd_env_info(env);
-        struct lu_fid           *fid  = &info->mti_fid2;
-        int                      len = cpu_to_le16(ent->lde_namelen);
-        const unsigned           align = sizeof(struct luda_type) - 1;
-        struct lu_fid_pack      *pack;
-        struct mdd_object       *obj;
-        struct luda_type        *lt;
-        int rc = 0;
-
-        if (attr & LUDA_FID) {
-                pack = (struct lu_fid_pack *)iops->rec(env, it);
-                if (IS_ERR(pack)) {
-                        rc = PTR_ERR(pack);
-                        ent->lde_attrs = 0;
-                        goto out;
-                }
-                rc = fid_unpack(pack, fid);
-                if (rc != 0) {
-                        ent->lde_attrs = 0;
-                        goto out;
-                }
-
-                fid_cpu_to_le(&ent->lde_fid, fid);
-                ent->lde_attrs = LUDA_FID;
-        }
-
-        /* check if file type is required */
-        if (attr & LUDA_TYPE) {
-                if (!(attr & LUDA_FID)) {
-                        CERROR("wrong attr : [%x]\n",attr);
-                        rc = -EINVAL;
-                        goto out;
-                }
-
-                obj = mdd_object_find(env, mdd, fid);
-                if (obj == NULL) /* remote object */
-                        goto out;
-
-                if (IS_ERR(obj)) {
-                        rc = PTR_ERR(obj);
-                        goto out;
-                }
-
-                if (mdd_object_exists(obj) == +1) {
-                        len = (len + align) & ~align;
-
-                        lt = (void *) ent->lde_name + len;
-                        lt->lt_type = cpu_to_le16(mdd_object_type(obj));
-
-                        ent->lde_attrs |= LUDA_TYPE;
-                }
-                mdd_object_put(env, obj);
-        }
-out:
-        ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
-        return rc;
-}
-
 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                               int first, void *area, int nob,
                               const struct dt_it_ops *iops, struct dt_it *it,
@@ -2007,8 +2205,8 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                               struct lu_dirent **last, __u32 attr)
 {
         int                     result;
+        __u64                   hash = 0;
         struct lu_dirent       *ent;
-        __u64  hash = 0;
 
         if (first) {
                 memset(area, 0, sizeof (struct lu_dirpage));
@@ -2018,7 +2216,6 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
 
         ent  = area;
         do {
-                char  *name;
                 int    len;
                 int    recsize;
 
@@ -2028,30 +2225,25 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 if (len == 0)
                         goto next;
 
-                name = (char *)iops->key(env, it);
                 hash = iops->store(env, it);
-
                 if (unlikely(first)) {
                         first = 0;
                         *start = hash;
                 }
 
+                /* calculate max space required for lu_dirent */
                 recsize = lu_dirent_calc_size(len, attr);
 
-                CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
-                                name, ent, nob, hash, len, len, len, name);
-
                 if (nob >= recsize) {
-                        ent->lde_hash    = cpu_to_le64(hash);
-                        ent->lde_namelen = cpu_to_le16(len);
-                        ent->lde_reclen  = cpu_to_le16(recsize);
-                        memcpy(ent->lde_name, name, len);
-
-                        result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
+                        result = iops->rec(env, it, ent, attr);
                         if (result == -ESTALE)
                                 goto next;
                         if (result != 0)
                                 goto out;
+
+                        /* osd might not able to pack all attributes,
+                         * so recheck rec length */
+                        recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
                         /*
                          * record doesn't fit into page, enlarge previous one.
@@ -2106,13 +2298,13 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          * iterate through directory and fill pages from @rdpg
          */
         iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, mdd_object_capa(env, obj));
+        it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
         if (IS_ERR(it))
                 return PTR_ERR(it);
 
         rc = iops->load(env, it, rdpg->rp_hash);
 
-        if (rc == 0)
+        if (rc == 0){
                 /*
                  * Iterator didn't find record with exactly the key requested.
                  *
@@ -2125,7 +2317,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
                  *     state)---position it on the next item.
                  */
                 rc = iops->next(env, it);
-        else if (rc > 0)
+        else if (rc > 0)
                 rc = 0;
 
         /*
@@ -2250,7 +2442,7 @@ static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
         LASSERT(mdd_object_exists(mdd_obj));
-        return do_version_set(env, mdd_object_child(mdd_obj), version);
+        do_version_set(env, mdd_object_child(mdd_obj), version);
 }
 
 const struct md_object_operations mdd_obj_ops = {
@@ -2268,9 +2460,12 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_close         = mdd_close,
         .moo_readpage      = mdd_readpage,
         .moo_readlink      = mdd_readlink,
+        .moo_changelog     = mdd_changelog,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
         .moo_version_get   = mdd_version_get,
         .moo_version_set   = mdd_version_set,
         .moo_path          = mdd_path,
+        .moo_file_lock     = mdd_file_lock,
+        .moo_file_unlock   = mdd_file_unlock,
 };