Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / llite / rw.c
index 8e9aaf2..df7ab9b 100644 (file)
@@ -65,22 +65,21 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
         ENTRY;
 
         pg.pg = page;
-        pg.disk_offset = pg.page_offset = ((obd_off)page->index) << PAGE_SHIFT;
+        pg.off = ((obd_off)page->index) << PAGE_SHIFT;
 
-        if (cmd == OBD_BRW_WRITE &&
-            (pg.disk_offset + PAGE_SIZE > inode->i_size))
+        if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
                 pg.count = inode->i_size % PAGE_SIZE;
         else
                 pg.count = PAGE_SIZE;
 
         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
                cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
-               pg.disk_offset, pg.disk_offset);
+               pg.off, pg.off);
         if (pg.count == 0) {
                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
-                       LPU64"\n", inode->i_ino, inode, inode->i_size,
-                       page->mapping->host, page->mapping->host->i_size,
-                       page->index, pg.disk_offset);
+                       LPU64"\n",
+                       inode->i_ino, inode, inode->i_size, page->mapping->host,
+                       page->mapping->host->i_size, page->index, pg.off);
         }
 
         pg.flag = flags;
@@ -119,7 +118,8 @@ void ll_truncate(struct inode *inode)
         }
 
         oa.o_id = lsm->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
+        oa.o_gr = lsm->lsm_object_gr;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE|OBD_MD_FLMODE|OBD_MD_FLATIME|
                                     OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
@@ -155,18 +155,20 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         int rc = 0;
         ENTRY;
 
-        LASSERT(PageLocked(page));
-        (void)llap_cast_private(page); /* assertion */
+        if (!PageLocked(page))
+                LBUG();
 
         /* Check to see if we should return -EIO right away */
         pga.pg = page;
-        pga.disk_offset = pga.page_offset = offset;
+        pga.off = offset;
         pga.count = PAGE_SIZE;
         pga.flag = 0;
 
         oa.o_id = lsm->lsm_object_id;
+        oa.o_gr = lsm->lsm_object_gr;
         oa.o_mode = inode->i_mode;
-        oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE
+                                | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
 
         rc = obd_brw(OBD_BRW_CHECK, ll_i2obdexp(inode), &oa, lsm, 1, &pga,
                      NULL);
@@ -209,6 +211,21 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         return rc;
 }
 
+int ll_write_count(struct page *page)
+{
+        struct inode *inode = page->mapping->host;
+
+        /* catch race with truncate */
+        if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
+                return 0;
+
+        /* catch sub-page write at end of file */
+        if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
+                return inode->i_size % PAGE_SIZE;
+
+        return PAGE_SIZE;
+}
+
 struct ll_async_page *llap_from_cookie(void *cookie)
 {
         struct ll_async_page *llap = cookie;
@@ -229,7 +246,8 @@ static int ll_ap_make_ready(void *data, int cmd)
 
         page = llap->llap_page;
 
-        LASSERT(cmd != OBD_BRW_READ);
+        if (cmd == OBD_BRW_READ)
+                RETURN(0);
 
         /* we're trying to write, but the page is locked.. come back later */
         if (TryLockPage(page))
@@ -249,26 +267,9 @@ static int ll_ap_make_ready(void *data, int cmd)
         RETURN(0);
 }
 
