Whamcloud - gitweb
bring b_io up to the lastest write caching code. fsx and rundbench 1 pass in a
authorzab <zab>
Sat, 15 Feb 2003 21:45:46 +0000 (21:45 +0000)
committerzab <zab>
Sat, 15 Feb 2003 21:45:46 +0000 (21:45 +0000)
96M all-in-one UML.

- prepare_write is throttled by finding dirty pages on the super block's
  dirty inodes and writing them to the network
- commit_write marks the page dirty and updates i_size
- writepage blocks writing the page and other dirty pages to the network
- sort the pages within a obd_brw batch so that block allocation isn't hosed
  on the OST
- don't change s_dirty's position on the list during writeback, that seems to
  be the job of writepage's callers
- don't try and mess with page's list membership after obd_brw completes,
  filemap_fdata{sync,wait} take care of that
- put a hack in obdo_to_inode that tricks ll_file_size into preferring the
  local i_size when there are cached pages on the inode
- add license blurb and editor instructions
- get rid of the management of vm lru pages and liod thread, prepare_write
  throttling and kupdate serve the same task (hopefully)
- remove unused ll_flush_inode_pages
- throw in a OSC-side "count > 0" assert to match a similar OST assert that
  I couldn't reproduce
- writeback will try to batch PTL_MD_MAX_IOV pages on the wire

lustre/llite/iod.c

index 0bc1166..f3c5373 100644 (file)
@@ -1,4 +1,31 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Copyright (C) 2002, 2003  Cluster File Systems, Inc
+ *
+ *  this started as an implementation of an io daemon that woke regularly 
+ *  to force writeback.. the throttling in prepare_write and kupdate's usual
+ *  writeback pressure got rid of our thread, but the file name remains.
+ */
 #include <linux/version.h>
+#include <linux/config.h>
 #include <linux/module.h>
 #include <linux/fs.h>
 #include <linux/stat.h>
 #include <linux/smp_lock.h>
 #include <linux/kmod.h>
 #include <linux/pagemap.h>
-#include <linux/low-latency.h>
-#include <linux/mm_inline.h>
-#include <asm/unistd.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 #include <linux/lustre_lite.h>
 
-/* wakeup every 30s */
-#define LIOD_WAKEUP_CYCLE      (30)
-
-/* FIXME tempororily copy from mm_inline.h */
-static inline void __add_page_to_inactive_clean_list(struct page * page)
-{
-       struct zone_struct * zone = page_zone(page);
-       DEBUG_LRU_PAGE(page);
-       SetPageInactiveClean(page);
-       list_add(&page->lru, &zone->inactive_clean_list);
-       zone->inactive_clean_pages++;
-//     nr_inactive_clean_pages++;
-}
+#ifndef list_for_each_prev_safe
+#define list_for_each_prev_safe(pos, n, head) \
+        for (pos = (head)->prev, n = pos->prev; pos != (head); \
+                pos = n, n = pos->prev )
+#endif
 
-static inline void __del_page_from_active_list(struct page * page)
-{
-       struct zone_struct * zone = page_zone(page);
-       list_del(&page->lru);
-       ClearPageActive(page);
-//     nr_active_pages--;
-       zone->active_pages--;
-       DEBUG_LRU_PAGE(page);
-}
+extern spinlock_t inode_lock;
 
-static inline void __del_page_from_inactive_dirty_list(struct page * page)
+/*
+ * ugh, we want disk allocation on the target to happen in offset order.  we'll
+ * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
+ * fine for our small page arrays and doesn't require allocation.  its an
+ * insertion sort that swaps elements that are strides apart, shrinking the
+ * stride down until its '1' and the array is sorted.
+ */
+void sort_brw_pages(struct brw_page *array, int num)
 {
-       struct zone_struct * zone = page_zone(page);
-       list_del(&page->lru);
-       ClearPageInactiveDirty(page);
-//     nr_inactive_dirty_pages--;
-       zone->inactive_dirty_pages--;
-       DEBUG_LRU_PAGE(page);
+        int stride, i, j;
+        struct brw_page tmp;
+
+        for( stride = 1; stride < num ; stride = (stride*3) +1  )
+                ;
+
+       do {
+               stride /= 3;
+               for ( i = stride ; i < num ; i++ ) {
+                       tmp = array[i];
+                       j = i;
+                       while ( j >= stride && 
+                                        array[j - stride].off > tmp.off ) {
+                               array[j] = array[j - stride];
+                               j -= stride;
+                       }
+                       array[j] = tmp;
+               }
+       } while ( stride > 1 );
 }
 
