Whamcloud - gitweb
b=2252
authorzab <zab>
Thu, 18 Dec 2003 04:13:42 +0000 (04:13 +0000)
committerzab <zab>
Thu, 18 Dec 2003 04:13:42 +0000 (04:13 +0000)
r=adilger
(didn't see regressions in buffalo, confirmed read throughput increases
with sf and fpp multi-node IOR)

This cleans up llite's readpage path and implements our own read-ahead window
that hangs off of ll_file_data.  The broad goal is to keep a fair amount of
read-ahead pages issued and queued which can be fired off into read rpcs as
read-ahead rpcs are completed.

lnet/include/linux/kp30.h
lnet/utils/debug.c
lustre/include/linux/lustre_compat25.h
lustre/include/linux/lustre_lite.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/rw.c
lustre/llite/rw24.c
lustre/osc/osc_request.c
lustre/portals/include/linux/kp30.h
lustre/portals/utils/debug.c

index 2cd7b06..b0889a2 100644 (file)
@@ -71,6 +71,7 @@ extern unsigned int portal_cerror;
 #define D_HA        (1 << 19) /* recovery and failover */
 #define D_RPCTRACE  (1 << 20) /* for distributed debugging */
 #define D_VFSTRACE  (1 << 21)
+#define D_READA     (1 << 22) /* read-ahead */
 
 #ifdef __KERNEL__
 # include <linux/sched.h> /* THREAD_SIZE */