-/* We have two reasons for giving llite the opportunity to change the 
- * write length of a given queued page as it builds the RPC containing
- * the page: 
- *
- * 1) Further extending writes may have landed in the page cache
- *    since a partial write first queued this page requiring us
- *    to write more from the page cache.
- * 2) We might have raced with truncate and want to avoid performing
- *    write RPCs that are just going to be thrown away by the 
- *    truncate's punch on the storage targets.
- *
- * The kms serves these purposes as it is set at both truncate and extending
- * writes.
- */
 static int ll_ap_refresh_count(void *data, int cmd)
 {
         struct ll_async_page *llap;
-        struct lov_stripe_md *lsm;
-        struct page *page;
-        __u64 kms;
         ENTRY;
 
         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
@@ -278,19 +279,7 @@ static int ll_ap_refresh_count(void *data, int cmd)
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
-        page = llap->llap_page;
-        lsm = ll_i2info(page->mapping->host)->lli_smd;
-        kms = lov_merge_size(lsm, 1);
-
-        /* catch race with truncate */
-        if (((__u64)page->index << PAGE_SHIFT) >= kms)
-                return 0;
-
-        /* catch sub-page write at end of file */
-        if (((__u64)page->index << PAGE_SHIFT) + PAGE_SIZE > kms)
-                return kms % PAGE_SIZE;
-
-        return PAGE_SIZE;
+        return ll_write_count(llap->llap_page);
 }
 
 void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
@@ -301,11 +290,13 @@ void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
         lsm = ll_i2info(inode)->lli_smd;
 
         oa->o_id = lsm->lsm_object_id;
-        oa->o_valid = OBD_MD_FLID;
+        oa->o_gr = lsm->lsm_object_gr;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
         if (cmd == OBD_BRW_WRITE) {
                 oa->o_valid |= OBD_MD_FLIFID | OBD_MD_FLEPOCH;
                 mdc_pack_fid(obdo_fid(oa), inode->i_ino, 0, inode->i_mode);
+                obdo_fid(oa)->mds = ll_i2info(inode)->lli_mds;
                 oa->o_easize = ll_i2info(inode)->lli_io_epoch;
 
                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
@@ -336,17 +327,6 @@ static struct obd_async_page_ops ll_async_page_ops = {
         .ap_completion =        ll_ap_completion,
 };
 
-struct ll_async_page *llap_cast_private(struct page *page)
-{
-        struct ll_async_page *llap = (struct ll_async_page *)page->private;
-
-        LASSERTF(llap == NULL || llap->llap_magic == LLAP_MAGIC,
-                 "page %p private %lu gave magic %d which != %d\n",
-                 page, page->private, llap->llap_magic, LLAP_MAGIC);
-
-        return llap;
-}
-
 /* XXX have the exp be an argument? */
 struct ll_async_page *llap_from_page(struct page *page)
 {
@@ -357,9 +337,12 @@ struct ll_async_page *llap_from_page(struct page *page)
         int rc;
         ENTRY;
 
-        llap = llap_cast_private(page);
-        if (llap != NULL)
+        llap = (struct ll_async_page *)page->private;
+        if (llap != NULL) {
+                if (llap->llap_magic != LLAP_MAGIC)
+                        RETURN(ERR_PTR(-EINVAL));
                 RETURN(llap);
+        }
 
         exp = ll_i2obdexp(page->mapping->host);
         if (exp == NULL)
@@ -383,10 +366,10 @@ struct ll_async_page *llap_from_page(struct page *page)
         page->private = (unsigned long)llap;
         llap->llap_page = page;
 
-        spin_lock(&sbi->ll_lock);
+        spin_lock(&sbi->ll_pglist_lock);
         sbi->ll_pglist_gen++;
         list_add_tail(&llap->llap_proc_item, &sbi->ll_pglist);
-        spin_unlock(&sbi->ll_lock);
+        spin_unlock(&sbi->ll_pglist_lock);
 
         RETURN(llap);
 }
@@ -483,75 +466,6 @@ out:
         RETURN(rc);
 }
 
