Whamcloud - gitweb
LU-3534 osp: move RPC pack from declare to execution phase
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 9e86b9e..0bfd37d 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -49,7 +49,6 @@
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
-#include <obd_lov.h>
 #include <lustre_idmap.h>
 #include <lustre_param.h>
 #include <lustre_mds.h>
 #include "mdd_internal.h"
 
 static const struct lu_object_operations mdd_lu_obj_ops;
-extern struct kmem_cache *mdd_object_kmem;
 
 static int mdd_xattr_get(const struct lu_env *env,
                          struct md_object *obj, struct lu_buf *buf,
                          const char *name);
 
-int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
-                 void **data)
-{
-        if (mdd_object_exists(obj) == 0) {
-                CERROR("%s: object "DFID" not found: rc = -2\n",
-                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
-                return -ENOENT;
-        }
-        mdo_data_get(env, obj, data);
-        return 0;
-}
-
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
-               struct lu_attr *la, struct lustre_capa *capa)
+              struct lu_attr *la)
 {
         if (mdd_object_exists(obj) == 0) {
                 CERROR("%s: object "DFID" not found: rc = -2\n",
@@ -85,18 +71,7 @@ int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                 return -ENOENT;
         }
 
-        return mdo_attr_get(env, obj, la, capa);
-}
-
-static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
-{
-        obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
-
-        if (flags & LUSTRE_APPEND_FL)
-                obj->mod_flags |= APPEND_OBJ;
-
-        if (flags & LUSTRE_IMMUTABLE_FL)
-                obj->mod_flags |= IMMUTE_OBJ;
+       return mdo_attr_get(env, obj, la);
 }
 
 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
@@ -109,17 +84,6 @@ struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
         return info;
 }
 
-const struct lu_name *mdd_name_get_const(const struct lu_env *env,
-                                        const void *area, ssize_t len)
-{
-       struct lu_name *lname;
-
-       lname = &mdd_env_info(env)->mti_name;
-       lname->ln_name = area;
-       lname->ln_namelen = len;
-       return lname;
-}
-
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
        struct lu_buf *buf;
@@ -142,25 +106,25 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
 }
 
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
-                                   const struct lu_object_header *hdr,
-                                   struct lu_device *d)
+                                  const struct lu_object_header *hdr,
+                                  struct lu_device *d)
 {
-        struct mdd_object *mdd_obj;
-
-       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, __GFP_IO);
-        if (mdd_obj != NULL) {
-                struct lu_object *o;
-
-                o = mdd2lu_obj(mdd_obj);
-                lu_object_init(o, NULL, d);
-                mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
-                mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
-                mdd_obj->mod_count = 0;
-                o->lo_ops = &mdd_lu_obj_ops;
-                return o;
-        } else {
-                return NULL;
-        }
+       struct mdd_object *mdd_obj;
+
+       OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, GFP_NOFS);
+       if (mdd_obj != NULL) {
+               struct lu_object *o;
+
+               o = mdd2lu_obj(mdd_obj);
+               lu_object_init(o, NULL, d);
+               mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
+               mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
+               mdd_obj->mod_count = 0;
+               o->lo_ops = &mdd_lu_obj_ops;
+               return o;
+       } else {
+               return NULL;
+       }
 }
 
 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
@@ -191,9 +155,7 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
                struct mdd_object *mdd_obj = lu2mdd_obj(o);
                struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
 
-               rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, attr->la_flags);
+               rc = mdd_la_get(env, mdd_obj, attr);
        }
 
        return rc;
@@ -242,8 +204,7 @@ int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
 
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                       mdd_object_capa(env, md2mdd_obj(obj)));
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
        if ((ma->ma_need & MA_INODE) != 0 && mdd_is_dead_obj(mdd_obj))
                ma->ma_attr.la_nlink = 0;
 