index 3f3e69c..14750d8 100644 (file)
@@ -69,7 +69,7 @@ static const char *portal_debug_masks[] =
         {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
          "blocks", "net", "warning", "buffs", "other", "dentry", "portals",
          "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace",
-         NULL};
+         "reada", NULL};
 
 struct debug_daemon_cmd {
         char *cmd;
index fdd1abf..62bde7c 100644 (file)
@@ -145,6 +145,13 @@ static inline void lustre_daemonize_helper(void)
 #define conditional_schedule() if (unlikely(need_resched())) schedule()
 #endif
 
+/* 2.6 has the lovely PagePrivate bit for indicating that a filesystem
+ * has hung state off of page->private.  We use it. */
+#define PG_private 9 /* unused in 2.4, apparently. */
+#define SetPagePrivate(page)    set_bit(PG_private, &(page)->flags)
+#define ClearPagePrivate(page)  clear_bit(PG_private, &(page)->flags)
+#define PagePrivate(page)       test_bit(PG_private, &(page)->flags)
+
 #endif /* end of 2.4 compat macros */
 
 #endif /* __KERNEL__ */
index 6c02f81..c496b42 100644 (file)
 /* careful, this is easy to screw up */
 #define PAGE_CACHE_MAXBYTES ((__u64)(~0UL) << PAGE_CACHE_SHIFT)
 
-extern kmem_cache_t *ll_file_data_slab;
-struct ll_file_data {
-        struct obd_client_handle fd_mds_och;
-        __u32 fd_flags;
-};
 
 /*
 struct lustre_intent_data {
index 1a535f1..c7bb5a6 100644 (file)
@@ -159,6 +159,7 @@ static int ll_local_open(struct file *file, struct lookup_intent *it)
         memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
         fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
         file->private_data = fd;
+        ll_readahead_init(&fd->fd_ras);
 
         lli->lli_io_epoch = body->io_epoch;
 
@@ -598,10 +599,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         if (err != ELDLM_OK)
                 RETURN(err);
 
+
         CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
+
+        /* turn off the kernel's read-ahead */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        filp->f_ramax = 0; /* turn off generic_file_readahead() */
+        filp->f_ramax = 0;
 #else
         filp->f_ra.ra_pages = 0;
 #endif
index 8adc915..9818c43 100644 (file)
@@ -43,6 +43,18 @@ struct ll_sb_info {
         struct list_head          ll_pglist;
 };
 
+struct ll_readahead_state {
+        spinlock_t      ras_lock;
+        unsigned long   ras_last, ras_window, ras_next_index;
+};
+
+extern kmem_cache_t *ll_file_data_slab;
+struct ll_file_data {
+        struct obd_client_handle fd_mds_och;
+        struct ll_readahead_state fd_ras;
+        __u32 fd_flags;
+};
+
 struct lustre_handle;
 struct lov_stripe_md;
 
@@ -95,9 +107,11 @@ struct it_cb_data {
 struct ll_async_page {
         int             llap_magic;
         void            *llap_cookie;
-        int             llap_queued;
         struct page     *llap_page;
         struct list_head llap_pending_write;
+         /* only trust these if the page lock is providing exclusion */
+         int             llap_write_queued:1,
+                         llap_defer_uptodate:1;
         struct list_head llap_proc_item;
 };
 
@@ -142,9 +156,11 @@ int ll_ocp_update_obdo(struct obd_client_page *ocp, int cmd, struct obdo *oa);
 int ll_ocp_set_io_ready(struct obd_client_page *ocp, int cmd);
 int ll_ocp_update_io_args(struct obd_client_page *ocp, int cmd);
 void ll_removepage(struct page *page);
+int ll_sync_page(struct page *page);
 int ll_readpage(struct file *file, struct page *page);
 struct ll_async_page *llap_from_cookie(void *cookie);
 struct ll_async_page *llap_from_page(struct page *page);
+void ll_readahead_init(struct ll_readahead_state *ras);
 
 void ll_truncate(struct inode *inode);
 
index e11eae1..0475d13 100644 (file)
@@ -232,18 +232,23 @@ static int ll_ap_make_ready(void *data, int cmd)
         struct page *page;
         ENTRY;
         
-        /* reads are always locked between queueing and completion, 
-         * llite should never queue pages without _READY */
-        LASSERT(cmd != OBD_BRW_READ);
-
         llap = llap_from_cookie(data);
         if (IS_ERR(llap)) 
                 RETURN(-EINVAL);
 
         page = llap->llap_page;
 
+        if (cmd == OBD_BRW_READ) {
+                /* paths that want to cancel a read-ahead clear page-private
+                 * before locking the page */ 
+               if (test_and_clear_bit(PG_private, &page->flags))
+                        RETURN(0);
+                RETURN(-EINTR);
+        }
+
+        /* we're trying to write, but the page is locked.. come back later */
         if (TryLockPage(page))
-                RETURN(-EBUSY);
+                RETURN(-EAGAIN);
 
         LL_CDEBUG_PAGE(page, "made ready\n");
         page_cache_get(page);
@@ -400,12 +405,15 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
                 if (exp == NULL)
                         RETURN(-EINVAL);
 
+                /* _make_ready only sees llap once we've unlocked the page */
+                llap->llap_write_queued = 1;
                 rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie, 
                                         OBD_BRW_WRITE, 0, 0, 0, 0);
                 if (rc != 0) { /* async failed, try sync.. */
                         struct obd_sync_io_container *osic;
                         osic_init(&osic);
 
+                        llap->llap_write_queued = 0;
                         rc = obd_queue_sync_io(exp, lsm, NULL, osic, 
                                                llap->llap_cookie, 
                                                OBD_BRW_WRITE, 0, to, 0);
@@ -422,7 +430,6 @@ free_osic:
                         GOTO(out, rc);
                 }
                 LL_CDEBUG_PAGE(page, "write queued\n");
-                llap->llap_queued = 1;
                 //llap_write_pending(inode, llap);
         } else {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
@@ -506,103 +513,215 @@ void ll_removepage(struct page *page)
         EXIT;
 }
 
-static int ll_start_readpage(struct obd_export *exp, struct inode *inode, 
-                             struct page *page)
+static int ll_page_matches(struct page *page)
 {
-        struct ll_async_page *llap;
-        int rc;
+        struct lustre_handle match_lockh = {0};
+        struct inode *inode = page->mapping->host;
+        struct ldlm_extent page_extent;
+        int flags, matches;
         ENTRY;
 
-        llap = llap_from_page(page);
-        if (IS_ERR(llap))
-                RETURN(PTR_ERR(llap));
-
+        page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
+        page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
+        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, 
+                            ll_i2info(inode)->lli_smd, LDLM_EXTENT, 
+                            &page_extent, sizeof(page_extent), 
+                            LCK_PR, &flags, inode, &match_lockh);
+        if (matches < 0) {
+                LL_CDEBUG_PAGE(page, "lock match failed\n");
+                RETURN(matches);
+        } 
+        if (matches) {
+                obd_cancel(ll_i2sbi(inode)->ll_osc_exp, 
+                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        }
+        RETURN(matches);
+}
+  
+static int ll_issue_page_read(struct obd_export *exp, 
+                              struct ll_async_page *llap, 
+                              int defer_uptodate)
+{ 
+        struct page *page = llap->llap_page;
+        int rc;
+  
+        /* we don't issue this page as URGENT so that it can be batched
+         * with other pages by the kernel's read-ahead.  We have a strong
+         * requirement that readpage() callers must call wait_on_page()
+         * or lock_page() to get into ->sync_page() to trigger the IO */
+        llap->llap_defer_uptodate = defer_uptodate;
         page_cache_get(page);
-
-        rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, 
-                                llap->llap_cookie, OBD_BRW_READ, 0, PAGE_SIZE, 
-                                0, ASYNC_READY | ASYNC_URGENT | 
-                                   ASYNC_COUNT_STABLE);
-        /* XXX verify that failed pages here will make their way
-         * through ->removepage.. I suspect they will. */
-        if (rc)
+        SetPagePrivate(page);
+        rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd, 
+                                NULL, llap->llap_cookie, OBD_BRW_READ, 0, 
+                                PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
+        if (rc) {
+                LL_CDEBUG_PAGE(page, "read queueing failed\n");
+                ClearPagePrivate(page);
                 page_cache_release(page);
-        else  {
-                llap->llap_queued = 1;
-                LL_CDEBUG_PAGE(page, "read queued\n");
         }
         RETURN(rc);
 }
 