-static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
-{
-        unsigned long ret;
-        ENTRY;
-
-        spin_lock(&sbi->ll_lock);
-        ret = min(sbi->ll_max_read_ahead_pages - sbi->ll_read_ahead_pages,
-                  len);
-        sbi->ll_read_ahead_pages += ret;
-        spin_unlock(&sbi->ll_lock);
-
-        RETURN(ret);
-}
-
-static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
-{
-        spin_lock(&sbi->ll_lock);
-        LASSERTF(sbi->ll_read_ahead_pages >= len, "r_a_p %lu len %lu\n",
-                 sbi->ll_read_ahead_pages, len);
-        sbi->ll_read_ahead_pages -= len;
-        spin_unlock(&sbi->ll_lock);
-}
-
-/* called for each page in a completed rpc.*/
-void ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
-{
-        struct ll_async_page *llap;
-        struct page *page;
-        ENTRY;
-
-        llap = llap_from_cookie(data);
-        if (IS_ERR(llap)) {
-                EXIT;
-                return;
-        }
-
-        page = llap->llap_page;
-        LASSERT(PageLocked(page));
-
-        LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
-
-        if (cmd == OBD_BRW_READ && llap->llap_defer_uptodate)
-                ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
-
-        if (rc == 0)  {
-                if (cmd == OBD_BRW_READ) {
-                        if (!llap->llap_defer_uptodate)
-                                SetPageUptodate(page);
-                } else {
-                        llap->llap_write_queued = 0;
-                }
-                ClearPageError(page);
-        } else {
-                if (cmd == OBD_BRW_READ)
-                        llap->llap_defer_uptodate = 0;
-                SetPageError(page);
-        }
-
-        unlock_page(page);
-
-        if (0 && cmd == OBD_BRW_WRITE) {
-                llap_write_complete(page->mapping->host, llap);
-                ll_try_done_writing(page->mapping->host);
-        }
-
-        page_cache_release(page);
-        EXIT;
-}
-
 /* the kernel calls us here when a page is unhashed from the page cache.
  * the page will be locked and the kernel is holding a spinlock, so
  * we need to be careful.  we're just tearing down our book-keeping
@@ -601,11 +515,11 @@ void ll_removepage(struct page *page)
          * is providing exclusivity to memory pressure/truncate/writeback..*/
         page->private = 0;
 
-        spin_lock(&sbi->ll_lock);
+        spin_lock(&sbi->ll_pglist_lock);
         if (!list_empty(&llap->llap_proc_item))
                 list_del_init(&llap->llap_proc_item);
         sbi->ll_pglist_gen++;
-        spin_unlock(&sbi->ll_lock);
+        spin_unlock(&sbi->ll_pglist_lock);
         OBD_FREE(llap, sizeof(*llap));
         EXIT;
 }
@@ -618,7 +532,7 @@ static int ll_page_matches(struct page *page, int fd_flags)
         int flags, matches;
         ENTRY;
 
-        if (fd_flags & LL_FILE_GROUP_LOCKED)
+        if (fd_flags & LL_FILE_CW_LOCKED)
                 RETURN(1);
 
         page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
@@ -643,8 +557,7 @@ static int ll_issue_page_read(struct obd_export *exp,
         llap->llap_defer_uptodate = defer;
         rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
-                                PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY
-                                              | ASYNC_URGENT);
+                                PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
         if (rc) {
                 LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
                 page_cache_release(page);
@@ -652,155 +565,171 @@ static int ll_issue_page_read(struct obd_export *exp,
         RETURN(rc);
 }
 
-#define RAS_CDEBUG(ras) \
-        CDEBUG(D_READA, "lrp %lu c %lu ws %lu wl %lu nra %lu\n",        \
-               ras->ras_last_readpage, ras->ras_consecutive,            \
-               ras->ras_window_start, ras->ras_window_len,              \
-               ras->ras_next_readahead);
+#define LL_RA_MIN(inode) ((unsigned long)PTLRPC_MAX_BRW_PAGES / 2)
+#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \
+                          PAGE_CACHE_SHIFT)
 
