Whamcloud - gitweb
land b_unify on b_devel
authorphil <phil>
Fri, 25 Jul 2003 21:18:26 +0000 (21:18 +0000)
committerphil <phil>
Fri, 25 Jul 2003 21:18:26 +0000 (21:18 +0000)
 - patches for 2.6
 - linus-approved APIs
 - lots of little reorg and fixes for 2.4/2.6 compatibility
 - better code to manage MDC lock cancellation and dentry memory pressure

lustre/llite/iod.c

index c30ef8a..eeb307a 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/rbtree.h>
 #include <linux/seq_file.h>
 #include <linux/time.h>
+#include <linux/wait.h>
 
 /* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */
 #ifdef PG_inactive_clean
@@ -69,15 +70,7 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp,
 
         /* we raced with truncate? */
         if ( off >= inode->i_size ) {
-                int rc;
-                rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
-                                          ll_i2info(inode)->lli_smd,
-                                          page->index, page->index);
-
-                LASSERT(rc == 0);
-                CDEBUG(D_CACHE, "offset "LPU64" (index %lu) > i_size %llu\n",
-                       off, page->index, inode->i_size);
-                unlock_page(page);
+                ll_end_writeback(inode, page);
                 return 0;
         }
 
@@ -123,7 +116,7 @@ static void ll_get_dirty_pages(struct inode *inode,
         struct list_head *pos, *n;
         ENTRY;
 
-        PGCACHE_WRLOCK(mapping);
+        ll_pgcache_lock(mapping);
 
         list_for_each_prev_safe(pos, n, &mapping->dirty_pages) {
                 page = list_entry(pos, struct page, list);
@@ -144,7 +137,7 @@ static void ll_get_dirty_pages(struct inode *inode,
                         break;
         }
 
-        PGCACHE_WRUNLOCK(mapping);
+        ll_pgcache_unlock(mapping);
         EXIT;
 }
 
@@ -173,7 +166,7 @@ static void ll_writeback(struct inode *inode, struct obdo *oa,
                 if (rc == 0)
                         obdo_refresh_inode(inode, oa,
                                            oa->o_valid & ~OBD_MD_FLSIZE);
-                ptlrpc_set_destroy (set);
+                ptlrpc_set_destroy(set);
         }
         /*
          * b=1038, we need to pass _brw errors up so that writeback
@@ -192,34 +185,72 @@ static void ll_writeback(struct inode *inode, struct obdo *oa,
         for (i = 0 ; i < llwp->npgs ; i++) {
                 struct page *page = llwp->pga[i].pg;
 
-                CDEBUG(D_CACHE, "finished page %p at index %lu\n", page,
-                       page->index);
-                LASSERT(PageLocked(page));
-
-                rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
-                                          ll_i2info(inode)->lli_smd,
-                                          page->index, page->index);
-                LASSERT(rc == 0);
-                unlock_page(page);
-                page_cache_release(page);
+                ll_end_writeback(inode, page);
+                page_cache_release(page); /* to match llwp_consume_page */
         }
 
         EXIT;
 }
 
+static struct ll_writeback_pages *llwp_alloc(struct inode *inode)
+{
+        struct ll_writeback_pages *llwp;
+        int size, max = (inode->i_blksize >> PAGE_CACHE_SHIFT);
+
+        if (max == 0) {
+                CERROR("forcing llwp->max to 1.  blksize: %lu\n",
+                       inode->i_blksize);
+                max = 1;
+        }
+        size = sizeof(*llwp) + (max * sizeof(struct brw_page));
+
+        OBD_ALLOC(llwp, size);
+        if (llwp == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+        llwp->max = max;
+/* XXX don't worry, this will be gone before you know it.. */
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+        llwp->inode = inode;
+#endif
+
+        RETURN(llwp);
+}
+
+void llwp_free(struct ll_writeback_pages *llwp)
+{
+        int size = sizeof(*llwp) + (llwp->max * sizeof(struct brw_page));
+        OBD_FREE(llwp, size);
+}
+
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 
-#ifndef PG_inactive_clean
+/* 2.4 doesn't give us a way to find out how many pages we have cached 'cause
+ * we're not using buffer_heads.  we are very conservative here and flush the
+ * superblock of all dirty data when the vm (rmap or stock) thinks that it is
+ * running low and kswapd would have done work.  kupdated isn't good enough
+ * because writers (dbench) can dirty _very quickly_, and we allocate under
+ * writepage..
+ */
+#ifdef PG_inactive_clean  /* 2.4 rmap */
+
+static int should_writeback(void)
+{
+        if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
+                return 1;
+        return 0;
+}
+
+# else  /* stock 2.4 -aa zone vm */
+
 #ifdef CONFIG_DISCONTIGMEM
-#error "sorry, we don't support DISCONTIGMEM yet"
+#error "sorry, we don't support DISCONTIGMEM"
 #endif
-
 /*
  * __alloc_pages marks a zone as needing balancing if an allocation is
  * performed when the zone has fewer free pages than its 'low' water
  * mark.  its cleared when try_to_free_pages makes progress.
  */
-static int zones_need_balancing(void)
+static int should_writeback(void) /* aka zones_need_balancing */
 {
         pg_data_t * pgdat;
         zone_t *zone;
@@ -235,43 +266,7 @@ static int zones_need_balancing(void)
         }
         return 0;
 }
