Whamcloud - gitweb
LU-542 Fix mdt xattr handler logic error
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 17089c7..2ae3f53 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -599,7 +602,7 @@ int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
         LASSERT(ldesc != NULL);
 
         lum->lmm_magic = LOV_MAGIC_V1;
-        lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
+        lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
         lum->lmm_pattern = ldesc->ld_pattern;
         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
@@ -608,6 +611,14 @@ int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
         RETURN(sizeof(*lum));
 }
 
+static int is_rootdir(struct mdd_object *mdd_obj)
+{
+        const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
+        const struct lu_fid *fid = mdo2fid(mdd_obj);
+
+        return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
+}
+
 /* get lov EA only */
 static int __mdd_lmm_get(const struct lu_env *env,
                          struct mdd_object *mdd_obj, struct md_attr *ma)
@@ -620,7 +631,7 @@ static int __mdd_lmm_get(const struct lu_env *env,
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
                         XATTR_NAME_LOV);
-        if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
+        if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
         if (rc > 0) {
                 ma->ma_lmm_size = rc;
@@ -630,6 +641,34 @@ static int __mdd_lmm_get(const struct lu_env *env,
         RETURN(rc);
 }
 
+/* get the first parent fid from link EA */
+static int mdd_pfid_get(const struct lu_env *env,
+                        struct mdd_object *mdd_obj, struct md_attr *ma)
+{
+        struct lu_buf *buf;
+        struct link_ea_header *leh;
+        struct link_ea_entry *lee;
+        struct lu_fid *pfid = &ma->ma_pfid;
+        ENTRY;
+
+        if (ma->ma_valid & MA_PFID)
+                RETURN(0);
+
+        buf = mdd_links_get(env, mdd_obj);
+        if (IS_ERR(buf))
+                RETURN(PTR_ERR(buf));
+
+        leh = buf->lb_buf;
+        lee = (struct link_ea_entry *)(leh + 1);
+        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
+        fid_be_to_cpu(pfid, pfid);
+        ma->ma_valid |= MA_PFID;
+        if (buf->lb_len > OBD_ALLOC_BIG)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+        RETURN(0);
+}
+
 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
                        struct md_attr *ma)
 {
@@ -726,6 +765,10 @@ int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
                     S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmm_get(env, mdd_obj, ma);
         }
