Whamcloud - gitweb
LU-6698 kernel: kernel update RHEL 6.6 [2.6.32-504.23.4.el6]
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 462b2b5..b6c8126 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -49,7 +49,6 @@
 #include <lprocfs_status.h>
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
-#include <obd_lov.h>
 #include <lustre_idmap.h>
 #include <lustre_param.h>
 #include <lustre_mds.h>
 #include "mdd_internal.h"
 
 static const struct lu_object_operations mdd_lu_obj_ops;
-extern struct kmem_cache *mdd_object_kmem;
 
 static int mdd_xattr_get(const struct lu_env *env,
                          struct md_object *obj, struct lu_buf *buf,
                          const char *name);
 
-int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
-                 void **data)
-{
-        if (mdd_object_exists(obj) == 0) {
-                CERROR("%s: object "DFID" not found: rc = -2\n",
-                       mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
-                return -ENOENT;
-        }
-        mdo_data_get(env, obj, data);
-        return 0;
-}
-
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
-               struct lu_attr *la, struct lustre_capa *capa)
+              struct lu_attr *la)
 {
         if (mdd_object_exists(obj) == 0) {
                 CERROR("%s: object "DFID" not found: rc = -2\n",
@@ -85,18 +71,7 @@ int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                 return -ENOENT;
         }
 
-        return mdo_attr_get(env, obj, la, capa);
-}
-
-void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
-{
-        obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
-
-        if (flags & LUSTRE_APPEND_FL)
-                obj->mod_flags |= APPEND_OBJ;
-
-        if (flags & LUSTRE_IMMUTABLE_FL)
-                obj->mod_flags |= IMMUTE_OBJ;
+       return mdo_attr_get(env, obj, la);
 }
 
 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
@@ -109,17 +84,6 @@ struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
         return info;
 }
 
-const struct lu_name *mdd_name_get_const(const struct lu_env *env,
-                                        const void *area, ssize_t len)
-{
-       struct lu_name *lname;
-
-       lname = &mdd_env_info(env)->mti_name;
-       lname->ln_name = area;
-       lname->ln_namelen = len;
-       return lname;
-}
-
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
        struct lu_buf *buf;
@@ -191,9 +155,7 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
                struct mdd_object *mdd_obj = lu2mdd_obj(o);
                struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
 
-               rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, attr->la_flags);
+               rc = mdd_la_get(env, mdd_obj, attr);
        }
 
        return rc;
@@ -242,8 +204,7 @@ int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
 
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                       mdd_object_capa(env, md2mdd_obj(obj)));
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
        if ((ma->ma_need & MA_INODE) != 0 && mdd_is_dead_obj(mdd_obj))
                ma->ma_attr.la_nlink = 0;
 
@@ -275,8 +236,7 @@ static int mdd_xattr_get(const struct lu_env *env,
                RETURN(-ENOENT);
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_get(env, mdd_obj, buf, name,
-                           mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_get(env, mdd_obj, buf, name);
         mdd_read_unlock(env, mdd_obj);
 
         RETURN(rc);
@@ -306,8 +266,7 @@ int mdd_readlink(const struct lu_env *env, struct md_object *obj,
        LASSERT(next->do_body_ops != NULL);
        LASSERT(next->do_body_ops->dbo_read != NULL);
        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
-                                         mdd_object_capa(env, mdd_obj));
+       rc = dt_read(env, next, buf, &pos);
         mdd_read_unlock(env, mdd_obj);
         RETURN(rc);
 }
@@ -324,10 +283,38 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
         ENTRY;
 
         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_list(env, mdd_obj, buf);
         mdd_read_unlock(env, mdd_obj);
 
-        RETURN(rc);
+       if (rc < 0)
+               RETURN(rc);
+
+       /*
+        * Filter out XATTR_NAME_LINK if this is an orphan object.  See
+        * mdd_xattr_get().
+        */
+       if (unlikely(mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ))) {
+               char   *end = (char *)buf->lb_buf + rc;
+               char   *p = buf->lb_buf;
+
+               while (p < end) {
+                       char   *next = p + strlen(p) + 1;
+
+                       if (strcmp(p, XATTR_NAME_LINK) == 0) {
+                               if (end - next > 0)
+                                       memmove(p, next, end - next);
+                               rc -= next - p;
+                               CDEBUG(D_INFO, "Filtered out "XATTR_NAME_LINK
+                                      " of orphan "DFID"\n",
+                                      PFID(mdd_object_fid(mdd_obj)));
+                               break;
+                       }
+
+                       p = next;
+               }
+       }
+
+       RETURN(rc);
 }
 
 int mdd_declare_object_create_internal(const struct lu_env *env,
@@ -379,8 +366,6 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
 
        rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-
        RETURN(rc);
 }
 