-#endif
-/* 2.4 doesn't give us a way to find out how many pages we have
- * cached 'cause we're not using buffer_heads.  we are very
- * conservative here and flush the superblock of all dirty data
- * when the vm (rmap or stock) thinks that it is running low
- * and kswapd would have done work.  kupdated isn't good enough
- * because writers (dbench) can dirty _very quickly_, and we
- * allocate under writepage..
- *
- * 2.5 gets this right, see the {inc,dec}_page_state(nr_dirty, )
- */
-static int should_writeback(void)
-{
-#ifdef PG_inactive_clean
-        if (free_high(ALL_ZONES) > 0 || free_low(ANY_ZONE) > 0)
-#else
-        if (zones_need_balancing())
-#endif
-                return 1;
-        return 0;
-}
-
-static int ll_alloc_brw(struct inode *inode, struct ll_writeback_pages *llwp)
-{
-        memset(llwp, 0, sizeof(struct ll_writeback_pages));
-
-        llwp->max = inode->i_blksize >> PAGE_CACHE_SHIFT;
-        if (llwp->max == 0) {
-                CERROR("forcing llwp->max to 1.  blksize: %lu\n",
-                       inode->i_blksize);
-                llwp->max = 1;
-        }
-        llwp->pga = kmalloc(llwp->max * sizeof(*llwp->pga), GFP_ATOMIC);
-        if (llwp->pga == NULL)
-                RETURN(-ENOMEM);
-        RETURN(0);
-}
+#endif /* PG_inactive_clean detection of rmap vs stock */
 
 int ll_check_dirty(struct super_block *sb)
 {
@@ -289,13 +284,21 @@ int ll_check_dirty(struct super_block *sb)
         current->flags |= PF_MEMALLOC;
 
         spin_lock(&inode_lock);
-
         /*
-         * first we try and write back dirty pages from dirty inodes
-         * until the VM thinkgs we're ok again..
+         * we're trying to use a very awkward hammer to throttle lustre's
+         * dirty data here.  as long as the vm thinks we're "low" we're
+         * finding dirty inodes and writing out all their data.  the
+         * second while loop is waiting for other threads who are doing
+         * the same thing.. we ran into livelocks if one thread was able
+         * to blow through here not finding dirty inodes because another
+         * thread was busy writing them back..
+         *
+         * XXX this is all goofy because low memory can stop it from 
+         * working properly.  someday we'll be pre-allocating io context
+         * in prepare_write/commit_write.
          */
         do {
-                struct ll_writeback_pages llwp;
+                struct ll_writeback_pages *llwp;
                 struct list_head *pos;
                 inode = NULL;
                 making_progress = 0;
@@ -313,39 +316,47 @@ int ll_check_dirty(struct super_block *sb)
                 if (inode == NULL)
                         break;
 
-                /* duplicate __sync_one, *sigh* */
+                /* lock the inode while we work on it, which duplicates
+                 * __sync_one */
                 list_del(&inode->i_list);
                 list_add(&inode->i_list, &inode->i_sb->s_locked_inodes);
                 inode->i_state |= I_LOCK;
                 inode->i_state &= ~I_DIRTY_PAGES;
 
                 spin_unlock(&inode_lock);
+                llwp = llwp_alloc(inode);
+                spin_lock(&inode_lock);
 
-                rc = ll_alloc_brw(inode, &llwp);
-                if (rc != 0)
-                        GOTO(cleanup, rc);
+                if (IS_ERR(llwp)) /* making_progress == 0 will break the loop */
+                        goto unlock_inode;
+
+                spin_unlock(&inode_lock);
 
                 do {
-                        llwp.npgs = 0;
-                        ll_get_dirty_pages(inode, &llwp);
-                        if (llwp.npgs) {
+                        llwp->npgs = 0;
+                        ll_get_dirty_pages(inode, llwp);
+                        if (llwp->npgs) {
                                 oa.o_id =
                                       ll_i2info(inode)->lli_smd->lsm_object_id;
                                 oa.o_valid = OBD_MD_FLID;
                                 obdo_from_inode(&oa, inode,
-                                                OBD_MD_FLTYPE | OBD_MD_FLATIME|
-                                                OBD_MD_FLMTIME| OBD_MD_FLCTIME);
+                                                OBD_MD_FLTYPE|OBD_MD_FLATIME|
+                                                OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+
                                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
                                                     LPROC_LL_WB_PRESSURE,
-                                                    llwp.npgs);
-                                ll_writeback(inode, &oa, &llwp);
-                                rc += llwp.npgs;
+                                                    llwp->npgs);
+                                ll_writeback(inode, &oa, llwp);
+                                rc += llwp->npgs;
                                 making_progress = 1;
                         }
-                } while (llwp.npgs && should_writeback());
+                } while (llwp->npgs && should_writeback());
+
+                llwp_free(llwp);
 
                 spin_lock(&inode_lock);
 