+        if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
+                if (S_ISREG(mdd_object_type(mdd_obj)))
+                        rc = mdd_pfid_get(env, mdd_obj, ma);
+        }
         if (rc == 0 && ma->ma_need & MA_LMV) {
                 if (S_ISDIR(mdd_object_type(mdd_obj)))
                         rc = __mdd_lmv_get(env, mdd_obj, ma);
@@ -750,7 +793,7 @@ int mdd_attr_get_internal_locked(const struct lu_env *env,
 {
         int rc;
         int needlock = ma->ma_need &
-                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
+                       (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
 
         if (needlock)
                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
@@ -1035,9 +1078,9 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
         if (la->la_valid == LA_ATIME) {
                 /* This is atime only set for read atime update on close. */
-                if (la->la_atime > tmp_la->la_atime &&
-                    la->la_atime <= (tmp_la->la_atime +
-                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
+                if (la->la_atime >= tmp_la->la_atime &&
+                    la->la_atime < (tmp_la->la_atime +
+                                    mdd_obj2mdd_dev(obj)->mdd_atime_diff))
                         la->la_valid &= ~LA_ATIME;
                 RETURN(0);
         }
@@ -1415,11 +1458,11 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct thandle *handle;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0;
+        int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-#ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
+#ifdef HAVE_QUOTA_SUPPORT
         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0, block_count = 0;
@@ -1438,11 +1481,6 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
         /*TODO: add lock here*/
         /* start a log jounal handle if needed */
         if (S_ISREG(mdd_object_type(mdd_obj)) &&
@@ -1450,15 +1488,31 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 lmm_size = mdd_lov_mdsize(env, mdd);
                 lmm = mdd_max_lmm_get(env, mdd);
                 if (lmm == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
+                        GOTO(no_trans, rc = -ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
                                 XATTR_NAME_LOV);
 
                 if (rc < 0)
-                        GOTO(cleanup, rc);
+                        GOTO(no_trans, rc);
+        }
+
+        chlog_cnt = 1;
+        if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
+                chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
+                         lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
         }
 
+        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
+                                    MDD_TXN_ATTR_SET_OP, chlog_cnt);
+        handle = mdd_trans_start(env, mdd);
+        if (IS_ERR(handle))
+                GOTO(no_trans, rc = PTR_ERR(handle));
+
+        /* permission changes may require sync operation */
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync |= mdd->mdd_sync_permission;
+
         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
@@ -1539,6 +1593,7 @@ cleanup:
                 rc = mdd_attr_set_changelog(env, obj, handle,
                                             ma->ma_attr.la_valid);
         mdd_trans_stop(env, mdd, rc, handle);
+no_trans:
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
@@ -1614,20 +1669,24 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
-            mdd->mdd_sync_permission == 1)
-                txn_param_sync(&mdd_env_info(env)->mti_param);
-
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+                handle->th_sync |= mdd->mdd_sync_permission;
+
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) == 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
         mdd_trans_stop(env, mdd, rc, handle);
@@ -1652,7 +1711,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
@@ -1662,8 +1721,13 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
 
-        /* Only record user xattr changes */
-        if ((rc == 0) && (strncmp("user.", name, 5) != 0))
+        /* Only record system & user xattr changes */
+        if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+                                  sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_ACCESS, name,
+                                  sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+                          strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+                                  sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
                                               handle);
 
@@ -1698,7 +1762,7 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
 
         LASSERT(mdd_object_exists(mdd_obj) > 0);
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
+        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
         if (rc)
                 RETURN(rc);
 
@@ -1823,7 +1887,7 @@ static int mdd_object_create(const struct lu_env *env,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1910,7 +1974,7 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
@@ -2077,11 +2141,20 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
 #endif
         ENTRY;
 
+        if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
+                mdd_obj->mod_count--;
+
+                if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
+                        CDEBUG(D_HA, "Object "DFID" is retained in orphan "
+                               "list\n", PFID(mdd_object_fid(mdd_obj)));
+                RETURN(0);
+        }
+
         /* check without any lock */
         if (mdd_obj->mod_count == 1 &&
             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
  again:
-                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
+                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
                 if (rc)
                         RETURN(rc);
                 handle = mdd_trans_start(env, mdo2mdd(obj));
@@ -2090,8 +2163,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         }
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-        if (handle == NULL &&
-            mdd_obj->mod_count == 1 &&
+        if (handle == NULL && mdd_obj->mod_count == 1 &&
             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
                 mdd_write_unlock(env, mdd_obj);
                 goto again;
@@ -2135,8 +2207,8 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
                 } else {
                         rc = mdd_object_kill(env, mdd_obj, ma);
-                                if (rc == 0)
-                                        reset = 0;
+                        if (rc == 0)
+                                reset = 0;
                 }
 
                 if (rc != 0)
@@ -2183,20 +2255,20 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
 }
 
 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              int first, void *area, int nob,
+                              struct lu_dirpage *dp, int nob,
                               const struct dt_it_ops *iops, struct dt_it *it,
-                              __u64 *start, __u64 *end,
-                              struct lu_dirent **last, __u32 attr)
+                              __u32 attr)
 {
+        void                   *area = dp;
         int                     result;
         __u64                   hash = 0;
         struct lu_dirent       *ent;
+        struct lu_dirent       *last = NULL;
+        int                     first = 1;
 
-        if (first) {
-                memset(area, 0, sizeof (struct lu_dirpage));
-                area += sizeof (struct lu_dirpage);
-                nob  -= sizeof (struct lu_dirpage);
-        }
+        memset(area, 0, sizeof (*dp));
+        area += sizeof (*dp);
+        nob  -= sizeof (*dp);
 
         ent  = area;
         do {
@@ -2212,7 +2284,7 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                 hash = iops->store(env, it);
                 if (unlikely(first)) {
                         first = 0;
-                        *start = hash;
+                        dp->ldp_hash_start = cpu_to_le64(hash);
                 }
 
                 /* calculate max space required for lu_dirent */
@@ -2229,20 +2301,10 @@ static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
                          * so recheck rec length */
                         recsize = le16_to_cpu(ent->lde_reclen);
                 } else {
-                        /*
-                         * record doesn't fit into page, enlarge previous one.
-                         */
-                        if (*last) {
-                                (*last)->lde_reclen =
-                                        cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
-                                                        nob);
-                                result = 0;
-                        } else
-                                result = -EINVAL;
-
+                        result = (last != NULL) ? 0 :-EINVAL;
                         goto out;
                 }