-/* move page into inactive_clean list.
- *
- * caller need to make sure that this page is not used
- * by anyothers
- */
-void refile_clean_page(struct page *page)
+static inline void fill_brw_page(struct brw_page *pg,
+                                   struct inode *inode,
+                                   struct page *page)
 {
-        LASSERT(PageLocked(page));
-       LASSERT(!PageDirty(page));
-
-        ClearPageReferenced(page);
-       page->age = 0;
-
-        spin_lock(&pagemap_lru_lock);
-        if (PageActive(page)) {
-                __del_page_from_active_list(page);
-                __add_page_to_inactive_clean_list(page);
-        } else if (PageInactiveDirty(page)) {
-                __del_page_from_inactive_dirty_list(page);
-                __add_page_to_inactive_clean_list(page);
-        }
-        spin_unlock(&pagemap_lru_lock);
+        page_cache_get(page);
+
+        pg->pg = page;
+        pg->off = ((obd_off)page->index) << PAGE_SHIFT;
+        pg->flag = OBD_BRW_CREATE;
+        pg->count = PAGE_SIZE;
+
+        /* catch partial writes for files that end mid-page */
+        if ( pg->off + pg->count > inode->i_size )
+                pg->count = inode->i_size & ~PAGE_MASK;
+
+        /*
+         * matches ptlrpc_bulk_get assert that trickles down
+         * from a 0 page length going through niobuf and into
+         * the buffer regions being posted
+         */
+        LASSERT(pg->count >= 0);
+
+        CDEBUG(D_CACHE, "brw_page %p: off %lld cnt %d, "
+                        "page %p: ind %ld\n",
+                        pg, pg->off, pg->count,
+                        page, page->index);
 }
 
-
-/* return value:
- * -1: no need to flush
- * 0:  need async flush
- * 1:  need sync flush
+/* 
+ * returns the number of pages that it added to the pgs array
  *
- * Note: here we are more sensitive than kswapd, hope we could
- * do more flush work by ourselves, not resort to kswapd
+ * this duplicates filemap_fdatasync and gives us an opportunity to grab lots
+ * of dirty pages.. 
  */
