Whamcloud - gitweb
b=2252
[fs/lustre-release.git] / lustre / llite / rw.c
index e11eae1..0475d13 100644 (file)
@@ -232,18 +232,23 @@ static int ll_ap_make_ready(void *data, int cmd)
         struct page *page;
         ENTRY;
         
-        /* reads are always locked between queueing and completion, 
-         * llite should never queue pages without _READY */
-        LASSERT(cmd != OBD_BRW_READ);
-
         llap = llap_from_cookie(data);
         if (IS_ERR(llap)) 
                 RETURN(-EINVAL);
 
         page = llap->llap_page;
 
+        if (cmd == OBD_BRW_READ) {
+                /* paths that want to cancel a read-ahead clear page-private
+                 * before locking the page */ 
+               if (test_and_clear_bit(PG_private, &page->flags))
+                        RETURN(0);
+                RETURN(-EINTR);
+        }
+
+        /* we're trying to write, but the page is locked.. come back later */
         if (TryLockPage(page))
-                RETURN(-EBUSY);
+                RETURN(-EAGAIN);
 
         LL_CDEBUG_PAGE(page, "made ready\n");
         page_cache_get(page);
@@ -400,12 +405,15 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
                 if (exp == NULL)
                         RETURN(-EINVAL);
 
+                /* _make_ready only sees llap once we've unlocked the page */
+                llap->llap_write_queued = 1;
                 rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie, 
                                         OBD_BRW_WRITE, 0, 0, 0, 0);
                 if (rc != 0) { /* async failed, try sync.. */
                         struct obd_sync_io_container *osic;
                         osic_init(&osic);
 
+                        llap->llap_write_queued = 0;
                         rc = obd_queue_sync_io(exp, lsm, NULL, osic, 
                                                llap->llap_cookie, 
                                                OBD_BRW_WRITE, 0, to, 0);
@@ -422,7 +430,6 @@ free_osic:
                         GOTO(out, rc);
                 }
                 LL_CDEBUG_PAGE(page, "write queued\n");
-                llap->llap_queued = 1;
                 //llap_write_pending(inode, llap);
         } else {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
@@ -506,103 +513,215 @@ void ll_removepage(struct page *page)
         EXIT;
 }
 
-static int ll_start_readpage(struct obd_export *exp, struct inode *inode, 
-                             struct page *page)
+static int ll_page_matches(struct page *page)
 {
-        struct ll_async_page *llap;
-        int rc;
+        struct lustre_handle match_lockh = {0};
+        struct inode *inode = page->mapping->host;
+        struct ldlm_extent page_extent;
+        int flags, matches;
         ENTRY;
 
-        llap = llap_from_page(page);
-        if (IS_ERR(llap))
-                RETURN(PTR_ERR(llap));
-
+        page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
+        page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
+        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, 
+                            ll_i2info(inode)->lli_smd, LDLM_EXTENT, 
+                            &page_extent, sizeof(page_extent), 
+                            LCK_PR, &flags, inode, &match_lockh);
+        if (matches < 0) {
+                LL_CDEBUG_PAGE(page, "lock match failed\n");
+                RETURN(matches);
+        } 
+        if (matches) {
+                obd_cancel(ll_i2sbi(inode)->ll_osc_exp, 
+                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        }
+        RETURN(matches);
+}
+  
+static int ll_issue_page_read(struct obd_export *exp, 
+                              struct ll_async_page *llap, 
+                              int defer_uptodate)
+{ 
+        struct page *page = llap->llap_page;
+        int rc;
+  
+        /* we don't issue this page as URGENT so that it can be batched
+         * with other pages by the kernel's read-ahead.  We have a strong
+         * requirement that readpage() callers must call wait_on_page()
+         * or lock_page() to get into ->sync_page() to trigger the IO */
+        llap->llap_defer_uptodate = defer_uptodate;
         page_cache_get(page);
-
-        rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, 
-                                llap->llap_cookie, OBD_BRW_READ, 0, PAGE_SIZE, 
-                                0, ASYNC_READY | ASYNC_URGENT | 
-                                   ASYNC_COUNT_STABLE);
-        /* XXX verify that failed pages here will make their way
-         * through ->removepage.. I suspect they will. */
-        if (rc)
+        SetPagePrivate(page);
+        rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd, 
+                                NULL, llap->llap_cookie, OBD_BRW_READ, 0, 
+                                PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
+        if (rc) {
+                LL_CDEBUG_PAGE(page, "read queueing failed\n");
+                ClearPagePrivate(page);
                 page_cache_release(page);
-        else  {
-                llap->llap_queued = 1;
-                LL_CDEBUG_PAGE(page, "read queued\n");
         }
         RETURN(rc);
 }
 