-                *last = ent;
+                last = ent;
                 ent = (void *)ent + recsize;
                 nob -= recsize;
 
@@ -2253,7 +2315,12 @@ next:
         } while (result == 0);
 
 out:
-        *end = hash;
+        dp->ldp_hash_end = cpu_to_le64(hash);
+        if (last != NULL) {
+                if (last->lde_hash == dp->ldp_hash_end)
+                        dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+                last->lde_reclen = 0; /* end mark */
+        }
         return result;
 }
 
@@ -2264,13 +2331,11 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
         struct dt_object  *next = mdd_object_child(obj);
         const struct dt_it_ops  *iops;
         struct page       *pg;
-        struct lu_dirent  *last = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
         int i;
+        int nlupgs = 0;
         int rc;
         int nob;
-        __u64 hash_start;
-        __u64 hash_end = 0;
 
         LASSERT(rdpg->rp_pages != NULL);
         LASSERT(next->do_index_ops != NULL);
@@ -2282,13 +2347,13 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          * iterate through directory and fill pages from @rdpg
          */
         iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, mdd_object_capa(env, obj));
+        it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
         if (IS_ERR(it))
                 return PTR_ERR(it);
 
         rc = iops->load(env, it, rdpg->rp_hash);
 
-        if (rc == 0){
+        if (rc == 0) {
                 /*
                  * Iterator didn't find record with exactly the key requested.
                  *
@@ -2313,39 +2378,51 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          */
         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
              i++, nob -= CFS_PAGE_SIZE) {
+                struct lu_dirpage *dp;
+
                 LASSERT(i < rdpg->rp_npages);
                 pg = rdpg->rp_pages[i];
-                rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
-                                        min_t(int, nob, CFS_PAGE_SIZE), iops,
-                                        it, &hash_start, &hash_end, &last,
-                                        rdpg->rp_attrs);
-                if (rc != 0 || i == rdpg->rp_npages - 1) {
-                        if (last)
-                                last->lde_reclen = 0;
+                dp = cfs_kmap(pg);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+repeat:
+#endif
+                rc = mdd_dir_page_build(env, mdd, dp,
+                                        min_t(int, nob, LU_PAGE_SIZE),
+                                        iops, it, rdpg->rp_attrs);
+                if (rc > 0) {
+                        /*
+                         * end of directory.
+                         */
+                        dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+                        nlupgs++;
+                } else if (rc < 0) {
+                        CWARN("build page failed: %d!\n", rc);
+                } else {
+                        nlupgs++;
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                        if ((unsigned long)dp & ~CFS_PAGE_MASK)
+                                goto repeat;
+#endif
                 }
                 cfs_kunmap(pg);
         }
-        if (rc > 0) {
-                /*
-                 * end of directory.
-                 */
-                hash_end = DIR_END_OFF;
-                rc = 0;
-        }
-        if (rc == 0) {
+        if (rc >= 0) {
                 struct lu_dirpage *dp;
 
                 dp = cfs_kmap(rdpg->rp_pages[0]);
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(hash_end);
-                if (i == 0)
+                if (nlupgs == 0) {
                         /*
-                         * No pages were processed, mark this.
+                         * No pages were processed, mark this for first page
+                         * and send back.
                          */
-                        dp->ldp_flags |= LDF_EMPTY;
-
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                        dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
+                        nlupgs = 1;
+                }
                 cfs_kunmap(rdpg->rp_pages[0]);
+
+                rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
         }
         iops->put(env, it);
         iops->fini(env, it);
@@ -2386,11 +2463,10 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 dp = (struct lu_dirpage*)cfs_kmap(pg);
                 memset(dp, 0 , sizeof(struct lu_dirpage));
                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
-                dp->ldp_flags |= LDF_EMPTY;
-                dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+                dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
+                dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                 cfs_kunmap(pg);
-                GOTO(out_unlock, rc = 0);
+                GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
         rc = __mdd_readpage(env, mdd_obj, rdpg);