@@ -275,8 +236,7 @@ static int mdd_xattr_get(const struct lu_env *env,
                RETURN(-ENOENT);
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_get(env, mdd_obj, buf, name,
-                           mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_get(env, mdd_obj, buf, name);
         mdd_read_unlock(env, mdd_obj);
 
         RETURN(rc);
@@ -286,8 +246,8 @@ static int mdd_xattr_get(const struct lu_env *env,
  * Permission check is done when open,
  * no need check again.
  */
-static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
-                        struct lu_buf *buf)
+int mdd_readlink(const struct lu_env *env, struct md_object *obj,
+                struct lu_buf *buf)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct dt_object  *next;
@@ -302,9 +262,11 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
         }
 
         next = mdd_object_child(mdd_obj);
-        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
-                                         mdd_object_capa(env, mdd_obj));
+       LASSERT(next != NULL);
+       LASSERT(next->do_body_ops != NULL);
+       LASSERT(next->do_body_ops->dbo_read != NULL);
+       mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
+       rc = dt_read(env, next, buf, &pos);
         mdd_read_unlock(env, mdd_obj);
         RETURN(rc);
 }
@@ -321,10 +283,38 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         ENTRY;
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_list(env, mdd_obj, buf);
         mdd_read_unlock(env, mdd_obj);
 
-        RETURN(rc);
+       if (rc < 0)
+               RETURN(rc);
+
+       /*
+        * Filter out XATTR_NAME_LINK if this is an orphan object.  See
+        * mdd_xattr_get().
+        */
+       if (unlikely(mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ))) {
+               char   *end = (char *)buf->lb_buf + rc;
+               char   *p = buf->lb_buf;
+
+               while (p < end) {
+                       char   *next = p + strlen(p) + 1;
+
+                       if (strcmp(p, XATTR_NAME_LINK) == 0) {
+                               if (end - next > 0)
+                                       memmove(p, next, end - next);
+                               rc -= next - p;
+                               CDEBUG(D_INFO, "Filtered out "XATTR_NAME_LINK
+                                      " of orphan "DFID"\n",
+                                      PFID(mdd_object_fid(mdd_obj)));
+                               break;
+                       }
+
+                       p = next;
+               }
+       }
+
+       RETURN(rc);
 }
 
 int mdd_declare_object_create_internal(const struct lu_env *env,
@@ -332,10 +322,10 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
                                       struct mdd_object *c,
                                       struct lu_attr *attr,
                                       struct thandle *handle,
-                                      const struct md_op_spec *spec)
+                                      const struct md_op_spec *spec,
+                                      struct dt_allocation_hint *hint)
 {
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
@@ -364,10 +354,10 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
 
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                               struct mdd_object *c, struct lu_attr *attr,
-                               struct thandle *handle,
-                               const struct md_op_spec *spec)
+                              struct thandle *handle,
+                              const struct md_op_spec *spec,
+                              struct dt_allocation_hint *hint)
 {
-        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
         int rc;
         ENTRY;
@@ -376,8 +366,6 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
 
        rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-
        RETURN(rc);
 }
 
@@ -388,7 +376,7 @@ int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
         int rc;
         ENTRY;
 
-        rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
+       rc = mdo_attr_set(env, obj, attr, handle);
 #ifdef CONFIG_FS_POSIX_ACL
         if (!rc && (attr->la_valid & LA_MODE) && needacl)
                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
@@ -477,7 +465,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
        /* Check if flags change. */
        if (la->la_valid & LA_FLAGS) {
-               unsigned int oldflags = 0;
+               unsigned int oldflags = oattr->la_flags &
+                               (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
                unsigned int newflags = la->la_flags &
                                (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
@@ -485,12 +474,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                    !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
-               /* XXX: the IMMUTABLE and APPEND_ONLY flags can
+               /* The IMMUTABLE and APPEND_ONLY flags can
                 * only be changed by the relevant capability. */
-               if (mdd_is_immutable(obj))
-                       oldflags |= LUSTRE_IMMUTABLE_FL;
-               if (mdd_is_append(obj))
-                       oldflags |= LUSTRE_APPEND_FL;
                if ((oldflags ^ newflags) &&
                    !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
                        RETURN(-EPERM);
@@ -499,7 +484,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                        la->la_flags &= ~LUSTRE_DIRSYNC_FL;
        }
 
-       if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
+       if (oattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL) &&
            (la->la_valid & ~LA_FLAGS) &&
            !(flags & MDS_PERM_BYPASS))
                RETURN(-EPERM);
@@ -660,6 +645,7 @@ int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
                             enum changelog_rec_type type, int flags,
                             struct mdd_object *mdd_obj, struct thandle *handle)
 {
+       const struct lu_ucred           *uc = lu_ucred(env);
        const struct lu_fid             *tfid;
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
@@ -685,25 +671,32 @@ int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
                 RETURN(0);
         }
 
-       reclen = llog_data_len(sizeof(*rec));
+       flags = (flags & CLF_FLAGMASK) | CLF_VERSION;
+       if (uc != NULL && uc->uc_jobid[0] != '\0')
+               flags |= CLF_JOBID;
+
+       reclen = llog_data_len(changelog_rec_offset(flags & CLF_SUPPORTED));
        buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
 
-        rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
-        rec->cr.cr_type = (__u32)type;
-        rec->cr.cr_tfid = *tfid;
-        rec->cr.cr_namelen = 0;
-        mdd_obj->mod_cltime = cfs_time_current_64();
+       rec->cr.cr_flags = flags;
+       rec->cr.cr_type = (__u32)type;
+       rec->cr.cr_tfid = *tfid;
+       rec->cr.cr_namelen = 0;
+       mdd_obj->mod_cltime = cfs_time_current_64();
+
+       if (flags & CLF_JOBID)
+               mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
 
        rc = mdd_changelog_store(env, mdd, rec, handle);
 
        RETURN(rc);
 }
 
