From cb47b974d0f14608f429985ef908b8fc015de2e2 Mon Sep 17 00:00:00 2001 From: zab Date: Wed, 2 Apr 2003 00:03:16 +0000 Subject: [PATCH] fix up file size and cache mechanisms to account for the disconnect between acquiring a file region extent and getting 0..N callbacks for obd locks that the acquiry generated. - truncate_inode_pages(,0) could race with dirtying and lose data. when a lock is canceled we now purge/invalidate the (striped) pages that were covered by the (lov->obd) lock - remove the getattr flag business and instead do getattrs around every lock acquiry. this brings us closer to how things will behave when we return file size with locks, but it will slow things down until then. - maintain an rbtree of extents which record the page offsets that are dirty in the page cache. for now it helps us decide if we should trust getattr. in the future it will help purging and the write space reservation. - always round extents to page size in osc_enqueue - while I'm in here, get rid of the ISBLK stuff in _file_write and let generic_file_write set ppos to i_size under the i_sem this has passed tests under a large lov with /tmp backing, but do keep an eye out for data/size inconsistency problems. --- lustre/llite/iod.c | 226 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) diff --git a/lustre/llite/iod.c b/lustre/llite/iod.c index 99bfb8b..bfbbd87 100644 --- a/lustre/llite/iod.c +++ b/lustre/llite/iod.c @@ -34,6 +34,7 @@ #include #include #include +#include /* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */ #ifdef PG_inactive_clean @@ -107,6 +108,8 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp, /* we raced with truncate? */ if ( off >= inode->i_size ) { + ll_remove_dirty(&ll_i2info(inode)->lli_dirty, page->index, + page->index); unlock_page(page); goto out; } @@ -226,6 +229,8 @@ static void ll_brw_pages_unlock( struct inode *inode, CDEBUG(D_CACHE, "cleaning page %p\n", page); LASSERT(PageLocked(page)); + ll_remove_dirty(&ll_i2info(inode)->lli_dirty, page->index, + page->index); unlock_page(page); page_cache_release(page); } @@ -413,3 +418,224 @@ cleanup: current->flags = old_flags; RETURN(rc); } + +/* + * we aggressively track offsets of pages that have been dirtied. we need this + * to make file size decisions around lock acquisition and cancelation. all + * extents include the offsets at their endpoints. + */ +struct offset_extent { + rb_node_t oe_node; + unsigned long oe_start, oe_end; +}; + +static struct offset_extent * ll_find_oe(rb_root_t *root, + struct offset_extent *needle) +{ + struct rb_node_s *node = root->rb_node; + struct offset_extent *oe; + ENTRY; + + CDEBUG(D_INODE, "searching [%lu -> %lu]\n", needle->oe_start, + needle->oe_end); + + while (node) { + oe = rb_entry(node, struct offset_extent, oe_node); + if (needle->oe_end < oe->oe_start ) + node = node->rb_left; + else if (needle->oe_start > oe->oe_end ) + node = node->rb_right; + else { + CDEBUG(D_INODE, "returning [%lu -> %lu]\n", + oe->oe_start, oe->oe_end); + RETURN(oe); + } + } + RETURN(NULL); +} + +/* do the rbtree mechanics to insert a node, callers are responsible + * for making sure that this new node doesn't overlap with existing + * nodes */ +static void ll_insert_oe(rb_root_t *root, struct offset_extent *new_oe) +{ + rb_node_t ** p = &root->rb_node; + rb_node_t * parent = NULL; + struct offset_extent *oe; + ENTRY; + + LASSERT(new_oe->oe_start <= new_oe->oe_end); + + while (*p) { + parent = *p; + oe = rb_entry(parent, struct offset_extent, oe_node); + if ( new_oe->oe_end < oe->oe_start ) + p = &(*p)->rb_left; + else if ( new_oe->oe_start > oe->oe_end ) + p = &(*p)->rb_right; + else + LBUG(); + } + rb_link_node(&new_oe->oe_node, parent, p); + rb_insert_color(&new_oe->oe_node, root); + EXIT; +} + +void ll_record_dirty(struct ll_dirty_offsets *lldo, unsigned long offset) +{ + struct offset_extent needle, *oe, *new_oe; + int rc; + ENTRY; + + /* will allocate more intelligently later */ + OBD_ALLOC(new_oe, sizeof(*new_oe)); + LASSERT(new_oe); /* will have to do for now :/ */ + + spin_lock(&lldo->do_lock); + + /* find neighbours that we might glom on to */ + needle.oe_start = (offset > 0) ? offset - 1 : offset; + needle.oe_end = (offset < ~0) ? offset + 1 : offset; + oe = ll_find_oe(&lldo->do_root, &needle); + if ( oe == NULL ) { + new_oe->oe_start = offset; + new_oe->oe_end = offset; + ll_insert_oe(&lldo->do_root, new_oe); + lldo->do_num_dirty++; + new_oe = NULL; + GOTO(out, rc = 1); + } + + /* already recorded */ + if ( offset >= oe->oe_start && offset <= oe->oe_end ) + GOTO(out, rc = 2); + + /* ok, need to check for adjacent neighbours */ + needle.oe_start = offset; + needle.oe_end = offset; + if ( ll_find_oe(&lldo->do_root, &needle) ) + GOTO(out, rc = 3); + + /* ok, its safe to extend the oe we found */ + if ( offset == oe->oe_start - 1 ) + oe->oe_start--; + else if ( offset == oe->oe_end + 1 ) + oe->oe_end++; + else + LBUG(); + lldo->do_num_dirty++; + +out: + CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty); + spin_unlock(&lldo->do_lock); + if ( new_oe ) + OBD_FREE(new_oe, sizeof(*new_oe)); + EXIT; + return; +} + +void ll_remove_dirty(struct ll_dirty_offsets *lldo, unsigned long start, + unsigned long end) +{ + struct offset_extent needle, *oe, *new_oe; + ENTRY; + + /* will allocate more intelligently later */ + OBD_ALLOC(new_oe, sizeof(*new_oe)); + LASSERT(new_oe); /* will have to do for now :/ */ + + needle.oe_start = start; + needle.oe_end = end; + + spin_lock(&lldo->do_lock); + for ( ; (oe = ll_find_oe(&lldo->do_root, &needle)) ; ) { + + /* see if we're punching a hole and need to create a node */ + if (oe->oe_start < start && oe->oe_end > end) { + new_oe->oe_start = end + 1; + new_oe->oe_end = oe->oe_end; + oe->oe_end = start - 1; + ll_insert_oe(&lldo->do_root, new_oe); + new_oe = NULL; + lldo->do_num_dirty -= end - start + 1; + break; + } + + /* overlapping edges */ + if (oe->oe_start < start && oe->oe_end <= end) { + lldo->do_num_dirty -= oe->oe_end - start + 1; + oe->oe_end = start - 1; + oe = NULL; + continue; + } + if (oe->oe_end > end && oe->oe_start >= start) { + lldo->do_num_dirty -= end - oe->oe_start + 1; + oe->oe_start = end + 1; + oe = NULL; + continue; + } + + /* an extent entirely within the one we're clearing */ + rb_erase(&oe->oe_node, &lldo->do_root); + lldo->do_num_dirty -= oe->oe_end - oe->oe_start + 1; + spin_unlock(&lldo->do_lock); + OBD_FREE(oe, sizeof(*oe)); + spin_lock(&lldo->do_lock); + } + CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty); + spin_unlock(&lldo->do_lock); + if (new_oe) + OBD_FREE(new_oe, sizeof(*new_oe)); + EXIT; +} + +int ll_find_dirty(struct ll_dirty_offsets *lldo, unsigned long *start, + unsigned long *end) +{ + struct offset_extent needle, *oe; + int rc = -ENOENT; + ENTRY; + + needle.oe_start = *start; + needle.oe_end = *end; + + spin_lock(&lldo->do_lock); + oe = ll_find_oe(&lldo->do_root, &needle); + if (oe) { + *start = oe->oe_start; + *end = oe->oe_end; + rc = 0; + } + spin_unlock(&lldo->do_lock); + + RETURN(rc); +} + +int ll_farthest_dirty(struct ll_dirty_offsets *lldo, unsigned long *farthest) +{ + struct rb_node_s *last, *node; + struct offset_extent *oe; + int rc = -1; + ENTRY; + + spin_lock(&lldo->do_lock); + for (node = lldo->do_root.rb_node, last = NULL; + node; + last = node, node = node->rb_right) + ; + + if (last) { + oe = rb_entry(last, struct offset_extent, oe_node); + *farthest = oe->oe_end; + rc = 0; + } + spin_unlock(&lldo->do_lock); + RETURN(rc); +} + +void ll_lldo_init(struct ll_dirty_offsets *lldo) +{ + spin_lock_init(&lldo->do_lock); + lldo->do_num_dirty = 0; + lldo->do_root.rb_node = NULL; +} -- 1.8.3.1