struct iam_iterator oi_it;
};
+struct osd_quota_leaf {
+ cfs_list_t oql_link;
+ uint oql_blk;
+};
+
/**
* Iterator's in-memory data structure for quota file.
*/
struct osd_it_quota {
struct osd_object *oiq_obj;
/** tree blocks path to where the entry is stored */
- uint oiq_blk[LUSTRE_DQTREEDEPTH];
+ uint oiq_blk[LUSTRE_DQTREEDEPTH + 1];
/** on-disk offset for current key where quota record can be found */
loff_t oiq_offset;
/** identifier for current quota record */
__u64 oiq_id;
+ /** the record index in the leaf/index block */
+ uint oiq_index[LUSTRE_DQTREEDEPTH + 1];
+ /** list of already processed leaf blocks */
+ cfs_list_t oiq_list;
};
#define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512)
/* prevent reading after eof */
cfs_spin_lock(&inode->i_lock);
if (i_size_read(inode) < *offs + size) {
- size = i_size_read(inode) - *offs;
- cfs_spin_unlock(&inode->i_lock);
- if (size < 0) {
- CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
- i_size_read(inode), *offs);
- return -EBADR;
- } else if (size == 0) {
- return 0;
- }
+ loff_t diff = i_size_read(inode) - *offs;
+ cfs_spin_unlock(&inode->i_lock);
+ if (diff < 0) {
+ CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
+ i_size_read(inode), *offs);
+ return -EBADR;
+ } else if (diff == 0) {
+ return 0;
+ } else {
+ size = diff;
+ }
} else {
cfs_spin_unlock(&inode->i_lock);
}
memset(it, 0, sizeof(*it));
lu_object_get(lo);
it->oiq_obj = obj;
+ CFS_INIT_LIST_HEAD(&it->oiq_list);
/* LUSTRE_DQTREEOFF is the initial offset where the tree can be found */
it->oiq_blk[0] = LUSTRE_DQTREEOFF;
static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
{
struct osd_it_quota *it = (struct osd_it_quota *)di;
-
+ struct osd_quota_leaf *leaf, *tmp;
ENTRY;
+
lu_object_put(env, &it->oiq_obj->oo_dt.do_lu);
+
+ cfs_list_for_each_entry_safe(leaf, tmp, &it->oiq_list, oql_link) {
+ cfs_list_del_init(&leaf->oql_link);
+ OBD_FREE_PTR(leaf);
+ }
EXIT;
}
return;
}
+static int osd_it_add_processed(struct osd_it_quota *it, int depth)
+{
+ struct osd_quota_leaf *leaf;
+
+ OBD_ALLOC_PTR(leaf);
+ if (leaf == NULL)
+ RETURN(-ENOMEM);
+ CFS_INIT_LIST_HEAD(&leaf->oql_link);
+ leaf->oql_blk = it->oiq_blk[depth];
+ cfs_list_add_tail(&leaf->oql_link, &it->oiq_list);
+ RETURN(0);
+}
+
/**
* Move on to the next valid entry.
*
/* Let's first check if there are any remaining valid entry in the
* current leaf block. Start with the next entry after the current one.
*/
- depth = LUSTRE_DQTREEDEPTH - 1;
- index = GETIDINDEX(it->oiq_id, depth);
+ depth = LUSTRE_DQTREEDEPTH;
+ index = it->oiq_index[depth];
if (++index < LUSTRE_DQSTRINBLK) {
/* Search for the next valid entry from current index */
rc = walk_block_dqentry(env, it->oiq_obj, type,
/* Found on entry, @it is already updated to the
* new position in walk_block_dqentry(). */
RETURN(0);
+ } else {
+ rc = osd_it_add_processed(it, depth);
+ if (rc)
+ RETURN(rc);
}
+ } else {
+ rc = osd_it_add_processed(it, depth);
+ if (rc)
+ RETURN(rc);
}
rc = 1;
/* We keep searching as long as walk_tree_dqentry() returns +1
* (= no valid entry found). */
for (; depth >= 0 && rc > 0; depth--) {
- index = GETIDINDEX(it->oiq_id, depth);
+ index = it->oiq_index[depth];
if (++index > 0xff)
continue;
rc = walk_tree_dqentry(env, it->oiq_obj, type,
/**
* Read the \a blk into \a buf.
- *
- * TODO Will support enforcement quota later.
*/
static ssize_t quota_read_blk(const struct lu_env *env,
struct osd_object *obj,
sizeof(struct lustre_disk_dqdbheader) + i * dqblk_sz;
if (it) {
- it->oiq_blk[LUSTRE_DQTREEDEPTH - 1] = blk;
+ it->oiq_blk[LUSTRE_DQTREEDEPTH] = blk;
it->oiq_offset = ret;
it->oiq_id = dqid;
+ it->oiq_index[LUSTRE_DQTREEDEPTH] = i;
} else {
ret = 0;
}
else
ret = find_block_dqentry(env, obj, type, dqid, blk, it);
- if (it && ret > 0) /* Entry found */
- it->oiq_blk[depth] = blk;
+ if (it && ret > 0) {
+ it->oiq_blk[depth + 1] = blk;
+ it->oiq_index[depth] = GETIDINDEX(dqid, depth);
+ }
+
out_buf:
freedqbuf(buf);
RETURN(ret);
struct lustre_disk_dqdbheader *dqhead;
int i, dqblk_sz;
struct lustre_disk_dqblk_v2 *ddquot;
-
+ struct osd_quota_leaf *leaf;
ENTRY;
+ /* check if the leaf block has been processed before */
+ cfs_list_for_each_entry(leaf, &it->oiq_list, oql_link) {
+ if (leaf->oql_blk == blk)
+ RETURN(1);
+ }
+
dqhead = (struct lustre_disk_dqdbheader *)buf;
dqblk_sz = sizeof(struct lustre_disk_dqblk_v2);
if (!buf)
(char *)&ddquot[i], dqblk_sz))
continue;
- it->oiq_blk[LUSTRE_DQTREEDEPTH - 1] = blk;
+ it->oiq_blk[LUSTRE_DQTREEDEPTH] = blk;
it->oiq_id = le32_to_cpu(ddquot[i].dqb_id);
it->oiq_offset = (blk << LUSTRE_DQBLKSIZE_BITS) +
sizeof(struct lustre_disk_dqdbheader) +
i * dqblk_sz;
+ it->oiq_index[LUSTRE_DQTREEDEPTH] = i;
ret = 0;
break;
}
ret = walk_block_dqentry(env, obj, type, blk, 0, it);
}
- if (ret == 0) /* Entry found */
- it->oiq_blk[depth] = blk;
+ if (ret == 0) { /* Entry found */
+ it->oiq_blk[depth + 1] = blk;
+ it->oiq_index[depth] = index;
+ }
+
out_buf:
freedqbuf(buf);
RETURN(ret);