-static int ll_readahead(struct ll_readahead_state *ras,
+static void ll_readahead(struct ll_readahead_state *ras,
                          struct obd_export *exp, struct address_space *mapping,
                          struct obd_io_group *oig, int flags)
 {
-        unsigned long i, start = 0, end = 0, reserved;
+        unsigned long i, start, end;
         struct ll_async_page *llap;
         struct page *page;
-        int rc, ret = 0;
-        __u64 kms;
-        ENTRY;
+        int rc;
 
-        kms = lov_merge_size(ll_i2info(mapping->host)->lli_smd, 1);
-        if (kms == 0)
-                RETURN(0);
+        if (mapping->host->i_size == 0)
+                return;
 
         spin_lock(&ras->ras_lock);
 
-        if (ras->ras_window_len) {
-                start = ras->ras_next_readahead;
-                end = ras->ras_window_start + ras->ras_window_len - 1;
-                end = min(end, (unsigned long)(kms >> PAGE_CACHE_SHIFT));
-                ras->ras_next_readahead = max(end, end + 1);
+        /* make sure to issue a window's worth of read-ahead pages */
+        end = ras->ras_last;
+        start = end - ras->ras_window;
+        if (start > end)
+                start = 0;
 
-                RAS_CDEBUG(ras);
-        }
+        /* but don't iterate over pages that we've already issued.  this
+         * will set start to end + 1 if we've already read-ahead up to
+         * ras_last sothe for() won't be entered */
+        if (ras->ras_next_index > start)
+                start = ras->ras_next_index;
+        if (end != ~0UL)
+                ras->ras_next_index = end + 1;
 
-        spin_unlock(&ras->ras_lock);
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               start, end);
 
-        if (end == 0)
-                RETURN(0);
+        spin_unlock(&ras->ras_lock);
 
-        reserved = ll_ra_count_get(ll_i2sbi(mapping->host), end - start + 1);
+        /* clamp to filesize */
+        i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT;
+        end = min(end, i);
 
-        for (i = start; reserved > 0 && i <= end; i++) {
-                /* skip locked pages from previous readpage calls */
+        for (i = start; i <= end; i++) {
+                /* grab_cache_page_nowait returns null if this races with
+                 * truncating the page (page->mapping == NULL) */
                 page = grab_cache_page_nowait(mapping, i);
-                if (page == NULL) {
-                        CDEBUG(D_READA, "g_c_p_n failed\n");
-                        continue;
-                }
-                
-                /* we do this first so that we can see the page in the /proc
-                 * accounting */
-                llap = llap_from_page(page);
-                if (IS_ERR(llap) || llap->llap_defer_uptodate)
-                        goto next_page;
+                if (page == NULL)
+                       break;
 
-                /* skip completed pages */
+                /* the book-keeping above promises that we've tried
+                 * all the indices from start to end, so we don't
+                 * stop if anyone returns an error. This may not be good. */
                 if (Page_Uptodate(page))
                         goto next_page;
 
-                /* bail when we hit the end of the lock. */
                 if ((rc = ll_page_matches(page, flags)) <= 0) {
                         LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
                                        "lock match failed: rc %d\n", rc);
-                        i = end;
                         goto next_page;
                 }
 
+                llap = llap_from_page(page);
+                if (IS_ERR(llap) || llap->llap_defer_uptodate)
+                        goto next_page;
+
                 rc = ll_issue_page_read(exp, llap, oig, 1);
-                if (rc == 0) {
-                        reserved--;
-                        ret++;
-                        LL_CDEBUG_PAGE(D_READA| D_PAGE, page, 
-                                       "started read-ahead\n");
-                }
+                if (rc == 0)
+                        LL_CDEBUG_PAGE(D_PAGE, page, "started read-ahead\n");
                 if (rc) {
         next_page:
-                        LL_CDEBUG_PAGE(D_READA | D_PAGE, page, 
-                                       "skipping read-ahead\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "skipping read-ahead\n");
 
                         unlock_page(page);
                 }
                 page_cache_release(page);
         }
-
-        LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
-        if (reserved != 0)
-                ll_ra_count_put(ll_i2sbi(mapping->host), reserved);
-        RETURN(ret);
-}
-
-static void ras_set_start(struct ll_readahead_state *ras,
-                               unsigned long index)
-{
-        ras->ras_window_start = index & (~(PTLRPC_MAX_BRW_PAGES - 1));
-        ras->ras_next_readahead = max(ras->ras_window_start,
-                                      ras->ras_next_readahead);
 }
 
 /* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras,
-                      unsigned long index)
+static void ll_readahead_set(struct inode *inode,
+                             struct ll_readahead_state *ras,
+                             unsigned long index)
 {
-        ras->ras_last_readpage = index;
-        ras->ras_consecutive = 1;
-        ras->ras_window_len = 0;
-        ras_set_start(ras, index);
-        ras->ras_next_readahead = ras->ras_window_start;
-
-        RAS_CDEBUG(ras);
+        ras->ras_next_index = index;
+        if (ras->ras_next_index != ~0UL)
+                ras->ras_next_index++;
+        ras->ras_window = LL_RA_MIN(inode);
+        ras->ras_last = ras->ras_next_index + ras->ras_window;
+        if (ras->ras_last < ras->ras_next_index)
+                ras->ras_last = ~0UL;
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               index);
 }
 
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
 {
         spin_lock_init(&ras->ras_lock);
-        ras_reset(ras, 0);
+        ll_readahead_set(inode, ras, 0);
 }
 
-static void ras_update(struct ll_readahead_state *ras,
-                       unsigned long index, unsigned long max)
+static void ll_readahead_update(struct inode *inode,
+                                struct ll_readahead_state *ras,
+                                unsigned long index, int hit)
 {
-        ENTRY;
+        unsigned long issued_start, new_last;
 
         spin_lock(&ras->ras_lock);
 
-        if (index != ras->ras_last_readpage + 1) {
-                ras_reset(ras, index);
-                GOTO(out_unlock, 0);
-        }
-
-        ras->ras_last_readpage = index;
-        ras->ras_consecutive++;
-        ras_set_start(ras, index);
-
-        if (ras->ras_consecutive == 2) {
-                ras->ras_window_len = PTLRPC_MAX_BRW_PAGES;
-                GOTO(out_unlock, 0);
+        /* we're interested in noticing the index's relation to the
+         * previously issued read-ahead pages */
+        issued_start = ras->ras_next_index - ras->ras_window - 1;
+        if (issued_start > ras->ras_next_index)
+                issued_start = 0;
+
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               hit ? "hit" : "miss", index, issued_start);
+        if (!hit &&
+            index == ras->ras_next_index && index == ras->ras_last + 1) {
+                /* special case the kernel's read-ahead running into the
+                 * page just beyond our read-ahead window as an extension
+                 * of our read-ahead.  sigh.  wishing it was easier to
+                 * turn off 2.4's read-ahead. */
+                ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 1);
+                if (index != ~0UL)
+                        ras->ras_next_index = index + 1;
+                ras->ras_last = index;
+        } else if (!hit &&
+                   (index > issued_start || ras->ras_next_index >= index)) {
+                /* deal with a miss way out of the window.  we interpret
+                 * this as a seek and restart the window */
+                ll_readahead_set(inode, ras, index);
+
+        } else if (!hit &&
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a miss inside the window?  surely its memory pressure
+                 * evicting our read pages before the app can see them.
+                 * we shrink the window aggressively */
+                unsigned long old_window = ras->ras_window;
+
+                ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN(inode));
+                ras->ras_last -= old_window - ras->ras_window;
+                if (ras->ras_next_index > ras->ras_last)
+                        ras->ras_next_index = ras->ras_last + 1;
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
+
+        } else if (hit &&
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a hit inside the window.  grow the window by twice the
+                 * number of pages that are satisified within the window.  */
+                ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 2);
+
+                /* we want the next readahead pass to issue a windows worth
+                 * beyond where the app currently is */
+                new_last = index + ras->ras_window;
+                if (new_last > ras->ras_last)
+                        ras->ras_last = new_last;
+
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
         }
 