@@ -391,7 +376,7 @@ int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
         int rc;
         ENTRY;
 
-        rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
+       rc = mdo_attr_set(env, obj, attr, handle);
 #ifdef CONFIG_FS_POSIX_ACL
         if (!rc && (attr->la_valid & LA_MODE) && needacl)
                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
@@ -452,8 +437,6 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
        LASSERT(oattr != NULL);
 
-       /* export destroy does not have ->le_ses, but we may want
-        * to drop LUSTRE_SOM_FL. */
        uc = lu_ucred_check(env);
        if (uc == NULL)
                RETURN(0);
@@ -480,7 +463,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
        /* Check if flags change. */
        if (la->la_valid & LA_FLAGS) {
-               unsigned int oldflags = 0;
+               unsigned int oldflags = oattr->la_flags &
+                               (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
                unsigned int newflags = la->la_flags &
                                (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
@@ -488,12 +472,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                    !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
-               /* XXX: the IMMUTABLE and APPEND_ONLY flags can
+               /* The IMMUTABLE and APPEND_ONLY flags can
                 * only be changed by the relevant capability. */
-               if (mdd_is_immutable(obj))
-                       oldflags |= LUSTRE_IMMUTABLE_FL;
-               if (mdd_is_append(obj))
-                       oldflags |= LUSTRE_APPEND_FL;
                if ((oldflags ^ newflags) &&
                    !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
                        RETURN(-EPERM);
@@ -502,7 +482,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                        la->la_flags &= ~LUSTRE_DIRSYNC_FL;
        }
 
-       if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
+       if (oattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL) &&
            (la->la_valid & ~LA_FLAGS) &&
            !(flags & MDS_PERM_BYPASS))
                RETURN(-EPERM);
@@ -612,44 +592,24 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                }
        }
 
-       /* For both Size-on-MDS case and truncate case,
-        * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
-        * We distinguish them by "flags & MDS_SOM".
-        * For SOM case, it is true, the MAY_WRITE perm has been checked
-        * when open, no need check again. For truncate case, it is false,
-        * the MAY_WRITE perm should be checked here. */
-       if (flags & MDS_SOM) {
-               /* For the "Size-on-MDS" setattr update, merge coming
-                * attributes with the set in the inode. BUG 10641 */
-               if ((la->la_valid & LA_ATIME) &&
-                   (la->la_atime <= oattr->la_atime))
-                       la->la_valid &= ~LA_ATIME;
-
-               /* OST attributes do not have a priority over MDS attributes,
-                * so drop times if ctime is equal. */
-               if ((la->la_valid & LA_CTIME) &&
-                   (la->la_ctime <= oattr->la_ctime))
-                       la->la_valid &= ~(LA_MTIME | LA_CTIME);
-       } else {
-               if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
-                       if (!((flags & MDS_OWNEROVERRIDE) &&
-                             (uc->uc_fsuid == oattr->la_uid)) &&
-                           !(flags & MDS_PERM_BYPASS)) {
-                               rc = mdd_permission_internal(env, obj,
-                                                            oattr, MAY_WRITE);
-                               if (rc != 0)
-                                       RETURN(rc);
-                       }
-               }
-               if (la->la_valid & LA_CTIME) {
-                       /* The pure setattr, it has the priority over what is
-                        * already set, do not drop it if ctime is equal. */
-                       if (la->la_ctime < oattr->la_ctime)
-                               la->la_valid &= ~(LA_ATIME | LA_MTIME |
-                                                       LA_CTIME);
+       if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
+               if (!((flags & MDS_OWNEROVERRIDE) &&
+                     (uc->uc_fsuid == oattr->la_uid)) &&
+                   !(flags & MDS_PERM_BYPASS)) {
+                       rc = mdd_permission_internal(env, obj, oattr,
+                                                    MAY_WRITE);
+                       if (rc != 0)
+                               RETURN(rc);
                }
        }
 