-int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
-                  int flags, struct md_object *obj)
+static int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
+                        int flags, struct md_object *obj)
 {
         struct thandle *handle;
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
@@ -715,9 +708,9 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
         if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+       if (rc)
+               GOTO(stop, rc);
 
         rc = mdd_trans_start(env, mdd, handle);
         if (rc)
@@ -750,19 +743,19 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
         struct mdd_device *mdd = mdo2mdd(obj);
         int bits, type = 0;
 
-        bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
+       bits =  (valid & LA_SIZE)  ? 1 << CL_TRUNC : 0;
+       bits |= (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
         bits = bits & mdd->mdd_cl.mc_mask;
+       /* This is an implementation limit rather than a protocol limit */
+       CLASSERT(CL_LAST <= sizeof(int) * 8);
         if (bits == 0)
                 return 0;
 
         /* The record type is the lowest non-masked set bit */
-        while (bits && ((bits & 1) == 0)) {
-                bits = bits >> 1;
-                type++;
-        }
+       type = __ffs(bits);
 
         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
         return mdd_changelog_data_store(env, mdd, type, (int)valid,
@@ -785,7 +778,7 @@ static int mdd_declare_attr_set(const struct lu_env *env,
        if (attr->la_valid & LA_MODE) {
                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
                rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
-                                  XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
+                                  XATTR_NAME_ACL_ACCESS);
                 mdd_read_unlock(env, obj);
                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
                         rc = 0;
@@ -803,7 +796,7 @@ static int mdd_declare_attr_set(const struct lu_env *env,
         }
 #endif
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
        return rc;
 }
 
@@ -856,7 +849,7 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
        LASSERT((ma->ma_valid & MA_HSM) == 0);
        LASSERT((ma->ma_valid & MA_SOM) == 0);
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
@@ -889,13 +882,10 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                       la->la_mtime, la->la_ctime);
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       if (la_copy->la_valid & LA_FLAGS) {
+       if (la_copy->la_valid & LA_FLAGS)
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, la_copy->la_flags);
-       } else if (la_copy->la_valid) { /* setattr */
+       else if (la_copy->la_valid) /* setattr */
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-       }
        mdd_write_unlock(env, mdd_obj);
 
        if (rc == 0)
@@ -915,7 +905,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
        struct lu_ucred *uc     = lu_ucred_assert(env);
        ENTRY;
 
-       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+       if (attr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
                RETURN(-EPERM);
 
        if ((uc->uc_fsuid != attr->la_uid) && !md_capable(uc, CFS_CAP_FOWNER))
@@ -941,19 +931,19 @@ static int mdd_declare_xattr_set(const struct lu_env *env,
        if (strncmp(XATTR_USER_PREFIX, name,
                    sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
            strcmp(XATTR_NAME_LOV, name) == 0) {
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
                if (rc)
                        return rc;
        }
 
        /* If HSM data is modified, this could add a changelog */
        if (strcmp(XATTR_NAME_HSM, name) == 0) {
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
                if (rc)
                        return rc;
        }
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
        return rc;
 }
 
@@ -988,8 +978,7 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
        CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
        current_buf = mdd_buf_get(env, info->mti_xattr_buf,
                                  sizeof(info->mti_xattr_buf));
-       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
-                          mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
        rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
        if (rc < 0 && rc != -ENODATA)
                GOTO(free, rc);
@@ -1020,6 +1009,9 @@ free:
        return(rc);
 }
 
+static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
+                        const char *name);
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1035,18 +1027,33 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        int                      rc;
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
-       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
-               rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
+       rc = mdd_xattr_sanity_check(env, mdd_obj, attr);
+       if (rc)
                RETURN(rc);
+
+       if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
+           strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
+               struct posix_acl *acl;
+
+               /* user may set empty ACL, which should be treated as removing
+                * ACL. */
+               acl = posix_acl_from_xattr(&init_user_ns, buf->lb_buf,
+                                          buf->lb_len);
+               if (acl == NULL) {
+                       rc = mdd_xattr_del(env, obj, name);
+                       RETURN(rc);
+               }
+               posix_acl_release(acl);
        }
 