-        /* we need to increase the window sometimes.  we'll arbitrarily
-         * do it half-way through the pages in an rpc */
-        if ((index & (PTLRPC_MAX_BRW_PAGES - 1)) == 
-            (PTLRPC_MAX_BRW_PAGES >> 1)) {
-                ras->ras_window_len += PTLRPC_MAX_BRW_PAGES;
-                ras->ras_window_len = min(ras->ras_window_len, max);
-        }
-
-        EXIT;
-out_unlock:
-        RAS_CDEBUG(ras);
         spin_unlock(&ras->ras_lock);
-        return;
 }
 
 /*
@@ -841,22 +770,19 @@ int ll_readpage(struct file *filp, struct page *page)
         if (IS_ERR(llap))
                 GOTO(out, rc = PTR_ERR(llap));
 
-        if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
-                ras_update(&fd->fd_ras, page->index, 
-                           ll_i2sbi(inode)->ll_max_read_ahead_pages);
-
         if (llap->llap_defer_uptodate) {
-                rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
-                                  fd->fd_flags);
-                if (rc > 0)
-                        obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, 
-                                             NULL, oig);
+                ll_readahead_update(inode, &fd->fd_ras, page->index, 1);
+                ll_readahead(&fd->fd_ras, exp, page->mapping, oig,fd->fd_flags);
+                obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
+                                     oig);
                 LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
                 SetPageUptodate(page);
                 unlock_page(page);
                 GOTO(out_oig, rc = 0);
         }
 
+        ll_readahead_update(inode, &fd->fd_ras, page->index, 0);
+
         rc = ll_page_matches(page, fd->fd_flags);
         if (rc < 0) {
                 LL_CDEBUG_PAGE(D_ERROR, page, "lock match failed: rc %d\n", rc);
@@ -869,10 +795,15 @@ int ll_readpage(struct file *filp, struct page *page)
                        inode->i_ino, page->index,
                        (long long)page->index << PAGE_CACHE_SHIFT);
                 if (time_after(jiffies, next_print)) {
-                        CWARN("ino %lu page %lu (%llu) not covered by "
+                        CERROR("ino %lu page %lu (%llu) not covered by "
                                "a lock (mmap?).  check debug logs.\n",
                                inode->i_ino, page->index,
                                (long long)page->index << PAGE_CACHE_SHIFT);
+                        ldlm_dump_all_namespaces();
+                        if (next_print == 0) {
+                                CERROR("%s\n", portals_debug_dumpstack());
+                                portals_debug_dumplog();
+                        }
                         next_print = jiffies + 30 * HZ;
                 }
         }
@@ -882,9 +813,8 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out, rc);
 
         LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
-        if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
-                ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
-                             fd->fd_flags);
+        if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+                ll_readahead(&fd->fd_ras, exp, page->mapping, oig,fd->fd_flags);
 
         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
 
@@ -896,3 +826,38 @@ out_oig:
                 oig_release(oig);
         RETURN(rc);
 }
+
+#if 0
+/* this is for read pages.  we issue them as ready but not urgent.  when
+ * someone waits on them we fire them off, hopefully merged with adjacent
+ * reads that were queued by read-ahead.  */
+int ll_sync_page(struct page *page)
+{
+        struct obd_export *exp;
+        struct ll_async_page *llap;
+        int rc;
+        ENTRY;
+
+        /* we're using a low bit flag to signify that a queued read should
+         * be issued once someone goes to lock it.  it is also cleared
+         * as the page is built into an RPC */
+        if (!test_and_clear_bit(LL_PRIVBITS_READ, &page->private))
+                RETURN(0);
+
+        /* careful to only deref page->mapping after checking the bit */
+        exp = ll_i2obdexp(page->mapping->host);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                RETURN(PTR_ERR(llap));
+
+        LL_CDEBUG_PAGE(D_PAGE, page, "setting ready|urgent\n");
+
+        rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
+                                 NULL, llap->llap_cookie,
+                                 ASYNC_READY|ASYNC_URGENT);
+        return rc;
+}
+#endif