From: zab Date: Sat, 15 Feb 2003 21:45:46 +0000 (+0000) Subject: bring b_io up to the lastest write caching code. fsx and rundbench 1 pass in a X-Git-Tag: v1_7_100~1^94~104 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=e0ea44807b627cbe64acb9ce6b4f534f3295d276;p=fs%2Flustre-release.git bring b_io up to the lastest write caching code. fsx and rundbench 1 pass in a 96M all-in-one UML. - prepare_write is throttled by finding dirty pages on the super block's dirty inodes and writing them to the network - commit_write marks the page dirty and updates i_size - writepage blocks writing the page and other dirty pages to the network - sort the pages within a obd_brw batch so that block allocation isn't hosed on the OST - don't change s_dirty's position on the list during writeback, that seems to be the job of writepage's callers - don't try and mess with page's list membership after obd_brw completes, filemap_fdata{sync,wait} take care of that - put a hack in obdo_to_inode that tricks ll_file_size into preferring the local i_size when there are cached pages on the inode - add license blurb and editor instructions - get rid of the management of vm lru pages and liod thread, prepare_write throttling and kupdate serve the same task (hopefully) - remove unused ll_flush_inode_pages - throw in a OSC-side "count > 0" assert to match a similar OST assert that I couldn't reproduce - writeback will try to batch PTL_MD_MAX_IOV pages on the wire --- diff --git a/lustre/llite/iod.c b/lustre/llite/iod.c index 0bc11669..f3c5373 100644 --- a/lustre/llite/iod.c +++ b/lustre/llite/iod.c @@ -1,4 +1,31 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc + * + * this started as an implementation of an io daemon that woke regularly + * to force writeback.. the throttling in prepare_write and kupdate's usual + * writeback pressure got rid of our thread, but the file name remains. + */ #include +#include #include #include #include @@ -6,566 +33,272 @@ #include #include #include -#include -#include -#include #define DEBUG_SUBSYSTEM S_LLITE #include -/* wakeup every 30s */ -#define LIOD_WAKEUP_CYCLE (30) - -/* FIXME tempororily copy from mm_inline.h */ -static inline void __add_page_to_inactive_clean_list(struct page * page) -{ - struct zone_struct * zone = page_zone(page); - DEBUG_LRU_PAGE(page); - SetPageInactiveClean(page); - list_add(&page->lru, &zone->inactive_clean_list); - zone->inactive_clean_pages++; -// nr_inactive_clean_pages++; -} +#ifndef list_for_each_prev_safe +#define list_for_each_prev_safe(pos, n, head) \ + for (pos = (head)->prev, n = pos->prev; pos != (head); \ + pos = n, n = pos->prev ) +#endif -static inline void __del_page_from_active_list(struct page * page) -{ - struct zone_struct * zone = page_zone(page); - list_del(&page->lru); - ClearPageActive(page); -// nr_active_pages--; - zone->active_pages--; - DEBUG_LRU_PAGE(page); -} +extern spinlock_t inode_lock; -static inline void __del_page_from_inactive_dirty_list(struct page * page) +/* + * ugh, we want disk allocation on the target to happen in offset order. we'll + * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do + * fine for our small page arrays and doesn't require allocation. its an + * insertion sort that swaps elements that are strides apart, shrinking the + * stride down until its '1' and the array is sorted. + */ +void sort_brw_pages(struct brw_page *array, int num) { - struct zone_struct * zone = page_zone(page); - list_del(&page->lru); - ClearPageInactiveDirty(page); -// nr_inactive_dirty_pages--; - zone->inactive_dirty_pages--; - DEBUG_LRU_PAGE(page); + int stride, i, j; + struct brw_page tmp; + + for( stride = 1; stride < num ; stride = (stride*3) +1 ) + ; + + do { + stride /= 3; + for ( i = stride ; i < num ; i++ ) { + tmp = array[i]; + j = i; + while ( j >= stride && + array[j - stride].off > tmp.off ) { + array[j] = array[j - stride]; + j -= stride; + } + array[j] = tmp; + } + } while ( stride > 1 ); } -/* move page into inactive_clean list. - * - * caller need to make sure that this page is not used - * by anyothers - */ -void refile_clean_page(struct page *page) +static inline void fill_brw_page(struct brw_page *pg, + struct inode *inode, + struct page *page) { - LASSERT(PageLocked(page)); - LASSERT(!PageDirty(page)); - - ClearPageReferenced(page); - page->age = 0; - - spin_lock(&pagemap_lru_lock); - if (PageActive(page)) { - __del_page_from_active_list(page); - __add_page_to_inactive_clean_list(page); - } else if (PageInactiveDirty(page)) { - __del_page_from_inactive_dirty_list(page); - __add_page_to_inactive_clean_list(page); - } - spin_unlock(&pagemap_lru_lock); + page_cache_get(page); + + pg->pg = page; + pg->off = ((obd_off)page->index) << PAGE_SHIFT; + pg->flag = OBD_BRW_CREATE; + pg->count = PAGE_SIZE; + + /* catch partial writes for files that end mid-page */ + if ( pg->off + pg->count > inode->i_size ) + pg->count = inode->i_size & ~PAGE_MASK; + + /* + * matches ptlrpc_bulk_get assert that trickles down + * from a 0 page length going through niobuf and into + * the buffer regions being posted + */ + LASSERT(pg->count >= 0); + + CDEBUG(D_CACHE, "brw_page %p: off %lld cnt %d, " + "page %p: ind %ld\n", + pg, pg->off, pg->count, + page, page->index); } - -/* return value: - * -1: no need to flush - * 0: need async flush - * 1: need sync flush +/* + * returns the number of pages that it added to the pgs array * - * Note: here we are more sensitive than kswapd, hope we could - * do more flush work by ourselves, not resort to kswapd + * this duplicates filemap_fdatasync and gives us an opportunity to grab lots + * of dirty pages.. */ -#if 0 -static inline int balance_dirty_state(void) +static int ll_get_dirty_pages(struct inode *inode, struct brw_page *pgs, + int nrmax) { - if (free_high(ALL_ZONES) > 0) { - printk("memory low, sync flush\n"); - return 1; - } - if (free_plenty(ALL_ZONES) > 0) { - printk("memory high, async flush\n"); - return 0; - } - else - return -1; -} -#else -/* FIXME need verify the parameters later */ -static inline int balance_dirty_state(void) -{ - if (free_plenty(ALL_ZONES) > -2048) { - return 1; - } - if (free_plenty(ALL_ZONES) > -4096) { - return 0; - } - - return -1; -} -#endif -extern spinlock_t inode_lock; -extern void wakeup_kswapd(unsigned int gfp_mask); - -static int flush_some_pages(struct super_block *sb); - -/* the main liod loop */ -static int liod_main(void *arg) -{ - struct super_block *sb = (struct super_block *)arg; - struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod; - + struct address_space *mapping = inode->i_mapping; + struct page *page; + struct list_head *pos, *n; + int ret = 0; ENTRY; - lock_kernel(); - daemonize(); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - spin_lock_irq(¤t->sigmask_lock); - sigfillset(¤t->blocked); - our_recalc_sigpending(current); - spin_unlock_irq(¤t->sigmask_lock); -#else - sigfillset(¤t->blocked); - our_recalc_sigpending(current); -#endif - - sprintf(current->comm, "liod"); - unlock_kernel(); - - /* declare we are ready */ - set_bit(LIOD_FLAG_ALIVE, &iod->io_flag); - wake_up(&iod->io_waitq); - -#if 0 - current->flags |= PF_KERNTHREAD; -#endif - -#if 0 - pgdat_list->node_zones[0].pages_min *= 2; - pgdat_list->node_zones[0].pages_low *= 2; - pgdat_list->node_zones[0].pages_high *= 2; - pgdat_list->node_zones[0].pages_plenty *= 2; -#endif - - CDEBUG(D_CACHE, "liod(%d) started\n", current->pid); - while (1) { - int flushed; - int t; - - /* check the stop command */ - if (test_bit(LIOD_FLAG_STOP, &iod->io_flag)) { - /* at umount time, should not be anyone - * trying to flushing pages */ - LASSERT(!waitqueue_active(&iod->io_sem.wait)); - break; - } - - t = interruptible_sleep_on_timeout(&iod->io_sleepq, - LIOD_WAKEUP_CYCLE*HZ); - CDEBUG(D_NET, "liod(%d) active due to %s\n", current->pid, - (t ? "wakeup" : "timeout")); - - /* try to flush */ - down(&iod->io_sem); - do { - flushed = flush_some_pages(sb); - conditional_schedule(); - } while (flushed && (balance_dirty_state() >= 0)); - up(&iod->io_sem); - - /* if still out of balance, it shows all dirty - * pages generate by this llite are flushing or - * flushed, so inbalance must be caused by other - * part of the kernel. here we wakeup kswapd - * immediately, it probably too earliar (because - * we are more sensitive than kswapd), but could - * gurantee the the amount of free+inactive_clean - * pages, at least could accelerate aging of pages - * - * Note: it start kswapd and return immediately - */ - if (balance_dirty_state() >= 0) - wakeup_kswapd(GFP_ATOMIC); - } - - clear_bit(LIOD_FLAG_ALIVE, &iod->io_flag); - wake_up(&iod->io_waitq); - - CDEBUG(D_NET, "liod(%d) exit\n", current->pid); - RETURN(0); -} + spin_lock(&pagecache_lock); -int liod_start(struct super_block *sb) -{ - struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod; - int rc; + list_for_each_prev_safe(pos, n, &mapping->dirty_pages) { + if ( ret == nrmax ) + break; + page = list_entry(pos, struct page, list); - /* initialize */ - iod->io_flag = 0; - init_waitqueue_head(&iod->io_sleepq); - init_waitqueue_head(&iod->io_waitq); - init_MUTEX(&iod->io_sem); + if (TryLockPage(page)) + continue; - rc = kernel_thread(liod_main, (void *) sb, - CLONE_VM | CLONE_FS | CLONE_FILES); + list_del(&page->list); + list_add(&page->list, &mapping->locked_pages); - if (rc < 0) { - CERROR("fail to start liod, error %d\n", rc); - return rc; + if (PageDirty(page)) { + ClearPageDirty(page); + fill_brw_page(&pgs[ret], inode, page); + ret++; + } else + UnlockPage(page); } - /* wait liod start */ - wait_event(iod->io_waitq, test_bit(LIOD_FLAG_ALIVE, &iod->io_flag)); - - return 0; + spin_unlock(&pagecache_lock); + RETURN(ret); } -static inline void liod_wakeup(struct ll_io_daemon *iod) +static void ll_brw_pages_unlock( struct inode *inode, struct brw_page *pgs, + int npgs, struct obd_brw_set *set) { - wake_up(&iod->io_sleepq); -} - -static inline void select_one_page(struct brw_page *pg, - struct inode *inode, - struct page *page) -{ - obd_off off; - - pg->pg = page; - pg->off = ((obd_off)page->index) << PAGE_SHIFT; - pg->flag = OBD_BRW_CREATE; - - off = ((obd_off)(page->index + 1)) << PAGE_SHIFT; - if (off > inode->i_size) - pg->count = inode->i_size & ~PAGE_MASK; - else - pg->count = PAGE_SIZE; -} - -/* select candidate dirty pages within an inode - * return: - * - npgs contains number of pages selected - * - 0: all pages in dirty list are searched - * 1: probably still have dirty pages - * - * don't sleep in this functions - * */ -static int select_inode_pages(struct inode *inode, struct brw_page *pgs, int *npgs) -{ - int nrmax = *npgs, nr = 0; - struct address_space *mapping = inode->i_mapping; - struct page *page; - struct list_head *list, *end; - - LASSERT(nrmax <= LIOD_FLUSH_NR); - - *npgs = 0; - - spin_lock(&pagecache_lock); - - /* if no dirty pages, just return */ - if (list_empty(&mapping->dirty_pages)) { - spin_unlock(&pagecache_lock); - return 0; - } - - list = mapping->dirty_pages.prev; - end = &mapping->dirty_pages; - while (nr < nrmax) { - /* no more dirty pages on this inode */ - if (list == end) - break; - - page = list_entry(list, struct page, list); - list = list->prev; - - /* flush pages only if we could gain the lock */ - if (!TryLockPage(page)) { - /* remove from dirty list */ - list_del(&page->list); - - if (PageDirty(page)) { - page_cache_get(page); - /* add to locked list */ - list_add(&page->list, &mapping->locked_pages); - - select_one_page(&pgs[nr++], inode, page); - - if (nr >= nrmax) - break; - } else { - /* it's quite possible. add to clean list */ - list_add(&page->list, &mapping->clean_pages); - UnlockPage(page); - } - } else { - if (list == &mapping->dirty_pages) - break; - - /* move to head */ - list_del(&page->list); - list_add(&page->list, &mapping->dirty_pages); - if (end == &mapping->dirty_pages) - end = &page->list; - } - } - spin_unlock(&pagecache_lock); - - *npgs = nr; - - if (list == end) - return 0; - else - return 1; -} - -static int bulk_flush_pages( - struct inode *inode, - int npgs, - struct brw_page *pgs, - struct obd_brw_set *set) -{ - struct page *page; - int rc; - - set->brw_callback = ll_brw_sync_wait; - rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), - ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL); - if (rc) { - CERROR("error from obd_brw: rc = %d\n", rc); - } else { - rc = ll_brw_sync_wait(set, CB_PHASE_START); - if (rc) - CERROR("error from callback: rc = %d\n", rc); - } - - rc = 0; - - while (--npgs >= 0) { - page = pgs[npgs].pg; - - LASSERT(PageLocked(page)); - - if (!rc) { - ClearPageDirty(page); - - /* move pages to clean list */ - spin_lock(&pagecache_lock); - list_del(&page->list); - list_add(&page->list, &inode->i_mapping->clean_pages); - spin_unlock(&pagecache_lock); - - refile_clean_page(page); - rc++; - } else { - SetPageDirty(page); - - /* add back to dirty list */ - spin_lock(&pagecache_lock); - list_del(&page->list); - list_add(&page->list, &inode->i_mapping->dirty_pages); - spin_unlock(&pagecache_lock); - } - UnlockPage(page); - - page_cache_release(page); - } - - spin_lock(&pagecache_lock); - if (list_empty(&inode->i_mapping->dirty_pages)) - inode->i_state &= ~I_DIRTY_PAGES; - spin_unlock(&pagecache_lock); + int rc, i; + ENTRY; - return rc; -} + sort_brw_pages(pgs, npgs); -/* synchronously flush certain amount of dirty pages right away - * don't simply call fdatasync(), we need a more efficient way - * to do flush in bunch mode. - * - * return the number of pages were flushed - * - * caller should gain the sbi->io_sem lock - * - * now we simply flush pages on at most one inode, probably - * need add multiple inode flush later. - */ -static int flush_some_pages(struct super_block *sb) -{ - struct ll_io_daemon *iod; - struct brw_page *pgs; - struct obd_brw_set *set; - struct list_head *list, *end; - struct inode *inode; - int npgs; - - iod = &ll_s2sbi(sb)->ll_iod; - set = &iod->io_set; - pgs = iod->io_pgs; - - /* init set */ + memset(set, 0, sizeof(struct obd_brw_set)); init_waitqueue_head(&set->brw_waitq); INIT_LIST_HEAD(&set->brw_desc_head); atomic_set(&set->brw_refcount, 0); + set->brw_callback = ll_brw_sync_wait; - spin_lock(&inode_lock); - - /* sync dirty inodes from tail, since we try to sync - * from the oldest one */ - npgs = 0; - list = sb->s_dirty.prev; - end = &sb->s_dirty; - while (1) { - int ret; - - /* no dirty inodes left */ - if (list == end) - break; - - inode = list_entry(list, struct inode, i_list); - list = list->next; - - /* if inode is locked, it should have been moved away - * from dirty list */ - LASSERT(!(inode->i_state & I_LOCK)); - - npgs = LIOD_FLUSH_NR; - ret = select_inode_pages(inode, pgs, &npgs); - - /* quit if found some pages */ - if (npgs) { - /* if all pages are searched on this inode, - * we could move it to the list head */ - if (!ret) { - list_del(&inode->i_list); - list_add(&inode->i_list, &sb->s_dirty); - } - break; - } else { - /* no page found */ - if (list == &sb->s_dirty) - break; - /* move inode to the end of list */ - list_del(&inode->i_list); - list_add(&inode->i_list, &sb->s_dirty); - if (end == &sb->s_dirty) - end = &inode->i_list; - } - } - spin_unlock(&inode_lock); - - if (!npgs) - return 0; - - LASSERT(inode); - - CDEBUG(D_CACHE, "got %d pages of inode %lu to flush\n", - npgs, inode->i_ino); - - return bulk_flush_pages(inode, npgs, pgs, set); -} - -void ll_balance_dirty_pages(struct super_block *sb) -{ - int flush; - struct ll_sb_info *sbi = ll_s2sbi(sb); - - flush = balance_dirty_state(); - if (flush < 0) - return; - - if (flush > 0) { - int flush; + rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL); + if (rc) { + CERROR("error from obd_brw: rc = %d\n", rc); + } else { + rc = ll_brw_sync_wait(set, CB_PHASE_START); + if (rc) + CERROR("error from callback: rc = %d\n", rc); + } - if (!down_trylock(&sbi->ll_iod.io_sem)) { - do { - flush = flush_some_pages(sb); - } while (flush && (balance_dirty_state() > 0)); + /* XXX this doesn't make sense to me */ + rc = 0; - up(&sbi->ll_iod.io_sem); + for ( i = 0 ; i < npgs ; i++) { + struct page *page = pgs[i].pg; - /* this will sleep until kswapd wakeup us. - * it maybe low efficient but hope could - * slow down the memory-allocation a bit */ - if (balance_dirty_state() >= 0) - wakeup_kswapd(GFP_KSWAPD); - } - } + CDEBUG(D_CACHE, "cleaning page %p\n", page); + LASSERT(PageLocked(page)); + unlock_page(page); + page_cache_release(page); + } - /* FIXME we need a way to wake up liods on *all* llite fs */ - liod_wakeup(&sbi->ll_iod); + EXIT; } -/* called by ll_writepage() - * return 0: we'v gained the lock and do the flushing once - * 1: can't gain lock, do nothing +/* + * this is called by prepare_write when we're low on memory, it wants + * to write back as much dirty data as it can. we'd rather just + * call fsync_dev and let the kernel call writepage on all our dirty + * pages, but i_sem makes that hard. prepare_write holds i_sem from + * generic_file_write, but other writepage callers don't. so we have + * this seperate code path that writes back all the inodes it can get + * i_sem on. */ -int ll_bulk_write_pages(struct inode *inode, struct page *page) +int ll_sb_sync( struct super_block *sb, struct inode *callers_inode ) { - struct super_block *sb = inode->i_sb; - struct ll_sb_info *sbi = ll_s2sbi(sb); - struct ll_io_daemon *iod; - struct obd_brw_set *set; - struct brw_page *pgs; - int npgs, ret; - - /* if can't got the lock, somebody must be doing bulk - * flushing. so just return */ - if (down_trylock(&sbi->ll_iod.io_sem)) - return 1; - - iod = &ll_s2sbi(sb)->ll_iod; - set = &iod->io_set; - pgs = iod->io_pgs; - - /* init set */ - init_waitqueue_head(&set->brw_waitq); - INIT_LIST_HEAD(&set->brw_desc_head); - atomic_set(&set->brw_refcount, 0); - - /* set the page passed in as the first selected page */ - LASSERT(PageLocked(page)); - page_cache_get(page); - select_one_page(pgs, inode, page); - - /* select other pages */ - npgs = LIOD_FLUSH_NR - 1; - ret = select_inode_pages(inode, &pgs[1], &npgs); - if (!ret) { - /* move inode to the end of list */ - spin_lock(&inode_lock); - list_del(&inode->i_list); - list_add(&inode->i_list, &sb->s_dirty); - spin_unlock(&inode_lock); - } - - bulk_flush_pages(inode, npgs+1, pgs, set); - - up(&sbi->ll_iod.io_sem); - return 0; + struct obd_brw_set *set = NULL; + struct brw_page *pgs = NULL; + unsigned long old_flags; /* hack? */ + int making_progress; + int rc = 0; + ENTRY; + + old_flags = current->flags; + current->flags |= PF_MEMALLOC; + set = obd_brw_set_new(); + pgs = kmalloc(LIOD_FLUSH_NR * sizeof(struct brw_page), GFP_ATOMIC); + if ( pgs == NULL || set == NULL ) + GOTO(cleanup, rc = -ENOMEM); + + spin_lock(&inode_lock); + + do { + struct list_head *pos; + int npgs; + struct inode *inode = NULL; + + making_progress = 0; + list_for_each_prev(pos, &sb->s_dirty) { + inode = list_entry(pos, struct inode, i_list); + + if ( ! (inode->i_state & I_DIRTY_PAGES) ) { + inode = NULL; + continue; + } + break; + } + + if ( inode == NULL ) + break; + + /* duplicate __sync_one, *sigh* */ + list_del(&inode->i_list); + list_add(&inode->i_list, &inode->i_sb->s_locked_inodes); + inode->i_state |= I_LOCK; + inode->i_state &= ~I_DIRTY_PAGES; + + spin_unlock(&inode_lock); + + do { + npgs = ll_get_dirty_pages(inode, pgs, LIOD_FLUSH_NR); + if ( npgs ) { + ll_brw_pages_unlock(inode, pgs, npgs, set); + rc += npgs; + making_progress = 1; + } + } while (npgs); + + spin_lock(&inode_lock); + + inode->i_state &= ~I_LOCK; + /* + * we are sneaky and leave the inode on the dirty list, + * even though it might not still be.. + */ + if (!(inode->i_state & I_FREEING)) { + list_del(&inode->i_list); + list_add(&inode->i_list, &inode->i_sb->s_dirty); + } + wake_up(&inode->i_wait); + + } while ( making_progress ); + + spin_unlock(&inode_lock); + +cleanup: + if ( set != NULL ) + obd_brw_set_free(set); + if ( pgs != NULL ) + kfree(pgs); + current->flags = old_flags; + + RETURN(rc); } -void liod_stop(struct super_block *sb) +int ll_batch_writepage( struct inode *inode, struct page *page ) { - struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod; - - if (!test_bit(LIOD_FLAG_ALIVE, &iod->io_flag)) { - CERROR("liod died unexpectedly!\n"); - return; - } - - /* send the kill command */ - set_bit(LIOD_FLAG_STOP, &iod->io_flag); - - /* if wakeup daemon */ - wake_up(&iod->io_sleepq); - - /* wait liod exit */ - wait_event(iod->io_waitq, !test_bit(LIOD_FLAG_ALIVE, &iod->io_flag)); + struct obd_brw_set *set = NULL; + struct brw_page *pgs = NULL; + unsigned long old_flags; /* hack? */ + int npgs; + int rc = 0; + ENTRY; - return; + old_flags = current->flags; + current->flags |= PF_MEMALLOC; + set = obd_brw_set_new(); + pgs = kmalloc(LIOD_FLUSH_NR * sizeof(struct brw_page), GFP_ATOMIC); + if ( pgs == NULL || set == NULL ) + GOTO(cleanup, rc = -ENOMEM); + + fill_brw_page(pgs, inode, page); + npgs = 1; + + npgs += ll_get_dirty_pages(inode, &pgs[npgs], LIOD_FLUSH_NR - npgs); + ll_brw_pages_unlock(inode, pgs, npgs, set); + +cleanup: + if ( set != NULL ) + obd_brw_set_free(set); + if ( pgs != NULL ) + kfree(pgs); + current->flags = old_flags; + RETURN(rc); }