-       rc = mdd_xattr_sanity_check(env, mdd_obj, attr);
-       if (rc)
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
+               rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
                RETURN(rc);
+       }
 
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
@@ -1070,8 +1077,7 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                }
        }
 
-       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
-                          mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle);
        mdd_write_unlock(env, mdd_obj);
        if (rc)
                GOTO(stop, rc);
@@ -1109,7 +1115,7 @@ static int mdd_declare_xattr_del(const struct lu_env *env,
        /* Only record user xattr changes */
        if ((strncmp(XATTR_USER_PREFIX, name,
                     sizeof(XATTR_USER_PREFIX) - 1) == 0))
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
 
        return rc;
 }
@@ -1118,8 +1124,8 @@ static int mdd_declare_xattr_del(const struct lu_env *env,
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
  */
-int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
-                  const char *name)
+static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
+                        const char *name)
 {
        struct mdd_object *mdd_obj = md2mdd_obj(obj);
        struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
@@ -1128,7 +1134,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
        int  rc;
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
@@ -1149,8 +1155,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
                 GOTO(stop, rc);
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_del(env, mdd_obj, name, handle,
-                           mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_del(env, mdd_obj, name, handle);
         mdd_write_unlock(env, mdd_obj);
        if (rc)
                GOTO(stop, rc);
@@ -1175,17 +1180,15 @@ stop:
  * read lov EA of an object
  * return the lov EA in an allocated lu_buf
  */
-static int mdd_get_lov_ea(const struct lu_env *env,
-                         struct mdd_object *obj,
-                         struct lu_buf *lmm_buf)
+int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf)
 {
        struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       int              rc, sz;
+       int              rc, bufsize;
        ENTRY;
 
 repeat:
-       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
-                          mdd_object_capa(env, obj));
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
 
        if (rc == -ERANGE) {
                /* mti_big_buf is allocated but is too small
@@ -1198,27 +1201,27 @@ repeat:
        }
 
        if (rc < 0)
-               GOTO(out, rc);
+               RETURN(rc);
 
        if (rc == 0)
-               GOTO(out, rc = -ENODATA);
+               RETURN(-ENODATA);
 
-       sz = rc;
+       bufsize = rc;
        if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
                /* mti_big_buf was not allocated, so we have to
                 * allocate it based on the ea size */
                buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
-                                            sz);
+                                            bufsize);
                if (buf->lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
                goto repeat;
        }
 
-       lu_buf_alloc(lmm_buf, sz);
+       lu_buf_alloc(lmm_buf, bufsize);
        if (lmm_buf->lb_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
        rc = 0;
        EXIT;
 
@@ -1239,7 +1242,7 @@ static int mdd_xattr_hsm_replace(const struct lu_env *env,
        ENTRY;
 
        rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
-                          handle, mdd_object_capa(env, o));
+                          handle);
        if (rc != 0)
                RETURN(rc);
 
@@ -1334,11 +1337,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        if (rc < 0)
                swap(fst_o, snd_o);
 
-       rc = mdd_la_get(env, fst_o, fst_la, BYPASS_CAPA);
+       rc = mdd_la_get(env, fst_o, fst_la);
        if (rc != 0)
                RETURN(rc);
 