-#if 0
-static inline int balance_dirty_state(void)
+static int ll_get_dirty_pages(struct inode *inode, struct brw_page *pgs, 
+                                int nrmax)
 {
-       if (free_high(ALL_ZONES) > 0) {
-               printk("memory low, sync flush\n");
-               return 1;
-       }
-       if (free_plenty(ALL_ZONES) > 0) {
-               printk("memory high, async flush\n");
-               return 0;
-       }
-       else
-               return -1;
-}
-#else
-/* FIXME need verify the parameters later */
-static inline int balance_dirty_state(void)
-{
-       if (free_plenty(ALL_ZONES) > -2048) {
-               return 1;
-       }
-       if (free_plenty(ALL_ZONES) > -4096) {
-               return 0;
-       }
-
-       return -1;
-}
-#endif
-extern spinlock_t inode_lock;
-extern void wakeup_kswapd(unsigned int gfp_mask);
-
-static int flush_some_pages(struct super_block *sb);
-
-/* the main liod loop */
-static int liod_main(void *arg)
-{
-       struct super_block *sb = (struct super_block *)arg;
-        struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod;
-
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        struct list_head *pos, *n;
+        int ret = 0;
         ENTRY;
 
-        lock_kernel();
-        daemonize();
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        spin_lock_irq(&current->sigmask_lock);
-        sigfillset(&current->blocked);
-        our_recalc_sigpending(current);
-        spin_unlock_irq(&current->sigmask_lock);
-#else
-        sigfillset(&current->blocked);
-        our_recalc_sigpending(current);
-#endif
-
-        sprintf(current->comm, "liod");
-        unlock_kernel();
-
-        /* declare we are ready */
-       set_bit(LIOD_FLAG_ALIVE, &iod->io_flag);
-        wake_up(&iod->io_waitq);
-
-#if 0
-       current->flags |= PF_KERNTHREAD;
-#endif
-
-#if 0
-       pgdat_list->node_zones[0].pages_min *= 2;
-       pgdat_list->node_zones[0].pages_low *= 2;
-       pgdat_list->node_zones[0].pages_high *= 2;
-       pgdat_list->node_zones[0].pages_plenty *= 2;
-#endif
-
-        CDEBUG(D_CACHE, "liod(%d) started\n", current->pid);
-        while (1) {
-               int flushed;
-               int t;
-
-               /* check the stop command */
-               if (test_bit(LIOD_FLAG_STOP, &iod->io_flag)) {
-                       /* at umount time, should not be anyone
-                        * trying to flushing pages */
-                       LASSERT(!waitqueue_active(&iod->io_sem.wait));
-                       break;
-               }
-
-               t = interruptible_sleep_on_timeout(&iod->io_sleepq,
-                                              LIOD_WAKEUP_CYCLE*HZ);
-               CDEBUG(D_NET, "liod(%d) active due to %s\n", current->pid,
-                               (t ? "wakeup" : "timeout"));
-
-               /* try to flush */
-               down(&iod->io_sem);
-               do {
-                       flushed = flush_some_pages(sb);
-                       conditional_schedule();
-               } while (flushed && (balance_dirty_state() >= 0));
-               up(&iod->io_sem);
-
-               /* if still out of balance, it shows all dirty
-                * pages generate by this llite are flushing or
-                * flushed, so inbalance must be caused by other
-                * part of the kernel. here we wakeup kswapd
-                * immediately, it probably too earliar (because
-                * we are more sensitive than kswapd), but could
-                * gurantee the the amount of free+inactive_clean
-                * pages, at least could accelerate aging of pages
-                *
-                * Note: it start kswapd and return immediately
-                */
-               if (balance_dirty_state() >= 0)
-                       wakeup_kswapd(GFP_ATOMIC);
-       }
-
-       clear_bit(LIOD_FLAG_ALIVE, &iod->io_flag);
-        wake_up(&iod->io_waitq);
-
-        CDEBUG(D_NET, "liod(%d) exit\n", current->pid);
-        RETURN(0);
-}
+        spin_lock(&pagecache_lock);
 