+unlock_inode:
                 if (!list_empty(&inode->i_mapping->dirty_pages))
                         inode->i_state |= I_DIRTY_PAGES;
 
@@ -359,7 +370,6 @@ int ll_check_dirty(struct super_block *sb)
                         list_add(&inode->i_list, &inode->i_sb->s_dirty);
                 }
                 wake_up(&inode->i_wait);
-                kfree(llwp.pga);
         } while (making_progress && should_writeback());
 
         /*
@@ -382,38 +392,327 @@ int ll_check_dirty(struct super_block *sb)
 
         spin_unlock(&inode_lock);
 
-cleanup:
         current->flags = old_flags;
-
         RETURN(rc);
 }
-#endif /* linux 2.5 */
 
+/* called from writepage and allows us to also try and write out other
+ * pages.  only called from 2.4 because 2.5 has ->writepages() */
 int ll_batch_writepage(struct inode *inode, struct obdo *oa, struct page *page)
 {
         unsigned long old_flags; /* hack? */
-        struct ll_writeback_pages llwp;
+        struct ll_writeback_pages *llwp;
         int rc = 0;
         ENTRY;
 
         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
         old_flags = current->flags;
         current->flags |= PF_MEMALLOC;
-        rc = ll_alloc_brw(inode, &llwp);
-        if (rc != 0)
-                GOTO(restore_flags, rc);
+        llwp = llwp_alloc(inode);
+        if (IS_ERR(llwp))
+                GOTO(restore_flags, PTR_ERR(llwp));
 
-        if (llwp_consume_page(&llwp, inode, page) == 0)
-                ll_get_dirty_pages(inode, &llwp);
+        if (llwp_consume_page(llwp, inode, page) == 0)
+                ll_get_dirty_pages(inode, llwp);
 
-        if (llwp.npgs) {
+        if (llwp->npgs) {
                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_WB_WRITEPAGE, llwp.npgs);
-                ll_writeback(inode, oa, &llwp);
+                                    LPROC_LL_WB_WRITEPAGE, llwp->npgs);
+                ll_writeback(inode, oa, llwp);
         }
-        kfree(llwp.pga);
+        llwp_free(llwp);
 
 restore_flags:
         current->flags = old_flags;
         RETURN(rc);
 }