-       rc = mdd_la_get(env, snd_o, snd_la, BYPASS_CAPA);
+       rc = mdd_la_get(env, snd_o, snd_la);
        if (rc != 0)
                RETURN(rc);
 
@@ -1420,13 +1423,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                        GOTO(stop, rc = -ENOMEM);
 
                /* Read HSM attribute */
-               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM,
-                                  BYPASS_CAPA);
+               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM);
                if (rc < 0)
                        GOTO(stop, rc);
 
-               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM,
-                                  BYPASS_CAPA);
+               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM);
                if (rc < 0)
                        GOTO(stop, rc);
 
@@ -1480,8 +1481,7 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                }
        }
 
-       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle,
-                          mdd_object_capa(env, fst_o));
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle);
        if (rc != 0)
                GOTO(stop, rc);
 
@@ -1490,11 +1490,9 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        } else {
                if (fst_buf->lb_buf != NULL)
                        rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
-                                          LU_XATTR_REPLACE, handle,
-                                          mdd_object_capa(env, snd_o));
+                                          LU_XATTR_REPLACE, handle);
                else
-                       rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle,
-                                          mdd_object_capa(env, snd_o));
+                       rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle);
        }
 
        if (rc != 0) {
@@ -1506,11 +1504,9 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                        fst_lmm->lmm_oi = *saved_oi;
                        fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1);
                        rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
-                                           LU_XATTR_REPLACE, handle,
-                                           mdd_object_capa(env, fst_o));
+                                           LU_XATTR_REPLACE, handle);
                } else {
-                       rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle,
-                                           mdd_object_capa(env, fst_o));
+                       rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle);
                }
                if (rc2 < 0)
                        goto do_lbug;
@@ -1564,9 +1560,9 @@ stop:
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                          struct mdd_object *child, const struct lu_attr *attr,
-                         const struct md_op_spec *spec)
+                         const struct md_op_spec *spec,
+                         struct dt_allocation_hint *hint)
 {
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
        struct dt_object *np = parent ?  mdd_object_child(parent) : NULL;
        struct dt_object *nc = mdd_object_child(child);
 
@@ -1583,7 +1579,7 @@ void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                hint->dah_eadata_len = 0;
        }
 