-int liod_start(struct super_block *sb)
-{
-       struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod;
-        int rc;
+        list_for_each_prev_safe(pos, n, &mapping->dirty_pages) {
+                if ( ret == nrmax )
+                        break;
+                page = list_entry(pos, struct page, list);
 
-        /* initialize */
-        iod->io_flag = 0;
-        init_waitqueue_head(&iod->io_sleepq);
-        init_waitqueue_head(&iod->io_waitq);
-       init_MUTEX(&iod->io_sem);
+                if (TryLockPage(page))
+                        continue;
 
-        rc = kernel_thread(liod_main, (void *) sb,
-                           CLONE_VM | CLONE_FS | CLONE_FILES);
+                list_del(&page->list);
+                list_add(&page->list, &mapping->locked_pages);
 
-        if (rc < 0) {
-               CERROR("fail to start liod, error %d\n", rc);
-                return rc;
+                if (PageDirty(page)) {
+                        ClearPageDirty(page);
+                        fill_brw_page(&pgs[ret], inode, page);
+                        ret++;
+                } else 
+                        UnlockPage(page);
         }
 
-       /* wait liod start */
-       wait_event(iod->io_waitq, test_bit(LIOD_FLAG_ALIVE, &iod->io_flag));
-
-        return 0;
+        spin_unlock(&pagecache_lock);
+        RETURN(ret);
 }
 
-static inline void liod_wakeup(struct ll_io_daemon *iod)
+static void ll_brw_pages_unlock( struct inode *inode, struct brw_page *pgs, 
+                int npgs, struct obd_brw_set *set)
 {
-        wake_up(&iod->io_sleepq);
-}
-
-static inline void select_one_page(struct brw_page *pg,
-                                   struct inode *inode,
-                                   struct page *page)
-{
-       obd_off off;
-
-       pg->pg = page;
-       pg->off = ((obd_off)page->index) << PAGE_SHIFT;
-       pg->flag = OBD_BRW_CREATE;
-
-       off = ((obd_off)(page->index + 1)) << PAGE_SHIFT;
-       if (off > inode->i_size)
-               pg->count = inode->i_size & ~PAGE_MASK;
-       else
-               pg->count = PAGE_SIZE;
-}
-
-/* select candidate dirty pages within an inode
- * return:
- * - npgs contains number of pages selected
- * - 0: all pages in dirty list are searched
- *   1: probably still have dirty pages
- *
- * don't sleep in this functions
- * */
-static int select_inode_pages(struct inode *inode, struct brw_page *pgs, int *npgs)
-{
-       int nrmax = *npgs, nr = 0;
-       struct address_space *mapping = inode->i_mapping;
-       struct page *page;
-       struct list_head *list, *end;
-
-       LASSERT(nrmax <= LIOD_FLUSH_NR);
-
-       *npgs = 0;
-
-       spin_lock(&pagecache_lock);
-
-       /* if no dirty pages, just return */
-       if (list_empty(&mapping->dirty_pages)) {
-               spin_unlock(&pagecache_lock);
-               return 0;
-       }
-
-       list = mapping->dirty_pages.prev;
-       end = &mapping->dirty_pages;
-       while (nr < nrmax) {
-               /* no more dirty pages on this inode */
-               if (list == end)
-                       break;
-
-               page = list_entry(list, struct page, list);
-               list = list->prev;
-
-               /* flush pages only if we could gain the lock */
-               if (!TryLockPage(page)) {
-                       /* remove from dirty list */
-                       list_del(&page->list);
-
-                       if (PageDirty(page)) {
-                               page_cache_get(page);
-                               /* add to locked list */
-                               list_add(&page->list, &mapping->locked_pages);
-
-                               select_one_page(&pgs[nr++], inode, page);
-                               
-                               if (nr >= nrmax)
-                                       break;
-                       } else {
-                               /* it's quite possible. add to clean list */
-                               list_add(&page->list, &mapping->clean_pages);
-                               UnlockPage(page);
-                       }
-               } else {
-                       if (list == &mapping->dirty_pages)
-                               break;
-
-                       /* move to head */
-                       list_del(&page->list);
-                       list_add(&page->list, &mapping->dirty_pages);
-                       if (end == &mapping->dirty_pages)
-                               end = &page->list;
-               }
-       }
-       spin_unlock(&pagecache_lock);
-
-       *npgs = nr;
-
-       if (list == end)
-               return 0;
-       else
-               return 1;
-}
-
-static int bulk_flush_pages(
-               struct inode *inode,
-               int npgs,
-               struct brw_page *pgs,
-               struct obd_brw_set *set)
-{
-       struct page *page;
-       int rc;
-       
-       set->brw_callback = ll_brw_sync_wait;
-       rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
-                     ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
-       if (rc) {
-               CERROR("error from obd_brw: rc = %d\n", rc);
-       } else {
-               rc = ll_brw_sync_wait(set, CB_PHASE_START);
-               if (rc)
-                       CERROR("error from callback: rc = %d\n", rc);
-       }
-
-       rc = 0;
-
-       while (--npgs >= 0) {
-               page = pgs[npgs].pg;
-
-               LASSERT(PageLocked(page));
-
-               if (!rc) {
-                       ClearPageDirty(page);
-
-                       /* move pages to clean list */
-                       spin_lock(&pagecache_lock);
-                       list_del(&page->list);
-                       list_add(&page->list, &inode->i_mapping->clean_pages);
-                       spin_unlock(&pagecache_lock);
-
-                       refile_clean_page(page);
-                       rc++;
-               } else {
-                       SetPageDirty(page);
-
-                       /* add back to dirty list */
-                       spin_lock(&pagecache_lock);
-                       list_del(&page->list);
-                       list_add(&page->list, &inode->i_mapping->dirty_pages);
-                       spin_unlock(&pagecache_lock);
-               }
-               UnlockPage(page);
-
-               page_cache_release(page);
-       }
-
-       spin_lock(&pagecache_lock);
-       if (list_empty(&inode->i_mapping->dirty_pages))
-               inode->i_state &= ~I_DIRTY_PAGES;
-       spin_unlock(&pagecache_lock);
+        int rc, i;
+        ENTRY;
 
-       return rc;
-}
+        sort_brw_pages(pgs, npgs);
 
-/* synchronously flush certain amount of dirty pages right away
- * don't simply call fdatasync(), we need a more efficient way
- * to do flush in bunch mode.
- *
- * return the number of pages were flushed
- *
- * caller should gain the sbi->io_sem lock
- *
- * now we simply flush pages on at most one inode, probably
- * need add multiple inode flush later.
- */
-static int flush_some_pages(struct super_block *sb)
-{
-       struct ll_io_daemon *iod;
-       struct brw_page *pgs;
-       struct obd_brw_set *set;
-       struct list_head *list, *end;
-       struct inode *inode;
-       int npgs;
-
-       iod = &ll_s2sbi(sb)->ll_iod;
-       set = &iod->io_set;
-       pgs = iod->io_pgs;
-
-       /* init set */
+        memset(set, 0, sizeof(struct obd_brw_set));
         init_waitqueue_head(&set->brw_waitq);
         INIT_LIST_HEAD(&set->brw_desc_head);
         atomic_set(&set->brw_refcount, 0);
+        set->brw_callback = ll_brw_sync_wait;
 
-       spin_lock(&inode_lock);
-
-       /* sync dirty inodes from tail, since we try to sync
-        * from the oldest one */
-       npgs = 0;
-       list = sb->s_dirty.prev;
-       end = &sb->s_dirty;
-       while (1) {
-               int ret;
-                       
-               /* no dirty inodes left */
-               if (list == end)
-                       break;
-
-               inode = list_entry(list, struct inode, i_list);
-               list = list->next;
-
-               /* if inode is locked, it should have been moved away
-                * from dirty list */
-               LASSERT(!(inode->i_state & I_LOCK));
-
-               npgs = LIOD_FLUSH_NR;
-               ret = select_inode_pages(inode, pgs, &npgs);
-
-               /* quit if found some pages */
-               if (npgs) {
-                       /* if all pages are searched on this inode,
-                        * we could move it to the list head */
-                       if (!ret) {
-                               list_del(&inode->i_list);
-                               list_add(&inode->i_list, &sb->s_dirty);
-                       }
-                       break;
-               } else {
-                       /* no page found */
-                       if (list == &sb->s_dirty)
-                               break;
-                       /* move inode to the end of list */
-                       list_del(&inode->i_list);
-                       list_add(&inode->i_list, &sb->s_dirty);
-                       if (end == &sb->s_dirty)
-                               end = &inode->i_list;
-               }
-       }
-       spin_unlock(&inode_lock);
-
-       if (!npgs)
-               return 0;
-
-       LASSERT(inode);
-
-       CDEBUG(D_CACHE, "got %d pages of inode %lu to flush\n",
-                       npgs, inode->i_ino);
-
-       return bulk_flush_pages(inode, npgs, pgs, set);
-}
-
-void ll_balance_dirty_pages(struct super_block *sb)
-{
-       int flush;
-       struct ll_sb_info *sbi = ll_s2sbi(sb);
-
-       flush = balance_dirty_state();
-       if (flush < 0)
-               return;
-
-       if (flush > 0) {
-               int flush;
+        rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
+                     ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
+        if (rc) {
+                CERROR("error from obd_brw: rc = %d\n", rc);
+        } else {
+                rc = ll_brw_sync_wait(set, CB_PHASE_START);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
+        }
 
-               if (!down_trylock(&sbi->ll_iod.io_sem)) {
-                       do {
-                               flush = flush_some_pages(sb);
-                       } while (flush && (balance_dirty_state() > 0));
+        /* XXX this doesn't make sense to me */
+        rc = 0;
 
-                       up(&sbi->ll_iod.io_sem);
+        for ( i = 0 ; i < npgs ; i++) {
+                struct page *page = pgs[i].pg;
 
-                       /* this will sleep until kswapd wakeup us.
-                        * it maybe low efficient but hope could
-                        * slow down the memory-allocation a bit */
-                       if (balance_dirty_state() >= 0)
-                               wakeup_kswapd(GFP_KSWAPD);
-               }
-       }
+                CDEBUG(D_CACHE, "cleaning page %p\n", page);
+                LASSERT(PageLocked(page));
+                unlock_page(page);
+                page_cache_release(page);
+        }
 
-       /* FIXME we need a way to wake up liods on *all* llite fs */
-       liod_wakeup(&sbi->ll_iod);
+        EXIT;
 }
 
-/* called by ll_writepage()
- * return 0: we'v gained the lock and do the flushing once
- *        1: can't gain lock, do nothing
+/*
+ * this is called by prepare_write when we're low on memory, it wants
+ * to write back as much dirty data as it can.  we'd rather just
+ * call fsync_dev and let the kernel call writepage on all our dirty
+ * pages, but i_sem makes that hard.  prepare_write holds i_sem from
+ * generic_file_write, but other writepage callers don't.   so we have
+ * this seperate code path that writes back all the inodes it can get
+ * i_sem on.
  */
-int ll_bulk_write_pages(struct inode *inode, struct page *page)
+int ll_sb_sync( struct super_block *sb, struct inode *callers_inode )
 {
-       struct super_block *sb = inode->i_sb;
-       struct ll_sb_info *sbi = ll_s2sbi(sb);
-       struct ll_io_daemon *iod;
-       struct obd_brw_set *set;
-       struct brw_page *pgs;
-       int npgs, ret;
-
-       /* if can't got the lock, somebody must be doing bulk
-        * flushing. so just return */
-       if (down_trylock(&sbi->ll_iod.io_sem))
-               return 1;
-
-       iod = &ll_s2sbi(sb)->ll_iod;
-       set = &iod->io_set;
-       pgs = iod->io_pgs;
-
-       /* init set */
-       init_waitqueue_head(&set->brw_waitq);
-       INIT_LIST_HEAD(&set->brw_desc_head);
-       atomic_set(&set->brw_refcount, 0);
-
-       /* set the page passed in as the first selected page */
-       LASSERT(PageLocked(page));
-       page_cache_get(page);
-       select_one_page(pgs, inode, page);
-
-       /* select other pages */
-       npgs = LIOD_FLUSH_NR - 1;
-       ret = select_inode_pages(inode, &pgs[1], &npgs);
-       if (!ret) {
-               /* move inode to the end of list */
-               spin_lock(&inode_lock);
-               list_del(&inode->i_list);
-               list_add(&inode->i_list, &sb->s_dirty);
-               spin_unlock(&inode_lock);
-       }
-
-       bulk_flush_pages(inode, npgs+1, pgs, set);
-
-       up(&sbi->ll_iod.io_sem);
-       return 0;
+        struct obd_brw_set *set = NULL;
+        struct brw_page *pgs = NULL;
+        unsigned long old_flags; /* hack? */
+        int making_progress;
+        int rc = 0;
+        ENTRY;
+
+        old_flags = current->flags;
+        current->flags |= PF_MEMALLOC;
+        set = obd_brw_set_new();
+        pgs = kmalloc(LIOD_FLUSH_NR * sizeof(struct brw_page), GFP_ATOMIC);
+        if ( pgs == NULL || set == NULL )
+                GOTO(cleanup, rc = -ENOMEM);
+
+        spin_lock(&inode_lock);
+
+        do {
+                struct list_head *pos;
+                int npgs;
+                struct inode *inode = NULL;
+
+                making_progress = 0;
+                list_for_each_prev(pos, &sb->s_dirty) {
+                        inode = list_entry(pos, struct inode, i_list);
+
+                        if ( ! (inode->i_state & I_DIRTY_PAGES) ) {
+                                inode = NULL;
+                                continue; 
+                        }
+                        break;
+                }
+
+                if ( inode == NULL )
+                        break;
+
+                /* duplicate __sync_one, *sigh* */
+                list_del(&inode->i_list);
+                list_add(&inode->i_list, &inode->i_sb->s_locked_inodes);
+                inode->i_state |= I_LOCK;
+                inode->i_state &= ~I_DIRTY_PAGES;
+
+                spin_unlock(&inode_lock);
+
+                do { 
+                        npgs = ll_get_dirty_pages(inode, pgs, LIOD_FLUSH_NR);
+                        if ( npgs ) {
+                                ll_brw_pages_unlock(inode, pgs, npgs, set);
+                                rc += npgs;
+                                making_progress = 1;
+                        }
+                } while (npgs);
+
+                spin_lock(&inode_lock);
+
+                inode->i_state &= ~I_LOCK;
+                /*
+                 * we are sneaky and leave the inode on the dirty list, 
+                 * even though it might not still be.. 
+                 */
+                if (!(inode->i_state & I_FREEING)) {
+                        list_del(&inode->i_list);
+                        list_add(&inode->i_list, &inode->i_sb->s_dirty);
+                }
+                wake_up(&inode->i_wait);
+
+        } while ( making_progress );
+
+        spin_unlock(&inode_lock);
+
+cleanup:
+        if ( set != NULL )
+                obd_brw_set_free(set);
+        if ( pgs != NULL )
+                kfree(pgs);
+        current->flags = old_flags;
+
+        RETURN(rc);
 }
 
-void liod_stop(struct super_block *sb)
+int ll_batch_writepage( struct inode *inode, struct page *page )
 {
-       struct ll_io_daemon *iod = &ll_s2sbi(sb)->ll_iod;
-
-       if (!test_bit(LIOD_FLAG_ALIVE, &iod->io_flag)) {
-               CERROR("liod died unexpectedly!\n");
-               return;
-       }
-
-        /* send the kill command */
-       set_bit(LIOD_FLAG_STOP, &iod->io_flag);
-
-        /* if wakeup daemon */
-        wake_up(&iod->io_sleepq);
-
-       /* wait liod exit */
-       wait_event(iod->io_waitq, !test_bit(LIOD_FLAG_ALIVE, &iod->io_flag));
+        struct obd_brw_set *set = NULL;
+        struct brw_page *pgs = NULL;
+        unsigned long old_flags; /* hack? */
+        int npgs;
+        int rc = 0;
+        ENTRY;
 
-       return;
+        old_flags = current->flags;
+        current->flags |= PF_MEMALLOC;
+        set = obd_brw_set_new();
+        pgs = kmalloc(LIOD_FLUSH_NR * sizeof(struct brw_page), GFP_ATOMIC);
+        if ( pgs == NULL || set == NULL )
+                GOTO(cleanup, rc = -ENOMEM);
+
+        fill_brw_page(pgs, inode, page);
+        npgs = 1;
+
+        npgs += ll_get_dirty_pages(inode, &pgs[npgs], LIOD_FLUSH_NR - npgs);
+        ll_brw_pages_unlock(inode, pgs, npgs, set);
+
+cleanup:
+        if ( set != NULL )
+                obd_brw_set_free(set);
+        if ( pgs != NULL )
+                kfree(pgs);
+        current->flags = old_flags;
+        RETURN(rc);
 }