-static void ll_start_readahead(struct obd_export *exp, struct inode *inode
-                               unsigned long first_index)
+static void ll_readahead(struct ll_readahead_state *ras
+                         struct obd_export *exp, struct address_space *mapping)
 {
-        struct lustre_handle match_lockh = {0};
-        struct ldlm_extent page_extent;
-        unsigned long index, end_index;
+        unsigned long i, start, end;
+        struct ll_async_page *llap;
         struct page *page;
-        int flags, matched, rc;
-
-        /* for good throughput we need to have many 'blksize' rpcs in
-         * flight per stripe, so we try to read-ahead a ridiculous amount
-         * of data. "- 3" for 8 rpcs */
-        end_index = first_index + (inode->i_blksize >> (PAGE_CACHE_SHIFT - 3));
-        if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
-                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
-
-        for (index = first_index + 1; index < end_index; index++) {
-                /* try to get a ref on an existing page or create a new
-                 * one.  if we find a locked page or lose the race
-                 * with another reader we stop trying */
-                page = grab_cache_page_nowait(inode->i_mapping, index);
-                if (page == NULL)
-                        break;
-                /* make sure we didn't race with other teardown/readers */
-                if (!page->mapping || Page_Uptodate(page)) {
-                        unlock_page(page);
-                        page_cache_release(page);
-                        continue;
-                }
+        int rc;
 
-                /* make sure the page we're about to read is covered
-                 * by a lock, stop when we go past the end of the lock */
-                page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
-                page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
-                flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
-                matched = obd_match(ll_i2sbi(inode)->ll_osc_exp, 
-                                    ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                                    &page_extent, sizeof(page_extent), LCK_PR, 
-                                    &flags, inode, &match_lockh);
-                if (matched < 0) {
-                        LL_CDEBUG_PAGE(page, "lock match failed\n");
-                        unlock_page(page);
-                        page_cache_release(page);
-                        break;
-                }
-                if (matched == 0) {
-                        LL_CDEBUG_PAGE(page, "didn't match a lock\n");
-                        unlock_page(page);
-                        page_cache_release(page);
-                        break;
-                }
+        if (mapping->host->i_size == 0)
+                return;
 
-                /* interestingly, we don't need to hold the lock across the IO.
-                 * As long as we match the lock while the page is locked in the
-                 * page cache we know that the lock's cancelation will wait for
-                 * the page to be unlocked.  XXX this should transition to
-                 * proper association of pages and locks in the future */
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
-                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        spin_lock(&ras->ras_lock);
+
+        /* make sure to issue a window's worth of read-ahead pages */
+        end = ras->ras_last;
+        start = end - ras->ras_window;
+        if (start > end)
+                start = 0;
+
+        /* but don't iterate over pages that we've already issued.  this
+         * will set start to end + 1 if we've already read-ahead up to
+         * ras_last sothe for() won't be entered */
+        if (ras->ras_next_index > start)
+                start = ras->ras_next_index;
+        if (end != ~0UL)
+                ras->ras_next_index = end + 1;
+
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               start, end); 
+
+        spin_unlock(&ras->ras_lock);
+
+        /* clamp to filesize */
+        i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT;
+        end = min(end, i);
+
+        for (i = start; i <= end; i++) {
+                /* grab_cache_page_nowait returns null if this races with
+                 * truncating the page (page->mapping == NULL) */
+                page = grab_cache_page_nowait(mapping, i);
+                if (page == NULL)
+                       continue;
+  
+                /* the book-keeping above promises that we've tried
+                 * all the indices from start to end, so we don't
+                 * stop if anyone returns an error. This may not be good. */
+                if (Page_Uptodate(page) || ll_page_matches(page) <= 0)
+                        goto next_page;
+
+                llap = llap_from_page(page);
+                if (IS_ERR(llap) || llap->llap_defer_uptodate)
+                        goto next_page;
+
+                rc = ll_issue_page_read(exp, llap, 1);
+                if (rc == 0)
+                        LL_CDEBUG_PAGE(page, "started read-ahead\n");
+                if (rc) {
+        next_page:
+                        LL_CDEBUG_PAGE(page, "skipping read-ahead\n");
 
-                rc = ll_start_readpage(exp, inode, page);
-                if (rc != 0) {
                         unlock_page(page);
-                        page_cache_release(page);
-                        break;
                 }
                 page_cache_release(page);
         }
 }
+
+/* XXX this should really bubble up somehow.  */
+#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2)
+#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES))
+
+/* called with the ras_lock held or from places where it doesn't matter */
+static void ll_readahead_set(struct ll_readahead_state *ras, 
+                             unsigned long index)
+{
+        ras->ras_next_index = index;
+        if (ras->ras_next_index != ~0UL)
+                ras->ras_next_index++;
+        ras->ras_window = LL_RA_MIN;
+        ras->ras_last = ras->ras_next_index + ras->ras_window;
+        if (ras->ras_last < ras->ras_next_index)
+                ras->ras_last = ~0UL;
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               index);
+}
+
+void ll_readahead_init(struct ll_readahead_state *ras)
+{
+        spin_lock_init(&ras->ras_lock);
+        ll_readahead_set(ras, 0);
+}
+
+static void ll_readahead_update(struct ll_readahead_state *ras, 
+                                unsigned long index, int hit)
+{
+        unsigned long issued_start, new_last;
+
+        spin_lock(&ras->ras_lock);
+
+        /* we're interested in noticing the index's relation to the 
+         * previously issued read-ahead pages */
+        issued_start = ras->ras_next_index - ras->ras_window - 1;
+        if (issued_start > ras->ras_next_index)
+                issued_start = 0;
+
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n", 
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               hit ? "hit" : "miss", index, issued_start);
+        if (!hit && 
+            index == ras->ras_next_index && index == ras->ras_last + 1) {
+                /* special case the kernel's read-ahead running into the
+                 * page just beyond our read-ahead window as an extension
+                 * of our read-ahead.  sigh.  wishing it was easier to
+                 * turn off 2.4's read-ahead. */
+                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1);
+                if (index != ~0UL)
+                        ras->ras_next_index = index + 1;
+                ras->ras_last = index;
+        } else if (!hit && 
+                   (index > issued_start || ras->ras_next_index >= index)) {
+                /* deal with a miss way out of the window.  we interpret
+                 * this as a seek and restart the window */
+                ll_readahead_set(ras, index);
+
+        } else if (!hit && 
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a miss inside the window?  surely its memory pressure
+                 * evicting our read pages before the app can see them.
+                 * we shrink the window aggressively */
+                unsigned long old_window = ras->ras_window;
+
+                ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN);
+                ras->ras_last -= old_window - ras->ras_window;
+                if (ras->ras_next_index > ras->ras_last)
+                        ras->ras_next_index = ras->ras_last + 1;
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
+
+        } else if (hit && 
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a hit inside the window.  grow the window by twice the 
+                 * number of pages that are satisified within the window.  */
+                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2);
+
+                /* we want the next readahead pass to issue a windows worth
+                 * beyond where the app currently is */
+                new_last = index + ras->ras_window;
+                if (new_last > ras->ras_last)
+                        ras->ras_last = new_last;
+
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
+        }
+
+        spin_unlock(&ras->ras_lock);
+}
+
 /*
  * for now we do our readpage the same on both 2.4 and 2.5.  The kernel's
  * read-ahead assumes it is valid to issue readpage all the way up to
@@ -612,14 +731,13 @@ static void ll_start_readahead(struct obd_export *exp, struct inode *inode,
  * 2.6 is how read-ahead gets batched and issued, but we're using our own,
  * so they look the same.
  */
