Whamcloud - gitweb
LU-1830 quota: fix ldiskfs quota accouning iterator
authorNiu Yawei <niu@whamcloud.com>
Wed, 5 Sep 2012 08:58:33 +0000 (16:58 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 10 Sep 2012 02:35:14 +0000 (22:35 -0400)
There were defects in the ldiskfs quota accounting iterator:
- leaf blocks could be re-processed;
- entry index was updated incorrectly, so some entries could be
  skipped;

Fix the defect in osd_diskfs_read(), the 32bit 'size' could be
overflowed.

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Icfd96fdaf38c57bc63a5fcfd1f96c02d0ddeb74d
Reviewed-on: http://review.whamcloud.com/3877
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_io.c
lustre/osd-ldiskfs/osd_quota.c
lustre/osd-ldiskfs/osd_quota_fmt.c

index 879343c..3c5406b 100644 (file)
@@ -474,17 +474,26 @@ struct osd_it_iam {
         struct iam_iterator    oi_it;
 };
 
+struct osd_quota_leaf {
+       cfs_list_t      oql_link;
+       uint            oql_blk;
+};
+
 /**
  * Iterator's in-memory data structure for quota file.
  */
 struct osd_it_quota {
        struct osd_object       *oiq_obj;
        /** tree blocks path to where the entry is stored */
-       uint                     oiq_blk[LUSTRE_DQTREEDEPTH];
+       uint                     oiq_blk[LUSTRE_DQTREEDEPTH + 1];
        /** on-disk offset for current key where quota record can be found */
        loff_t                   oiq_offset;
        /** identifier for current quota record */
        __u64                    oiq_id;
+       /** the record index in the leaf/index block */
+       uint                     oiq_index[LUSTRE_DQTREEDEPTH + 1];
+       /** list of already processed leaf blocks */
+       cfs_list_t               oiq_list;
 };
 
 #define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512)
index 35e08f8..af3d6af 100644 (file)
@@ -864,15 +864,17 @@ int osd_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs)
         /* prevent reading after eof */
         cfs_spin_lock(&inode->i_lock);
         if (i_size_read(inode) < *offs + size) {
-                size = i_size_read(inode) - *offs;
-                cfs_spin_unlock(&inode->i_lock);
-                if (size < 0) {
-                        CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
-                               i_size_read(inode), *offs);
-                        return -EBADR;
-                } else if (size == 0) {
-                        return 0;
-                }
+               loff_t diff = i_size_read(inode) - *offs;
+               cfs_spin_unlock(&inode->i_lock);
+               if (diff < 0) {
+                       CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
+                              i_size_read(inode), *offs);
+                       return -EBADR;
+               } else if (diff == 0) {
+                       return 0;
+               } else {
+                       size = diff;
+               }
         } else {
                 cfs_spin_unlock(&inode->i_lock);
         }
index e636c6d..c39f829 100644 (file)
@@ -149,6 +149,7 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
        memset(it, 0, sizeof(*it));
        lu_object_get(lo);
        it->oiq_obj = obj;
+       CFS_INIT_LIST_HEAD(&it->oiq_list);
 
        /* LUSTRE_DQTREEOFF is the initial offset where the tree can be found */
        it->oiq_blk[0] = LUSTRE_DQTREEOFF;
@@ -167,9 +168,15 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
 {
        struct osd_it_quota *it = (struct osd_it_quota *)di;
-
+       struct osd_quota_leaf *leaf, *tmp;
        ENTRY;
+
        lu_object_put(env, &it->oiq_obj->oo_dt.do_lu);
+
+       cfs_list_for_each_entry_safe(leaf, tmp, &it->oiq_list, oql_link) {
+               cfs_list_del_init(&leaf->oql_link);
+               OBD_FREE_PTR(leaf);
+       }
        EXIT;
 }
 
@@ -227,6 +234,19 @@ static void osd_it_acct_put(const struct lu_env *env, struct dt_it *di)
        return;
 }
 
+static int osd_it_add_processed(struct osd_it_quota *it, int depth)
+{
+       struct osd_quota_leaf *leaf;
+
+       OBD_ALLOC_PTR(leaf);
+       if (leaf == NULL)
+               RETURN(-ENOMEM);
+       CFS_INIT_LIST_HEAD(&leaf->oql_link);
+       leaf->oql_blk = it->oiq_blk[depth];
+       cfs_list_add_tail(&leaf->oql_link, &it->oiq_list);
+       RETURN(0);
+}
+
 /**
  * Move on to the next valid entry.
  *
@@ -250,8 +270,8 @@ static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
        /* Let's first check if there are any remaining valid entry in the
         * current leaf block. Start with the next entry after the current one.
         */
