#endif
/* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
+static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
struct page *page, int flags)
{
struct ll_inode_info *lli = ll_i2info(inode);
page = llap->llap_page;
- if (cmd == OBD_BRW_READ) {
- /* _sync_page beat us to it and is about to call
- * _set_async_flags which will fire off rpcs again */
- if (!test_and_clear_bit(LL_PRIVBITS_READ, &page->private))
- RETURN(-EAGAIN);
+ if (cmd == OBD_BRW_READ)
RETURN(0);
- }
/* we're trying to write, but the page is locked.. come back later */
if (TryLockPage(page))
.ap_completion = ll_ap_completion,
};
-#define page_llap(page) \
- ((struct ll_async_page *)((page)->private & ~LL_PRIVBITS_MASK))
-
/* XXX have the exp be an argument? */
struct ll_async_page *llap_from_page(struct page *page)
{
int rc;
ENTRY;
- llap = page_llap(page);
+ llap = (struct ll_async_page *)page->private;
if (llap != NULL) {
if (llap->llap_magic != LLAP_MAGIC)
RETURN(ERR_PTR(-EINVAL));
RETURN(llap);
- }
+ }
exp = ll_i2obdexp(page->mapping->host);
if (exp == NULL)
if (llap == NULL)
RETURN(ERR_PTR(-ENOMEM));
llap->llap_magic = LLAP_MAGIC;
- rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd,
- NULL, page,
+ rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
(obd_off)page->index << PAGE_SHIFT,
&ll_async_page_ops, llap, &llap->llap_cookie);
if (rc) {
RETURN(ERR_PTR(rc));
}
- CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
+ CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
page, llap->llap_cookie, (obd_off)page->index << PAGE_SHIFT);
- /* also zeroing the PRIVBITS low order bitflags */
+ /* also zeroing the PRIVBITS low order bitflags */
page->private = (unsigned long)llap;
llap->llap_page = page;
rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
OBD_BRW_WRITE, 0, 0, 0, 0);
if (rc != 0) { /* async failed, try sync.. */
- struct obd_sync_io_container *osic;
- osic_init(&osic);
+ struct obd_io_group *oig;
+ rc = oig_init(&oig);
+ if (rc)
+ GOTO(out, rc);
llap->llap_write_queued = 0;
- rc = obd_queue_sync_io(exp, lsm, NULL, osic,
- llap->llap_cookie,
- OBD_BRW_WRITE, 0, to, 0);
+ rc = obd_queue_group_io(exp, lsm, NULL, oig,
+ llap->llap_cookie,
+ OBD_BRW_WRITE, 0, to, 0,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE |
+ ASYNC_GROUP_SYNC);
+
if (rc)
- GOTO(free_osic, rc);
+ GOTO(free_oig, rc);
- rc = obd_trigger_sync_io(exp, lsm, NULL, osic);
+ rc = obd_trigger_group_io(exp, lsm, NULL, oig);
if (rc)
- GOTO(free_osic, rc);
+ GOTO(free_oig, rc);
- rc = osic_wait(osic);
-free_osic:
- osic_release(osic);
+ rc = oig_wait(oig);
+free_oig:
+ oig_release(oig);
GOTO(out, rc);
}
LL_CDEBUG_PAGE(page, "write queued\n");
LPROC_LL_DIRTY_HITS);
}
- /* put the page in the page cache, from now on ll_removepage is
+ /* put the page in the page cache, from now on ll_removepage is
* responsible for cleaning up the llap */
set_page_dirty(page);
exp = ll_i2obdexp(inode);
if (exp == NULL) {
- CERROR("page %p ind %lu gave null export\n", page,
- page->index);
+ CERROR("page %p ind %lu gave null export\n", page, page->index);
EXIT;
return;
}
llap = llap_from_page(page);
if (IS_ERR(llap)) {
- CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
+ CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
page->index, PTR_ERR(llap));
EXIT;
return;
}
//llap_write_complete(inode, llap);
- rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
+ rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
llap->llap_cookie);
if (rc != 0)
CERROR("page %p ind %lu failed: %d\n", page, page->index, rc);
page_extent.l_extent.end =
page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
- matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
- ll_i2info(inode)->lli_smd, LDLM_EXTENT,
+ matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
+ ll_i2info(inode)->lli_smd, LDLM_EXTENT,
&page_extent, LCK_PR, &flags, inode, &match_lockh);
if (matches < 0) {
LL_CDEBUG_PAGE(page, "lock match failed\n");
RETURN(matches);
- }
+ }
if (matches) {
- obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
+ obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
}
RETURN(matches);
}
static int ll_issue_page_read(struct obd_export *exp,
- struct ll_async_page *llap, int defer_uptodate)
+ struct ll_async_page *llap,
+ struct obd_io_group *oig, int defer)
{
struct page *page = llap->llap_page;
int rc;
- /* we don't issue this page as URGENT so that it can be batched
- * with other pages by the kernel's read-ahead. We have a strong
- * requirement that readpage() callers must call wait_on_page()
- * or lock_page() to get into ->sync_page() to trigger the IO */
- llap->llap_defer_uptodate = defer_uptodate;
page_cache_get(page);
- set_bit(LL_PRIVBITS_READ, &page->private); /* see ll_sync_page() */
- rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd,
- NULL, llap->llap_cookie, OBD_BRW_READ, 0,
+ llap->llap_defer_uptodate = defer;
+ rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
+ NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
if (rc) {
LL_CDEBUG_PAGE(page, "read queueing failed\n");
- clear_bit(LL_PRIVBITS_READ, &page->private);
page_cache_release(page);
}
RETURN(rc);
}
-static void ll_readahead(struct ll_readahead_state *ras,
- struct obd_export *exp, struct address_space *mapping)
+#define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2)
+#define LL_RA_MAX(inode) (inode->i_blksize * 3)
+
+static void ll_readahead(struct ll_readahead_state *ras,
+ struct obd_export *exp, struct address_space *mapping,
+ struct obd_io_group *oig)
{
unsigned long i, start, end;
struct ll_async_page *llap;
CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
ras->ras_next_index, ras->ras_last, ras->ras_window,
- start, end);
+ start, end);
spin_unlock(&ras->ras_lock);
* truncating the page (page->mapping == NULL) */
page = grab_cache_page_nowait(mapping, i);
if (page == NULL)
- continue;
-
+ break;
+
/* the book-keeping above promises that we've tried
* all the indices from start to end, so we don't
* stop if anyone returns an error. This may not be good. */
if (IS_ERR(llap) || llap->llap_defer_uptodate)
goto next_page;
- rc = ll_issue_page_read(exp, llap, 1);
+ rc = ll_issue_page_read(exp, llap, oig, 1);
if (rc == 0)
LL_CDEBUG_PAGE(page, "started read-ahead\n");
if (rc) {
}
}
-/* XXX this should really bubble up somehow. */
-#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2)
-#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES))
-
/* called with the ras_lock held or from places where it doesn't matter */
-static void ll_readahead_set(struct ll_readahead_state *ras,
+static void ll_readahead_set(struct inode *inode,
+ struct ll_readahead_state *ras,
unsigned long index)
{
ras->ras_next_index = index;
if (ras->ras_next_index != ~0UL)
ras->ras_next_index++;
- ras->ras_window = LL_RA_MIN;
+ ras->ras_window = LL_RA_MIN(inode);
ras->ras_last = ras->ras_next_index + ras->ras_window;
if (ras->ras_last < ras->ras_next_index)
ras->ras_last = ~0UL;
index);
}
-void ll_readahead_init(struct ll_readahead_state *ras)
+void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
spin_lock_init(&ras->ras_lock);
- ll_readahead_set(ras, 0);
+ ll_readahead_set(inode, ras, 0);
}
-static void ll_readahead_update(struct ll_readahead_state *ras,
+static void ll_readahead_update(struct inode *inode,
+ struct ll_readahead_state *ras,
unsigned long index, int hit)
{
unsigned long issued_start, new_last;
spin_lock(&ras->ras_lock);
- /* we're interested in noticing the index's relation to the
+ /* we're interested in noticing the index's relation to the
* previously issued read-ahead pages */
issued_start = ras->ras_next_index - ras->ras_window - 1;
if (issued_start > ras->ras_next_index)
issued_start = 0;
- CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
ras->ras_next_index, ras->ras_last, ras->ras_window,
hit ? "hit" : "miss", index, issued_start);
- if (!hit &&
+ if (!hit &&
index == ras->ras_next_index && index == ras->ras_last + 1) {
/* special case the kernel's read-ahead running into the
* page just beyond our read-ahead window as an extension
* of our read-ahead. sigh. wishing it was easier to
* turn off 2.4's read-ahead. */
- ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1);
+ ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 1);
if (index != ~0UL)
ras->ras_next_index = index + 1;
ras->ras_last = index;
- } else if (!hit &&
+ } else if (!hit &&
(index > issued_start || ras->ras_next_index >= index)) {
/* deal with a miss way out of the window. we interpret
* this as a seek and restart the window */
- ll_readahead_set(ras, index);
+ ll_readahead_set(inode, ras, index);
- } else if (!hit &&
+ } else if (!hit &&
issued_start <= index && index < ras->ras_next_index) {
/* a miss inside the window? surely its memory pressure
* evicting our read pages before the app can see them.
* we shrink the window aggressively */
unsigned long old_window = ras->ras_window;
- ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN);
+ ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN(inode));
ras->ras_last -= old_window - ras->ras_window;
if (ras->ras_next_index > ras->ras_last)
ras->ras_next_index = ras->ras_last + 1;
CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
ras->ras_next_index, ras->ras_last, ras->ras_window);
- } else if (hit &&
+ } else if (hit &&
issued_start <= index && index < ras->ras_next_index) {
- /* a hit inside the window. grow the window by twice the
+ /* a hit inside the window. grow the window by twice the
* number of pages that are satisified within the window. */
- ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2);
+ ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 2);
/* we want the next readahead pass to issue a windows worth
* beyond where the app currently is */
struct ll_file_data *fd = filp->private_data;
struct inode *inode = page->mapping->host;
struct obd_export *exp;
- int rc;
struct ll_async_page *llap;
+ struct obd_io_group *oig = NULL;
+ int rc;
ENTRY;
LASSERT(PageLocked(page));
(((obd_off)page->index) << PAGE_SHIFT));
LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
+ rc = oig_init(&oig);
+ if (rc < 0)
+ GOTO(out, rc);
+
exp = ll_i2obdexp(inode);
if (exp == NULL)
GOTO(out, rc = -EINVAL);
GOTO(out, rc = PTR_ERR(llap));
if (llap->llap_defer_uptodate) {
- ll_readahead_update(&fd->fd_ras, page->index, 1);
+ ll_readahead_update(inode, &fd->fd_ras, page->index, 1);
+ ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
+ obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
+ oig);
LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
SetPageUptodate(page);
- ll_readahead(&fd->fd_ras, exp, page->mapping);
unlock_page(page);
- RETURN(0);
+ GOTO(out_oig, rc = 0);
}
- ll_readahead_update(&fd->fd_ras, page->index, 0);
+ ll_readahead_update(inode, &fd->fd_ras, page->index, 0);
rc = ll_page_matches(page);
if (rc < 0)
}
}
- rc = ll_issue_page_read(exp, llap, 0);
- if (rc == 0) {
- LL_CDEBUG_PAGE(page, "queued readpage\n");
- if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
- ll_readahead(&fd->fd_ras, exp, page->mapping);
- }
+ rc = ll_issue_page_read(exp, llap, oig, 0);
+ if (rc)
+ GOTO(out, rc);
+
+ LL_CDEBUG_PAGE(page, "queued readpage\n");
+ if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+ ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
+
+ rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
+
out:
- if (rc)
+ if (rc)
unlock_page(page);
+out_oig:
+ if (oig != NULL)
+ oig_release(oig);
RETURN(rc);
}
+#if 0
/* this is for read pages. we issue them as ready but not urgent. when
* someone waits on them we fire them off, hopefully merged with adjacent
* reads that were queued by read-ahead. */
exp = ll_i2obdexp(page->mapping->host);
if (exp == NULL)
RETURN(-EINVAL);
-
+
llap = llap_from_page(page);
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
- rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
- NULL, llap->llap_cookie,
+ rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
+ NULL, llap->llap_cookie,
ASYNC_READY|ASYNC_URGENT);
return rc;
}
+#endif