-       CDEBUG(D_INFO, DFID" eadata %p, len %d\n", PFID(mdd_object_fid(child)),
+       CDEBUG(D_INFO, DFID" eadata %p len %d\n", PFID(mdd_object_fid(child)),
               hint->dah_eadata, hint->dah_eadata_len);
        /* @hint will be initialized by underlying device. */
        nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
@@ -1647,7 +1643,7 @@ static int mdd_open_sanity_check(const struct lu_env *env,
                flag &= ~MDS_OPEN_TRUNC;
 
        /* For writing append-only file must open it with append mode. */
-       if (mdd_is_append(obj)) {
+       if (attr->la_flags & LUSTRE_APPEND_FL) {
                if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
                        RETURN(-EPERM);
                if (flag & MDS_OPEN_TRUNC)
@@ -1681,37 +1677,21 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
-       if (rc)
-               RETURN(rc);
+       rc = mdd_la_get(env, mdd_obj, attr);
+       if (rc != 0)
+               GOTO(out, rc);
 
        rc = mdd_open_sanity_check(env, mdd_obj, attr, flags);
-       if (rc == 0)
-               mdd_obj->mod_count++;
+       if (rc != 0)
+               GOTO(out, rc);
 
+       mdd_obj->mod_count++;
+       EXIT;
+out:
        mdd_write_unlock(env, mdd_obj);
        return rc;
 }
 
-int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                            struct md_attr *ma, struct thandle *handle)
-{
-        return mdo_declare_destroy(env, obj, handle);
-}
-
-/* return md_attr back,
- * if it is last unlink then return lov ea + llog cookie*/
-int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma, struct thandle *handle)
-{
-       int rc;
-        ENTRY;
-
-       rc = mdo_destroy(env, obj, handle);
-
-        RETURN(rc);
-}
-
 static int mdd_declare_close(const struct lu_env *env,
                              struct mdd_object *obj,
                              struct md_attr *ma,
@@ -1765,9 +1745,9 @@ again:
                 if (rc)
                         GOTO(stop, rc);
 
-                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-                if (rc)
-                        GOTO(stop, rc);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+               if (rc)
+                       GOTO(stop, rc);
 
                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
                 if (rc)
@@ -1775,8 +1755,7 @@ again:
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                       mdd_object_capa(env, mdd_obj));
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
        if (rc != 0) {
                CERROR("Failed to get lu_attr of "DFID": %d\n",
                       PFID(mdd_object_fid(mdd_obj)), rc);
@@ -1841,10 +1820,10 @@ out:
                         if (IS_ERR(handle))
                                GOTO(stop, rc = PTR_ERR(handle));
 
-                        rc = mdd_declare_changelog_store(env, mdd, NULL,
-                                                         handle);
-                        if (rc)
-                                GOTO(stop, rc);
+                       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL,
+                                                        handle);
+                       if (rc)
+                               GOTO(stop, rc);
 
                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
                         if (rc)
@@ -1882,7 +1861,7 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
-                             int nob, const struct dt_it_ops *iops,
+                             size_t nob, const struct dt_it_ops *iops,
                              struct dt_it *it, __u32 attr, void *arg)
 {
        struct lu_dirpage       *dp = &lp->lp_dir;
@@ -1891,8 +1870,12 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
        __u64                    hash = 0;
        struct lu_dirent        *ent;
        struct lu_dirent        *last = NULL;
+       struct lu_fid            fid;
        int                      first = 1;
 
+       if (nob < sizeof(*dp))
+               return -EINVAL;
+
         memset(area, 0, sizeof (*dp));
         area += sizeof (*dp);
         nob  -= sizeof (*dp);
@@ -1900,7 +1883,7 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
         ent  = area;
         do {
                 int    len;
-                int    recsize;
+               size_t recsize;
 
                 len = iops->key_size(env, it);
 
@@ -1927,6 +1910,12 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
                         /* osd might not able to pack all attributes,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
+
+                       if (le32_to_cpu(ent->lde_attrs) & LUDA_FID) {
+                               fid_le_to_cpu(&fid, &ent->lde_fid);
+                               if (fid_is_dot_lustre(&fid))
+                                       goto next;
+                       }
                 } else {
                         result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
@@ -2011,6 +2000,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                         * No pages were processed, mark this for first page
                         * and send back.
                         */
+                       dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
                        dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                        rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
                }
@@ -2025,27 +2015,40 @@ out_unlock:
 
 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+       struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
-        if (mdd_object_exists(mdd_obj) == 0) {
-                CERROR("%s: object "DFID" not found: rc = -2\n",
-                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
-                return -ENOENT;
-        }
-        return dt_object_sync(env, mdd_object_child(mdd_obj));
+       if (mdd_object_exists(mdd_obj) == 0) {
+               int rc = -ENOENT;
+
+               CERROR("%s: object "DFID" not found: rc = %d\n",
+                      mdd_obj_dev_name(mdd_obj),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               return rc;
+       }
+       return dt_object_sync(env, mdd_object_child(mdd_obj),
+                             0, OBD_OBJECT_EOF);
 }
 
 static int mdd_object_lock(const struct lu_env *env,
                           struct md_object *obj,
                           struct lustre_handle *lh,
                           struct ldlm_enqueue_info *einfo,
-                          void *policy)
+                          ldlm_policy_data_t *policy)
 {
        struct mdd_object *mdd_obj = md2mdd_obj(obj);
        return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
                              einfo, policy);
 }
 
+static int mdd_object_unlock(const struct lu_env *env,
+                            struct md_object *obj,
+                            struct ldlm_enqueue_info *einfo,
+                            ldlm_policy_data_t *policy)
+{
+       struct mdd_object *mdd_obj = md2mdd_obj(obj);
+       return dt_object_unlock(env, mdd_object_child(mdd_obj), einfo, policy);
+}
+
 const struct md_object_operations mdd_obj_ops = {
        .moo_permission         = mdd_permission,
        .moo_attr_get           = mdd_attr_get,
@@ -2060,7 +2063,7 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_readpage           = mdd_readpage,
        .moo_readlink           = mdd_readlink,
        .moo_changelog          = mdd_changelog,
-       .moo_capa_get           = mdd_capa_get,
        .moo_object_sync        = mdd_object_sync,
        .moo_object_lock        = mdd_object_lock,
+       .moo_object_unlock      = mdd_object_unlock,
 };