+       if (la->la_valid & LA_CTIME) {
+               /* The pure setattr, it has the priority over what is
+                * already set, do not drop it if ctime is equal. */
+               if (la->la_ctime < oattr->la_ctime)
+                       la->la_valid &= ~(LA_ATIME | LA_MTIME | LA_CTIME);
+       }
+
        RETURN(0);
 }
 
@@ -657,12 +617,13 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
  * If this fails, we must fail the whole transaction; we don't
  * want the change to commit without the log entry.
  * \param mdd_obj - mdd_object of change
- * \param handle - transacion handle
+ * \param handle - transaction handle
  */
 int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
                             enum changelog_rec_type type, int flags,
                             struct mdd_object *mdd_obj, struct thandle *handle)
 {
+       const struct lu_ucred           *uc = lu_ucred(env);
        const struct lu_fid             *tfid;
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
@@ -688,25 +649,32 @@ int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
                 RETURN(0);
         }
 
-       reclen = llog_data_len(sizeof(*rec));
+       flags = (flags & CLF_FLAGMASK) | CLF_VERSION;
+       if (uc != NULL && uc->uc_jobid[0] != '\0')
+               flags |= CLF_JOBID;
+
+       reclen = llog_data_len(changelog_rec_offset(flags & CLF_SUPPORTED));
        buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
 
-        rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
-        rec->cr.cr_type = (__u32)type;
-        rec->cr.cr_tfid = *tfid;
-        rec->cr.cr_namelen = 0;
-        mdd_obj->mod_cltime = cfs_time_current_64();
+       rec->cr.cr_flags = flags;
+       rec->cr.cr_type = (__u32)type;
+       rec->cr.cr_tfid = *tfid;
+       rec->cr.cr_namelen = 0;
+       mdd_obj->mod_cltime = cfs_time_current_64();
+
+       if (flags & CLF_JOBID)
+               mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
 
        rc = mdd_changelog_store(env, mdd, rec, handle);
 
        RETURN(rc);
 }
 