-int ll_readpage(struct file *file, struct page *page)
+int ll_readpage(struct file *filp, struct page *page)
 {
+        struct ll_file_data *fd = filp->private_data;
         struct inode *inode = page->mapping->host;
-        struct lustre_handle match_lockh = {0};
         struct obd_export *exp;
-        struct ldlm_extent page_extent;
-        int flags, rc = 0, matched;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        int rc;
+        struct ll_async_page *llap;
         ENTRY;
 
         LASSERT(PageLocked(page));
@@ -627,30 +745,32 @@ int ll_readpage(struct file *file, struct page *page)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
                inode->i_ino, inode->i_generation, inode,
                (((obd_off)page->index) << PAGE_SHIFT));
-        LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
-
-        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
-                CERROR("reading beyond EOF\n");
-                memset(kmap(page), 0, PAGE_SIZE);
-                kunmap(page);
-                SetPageUptodate(page);
-                GOTO(out, rc = 0);
-        }
+        LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
 
         exp = ll_i2obdexp(inode);
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
-        page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
-        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
-        matched = obd_match(sbi->ll_osc_exp, ll_i2info(inode)->lli_smd, 
-                            LDLM_EXTENT, &page_extent, sizeof(page_extent), 
-                            LCK_PR, &flags, inode, &match_lockh);
-        if (matched < 0)
-                GOTO(out, rc = matched);
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                GOTO(out, rc = PTR_ERR(llap));
+
+        if (llap->llap_defer_uptodate) {
+                ll_readahead_update(&fd->fd_ras, page->index, 1);
+                LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
+                SetPageUptodate(page);
+                ll_readahead(&fd->fd_ras, exp, page->mapping);
+                unlock_page(page);
+                RETURN(0);
+        }
+
+        ll_readahead_update(&fd->fd_ras, page->index, 0);
 
