struct page *page;
ENTRY;
- /* reads are always locked between queueing and completion,
- * llite should never queue pages without _READY */
- LASSERT(cmd != OBD_BRW_READ);
-
llap = llap_from_cookie(data);
if (IS_ERR(llap))
RETURN(-EINVAL);
page = llap->llap_page;
+ if (cmd == OBD_BRW_READ) {
+ /* paths that want to cancel a read-ahead clear page-private
+ * before locking the page */
+ if (test_and_clear_bit(PG_private, &page->flags))
+ RETURN(0);
+ RETURN(-EINTR);
+ }
+
+ /* we're trying to write, but the page is locked.. come back later */
if (TryLockPage(page))
- RETURN(-EBUSY);
+ RETURN(-EAGAIN);
LL_CDEBUG_PAGE(page, "made ready\n");
page_cache_get(page);
if (exp == NULL)
RETURN(-EINVAL);
+ /* _make_ready only sees llap once we've unlocked the page */
+ llap->llap_write_queued = 1;
rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
OBD_BRW_WRITE, 0, 0, 0, 0);
if (rc != 0) { /* async failed, try sync.. */
struct obd_sync_io_container *osic;
osic_init(&osic);
+ llap->llap_write_queued = 0;
rc = obd_queue_sync_io(exp, lsm, NULL, osic,
llap->llap_cookie,
OBD_BRW_WRITE, 0, to, 0);
GOTO(out, rc);
}
LL_CDEBUG_PAGE(page, "write queued\n");
- llap->llap_queued = 1;
//llap_write_pending(inode, llap);
} else {
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
EXIT;
}
-static int ll_start_readpage(struct obd_export *exp, struct inode *inode,
- struct page *page)
+static int ll_page_matches(struct page *page)
{
- struct ll_async_page *llap;
- int rc;
+ struct lustre_handle match_lockh = {0};
+ struct inode *inode = page->mapping->host;
+ struct ldlm_extent page_extent;
+ int flags, matches;
ENTRY;
- llap = llap_from_page(page);
- if (IS_ERR(llap))
- RETURN(PTR_ERR(llap));
-
+ page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
+ page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
+ flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+ matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
+ ll_i2info(inode)->lli_smd, LDLM_EXTENT,
+ &page_extent, sizeof(page_extent),
+ LCK_PR, &flags, inode, &match_lockh);
+ if (matches < 0) {
+ LL_CDEBUG_PAGE(page, "lock match failed\n");
+ RETURN(matches);
+ }
+ if (matches) {
+ obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
+ ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+ }
+ RETURN(matches);
+}
+
+static int ll_issue_page_read(struct obd_export *exp,
+ struct ll_async_page *llap,
+ int defer_uptodate)
+{
+ struct page *page = llap->llap_page;
+ int rc;
+
+ /* we don't issue this page as URGENT so that it can be batched
+ * with other pages by the kernel's read-ahead. We have a strong
+ * requirement that readpage() callers must call wait_on_page()
+ * or lock_page() to get into ->sync_page() to trigger the IO */
+ llap->llap_defer_uptodate = defer_uptodate;
page_cache_get(page);
-
- rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
- llap->llap_cookie, OBD_BRW_READ, 0, PAGE_SIZE,
- 0, ASYNC_READY | ASYNC_URGENT |
- ASYNC_COUNT_STABLE);
- /* XXX verify that failed pages here will make their way
- * through ->removepage.. I suspect they will. */
- if (rc)
+ SetPagePrivate(page);
+ rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd,
+ NULL, llap->llap_cookie, OBD_BRW_READ, 0,
+ PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
+ if (rc) {
+ LL_CDEBUG_PAGE(page, "read queueing failed\n");
+ ClearPagePrivate(page);
page_cache_release(page);
- else {
- llap->llap_queued = 1;
- LL_CDEBUG_PAGE(page, "read queued\n");
}
RETURN(rc);
}
-static void ll_start_readahead(struct obd_export *exp, struct inode *inode,
- unsigned long first_index)
+static void ll_readahead(struct ll_readahead_state *ras,
+ struct obd_export *exp, struct address_space *mapping)
{
- struct lustre_handle match_lockh = {0};
- struct ldlm_extent page_extent;
- unsigned long index, end_index;
+ unsigned long i, start, end;
+ struct ll_async_page *llap;
struct page *page;
- int flags, matched, rc;
-
- /* for good throughput we need to have many 'blksize' rpcs in
- * flight per stripe, so we try to read-ahead a ridiculous amount
- * of data. "- 3" for 8 rpcs */
- end_index = first_index + (inode->i_blksize >> (PAGE_CACHE_SHIFT - 3));
- if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
- end_index = inode->i_size >> PAGE_CACHE_SHIFT;
-
- for (index = first_index + 1; index < end_index; index++) {
- /* try to get a ref on an existing page or create a new
- * one. if we find a locked page or lose the race
- * with another reader we stop trying */
- page = grab_cache_page_nowait(inode->i_mapping, index);
- if (page == NULL)
- break;
- /* make sure we didn't race with other teardown/readers */
- if (!page->mapping || Page_Uptodate(page)) {
- unlock_page(page);
- page_cache_release(page);
- continue;
- }
+ int rc;
- /* make sure the page we're about to read is covered
- * by a lock, stop when we go past the end of the lock */
- page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
- page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
- flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
- matched = obd_match(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LDLM_EXTENT,
- &page_extent, sizeof(page_extent), LCK_PR,
- &flags, inode, &match_lockh);
- if (matched < 0) {
- LL_CDEBUG_PAGE(page, "lock match failed\n");
- unlock_page(page);
- page_cache_release(page);
- break;
- }
- if (matched == 0) {
- LL_CDEBUG_PAGE(page, "didn't match a lock\n");
- unlock_page(page);
- page_cache_release(page);
- break;
- }
+ if (mapping->host->i_size == 0)
+ return;
- /* interestingly, we don't need to hold the lock across the IO.
- * As long as we match the lock while the page is locked in the
- * page cache we know that the lock's cancelation will wait for
- * the page to be unlocked. XXX this should transition to
- * proper association of pages and locks in the future */
- obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+ spin_lock(&ras->ras_lock);
+
+ /* make sure to issue a window's worth of read-ahead pages */
+ end = ras->ras_last;
+ start = end - ras->ras_window;
+ if (start > end)
+ start = 0;
+
+ /* but don't iterate over pages that we've already issued. this
+ * will set start to end + 1 if we've already read-ahead up to
+ * ras_last sothe for() won't be entered */
+ if (ras->ras_next_index > start)
+ start = ras->ras_next_index;
+ if (end != ~0UL)
+ ras->ras_next_index = end + 1;
+
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ start, end);
+
+ spin_unlock(&ras->ras_lock);
+
+ /* clamp to filesize */
+ i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT;
+ end = min(end, i);
+
+ for (i = start; i <= end; i++) {
+ /* grab_cache_page_nowait returns null if this races with
+ * truncating the page (page->mapping == NULL) */
+ page = grab_cache_page_nowait(mapping, i);
+ if (page == NULL)
+ continue;
+
+ /* the book-keeping above promises that we've tried
+ * all the indices from start to end, so we don't
+ * stop if anyone returns an error. This may not be good. */
+ if (Page_Uptodate(page) || ll_page_matches(page) <= 0)
+ goto next_page;
+
+ llap = llap_from_page(page);
+ if (IS_ERR(llap) || llap->llap_defer_uptodate)
+ goto next_page;
+
+ rc = ll_issue_page_read(exp, llap, 1);
+ if (rc == 0)
+ LL_CDEBUG_PAGE(page, "started read-ahead\n");
+ if (rc) {
+ next_page:
+ LL_CDEBUG_PAGE(page, "skipping read-ahead\n");
- rc = ll_start_readpage(exp, inode, page);
- if (rc != 0) {
unlock_page(page);
- page_cache_release(page);
- break;
}
page_cache_release(page);
}
}
+
+/* XXX this should really bubble up somehow. */
+#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2)
+#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES))
+
+/* called with the ras_lock held or from places where it doesn't matter */
+static void ll_readahead_set(struct ll_readahead_state *ras,
+ unsigned long index)
+{
+ ras->ras_next_index = index;
+ if (ras->ras_next_index != ~0UL)
+ ras->ras_next_index++;
+ ras->ras_window = LL_RA_MIN;
+ ras->ras_last = ras->ras_next_index + ras->ras_window;
+ if (ras->ras_last < ras->ras_next_index)
+ ras->ras_last = ~0UL;
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ index);
+}
+
+void ll_readahead_init(struct ll_readahead_state *ras)
+{
+ spin_lock_init(&ras->ras_lock);
+ ll_readahead_set(ras, 0);
+}
+
+static void ll_readahead_update(struct ll_readahead_state *ras,
+ unsigned long index, int hit)
+{
+ unsigned long issued_start, new_last;
+
+ spin_lock(&ras->ras_lock);
+
+ /* we're interested in noticing the index's relation to the
+ * previously issued read-ahead pages */
+ issued_start = ras->ras_next_index - ras->ras_window - 1;
+ if (issued_start > ras->ras_next_index)
+ issued_start = 0;
+
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ hit ? "hit" : "miss", index, issued_start);
+ if (!hit &&
+ index == ras->ras_next_index && index == ras->ras_last + 1) {
+ /* special case the kernel's read-ahead running into the
+ * page just beyond our read-ahead window as an extension
+ * of our read-ahead. sigh. wishing it was easier to
+ * turn off 2.4's read-ahead. */
+ ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1);
+ if (index != ~0UL)
+ ras->ras_next_index = index + 1;
+ ras->ras_last = index;
+ } else if (!hit &&
+ (index > issued_start || ras->ras_next_index >= index)) {
+ /* deal with a miss way out of the window. we interpret
+ * this as a seek and restart the window */
+ ll_readahead_set(ras, index);
+
+ } else if (!hit &&
+ issued_start <= index && index < ras->ras_next_index) {
+ /* a miss inside the window? surely its memory pressure
+ * evicting our read pages before the app can see them.
+ * we shrink the window aggressively */
+ unsigned long old_window = ras->ras_window;
+
+ ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN);
+ ras->ras_last -= old_window - ras->ras_window;
+ if (ras->ras_next_index > ras->ras_last)
+ ras->ras_next_index = ras->ras_last + 1;
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window);
+
+ } else if (hit &&
+ issued_start <= index && index < ras->ras_next_index) {
+ /* a hit inside the window. grow the window by twice the
+ * number of pages that are satisified within the window. */
+ ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2);
+
+ /* we want the next readahead pass to issue a windows worth
+ * beyond where the app currently is */
+ new_last = index + ras->ras_window;
+ if (new_last > ras->ras_last)
+ ras->ras_last = new_last;
+
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window);
+ }
+
+ spin_unlock(&ras->ras_lock);
+}
+
/*
* for now we do our readpage the same on both 2.4 and 2.5. The kernel's
* read-ahead assumes it is valid to issue readpage all the way up to
* 2.6 is how read-ahead gets batched and issued, but we're using our own,
* so they look the same.
*/
-int ll_readpage(struct file *file, struct page *page)
+int ll_readpage(struct file *filp, struct page *page)
{
+ struct ll_file_data *fd = filp->private_data;
struct inode *inode = page->mapping->host;
- struct lustre_handle match_lockh = {0};
struct obd_export *exp;
- struct ldlm_extent page_extent;
- int flags, rc = 0, matched;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
+ int rc;
+ struct ll_async_page *llap;
ENTRY;
LASSERT(PageLocked(page));
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
inode->i_ino, inode->i_generation, inode,
(((obd_off)page->index) << PAGE_SHIFT));
- LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
-
- if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
- CERROR("reading beyond EOF\n");
- memset(kmap(page), 0, PAGE_SIZE);
- kunmap(page);
- SetPageUptodate(page);
- GOTO(out, rc = 0);
- }
+ LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
exp = ll_i2obdexp(inode);
if (exp == NULL)
GOTO(out, rc = -EINVAL);
- page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
- page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1;
- flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
- matched = obd_match(sbi->ll_osc_exp, ll_i2info(inode)->lli_smd,
- LDLM_EXTENT, &page_extent, sizeof(page_extent),
- LCK_PR, &flags, inode, &match_lockh);
- if (matched < 0)
- GOTO(out, rc = matched);
+ llap = llap_from_page(page);
+ if (IS_ERR(llap))
+ GOTO(out, rc = PTR_ERR(llap));
+
+ if (llap->llap_defer_uptodate) {
+ ll_readahead_update(&fd->fd_ras, page->index, 1);
+ LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
+ SetPageUptodate(page);
+ ll_readahead(&fd->fd_ras, exp, page->mapping);
+ unlock_page(page);
+ RETURN(0);
+ }
+
+ ll_readahead_update(&fd->fd_ras, page->index, 0);
- if (matched == 0) {
+ rc = ll_page_matches(page);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc == 0) {
static unsigned long next_print;
CDEBUG(D_INODE, "didn't match a lock");
if (time_after(jiffies, next_print)) {
}
}
- rc = ll_start_readpage(exp, inode, page);
- if (rc == 0 && (sbi->ll_flags & LL_SBI_READAHEAD))
- ll_start_readahead(exp, inode, page->index);
-
- if (matched == 1)
- obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
+ rc = ll_issue_page_read(exp, llap, 0);
+ if (rc == 0) {
+ LL_CDEBUG_PAGE(page, "queued readpage\n");
+ if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+ ll_readahead(&fd->fd_ras, exp, page->mapping);
+ }
out:
- if (rc)
+ if (rc)
unlock_page(page);
RETURN(rc);
}
+
+/* this is for read pages. we issue them as ready but not urgent. when
+ * someone waits on them we fire them off, hopefully merged with adjacent
+ * reads that were queued by the kernel's read-ahead. */
+int ll_sync_page(struct page *page)
+{
+ struct obd_export *exp;
+ struct ll_async_page *llap;
+ int rc;
+ ENTRY;
+
+ /* we're abusing PagePrivate to signify that a queued read should
+ * be issued once someone goes to lock it. it is cleared by
+ * canceling the read-ahead page before discarding and by issuing
+ * the read rpc */
+ if (!PagePrivate(page))
+ RETURN(0);
+ ClearPagePrivate(page);
+
+ /* careful to only deref page->mapping after checking PagePrivate */
+ exp = ll_i2obdexp(page->mapping->host);
+ if (exp == NULL)
+ RETURN(-EINVAL);
+
+ llap = llap_from_page(page);
+ if (IS_ERR(llap))
+ RETURN(PTR_ERR(llap));
+
+ LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
+
+ rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
+ NULL, llap->llap_cookie,
+ ASYNC_READY|ASYNC_URGENT);
+ return rc;
+}