-static void ll_start_readahead(struct obd_export *exp, struct inode *inode
-                               unsigned long first_index)
+static void ll_readahead(struct ll_readahead_state *ras
+                         struct obd_export *exp, struct address_space *mapping)
 {
-        struct lustre_handle match_lockh = {0};
-        struct ldlm_extent page_extent;
-        unsigned long index, end_index;
+        unsigned long i, start, end;
+        struct ll_async_page *llap;
         struct page *page;
-        int flags, matched, rc;
-
-        /* for good throughput we need to have many 'blksize' rpcs in
-         * flight per stripe, so we try to read-ahead a ridiculous amount
-         * of data. "- 3" for 8 rpcs */
-        end_index = first_index + (inode->i_blksize >> (PAGE_CACHE_SHIFT - 3));
-        if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
-                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
-
-        for (index = first_index + 1; index < end_index; index++) {
-                /* try to get a ref on an existing page or create a new
-                 * one.  if we find a locked page or lose the race
-                 * with another reader we stop trying */
-                page = grab_cache_page_nowait(inode->i_mapping, index);
-                if (page == NULL)
-                        break;
-                /* make sure we didn't race with other teardown/readers */
-                if (!page->mapping || Page_Uptodate(page)) {
-                        unlock_page(page);
-                        page_cache_release(page);
-                        continue;
-                }
+        int rc;
 
-                /* make sure the page we're about to read is covered
-                 * by a lock, stop when we go past the end of the lock */
-                page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
-                page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
-                flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
-                matched = obd_match(ll_i2sbi(inode)->ll_osc_exp, 
-                                    ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                                    &page_extent, sizeof(page_extent), LCK_PR, 
-                                    &flags, inode, &match_lockh);
-                if (matched < 0) {
-                        LL_CDEBUG_PAGE(page, "lock match failed\n");
-                        unlock_page(page);
-                        page_cache_release(page);
-                        break;
-                }
-                if (matched == 0) {
-                        LL_CDEBUG_PAGE(page, "didn't match a lock\n");
-                        unlock_page(page);
-                        page_cache_release(page);
-                        break;
-                }
+        if (mapping->host->i_size == 0)
+                return;
 
-                /* interestingly, we don't need to hold the lock across the IO.
-                 * As long as we match the lock while the page is locked in the
-                 * page cache we know that the lock's cancelation will wait for
-                 * the page to be unlocked.  XXX this should transition to
-                 * proper association of pages and locks in the future */
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
-                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        spin_lock(&ras->ras_lock);
+
+        /* make sure to issue a window's worth of read-ahead pages */
+        end = ras->ras_last;
+        start = end - ras->ras_window;
+        if (start > end)
+                start = 0;
+
+        /* but don't iterate over pages that we've already issued.  this
+         * will set start to end + 1 if we've already read-ahead up to
+         * ras_last sothe for() won't be entered */
+        if (ras->ras_next_index > start)
+                start = ras->ras_next_index;
+        if (end != ~0UL)
+                ras->ras_next_index = end + 1;
+
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               start, end); 
+
+        spin_unlock(&ras->ras_lock);
+
+        /* clamp to filesize */
+        i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT;
+        end = min(end, i);
+
+        for (i = start; i <= end; i++) {
+                /* grab_cache_page_nowait returns null if this races with
+                 * truncating the page (page->mapping == NULL) */
+                page = grab_cache_page_nowait(mapping, i);
+                if (page == NULL)
+                       continue;
+  
+                /* the book-keeping above promises that we've tried
+                 * all the indices from start to end, so we don't
+                 * stop if anyone returns an error. This may not be good. */
+                if (Page_Uptodate(page) || ll_page_matches(page) <= 0)
+                        goto next_page;
+
+                llap = llap_from_page(page);
+                if (IS_ERR(llap) || llap->llap_defer_uptodate)
+                        goto next_page;
+
+                rc = ll_issue_page_read(exp, llap, 1);
+                if (rc == 0)
+                        LL_CDEBUG_PAGE(page, "started read-ahead\n");
+                if (rc) {
+        next_page:
+                        LL_CDEBUG_PAGE(page, "skipping read-ahead\n");
 
-                rc = ll_start_readpage(exp, inode, page);
-                if (rc != 0) {
                         unlock_page(page);
-                        page_cache_release(page);
-                        break;
                 }
                 page_cache_release(page);
         }
 }
+
+/* XXX this should really bubble up somehow.  */
+#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2)
+#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES))
+
+/* called with the ras_lock held or from places where it doesn't matter */
+static void ll_readahead_set(struct ll_readahead_state *ras, 
+                             unsigned long index)
+{
+        ras->ras_next_index = index;
+        if (ras->ras_next_index != ~0UL)
+                ras->ras_next_index++;
+        ras->ras_window = LL_RA_MIN;
+        ras->ras_last = ras->ras_next_index + ras->ras_window;
+        if (ras->ras_last < ras->ras_next_index)
+                ras->ras_last = ~0UL;
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n",
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               index);
+}
+
+void ll_readahead_init(struct ll_readahead_state *ras)
+{
+        spin_lock_init(&ras->ras_lock);
+        ll_readahead_set(ras, 0);
+}
+
+static void ll_readahead_update(struct ll_readahead_state *ras, 
+                                unsigned long index, int hit)
+{
+        unsigned long issued_start, new_last;
+
+        spin_lock(&ras->ras_lock);
+
+        /* we're interested in noticing the index's relation to the 
+         * previously issued read-ahead pages */
+        issued_start = ras->ras_next_index - ras->ras_window - 1;
+        if (issued_start > ras->ras_next_index)
+                issued_start = 0;
+
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n", 
+               ras->ras_next_index, ras->ras_last, ras->ras_window,
+               hit ? "hit" : "miss", index, issued_start);
+        if (!hit && 
+            index == ras->ras_next_index && index == ras->ras_last + 1) {
+                /* special case the kernel's read-ahead running into the
+                 * page just beyond our read-ahead window as an extension
+                 * of our read-ahead.  sigh.  wishing it was easier to
+                 * turn off 2.4's read-ahead. */
+                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1);
+                if (index != ~0UL)
+                        ras->ras_next_index = index + 1;
+                ras->ras_last = index;
+        } else if (!hit && 
+                   (index > issued_start || ras->ras_next_index >= index)) {
+                /* deal with a miss way out of the window.  we interpret
+                 * this as a seek and restart the window */
+                ll_readahead_set(ras, index);
+
+        } else if (!hit && 
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a miss inside the window?  surely its memory pressure
+                 * evicting our read pages before the app can see them.
+                 * we shrink the window aggressively */
+                unsigned long old_window = ras->ras_window;
+
+                ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN);
+                ras->ras_last -= old_window - ras->ras_window;
+                if (ras->ras_next_index > ras->ras_last)
+                        ras->ras_next_index = ras->ras_last + 1;
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
+
+        } else if (hit && 
+                   issued_start <= index && index < ras->ras_next_index) {
+                /* a hit inside the window.  grow the window by twice the 
+                 * number of pages that are satisified within the window.  */
+                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2);
+
+                /* we want the next readahead pass to issue a windows worth
+                 * beyond where the app currently is */
+                new_last = index + ras->ras_window;
+                if (new_last > ras->ras_last)
+                        ras->ras_last = new_last;
+
+                CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n",
+                       ras->ras_next_index, ras->ras_last, ras->ras_window);
+        }
+
+        spin_unlock(&ras->ras_lock);
+}
+
 /*
  * for now we do our readpage the same on both 2.4 and 2.5.  The kernel's
  * read-ahead assumes it is valid to issue readpage all the way up to
@@ -612,14 +731,13 @@ static void ll_start_readahead(struct obd_export *exp, struct inode *inode,
  * 2.6 is how read-ahead gets batched and issued, but we're using our own,
  * so they look the same.
  */