-        if (matched == 0) {
+        rc = ll_page_matches(page);
+        if (rc < 0)
+                GOTO(out, rc);
+
+        if (rc == 0) {
                 static unsigned long next_print;
                 CDEBUG(D_INODE, "didn't match a lock");
                 if (time_after(jiffies, next_print)) {
@@ -660,15 +780,49 @@ int ll_readpage(struct file *file, struct page *page)
                 }
         }
 
-        rc = ll_start_readpage(exp, inode, page);
-        if (rc == 0 && (sbi->ll_flags & LL_SBI_READAHEAD))
-                ll_start_readahead(exp, inode, page->index);
-
-        if (matched == 1)
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp, 
-                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        rc = ll_issue_page_read(exp, llap, 0);
+        if (rc == 0) {
+                LL_CDEBUG_PAGE(page, "queued readpage\n");
+                if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+                        ll_readahead(&fd->fd_ras, exp, page->mapping);
+        }
 out:
-        if (rc)
+        if (rc) 
                 unlock_page(page);
         RETURN(rc);
 }
+
+/* this is for read pages.  we issue them as ready but not urgent.  when
+ * someone waits on them we fire them off, hopefully merged with adjacent
+ * reads that were queued by the kernel's read-ahead.  */
+int ll_sync_page(struct page *page)
+{
+        struct obd_export *exp;
+        struct ll_async_page *llap;
+        int rc;
+        ENTRY;
+
+        /* we're abusing PagePrivate to signify that a queued read should
+         * be issued once someone goes to lock it.  it is cleared by 
+         * canceling the read-ahead page before discarding and by issuing
+         * the read rpc */
+        if (!PagePrivate(page))
+                RETURN(0);
+        ClearPagePrivate(page);
+
+        /* careful to only deref page->mapping after checking PagePrivate */
+        exp = ll_i2obdexp(page->mapping->host);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+  
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                RETURN(PTR_ERR(llap));
+
+        LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
+
+        rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd, 
+                                 NULL, llap->llap_cookie, 
+                                 ASYNC_READY|ASYNC_URGENT);
+        return rc;
+}
index fe11b27..0adc60d 100644 (file)
@@ -61,14 +61,16 @@ void ll_ap_completion_24(void *data, int cmd, int rc)
                 return;
         }
 
-        llap->llap_queued = 0;
         page = llap->llap_page;
-
         LASSERT(PageLocked(page));
 
         if (rc == 0)  {
-                if (cmd == OBD_BRW_READ)
-                        SetPageUptodate(page);
+                if (cmd == OBD_BRW_READ) {
+                        if (!llap->llap_defer_uptodate)
+                                SetPageUptodate(page);
+                } else {
+                        llap->llap_write_queued = 0;
+                }
         } else { 
                 SetPageError(page);
         }