+#endif
+
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+/* we use a singly linked list of page->private to pass pages between
+ * readpage/writepage and our worker threads without allocating things
+ * and while maintaining fifo order.. */
+void plist_init(struct plist *plist) {
+        plist->pl_head = NULL;
+        plist->pl_tail = NULL;
+        plist->pl_num = 0;
+}
+struct page *plist_get_page(struct plist *plist) {
+        struct page *page = plist->pl_head;
+
+        if (page == NULL)
+                return NULL;
+
+        plist->pl_head = (struct page *)page->private;
+        if (page == plist->pl_tail)
+                plist->pl_tail = NULL;
+        plist->pl_num--;
+        page->private = 0;
+
+        return page;
+}
+void plist_move(struct plist *to, struct plist *from)
+{
+        if (to->pl_head == NULL) 
+                *to = *from;
+        else {
+                to->pl_tail->private = (unsigned long)from->pl_head;
+                to->pl_tail = from->pl_tail;
+                to->pl_num += from->pl_num;
+        }
+        plist_init(from);
+}
+void plist_add_page(struct plist *plist, struct page *page)
+{
+        LASSERT(page->private == 0);
+        if (plist->pl_tail) {
+                plist->pl_tail->private = (unsigned long)page;
+                plist->pl_tail = page;
+        } else {
+                plist->pl_head = page;
+                plist->pl_tail = page;
+        }
+        plist->pl_num++;
+}
+
+void lliod_wakeup(struct inode *inode)
+{
+        struct lliod_ctl *lc = &ll_i2sbi(inode)->ll_lc;
+        wake_up(&lc->lc_waitq);
+        lc->lc_new_arrival = 1;
+}
+
+/* wake_lliod can be skipped if the path knows that more lliod_give_s will
+ * be coming before the path waits on the pages.. it must be called before
+ * waiting so that new_arrival is set and lliod comes out of its l_wait */
+void lliod_give_plist(struct inode *inode, struct plist *plist, int rw)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lliod_ctl *lc = &ll_i2sbi(inode)->ll_lc;
+
+        CDEBUG(D_CACHE, "rw: %d plist %p num %d\n", rw, plist, 
+                        plist ? plist->pl_num : 0);
+
+        if (plist)
+                LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
+
+        spin_lock(&lc->lc_lock);
+        if (list_empty(&lli->lli_lc_item)) 
+                list_add_tail(&lli->lli_lc_item, &lc->lc_lli_list);
+
+        if (plist) {
+                if (rw == OBD_BRW_WRITE) 
+                        plist_move(&lli->lli_pl_write, plist);
+                else
+                        plist_move(&lli->lli_pl_read, plist);
+        }
+        spin_unlock(&lc->lc_lock);
+}
+
+void lliod_give_page(struct inode *inode, struct page *page, int rw)
+{
+        struct plist plist;
+
+        plist_init(&plist);
+        plist_add_page(&plist, page);
+        lliod_give_plist(inode, &plist, rw);
+}
+
+/* XXX should so something smart with the 'rc' depending on the failover
+ * configuration  */
+void lliod_complete_llwp(struct inode *inode, struct ll_writeback_pages *llwp,
+                         int rc)
+{
+        struct page *page;
+        int i;
+
+        CDEBUG(D_CACHE, "inode: %p rw: %d rc: %d\n", inode, llwp->rw, rc);
+
+        for (i = 0 ; i < llwp->npgs ; i++) {
+                page = llwp->pga[i].pg;
+
+                CDEBUG(D_CACHE, "page: %p index: %lu\n", page, page->index);
+                if (llwp->rw == OBD_BRW_WRITE)
+                        ll_end_writeback(inode, page);
+                else {
+                        SetPageUptodate(page);
+                        unlock_page(page);
+                }
+
+                page_cache_release(page); /* to match llwp_consume_page */
+        }
+}
+
+/* ok, the clump thing wasn't so hot, lets just do brws as writepage hands
+ * us pages.  to avoid inter-inode or read/write starvation we take the
+ * pages off the lli and then consume them all, first reads then writes */
+int lliod_brw(struct lliod_ctl *lc)
+{
+        struct inode *inode = NULL;
+        struct ll_inode_info *lli = NULL;
+        struct ll_writeback_pages *llwp;
+        struct ptlrpc_request_set *set = NULL;
+        struct page *page;
+        struct plist plist_read, plist_write, *plist;
+        int rc = 0, rw, tmp;
+        ENTRY;
+
+        plist_init(&plist_read);
+        plist_init(&plist_write);
+
+        spin_lock(&lc->lc_lock);
+        if (list_empty(&lc->lc_lli_list)) {
+                spin_unlock(&lc->lc_lock);
+                RETURN(0);
+        }
+
+        lli = list_entry(lc->lc_lli_list.next, struct ll_inode_info, 
+                         lli_lc_item);
+        inode = ll_info2i(lli);
+        list_del_init(&lli->lli_lc_item);
+
+        plist_move(&plist_read, &lli->lli_pl_read);
+        plist_move(&plist_write, &lli->lli_pl_write);
+
+        spin_unlock(&lc->lc_lock);
+
+        llwp = llwp_alloc(inode);
+        if (IS_ERR(llwp)) {
+                rc = -ENOMEM;
+                goto out;
+        }
+
+        if (plist_read.pl_num) {
+                plist = &plist_read;
+                rw = OBD_BRW_READ;
+        } else {
+                plist = &plist_write;
+                rw = OBD_BRW_WRITE;
+        }
+
+        CDEBUG(D_CACHE, "inode %p #r: %d #w: %d\n", inode, plist_read.pl_num,
+               plist_write.pl_num);
+
+        while (plist->pl_num > 0) {
+                struct obdo oa;
+
+                set = ptlrpc_prep_set();
+                if (set == NULL) {
+                        rc = -ENOMEM;
+                        break;
+                }
+
+                llwp->npgs = 0;
+                llwp->rw = rw;
+                llwp->inode = inode;
+                while ((page = plist_get_page(plist))) {
+                        tmp = llwp_consume_page(llwp, inode, page);
+                        page_cache_release(page); /* from writepage */
+                        if (tmp)
+                                break;
+                }
+                oa.o_id = lli->lli_smd->lsm_object_id;
+                oa.o_valid = OBD_MD_FLID;
+                obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+                tmp = obd_brw_async(rw, ll_i2obdconn(inode), &oa,
+                                   ll_i2info(inode)->lli_smd, 
+                                   llwp->npgs, llwp->pga, set, NULL);
+                if (tmp == 0)
+                        tmp = ptlrpc_set_wait(set);
+
+                ptlrpc_set_destroy(set);
+                lliod_complete_llwp(inode, llwp, tmp);
+
+                if (plist->pl_num == 0 && rw == OBD_BRW_READ) {
+                        plist = &plist_write;
+                        rw = OBD_BRW_WRITE;
+                }
+        }
+
+        llwp_free(llwp);
+out:
+        if (rc) {
+                lliod_give_plist(inode, &plist_read, OBD_BRW_READ);
+                lliod_give_plist(inode, &plist_write, OBD_BRW_WRITE);
+        }
+
+        RETURN(rc);
+}
+             
+static int lliod(void *arg)
+{
+        struct lliod_ctl *lc = arg;
+        ENTRY;
+
+        kportal_daemonize("liod_writeback");
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
+        sigfillset(&current->blocked);
+        recalc_sigpending();
+#else
+        spin_lock_irqsave(&current->sigmask_lock, flags);
+        sigfillset(&current->blocked);
+        recalc_sigpending(current);
+        spin_unlock_irqrestore(&current->sigmask_lock, flags);
+#endif
+
+        complete(&lc->lc_starting);
+
+        /* like kswapd */
+        current->flags |= PF_MEMALLOC;
+
+        while (1) {
+
+                /* XXX re-using the clu waitq for now; its harmless.. 
+                 * we'll update the path depending on clu's fate */ 
+                wait_event_interruptible(lc->lc_waitq,
+                                ( test_bit(LIOD_STOP, &lc->lc_flags) ||
+                                  (!list_empty(&lc->lc_lli_list)) ) );
+
+                if (test_bit(LIOD_STOP, &lc->lc_flags))
+                        break;
+
+                /* sleep for a short amount of time if we get -ENOMEM, 
+                 * maybe giving the world a chance to free some memory
+                 * for us */
+                if (lliod_brw(lc)) {
+                        set_current_state(TASK_INTERRUPTIBLE);
+                        schedule_timeout(HZ/100);
+                }
+
+        }
+        /* XXX should be making sure we don't have inodes/
+         * pages still in flight */
+        complete(&lc->lc_finishing);
+        return 0;
+}
+
+int lliod_start(struct ll_sb_info *sbi, struct inode *inode)
+{
+        struct lliod_ctl *lc = &sbi->ll_lc;
+        ENTRY;
+
+        init_completion(&lc->lc_starting);
+        init_completion(&lc->lc_finishing);
+        INIT_LIST_HEAD(&lc->lc_lli_list);
+        init_waitqueue_head(&lc->lc_waitq);
+        lc->lc_flags = 0;
+        lc->lc_new_arrival = 0;
+        spin_lock_init(&lc->lc_lock);
+
+        if (kernel_thread(lliod, &sbi->ll_lc, 0) < 0) 
+                RETURN(-ECHILD);
+
+        wait_for_completion(&lc->lc_starting);
+        RETURN(0);
+}
+
+void lliod_stop(struct ll_sb_info *sbi)
+{
+        struct lliod_ctl *lc = &sbi->ll_lc;
+
+        set_bit(LIOD_STOP, &lc->lc_flags);
+        wake_up(&lc->lc_waitq);
+        wait_for_completion(&lc->lc_finishing);
+}
+#endif /* 2.5 check.. */