-int ll_readpage(struct file *file, struct page *page)
+int ll_readpage(struct file *filp, struct page *page)
 {
+        struct ll_file_data *fd = filp->private_data;
         struct inode *inode = page->mapping->host;
-        struct lustre_handle match_lockh = {0};
         struct obd_export *exp;
-        struct ldlm_extent page_extent;
-        int flags, rc = 0, matched;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        int rc;
+        struct ll_async_page *llap;
         ENTRY;
 
         LASSERT(PageLocked(page));
@@ -627,30 +745,32 @@ int ll_readpage(struct file *file, struct page *page)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
                inode->i_ino, inode->i_generation, inode,
                (((obd_off)page->index) << PAGE_SHIFT));
-        LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
-
-        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
-                CERROR("reading beyond EOF\n");
-                memset(kmap(page), 0, PAGE_SIZE);
-                kunmap(page);
-                SetPageUptodate(page);
-                GOTO(out, rc = 0);
-        }
+        LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
 
         exp = ll_i2obdexp(inode);
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
-        page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
-        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
-        matched = obd_match(sbi->ll_osc_exp, ll_i2info(inode)->lli_smd, 
-                            LDLM_EXTENT, &page_extent, sizeof(page_extent), 
-                            LCK_PR, &flags, inode, &match_lockh);
-        if (matched < 0)
-                GOTO(out, rc = matched);
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                GOTO(out, rc = PTR_ERR(llap));
+
+        if (llap->llap_defer_uptodate) {
+                ll_readahead_update(&fd->fd_ras, page->index, 1);
+                LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
+                SetPageUptodate(page);
+                ll_readahead(&fd->fd_ras, exp, page->mapping);
+                unlock_page(page);
+                RETURN(0);
+        }
+
+        ll_readahead_update(&fd->fd_ras, page->index, 0);
 