@@ -105,20 +107,21 @@ static int ll_writepage_24(struct page *page)
                 GOTO(out, rc = PTR_ERR(llap));
 
         page_cache_get(page);
-        if (llap->llap_queued) {
+        if (llap->llap_write_queued) {
                 LL_CDEBUG_PAGE(page, "marking urgent\n");
                 rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, 
                                          llap->llap_cookie, ASYNC_READY | 
                                          ASYNC_URGENT);
         } else {
+                llap->llap_write_queued = 1;
                 rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, 
                                         llap->llap_cookie, OBD_BRW_WRITE, 0, 0, 
                                         OBD_BRW_CREATE, ASYNC_READY | 
                                         ASYNC_URGENT);
-                if (rc == 0) {
+                if (rc == 0)
                         LL_CDEBUG_PAGE(page, "mmap write queued\n");
-                        llap->llap_queued = 1;
-                }
+                else 
+                        llap->llap_write_queued = 0;
         }
         if (rc)
                 page_cache_release(page);
@@ -210,5 +213,6 @@ struct address_space_operations ll_aops = {
         prepare_write: ll_prepare_write,
         commit_write: ll_commit_write,
         removepage: ll_removepage,
+        sync_page: ll_sync_page,
         bmap: NULL
 };
index 6e5a9e0..a23975a 100644 (file)
@@ -1266,13 +1266,39 @@ static int osc_send_oap_rpc(struct client_obd *cli, int cmd,
                  * will still be on the dirty list).  we could call in
                  * at the end of ll_file_write to process the queue again. */
                 if (!(oap->oap_async_flags & ASYNC_READY)) {
-                        if (ops->ap_make_ready(oap->oap_caller_data, cmd)) {
-                                CDEBUG(D_INODE, "oap at page_count %d not "
-                                                "ready\n", page_count);
+                        int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
+                        if (rc < 0)
+                                CDEBUG(D_INODE, "oap %p page %p returned %d "
+                                                "instead of ready\n", oap, 
+                                                oap->oap_page, rc);
+                        switch (rc) {
+                        case -EAGAIN:
+                                /* llite is telling us that the page is still
+                                 * in commit_write and that we should try
+                                 * and put it in an rpc again later.  we 
+                                 * break out of the loop so we don't create
+                                 * a whole in the sequence of pages in 
+                                 * the rpc stream.*/
+                                pos = NULL;
+                                break;
+                        case -EINTR:
+                                /* the io isn't needed.. tell the checks
+                                 * below to complete the rpc with EINTR */
+                                oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+                                oap->oap_count = -EINTR;
+                                break;
+                        case 0:
+                                oap->oap_async_flags |= ASYNC_READY;
+                                break;
+                        default:
+                                LASSERTF(0, "oap %p page %p returned %d "
+                                            "from make_ready\n", oap, 
+                                            oap->oap_page, rc);
                                 break;
                         }
-                        oap->oap_async_flags |= ASYNC_READY;
                 }
+                if (pos == NULL)
+                        break;
 
                 /* take the page out of our book-keeping */
                 list_del_init(&oap->oap_pending_item);
@@ -1723,7 +1749,10 @@ static int osc_set_async_flags(struct obd_export *exp,
 
         spin_lock(&cli->cl_loi_list_lock);
 
-        if (oap->oap_async_flags == async_flags)
+        if (list_empty(&oap->oap_pending_item))
+                GOTO(out, rc = -EINVAL);
+
+        if ((oap->oap_async_flags & async_flags) == async_flags)
                 GOTO(out, rc = 0);
 
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
index 2cd7b06..b0889a2 100644 (file)
@@ -71,6 +71,7 @@ extern unsigned int portal_cerror;
 #define D_HA        (1 << 19) /* recovery and failover */
 #define D_RPCTRACE  (1 << 20) /* for distributed debugging */
 #define D_VFSTRACE  (1 << 21)
+#define D_READA     (1 << 22) /* read-ahead */
 
 #ifdef __KERNEL__
 # include <linux/sched.h> /* THREAD_SIZE */
index 3f3e69c..14750d8 100644 (file)
@@ -69,7 +69,7 @@ static const char *portal_debug_masks[] =
         {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
          "blocks", "net", "warning", "buffs", "other", "dentry", "portals",
          "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace",
-         NULL};
+         "reada", NULL};
 
 struct debug_daemon_cmd {
         char *cmd;