#include <linux/rbtree.h>
#include <linux/seq_file.h>
#include <linux/time.h>
+#include <linux/wait.h>
/* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */
#ifdef PG_inactive_clean
/* we raced with truncate? */
if ( off >= inode->i_size ) {
- int rc;
- rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
- ll_i2info(inode)->lli_smd,
- page->index, page->index);
-
- LASSERT(rc == 0);
- CDEBUG(D_CACHE, "offset "LPU64" (index %lu) > i_size %llu\n",
- off, page->index, inode->i_size);
- unlock_page(page);
+ ll_end_writeback(inode, page);
return 0;
}
struct list_head *pos, *n;
ENTRY;
- PGCACHE_WRLOCK(mapping);
+ ll_pgcache_lock(mapping);
list_for_each_prev_safe(pos, n, &mapping->dirty_pages) {
page = list_entry(pos, struct page, list);
break;
}
- PGCACHE_WRUNLOCK(mapping);
+ ll_pgcache_unlock(mapping);
EXIT;
}
if (rc == 0)
obdo_refresh_inode(inode, oa,
oa->o_valid & ~OBD_MD_FLSIZE);
- ptlrpc_set_destroy (set);
+ ptlrpc_set_destroy(set);
}
/*
* b=1038, we need to pass _brw errors up so that writeback
for (i = 0 ; i < llwp->npgs ; i++) {
struct page *page = llwp->pga[i].pg;
- CDEBUG(D_CACHE, "finished page %p at index %lu\n", page,
- page->index);
- LASSERT(PageLocked(page));
-
- rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
- ll_i2info(inode)->lli_smd,
- page->index, page->index);
- LASSERT(rc == 0);
- unlock_page(page);
- page_cache_release(page);
+ ll_end_writeback(inode, page);
+ page_cache_release(page); /* to match llwp_consume_page */
}
EXIT;
}
+static struct ll_writeback_pages *llwp_alloc(struct inode *inode)
+{
+ struct ll_writeback_pages *llwp;
+ int size, max = (inode->i_blksize >> PAGE_CACHE_SHIFT);
+
+ if (max == 0) {
+ CERROR("forcing llwp->max to 1. blksize: %lu\n",
+ inode->i_blksize);
+ max = 1;
+ }
+ size = sizeof(*llwp) + (max * sizeof(struct brw_page));
+
+ OBD_ALLOC(llwp, size);
+ if (llwp == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ llwp->max = max;
+/* XXX don't worry, this will be gone before you know it.. */
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+ llwp->inode = inode;
+#endif
+
+ RETURN(llwp);
+}
+
+void llwp_free(struct ll_writeback_pages *llwp)
+{
+ int size = sizeof(*llwp) + (llwp->max * sizeof(struct brw_page));
+ OBD_FREE(llwp, size);
+}
+
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#ifndef PG_inactive_clean
+/* 2.4 doesn't give us a way to find out how many pages we have cached 'cause
+ * we're not using buffer_heads. we are very conservative here and flush the
+ * superblock of all dirty data when the vm (rmap or stock) thinks that it is
+ * running low and kswapd would have done work. kupdated isn't good enough
+ * because writers (dbench) can dirty _very quickly_, and we allocate under
+ * writepage..
+ */
+#ifdef PG_inactive_clean /* 2.4 rmap */
+
+static int should_writeback(void)
+{
+ if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
+ return 1;
+ return 0;
+}
+
+# else /* stock 2.4 -aa zone vm */
+
#ifdef CONFIG_DISCONTIGMEM
-#error "sorry, we don't support DISCONTIGMEM yet"
+#error "sorry, we don't support DISCONTIGMEM"
#endif
-
/*
* __alloc_pages marks a zone as needing balancing if an allocation is
* performed when the zone has fewer free pages than its 'low' water
* mark. its cleared when try_to_free_pages makes progress.
*/
-static int zones_need_balancing(void)
+static int should_writeback(void) /* aka zones_need_balancing */
{
pg_data_t * pgdat;
zone_t *zone;
}
return 0;
}
-#endif
-/* 2.4 doesn't give us a way to find out how many pages we have
- * cached 'cause we're not using buffer_heads. we are very
- * conservative here and flush the superblock of all dirty data
- * when the vm (rmap or stock) thinks that it is running low
- * and kswapd would have done work. kupdated isn't good enough
- * because writers (dbench) can dirty _very quickly_, and we
- * allocate under writepage..
- *
- * 2.5 gets this right, see the {inc,dec}_page_state(nr_dirty, )
- */
-static int should_writeback(void)
-{
-#ifdef PG_inactive_clean
- if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
-#else
- if (zones_need_balancing())
-#endif
- return 1;
- return 0;
-}
-
-static int ll_alloc_brw(struct inode *inode, struct ll_writeback_pages *llwp)
-{
- memset(llwp, 0, sizeof(struct ll_writeback_pages));
-
- llwp->max = inode->i_blksize >> PAGE_CACHE_SHIFT;
- if (llwp->max == 0) {
- CERROR("forcing llwp->max to 1. blksize: %lu\n",
- inode->i_blksize);
- llwp->max = 1;
- }
- llwp->pga = kmalloc(llwp->max * sizeof(*llwp->pga), GFP_ATOMIC);
- if (llwp->pga == NULL)
- RETURN(-ENOMEM);
- RETURN(0);
-}
+#endif /* PG_inactive_clean detection of rmap vs stock */
int ll_check_dirty(struct super_block *sb)
{
current->flags |= PF_MEMALLOC;
spin_lock(&inode_lock);
-
/*
- * first we try and write back dirty pages from dirty inodes
- * until the VM thinkgs we're ok again..
+ * we're trying to use a very awkward hammer to throttle lustre's
+ * dirty data here. as long as the vm thinks we're "low" we're
+ * finding dirty inodes and writing out all their data. the
+ * second while loop is waiting for other threads who are doing
+ * the same thing.. we ran into livelocks if one thread was able
+ * to blow through here not finding dirty inodes because another
+ * thread was busy writing them back..
+ *
+ * XXX this is all goofy because low memory can stop it from
+ * working properly. someday we'll be pre-allocating io context
+ * in prepare_write/commit_write.
*/
do {
- struct ll_writeback_pages llwp;
+ struct ll_writeback_pages *llwp;
struct list_head *pos;
inode = NULL;
making_progress = 0;
if (inode == NULL)
break;
- /* duplicate __sync_one, *sigh* */
+ /* lock the inode while we work on it, which duplicates
+ * __sync_one */
list_del(&inode->i_list);
list_add(&inode->i_list, &inode->i_sb->s_locked_inodes);
inode->i_state |= I_LOCK;
inode->i_state &= ~I_DIRTY_PAGES;
spin_unlock(&inode_lock);
+ llwp = llwp_alloc(inode);
+ spin_lock(&inode_lock);
- rc = ll_alloc_brw(inode, &llwp);
- if (rc != 0)
- GOTO(cleanup, rc);
+ if (IS_ERR(llwp)) /* making_progress == 0 will break the loop */
+ goto unlock_inode;
+
+ spin_unlock(&inode_lock);
do {
- llwp.npgs = 0;
- ll_get_dirty_pages(inode, &llwp);
- if (llwp.npgs) {
+ llwp->npgs = 0;
+ ll_get_dirty_pages(inode, llwp);
+ if (llwp->npgs) {
oa.o_id =
ll_i2info(inode)->lli_smd->lsm_object_id;
oa.o_valid = OBD_MD_FLID;
obdo_from_inode(&oa, inode,
- OBD_MD_FLTYPE | OBD_MD_FLATIME|
- OBD_MD_FLMTIME| OBD_MD_FLCTIME);
+ OBD_MD_FLTYPE|OBD_MD_FLATIME|
+ OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+
lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
LPROC_LL_WB_PRESSURE,
- llwp.npgs);
- ll_writeback(inode, &oa, &llwp);
- rc += llwp.npgs;
+ llwp->npgs);
+ ll_writeback(inode, &oa, llwp);
+ rc += llwp->npgs;
making_progress = 1;
}
- } while (llwp.npgs && should_writeback());
+ } while (llwp->npgs && should_writeback());
+
+ llwp_free(llwp);
spin_lock(&inode_lock);
+unlock_inode:
if (!list_empty(&inode->i_mapping->dirty_pages))
inode->i_state |= I_DIRTY_PAGES;
list_add(&inode->i_list, &inode->i_sb->s_dirty);
}
wake_up(&inode->i_wait);
- kfree(llwp.pga);
} while (making_progress && should_writeback());
/*
spin_unlock(&inode_lock);
-cleanup:
current->flags = old_flags;
-
RETURN(rc);
}
-#endif /* linux 2.5 */
+/* called from writepage and allows us to also try and write out other
+ * pages. only called from 2.4 because 2.5 has ->writepages() */
int ll_batch_writepage(struct inode *inode, struct obdo *oa, struct page *page)
{
unsigned long old_flags; /* hack? */
- struct ll_writeback_pages llwp;
+ struct ll_writeback_pages *llwp;
int rc = 0;
ENTRY;
SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
old_flags = current->flags;
current->flags |= PF_MEMALLOC;
- rc = ll_alloc_brw(inode, &llwp);
- if (rc != 0)
- GOTO(restore_flags, rc);
+ llwp = llwp_alloc(inode);
+ if (IS_ERR(llwp))
+ GOTO(restore_flags, PTR_ERR(llwp));
- if (llwp_consume_page(&llwp, inode, page) == 0)
- ll_get_dirty_pages(inode, &llwp);
+ if (llwp_consume_page(llwp, inode, page) == 0)
+ ll_get_dirty_pages(inode, llwp);
- if (llwp.npgs) {
+ if (llwp->npgs) {
lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
- LPROC_LL_WB_WRITEPAGE, llwp.npgs);
- ll_writeback(inode, oa, &llwp);
+ LPROC_LL_WB_WRITEPAGE, llwp->npgs);
+ ll_writeback(inode, oa, llwp);
}
- kfree(llwp.pga);
+ llwp_free(llwp);
restore_flags:
current->flags = old_flags;
RETURN(rc);
}
+#endif
+
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+/* we use a singly linked list of page->private to pass pages between
+ * readpage/writepage and our worker threads without allocating things
+ * and while maintaining fifo order.. */
+void plist_init(struct plist *plist) {
+ plist->pl_head = NULL;
+ plist->pl_tail = NULL;
+ plist->pl_num = 0;
+}
+struct page *plist_get_page(struct plist *plist) {
+ struct page *page = plist->pl_head;
+
+ if (page == NULL)
+ return NULL;
+
+ plist->pl_head = (struct page *)page->private;
+ if (page == plist->pl_tail)
+ plist->pl_tail = NULL;
+ plist->pl_num--;
+ page->private = 0;
+
+ return page;
+}
+void plist_move(struct plist *to, struct plist *from)
+{
+ if (to->pl_head == NULL)
+ *to = *from;
+ else {
+ to->pl_tail->private = (unsigned long)from->pl_head;
+ to->pl_tail = from->pl_tail;
+ to->pl_num += from->pl_num;
+ }
+ plist_init(from);
+}
+void plist_add_page(struct plist *plist, struct page *page)
+{
+ LASSERT(page->private == 0);
+ if (plist->pl_tail) {
+ plist->pl_tail->private = (unsigned long)page;
+ plist->pl_tail = page;
+ } else {
+ plist->pl_head = page;
+ plist->pl_tail = page;
+ }
+ plist->pl_num++;
+}
+
+void lliod_wakeup(struct inode *inode)
+{
+ struct lliod_ctl *lc = &ll_i2sbi(inode)->ll_lc;
+ wake_up(&lc->lc_waitq);
+ lc->lc_new_arrival = 1;
+}
+
+/* wake_lliod can be skipped if the path knows that more lliod_give_s will
+ * be coming before the path waits on the pages.. it must be called before
+ * waiting so that new_arrival is set and lliod comes out of its l_wait */
+void lliod_give_plist(struct inode *inode, struct plist *plist, int rw)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lliod_ctl *lc = &ll_i2sbi(inode)->ll_lc;
+
+ CDEBUG(D_CACHE, "rw: %d plist %p num %d\n", rw, plist,
+ plist ? plist->pl_num : 0);
+
+ if (plist)
+ LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
+
+ spin_lock(&lc->lc_lock);
+ if (list_empty(&lli->lli_lc_item))
+ list_add_tail(&lli->lli_lc_item, &lc->lc_lli_list);
+
+ if (plist) {
+ if (rw == OBD_BRW_WRITE)
+ plist_move(&lli->lli_pl_write, plist);
+ else
+ plist_move(&lli->lli_pl_read, plist);
+ }
+ spin_unlock(&lc->lc_lock);
+}
+
+void lliod_give_page(struct inode *inode, struct page *page, int rw)
+{
+ struct plist plist;
+
+ plist_init(&plist);
+ plist_add_page(&plist, page);
+ lliod_give_plist(inode, &plist, rw);
+}
+
+/* XXX should so something smart with the 'rc' depending on the failover
+ * configuration */
+void lliod_complete_llwp(struct inode *inode, struct ll_writeback_pages *llwp,
+ int rc)
+{
+ struct page *page;
+ int i;
+
+ CDEBUG(D_CACHE, "inode: %p rw: %d rc: %d\n", inode, llwp->rw, rc);
+
+ for (i = 0 ; i < llwp->npgs ; i++) {
+ page = llwp->pga[i].pg;
+
+ CDEBUG(D_CACHE, "page: %p index: %lu\n", page, page->index);
+ if (llwp->rw == OBD_BRW_WRITE)
+ ll_end_writeback(inode, page);
+ else {
+ SetPageUptodate(page);
+ unlock_page(page);
+ }
+
+ page_cache_release(page); /* to match llwp_consume_page */
+ }
+}
+
+/* ok, the clump thing wasn't so hot, lets just do brws as writepage hands
+ * us pages. to avoid inter-inode or read/write starvation we take the
+ * pages off the lli and then consume them all, first reads then writes */
+int lliod_brw(struct lliod_ctl *lc)
+{
+ struct inode *inode = NULL;
+ struct ll_inode_info *lli = NULL;
+ struct ll_writeback_pages *llwp;
+ struct ptlrpc_request_set *set = NULL;
+ struct page *page;
+ struct plist plist_read, plist_write, *plist;
+ int rc = 0, rw, tmp;
+ ENTRY;
+
+ plist_init(&plist_read);
+ plist_init(&plist_write);
+
+ spin_lock(&lc->lc_lock);
+ if (list_empty(&lc->lc_lli_list)) {
+ spin_unlock(&lc->lc_lock);
+ RETURN(0);
+ }
+
+ lli = list_entry(lc->lc_lli_list.next, struct ll_inode_info,
+ lli_lc_item);
+ inode = ll_info2i(lli);
+ list_del_init(&lli->lli_lc_item);
+
+ plist_move(&plist_read, &lli->lli_pl_read);
+ plist_move(&plist_write, &lli->lli_pl_write);
+
+ spin_unlock(&lc->lc_lock);
+
+ llwp = llwp_alloc(inode);
+ if (IS_ERR(llwp)) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ if (plist_read.pl_num) {
+ plist = &plist_read;
+ rw = OBD_BRW_READ;
+ } else {
+ plist = &plist_write;
+ rw = OBD_BRW_WRITE;
+ }
+
+ CDEBUG(D_CACHE, "inode %p #r: %d #w: %d\n", inode, plist_read.pl_num,
+ plist_write.pl_num);
+
+ while (plist->pl_num > 0) {
+ struct obdo oa;
+
+ set = ptlrpc_prep_set();
+ if (set == NULL) {
+ rc = -ENOMEM;
+ break;
+ }
+
+ llwp->npgs = 0;
+ llwp->rw = rw;
+ llwp->inode = inode;
+ while ((page = plist_get_page(plist))) {
+ tmp = llwp_consume_page(llwp, inode, page);
+ page_cache_release(page); /* from writepage */
+ if (tmp)
+ break;
+ }
+ oa.o_id = lli->lli_smd->lsm_object_id;
+ oa.o_valid = OBD_MD_FLID;
+ obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ tmp = obd_brw_async(rw, ll_i2obdconn(inode), &oa,
+ ll_i2info(inode)->lli_smd,
+ llwp->npgs, llwp->pga, set, NULL);
+ if (tmp == 0)
+ tmp = ptlrpc_set_wait(set);
+
+ ptlrpc_set_destroy(set);
+ lliod_complete_llwp(inode, llwp, tmp);
+
+ if (plist->pl_num == 0 && rw == OBD_BRW_READ) {
+ plist = &plist_write;
+ rw = OBD_BRW_WRITE;
+ }
+ }
+
+ llwp_free(llwp);
+out:
+ if (rc) {
+ lliod_give_plist(inode, &plist_read, OBD_BRW_READ);
+ lliod_give_plist(inode, &plist_write, OBD_BRW_WRITE);
+ }
+
+ RETURN(rc);
+}
+
+static int lliod(void *arg)
+{
+ struct lliod_ctl *lc = arg;
+ ENTRY;
+
+ kportal_daemonize("liod_writeback");
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
+ sigfillset(¤t->blocked);
+ recalc_sigpending();
+#else
+ spin_lock_irqsave(¤t->sigmask_lock, flags);
+ sigfillset(¤t->blocked);
+ recalc_sigpending(current);
+ spin_unlock_irqrestore(¤t->sigmask_lock, flags);
+#endif
+
+ complete(&lc->lc_starting);
+
+ /* like kswapd */
+ current->flags |= PF_MEMALLOC;
+
+ while (1) {
+
+ /* XXX re-using the clu waitq for now; its harmless..
+ * we'll update the path depending on clu's fate */
+ wait_event_interruptible(lc->lc_waitq,
+ ( test_bit(LIOD_STOP, &lc->lc_flags) ||
+ (!list_empty(&lc->lc_lli_list)) ) );
+
+ if (test_bit(LIOD_STOP, &lc->lc_flags))
+ break;
+
+ /* sleep for a short amount of time if we get -ENOMEM,
+ * maybe giving the world a chance to free some memory
+ * for us */
+ if (lliod_brw(lc)) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(HZ/100);
+ }
+
+ }
+ /* XXX should be making sure we don't have inodes/
+ * pages still in flight */
+ complete(&lc->lc_finishing);
+ return 0;
+}
+
+int lliod_start(struct ll_sb_info *sbi, struct inode *inode)
+{
+ struct lliod_ctl *lc = &sbi->ll_lc;
+ ENTRY;
+
+ init_completion(&lc->lc_starting);
+ init_completion(&lc->lc_finishing);
+ INIT_LIST_HEAD(&lc->lc_lli_list);
+ init_waitqueue_head(&lc->lc_waitq);
+ lc->lc_flags = 0;
+ lc->lc_new_arrival = 0;
+ spin_lock_init(&lc->lc_lock);
+
+ if (kernel_thread(lliod, &sbi->ll_lc, 0) < 0)
+ RETURN(-ECHILD);
+
+ wait_for_completion(&lc->lc_starting);
+ RETURN(0);
+}
+
+void lliod_stop(struct ll_sb_info *sbi)
+{
+ struct lliod_ctl *lc = &sbi->ll_lc;
+
+ set_bit(LIOD_STOP, &lc->lc_flags);
+ wake_up(&lc->lc_waitq);
+ wait_for_completion(&lc->lc_finishing);
+}
+#endif /* 2.5 check.. */