-int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
-                  int flags, struct md_object *obj)
+static int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
+                        int flags, struct md_object *obj)
 {
         struct thandle *handle;
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
@@ -718,9 +686,9 @@ int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
         if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+       if (rc)
+               GOTO(stop, rc);
 
         rc = mdd_trans_start(env, mdd, handle);
         if (rc)
@@ -753,19 +721,19 @@ static int mdd_attr_set_changelog(const struct lu_env *env,
         struct mdd_device *mdd = mdo2mdd(obj);
         int bits, type = 0;
 
-        bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
+       bits =  (valid & LA_SIZE)  ? 1 << CL_TRUNC : 0;
+       bits |= (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
         bits = bits & mdd->mdd_cl.mc_mask;
+       /* This is an implementation limit rather than a protocol limit */
+       CLASSERT(CL_LAST <= sizeof(int) * 8);
         if (bits == 0)
                 return 0;
 
         /* The record type is the lowest non-masked set bit */
-        while (bits && ((bits & 1) == 0)) {
-                bits = bits >> 1;
-                type++;
-        }
+       type = __ffs(bits);
 
         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
         return mdd_changelog_data_store(env, mdd, type, (int)valid,
@@ -788,7 +756,7 @@ static int mdd_declare_attr_set(const struct lu_env *env,
        if (attr->la_valid & LA_MODE) {
                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
                rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
-                                  XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
+                                  XATTR_NAME_ACL_ACCESS);
                 mdd_read_unlock(env, obj);
                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
                         rc = 0;
@@ -806,7 +774,7 @@ static int mdd_declare_attr_set(const struct lu_env *env,
         }
 #endif
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
        return rc;
 }
 
@@ -854,12 +822,11 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
        int rc;
        ENTRY;
 
-       /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
+       /* we do not use ->attr_set() for LOV/HSM EA any more */
        LASSERT((ma->ma_valid & MA_LOV) == 0);
        LASSERT((ma->ma_valid & MA_HSM) == 0);
-       LASSERT((ma->ma_valid & MA_SOM) == 0);
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
@@ -892,13 +859,10 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                       la->la_mtime, la->la_ctime);
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       if (la_copy->la_valid & LA_FLAGS) {
+       if (la_copy->la_valid & LA_FLAGS)
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, la_copy->la_flags);
-       } else if (la_copy->la_valid) { /* setattr */
+       else if (la_copy->la_valid) /* setattr */
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-       }
        mdd_write_unlock(env, mdd_obj);
 
        if (rc == 0)
@@ -918,7 +882,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
        struct lu_ucred *uc     = lu_ucred_assert(env);
        ENTRY;
 
-       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+       if (attr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
                RETURN(-EPERM);
 
        if ((uc->uc_fsuid != attr->la_uid) && !md_capable(uc, CFS_CAP_FOWNER))
@@ -944,19 +908,19 @@ static int mdd_declare_xattr_set(const struct lu_env *env,
        if (strncmp(XATTR_USER_PREFIX, name,
                    sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
            strcmp(XATTR_NAME_LOV, name) == 0) {
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
                if (rc)
                        return rc;
        }
 
        /* If HSM data is modified, this could add a changelog */
        if (strcmp(XATTR_NAME_HSM, name) == 0) {
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
                if (rc)
                        return rc;
        }
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
        return rc;
 }
 
@@ -991,8 +955,7 @@ static int mdd_hsm_update_locked(const struct lu_env *env,
        CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
        current_buf = mdd_buf_get(env, info->mti_xattr_buf,
                                  sizeof(info->mti_xattr_buf));
-       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
-                          mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
        rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
        if (rc < 0 && rc != -ENODATA)
                GOTO(free, rc);
@@ -1023,6 +986,9 @@ free:
        return(rc);
 }
 
+static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
+                        const char *name);
+
 /**
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
@@ -1038,18 +1004,33 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        int                      rc;
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
-       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
-               rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
+       rc = mdd_xattr_sanity_check(env, mdd_obj, attr);
+       if (rc)
                RETURN(rc);
+
+       if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
+           strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
+               struct posix_acl *acl;
+
+               /* user may set empty ACL, which should be treated as removing
+                * ACL. */
+               acl = posix_acl_from_xattr(&init_user_ns, buf->lb_buf,
+                                          buf->lb_len);
+               if (acl == NULL) {
+                       rc = mdd_xattr_del(env, obj, name);
+                       RETURN(rc);
+               }
+               posix_acl_release(acl);
        }
 
-       rc = mdd_xattr_sanity_check(env, mdd_obj, attr);
-       if (rc)
+       if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
+               rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
                RETURN(rc);
+       }
 
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
@@ -1073,8 +1054,7 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                }
        }
 
-       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
-                          mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle);
        mdd_write_unlock(env, mdd_obj);
        if (rc)
                GOTO(stop, rc);
@@ -1112,7 +1092,7 @@ static int mdd_declare_xattr_del(const struct lu_env *env,
        /* Only record user xattr changes */
        if ((strncmp(XATTR_USER_PREFIX, name,
                     sizeof(XATTR_USER_PREFIX) - 1) == 0))
-               rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
 
        return rc;
 }