-        if (matched == 0) {
+        rc = ll_page_matches(page);
+        if (rc < 0)
+                GOTO(out, rc);
+
+        if (rc == 0) {
                 static unsigned long next_print;
                 CDEBUG(D_INODE, "didn't match a lock");
                 if (time_after(jiffies, next_print)) {
@@ -660,15 +780,49 @@ int ll_readpage(struct file *file, struct page *page)
                 }
         }
 
-        rc = ll_start_readpage(exp, inode, page);
-        if (rc == 0 && (sbi->ll_flags & LL_SBI_READAHEAD))
-                ll_start_readahead(exp, inode, page->index);
-
-        if (matched == 1)
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp, 
-                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+        rc = ll_issue_page_read(exp, llap, 0);
+        if (rc == 0) {
+                LL_CDEBUG_PAGE(page, "queued readpage\n");
+                if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+                        ll_readahead(&fd->fd_ras, exp, page->mapping);
+        }
 out:
-        if (rc)
+        if (rc) 
                 unlock_page(page);
         RETURN(rc);
 }
+
+/* this is for read pages.  we issue them as ready but not urgent.  when
+ * someone waits on them we fire them off, hopefully merged with adjacent
+ * reads that were queued by the kernel's read-ahead.  */
+int ll_sync_page(struct page *page)
+{
+        struct obd_export *exp;
+        struct ll_async_page *llap;
+        int rc;
+        ENTRY;
+
+        /* we're abusing PagePrivate to signify that a queued read should
+         * be issued once someone goes to lock it.  it is cleared by 
+         * canceling the read-ahead page before discarding and by issuing
+         * the read rpc */
+        if (!PagePrivate(page))
+                RETURN(0);
+        ClearPagePrivate(page);
+
+        /* careful to only deref page->mapping after checking PagePrivate */
+        exp = ll_i2obdexp(page->mapping->host);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+  
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                RETURN(PTR_ERR(llap));
+
+        LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
+
+        rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd, 
+                                 NULL, llap->llap_cookie, 
+                                 ASYNC_READY|ASYNC_URGENT);
+        return rc;
+}