From 8376853f237908f2d02a0a9780ef780883d6f658 Mon Sep 17 00:00:00 2001 From: shaver Date: Wed, 2 Jul 2003 13:32:08 +0000 Subject: [PATCH] Client-side portions of bug 974: push cache and dirty-page management down into OSC, in preparation for pre-allocation grants. Also fixes 975 by providing /proc/fs/lustre/llite/fs0/max_dirty_pages to limit client-side cache. --- lustre/llite/iod.c | 259 +++++------------------------------------------------ 1 file changed, 20 insertions(+), 239 deletions(-) diff --git a/lustre/llite/iod.c b/lustre/llite/iod.c index 836a9aa..e3fabe6 100644 --- a/lustre/llite/iod.c +++ b/lustre/llite/iod.c @@ -38,6 +38,7 @@ #include #include #include +#include "llite_internal.h" /* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */ #ifdef PG_inactive_clean @@ -73,7 +74,14 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp, /* we raced with truncate? */ if ( off >= inode->i_size ) { - ll_remove_dirty(inode, page->index, page->index); + int rc; + rc = ll_clear_dirty_pages(ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, + page->index, page->index); + + LASSERT(rc == 0); + CDEBUG(D_CACHE, "offset "LPU64" (index %lu) > i_size %llu\n", + off, page->index, inode->i_size); unlock_page(page); return 0; } @@ -85,7 +93,7 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp, pg->pg = page; pg->off = off; - pg->flag = OBD_BRW_CREATE; + pg->flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT; pg->count = PAGE_CACHE_SIZE; /* catch partial writes for files that end mid-page */ @@ -176,9 +184,10 @@ static void ll_writeback(struct inode *inode, struct ll_writeback_pages *llwp) CERROR("error from obd_brw_async: rc = %d\n", rc); lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WB_FAIL, llwp->npgs); - } else + } else { lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WB_OK, (llwp->npgs)); + } for (i = 0 ; i < llwp->npgs ; i++) { struct page *page = llwp->pga[i].pg; @@ -186,7 +195,11 @@ static void ll_writeback(struct inode *inode, struct ll_writeback_pages *llwp) CDEBUG(D_CACHE, "finished page %p at index %lu\n", page, page->index); LASSERT(PageLocked(page)); - ll_remove_dirty(inode, page->index, page->index); + + rc = ll_clear_dirty_pages(ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, + page->index, page->index); + LASSERT(rc == 0); unlock_page(page); page_cache_release(page); } @@ -380,7 +393,7 @@ int ll_batch_writepage(struct inode *inode, struct page *page) current->flags |= PF_MEMALLOC; rc = ll_alloc_brw(inode, &llwp); if (rc != 0) - GOTO(cleanup, rc); + GOTO(restore_flags, rc); if (llwp_consume_page(&llwp, inode, page) == 0) ll_get_dirty_pages(inode, &llwp); @@ -390,241 +403,9 @@ int ll_batch_writepage(struct inode *inode, struct page *page) LPROC_LL_WB_WRITEPAGE, llwp.npgs); ll_writeback(inode, &llwp); } - kfree(llwp.pga); -cleanup: - current->flags = old_flags; - RETURN(rc); -} - -/* - * we aggressively track offsets of pages that have been dirtied. we need this - * to make file size decisions around lock acquisition and cancelation. all - * extents include the offsets at their endpoints. - */ -struct offset_extent { - rb_node_t oe_node; - unsigned long oe_start, oe_end; -}; - -static struct offset_extent *ll_find_oe(rb_root_t *root, - struct offset_extent *needle) -{ - struct rb_node_s *node = root->rb_node; - struct offset_extent *oe; - ENTRY; - - CDEBUG(D_INODE, "searching [%lu -> %lu]\n", needle->oe_start, - needle->oe_end); - - while (node) { - oe = rb_entry(node, struct offset_extent, oe_node); - if (needle->oe_end < oe->oe_start) - node = node->rb_left; - else if (needle->oe_start > oe->oe_end) - node = node->rb_right; - else { - CDEBUG(D_INODE, "returning [%lu -> %lu]\n", - oe->oe_start, oe->oe_end); - RETURN(oe); - } - } - RETURN(NULL); -} - -/* do the rbtree mechanics to insert a node, callers are responsible - * for making sure that this new node doesn't overlap with existing - * nodes */ -static void ll_insert_oe(rb_root_t *root, struct offset_extent *new_oe) -{ - rb_node_t ** p = &root->rb_node; - rb_node_t * parent = NULL; - struct offset_extent *oe; - ENTRY; - - LASSERT(new_oe->oe_start <= new_oe->oe_end); - - while (*p) { - parent = *p; - oe = rb_entry(parent, struct offset_extent, oe_node); - if ( new_oe->oe_end < oe->oe_start ) - p = &(*p)->rb_left; - else if ( new_oe->oe_start > oe->oe_end ) - p = &(*p)->rb_right; - else - LBUG(); - } - rb_link_node(&new_oe->oe_node, parent, p); - rb_insert_color(&new_oe->oe_node, root); - EXIT; -} - -static inline void lldo_dirty_add(struct inode *inode, - struct ll_dirty_offsets *lldo, - long val) -{ - lldo->do_num_dirty += val; - lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_DIRTY_PAGES, - val); -} - -void ll_record_dirty(struct inode *inode, unsigned long offset) -{ - struct ll_dirty_offsets *lldo = &ll_i2info(inode)->lli_dirty; - struct offset_extent needle, *oe, *new_oe; - int rc; - ENTRY; - - /* will allocate more intelligently later */ - OBD_ALLOC(new_oe, sizeof(*new_oe)); - LASSERT(new_oe); /* will have to do for now :/ */ - - spin_lock(&lldo->do_lock); - - /* find neighbours that we might glom on to */ - needle.oe_start = (offset > 0) ? offset - 1 : offset; - needle.oe_end = (offset < ~0) ? offset + 1 : offset; - oe = ll_find_oe(&lldo->do_root, &needle); - if ( oe == NULL ) { - new_oe->oe_start = offset; - new_oe->oe_end = offset; - ll_insert_oe(&lldo->do_root, new_oe); - lldo_dirty_add(inode, lldo, 1); - new_oe = NULL; - GOTO(out, rc = 1); - } - - /* already recorded */ - if ( offset >= oe->oe_start && offset <= oe->oe_end ) - GOTO(out, rc = 2); - - /* ok, need to check for adjacent neighbours */ - needle.oe_start = offset; - needle.oe_end = offset; - if (ll_find_oe(&lldo->do_root, &needle)) - GOTO(out, rc = 3); - - /* ok, its safe to extend the oe we found */ - if ( offset == oe->oe_start - 1 ) - oe->oe_start--; - else if ( offset == oe->oe_end + 1 ) - oe->oe_end++; - else - LBUG(); - lldo_dirty_add(inode, lldo, 1); - -out: - CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty); - spin_unlock(&lldo->do_lock); - if ( new_oe ) - OBD_FREE(new_oe, sizeof(*new_oe)); - EXIT; - return; -} -void ll_remove_dirty(struct inode *inode, unsigned long start, - unsigned long end) -{ - struct ll_dirty_offsets *lldo = &ll_i2info(inode)->lli_dirty; - struct offset_extent needle, *oe, *new_oe; - ENTRY; - - /* will allocate more intelligently later */ - OBD_ALLOC(new_oe, sizeof(*new_oe)); - LASSERT(new_oe); /* will have to do for now :/ */ - - needle.oe_start = start; - needle.oe_end = end; - - spin_lock(&lldo->do_lock); - for ( ; (oe = ll_find_oe(&lldo->do_root, &needle)) ; ) { - - /* see if we're punching a hole and need to create a node */ - if (oe->oe_start < start && oe->oe_end > end) { - new_oe->oe_start = end + 1; - new_oe->oe_end = oe->oe_end; - oe->oe_end = start - 1; - ll_insert_oe(&lldo->do_root, new_oe); - new_oe = NULL; - lldo_dirty_add(inode, lldo, -(end - start + 1)); - break; - } - - /* overlapping edges */ - if (oe->oe_start < start && oe->oe_end <= end) { - lldo_dirty_add(inode, lldo, -(oe->oe_end - start + 1)); - oe->oe_end = start - 1; - oe = NULL; - continue; - } - if (oe->oe_end > end && oe->oe_start >= start) { - lldo_dirty_add(inode, lldo, -(end - oe->oe_start + 1)); - oe->oe_start = end + 1; - oe = NULL; - continue; - } - - /* an extent entirely within the one we're clearing */ - rb_erase(&oe->oe_node, &lldo->do_root); - lldo_dirty_add(inode, lldo, -(oe->oe_end - oe->oe_start + 1)); - spin_unlock(&lldo->do_lock); - OBD_FREE(oe, sizeof(*oe)); - spin_lock(&lldo->do_lock); - } - CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty); - spin_unlock(&lldo->do_lock); - if (new_oe) - OBD_FREE(new_oe, sizeof(*new_oe)); - EXIT; -} - -int ll_find_dirty(struct ll_dirty_offsets *lldo, unsigned long *start, - unsigned long *end) -{ - struct offset_extent needle, *oe; - int rc = -ENOENT; - ENTRY; - - needle.oe_start = *start; - needle.oe_end = *end; - - spin_lock(&lldo->do_lock); - oe = ll_find_oe(&lldo->do_root, &needle); - if (oe) { - *start = oe->oe_start; - *end = oe->oe_end; - rc = 0; - } - spin_unlock(&lldo->do_lock); - - RETURN(rc); -} - -int ll_farthest_dirty(struct ll_dirty_offsets *lldo, unsigned long *farthest) -{ - struct rb_node_s *last, *node; - struct offset_extent *oe; - int rc = -1; - ENTRY; - - spin_lock(&lldo->do_lock); - for (node = lldo->do_root.rb_node, last = NULL; - node; - last = node, node = node->rb_right) - ; - - if (last) { - oe = rb_entry(last, struct offset_extent, oe_node); - *farthest = oe->oe_end; - rc = 0; - } - spin_unlock(&lldo->do_lock); +restore_flags: + current->flags = old_flags; RETURN(rc); } - -void ll_lldo_init(struct ll_dirty_offsets *lldo) -{ - spin_lock_init(&lldo->do_lock); - lldo->do_num_dirty = 0; - lldo->do_root.rb_node = NULL; -} -- 1.8.3.1