@@ -1121,8 +1101,8 @@ static int mdd_declare_xattr_del(const struct lu_env *env,
  * The caller should guarantee to update the object ctime
  * after xattr_set if needed.
  */
-int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
-                  const char *name)
+static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
+                        const char *name)
 {
        struct mdd_object *mdd_obj = md2mdd_obj(obj);
        struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
@@ -1131,7 +1111,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
        int  rc;
        ENTRY;
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
+       rc = mdd_la_get(env, mdd_obj, attr);
        if (rc)
                RETURN(rc);
 
@@ -1152,8 +1132,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
                 GOTO(stop, rc);
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        rc = mdo_xattr_del(env, mdd_obj, name, handle,
-                           mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_del(env, mdd_obj, name, handle);
         mdd_write_unlock(env, mdd_obj);
        if (rc)
                GOTO(stop, rc);
@@ -1186,8 +1165,7 @@ int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
        ENTRY;
 
 repeat:
-       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
-                          mdd_object_capa(env, obj));
+       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
 
        if (rc == -ERANGE) {
                /* mti_big_buf is allocated but is too small
@@ -1241,7 +1219,7 @@ static int mdd_xattr_hsm_replace(const struct lu_env *env,
        ENTRY;
 
        rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
-                          handle, mdd_object_capa(env, o));
+                          handle);
        if (rc != 0)
                RETURN(rc);
 
@@ -1336,11 +1314,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        if (rc < 0)
                swap(fst_o, snd_o);
 
-       rc = mdd_la_get(env, fst_o, fst_la, BYPASS_CAPA);
+       rc = mdd_la_get(env, fst_o, fst_la);
        if (rc != 0)
                RETURN(rc);
 
-       rc = mdd_la_get(env, snd_o, snd_la, BYPASS_CAPA);
+       rc = mdd_la_get(env, snd_o, snd_la);
        if (rc != 0)
                RETURN(rc);
 
@@ -1422,13 +1400,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                        GOTO(stop, rc = -ENOMEM);
 
                /* Read HSM attribute */
-               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM,
-                                  BYPASS_CAPA);
+               rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM);
                if (rc < 0)
                        GOTO(stop, rc);
 
-               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM,
-                                  BYPASS_CAPA);
+               rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM);
                if (rc < 0)
                        GOTO(stop, rc);
 
@@ -1482,8 +1458,7 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                }
        }
 
-       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle,
-                          mdd_object_capa(env, fst_o));
+       rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle);
        if (rc != 0)
                GOTO(stop, rc);
 
@@ -1492,11 +1467,9 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        } else {
                if (fst_buf->lb_buf != NULL)
                        rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
-                                          LU_XATTR_REPLACE, handle,
-                                          mdd_object_capa(env, snd_o));
+                                          LU_XATTR_REPLACE, handle);
                else
-                       rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle,
-                                          mdd_object_capa(env, snd_o));
+                       rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle);
        }
 
        if (rc != 0) {
@@ -1508,11 +1481,9 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
                        fst_lmm->lmm_oi = *saved_oi;
                        fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1);
                        rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
-                                           LU_XATTR_REPLACE, handle,
-                                           mdd_object_capa(env, fst_o));
+                                           LU_XATTR_REPLACE, handle);
                } else {
-                       rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle,
-                                           mdd_object_capa(env, fst_o));
+                       rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle);
                }
                if (rc2 < 0)
                        goto do_lbug;