-       depth = LUSTRE_DQTREEDEPTH - 1;
-       index = GETIDINDEX(it->oiq_id, depth);
+       depth = LUSTRE_DQTREEDEPTH;
+       index = it->oiq_index[depth];
        if (++index < LUSTRE_DQSTRINBLK) {
                /* Search for the next valid entry from current index */
                rc = walk_block_dqentry(env, it->oiq_obj, type,
@@ -263,7 +283,15 @@ static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
                        /* Found on entry, @it is already updated to the
                         * new position in walk_block_dqentry(). */
                        RETURN(0);
+               } else {
+                       rc = osd_it_add_processed(it, depth);
+                       if (rc)
+                               RETURN(rc);
                }
+       } else {
+               rc = osd_it_add_processed(it, depth);
+               if (rc)
+                       RETURN(rc);
        }
        rc = 1;
 
@@ -274,7 +302,7 @@ static int osd_it_acct_next(const struct lu_env *env, struct dt_it *di)
        /* We keep searching as long as walk_tree_dqentry() returns +1
         * (= no valid entry found). */
        for (; depth >= 0 && rc > 0; depth--) {
-               index = GETIDINDEX(it->oiq_id, depth);
+               index = it->oiq_index[depth];
                if (++index > 0xff)
                        continue;
                rc = walk_tree_dqentry(env, it->oiq_obj, type,
index 4dbfa07..25db1ce 100644 (file)
@@ -53,8 +53,6 @@ static inline void freedqbuf(dqbuf_t buf)
 
 /**
  * Read the \a blk into \a buf.
- *
- * TODO Will support enforcement quota later.
  */
 static ssize_t quota_read_blk(const struct lu_env *env,
                              struct osd_object *obj,
@@ -126,9 +124,10 @@ static loff_t find_block_dqentry(const struct lu_env *env,
                      sizeof(struct lustre_disk_dqdbheader) + i * dqblk_sz;
 
                if (it) {
-                       it->oiq_blk[LUSTRE_DQTREEDEPTH - 1] = blk;
+                       it->oiq_blk[LUSTRE_DQTREEDEPTH] = blk;
                        it->oiq_offset = ret;
                        it->oiq_id = dqid;
+                       it->oiq_index[LUSTRE_DQTREEDEPTH] = i;
                } else {
                        ret = 0;
                }
@@ -174,8 +173,11 @@ loff_t find_tree_dqentry(const struct lu_env *env,
        else
                ret = find_block_dqentry(env, obj, type, dqid, blk, it);
 
-       if (it && ret > 0) /* Entry found */
-               it->oiq_blk[depth] = blk;
+       if (it && ret > 0) {
+               it->oiq_blk[depth + 1] = blk;
+               it->oiq_index[depth] = GETIDINDEX(dqid, depth);
+       }
+
 out_buf:
        freedqbuf(buf);
        RETURN(ret);
@@ -198,9 +200,15 @@ int walk_block_dqentry(const struct lu_env *env, struct osd_object *obj,
        struct lustre_disk_dqdbheader   *dqhead;
        int                              i, dqblk_sz;
        struct lustre_disk_dqblk_v2     *ddquot;
-
+       struct osd_quota_leaf           *leaf;
        ENTRY;
 
+       /* check if the leaf block has been processed before */
+       cfs_list_for_each_entry(leaf, &it->oiq_list, oql_link) {
+               if (leaf->oql_blk == blk)
+                       RETURN(1);
+       }
+
        dqhead = (struct lustre_disk_dqdbheader *)buf;
        dqblk_sz = sizeof(struct lustre_disk_dqblk_v2);
        if (!buf)
@@ -223,11 +231,12 @@ int walk_block_dqentry(const struct lu_env *env, struct osd_object *obj,
                            (char *)&ddquot[i], dqblk_sz))
                        continue;
 
-               it->oiq_blk[LUSTRE_DQTREEDEPTH - 1] = blk;
+               it->oiq_blk[LUSTRE_DQTREEDEPTH] = blk;
                it->oiq_id = le32_to_cpu(ddquot[i].dqb_id);
                it->oiq_offset = (blk << LUSTRE_DQBLKSIZE_BITS) +
                                  sizeof(struct lustre_disk_dqdbheader) +
                                  i * dqblk_sz;
+               it->oiq_index[LUSTRE_DQTREEDEPTH] = i;
                ret = 0;
                break;
        }
@@ -276,8 +285,11 @@ int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
                        ret = walk_block_dqentry(env, obj, type, blk, 0, it);
        }
 
-       if (ret == 0) /* Entry found */
-               it->oiq_blk[depth] = blk;
+       if (ret == 0) { /* Entry found */
+               it->oiq_blk[depth + 1] = blk;
+               it->oiq_index[depth] = index;
+       }
+
 out_buf:
        freedqbuf(buf);
        RETURN(ret);