@@ -1649,7 +1620,7 @@ static int mdd_open_sanity_check(const struct lu_env *env,
                flag &= ~MDS_OPEN_TRUNC;
 
        /* For writing append-only file must open it with append mode. */
-       if (mdd_is_append(obj)) {
+       if (attr->la_flags & LUSTRE_APPEND_FL) {
                if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
                        RETURN(-EPERM);
                if (flag & MDS_OPEN_TRUNC)
@@ -1683,37 +1654,21 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
 
-       rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
-       if (rc)
-               RETURN(rc);
+       rc = mdd_la_get(env, mdd_obj, attr);
+       if (rc != 0)
+               GOTO(out, rc);
 
        rc = mdd_open_sanity_check(env, mdd_obj, attr, flags);
-       if (rc == 0)
-               mdd_obj->mod_count++;
+       if (rc != 0)
+               GOTO(out, rc);
 
+       mdd_obj->mod_count++;
+       EXIT;
+out:
        mdd_write_unlock(env, mdd_obj);
        return rc;
 }
 
-int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                            struct md_attr *ma, struct thandle *handle)
-{
-        return mdo_declare_destroy(env, obj, handle);
-}
-
-/* return md_attr back,
- * if it is last unlink then return lov ea + llog cookie*/
-int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
-                    struct md_attr *ma, struct thandle *handle)
-{
-       int rc;
-        ENTRY;
-
-       rc = mdo_destroy(env, obj, handle);
-
-        RETURN(rc);
-}
-
 static int mdd_declare_close(const struct lu_env *env,
                              struct mdd_object *obj,
                              struct md_attr *ma,
@@ -1767,9 +1722,9 @@ again:
                 if (rc)
                         GOTO(stop, rc);
 
-                rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
-                if (rc)
-                        GOTO(stop, rc);
+               rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+               if (rc)
+                       GOTO(stop, rc);
 
                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
                 if (rc)
@@ -1777,8 +1732,7 @@ again:
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
-                       mdd_object_capa(env, mdd_obj));
+       rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
        if (rc != 0) {
                CERROR("Failed to get lu_attr of "DFID": %d\n",
                       PFID(mdd_object_fid(mdd_obj)), rc);
@@ -1843,10 +1797,10 @@ out:
                         if (IS_ERR(handle))
                                GOTO(stop, rc = PTR_ERR(handle));
 
-                        rc = mdd_declare_changelog_store(env, mdd, NULL,
-                                                         handle);
-                        if (rc)
-                                GOTO(stop, rc);
+                       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL,
+                                                        handle);
+                       if (rc)
+                               GOTO(stop, rc);
 
                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
                         if (rc)
@@ -1884,7 +1838,7 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
-                             int nob, const struct dt_it_ops *iops,
+                             size_t nob, const struct dt_it_ops *iops,
                              struct dt_it *it, __u32 attr, void *arg)
 {
        struct lu_dirpage       *dp = &lp->lp_dir;
@@ -1893,8 +1847,12 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
        __u64                    hash = 0;
        struct lu_dirent        *ent;
        struct lu_dirent        *last = NULL;
+       struct lu_fid            fid;
        int                      first = 1;
 
+       if (nob < sizeof(*dp))
+               return -EINVAL;
+
         memset(area, 0, sizeof (*dp));
         area += sizeof (*dp);
         nob  -= sizeof (*dp);
@@ -1902,7 +1860,7 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
         ent  = area;
         do {
                 int    len;
-                int    recsize;
+               size_t recsize;
 
                 len = iops->key_size(env, it);
 
@@ -1929,6 +1887,12 @@ static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
                         /* osd might not able to pack all attributes,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
+
+                       if (le32_to_cpu(ent->lde_attrs) & LUDA_FID) {
+                               fid_le_to_cpu(&fid, &ent->lde_fid);
+                               if (fid_is_dot_lustre(&fid))
+                                       goto next;
+                       }
                 } else {
                         result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
@@ -2013,6 +1977,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                         * No pages were processed, mark this for first page
                         * and send back.
                         */
+                       dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
                        dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                        rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
                }
@@ -2027,14 +1992,18 @@ out_unlock:
 
 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+       struct mdd_object *mdd_obj = md2mdd_obj(obj);
 
-        if (mdd_object_exists(mdd_obj) == 0) {
-                CERROR("%s: object "DFID" not found: rc = -2\n",
-                       mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
-                return -ENOENT;
-        }
-        return dt_object_sync(env, mdd_object_child(mdd_obj));
+       if (mdd_object_exists(mdd_obj) == 0) {
+               int rc = -ENOENT;
+
+               CERROR("%s: object "DFID" not found: rc = %d\n",
+                      mdd_obj_dev_name(mdd_obj),
+                      PFID(mdd_object_fid(mdd_obj)), rc);
+               return rc;
+       }
+       return dt_object_sync(env, mdd_object_child(mdd_obj),
+                             0, OBD_OBJECT_EOF);
 }
 
 static int mdd_object_lock(const struct lu_env *env,
@@ -2071,7 +2040,6 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_readpage           = mdd_readpage,
        .moo_readlink           = mdd_readlink,
        .moo_changelog          = mdd_changelog,
-       .moo_capa_get           = mdd_capa_get,
        .moo_object_sync        = mdd_object_sync,
        .moo_object_lock        = mdd_object_lock,
        .moo_object_unlock      = mdd_object_unlock,