From f2800b1ad4754aee1f7fed720c835f91ac1fea28 Mon Sep 17 00:00:00 2001 From: zab Date: Thu, 18 Dec 2003 04:13:42 +0000 Subject: [PATCH] b=2252 r=adilger (didn't see regressions in buffalo, confirmed read throughput increases with sf and fpp multi-node IOR) This cleans up llite's readpage path and implements our own read-ahead window that hangs off of ll_file_data. The broad goal is to keep a fair amount of read-ahead pages issued and queued which can be fired off into read rpcs as read-ahead rpcs are completed. --- lnet/include/linux/kp30.h | 1 + lnet/utils/debug.c | 2 +- lustre/include/linux/lustre_compat25.h | 7 + lustre/include/linux/lustre_lite.h | 5 - lustre/llite/file.c | 6 +- lustre/llite/llite_internal.h | 18 +- lustre/llite/rw.c | 384 +++++++++++++++++++++++---------- lustre/llite/rw24.c | 20 +- lustre/osc/osc_request.c | 39 +++- lustre/portals/include/linux/kp30.h | 1 + lustre/portals/utils/debug.c | 2 +- 11 files changed, 348 insertions(+), 137 deletions(-) diff --git a/lnet/include/linux/kp30.h b/lnet/include/linux/kp30.h index 2cd7b06..b0889a2 100644 --- a/lnet/include/linux/kp30.h +++ b/lnet/include/linux/kp30.h @@ -71,6 +71,7 @@ extern unsigned int portal_cerror; #define D_HA (1 << 19) /* recovery and failover */ #define D_RPCTRACE (1 << 20) /* for distributed debugging */ #define D_VFSTRACE (1 << 21) +#define D_READA (1 << 22) /* read-ahead */ #ifdef __KERNEL__ # include /* THREAD_SIZE */ diff --git a/lnet/utils/debug.c b/lnet/utils/debug.c index 3f3e69c..14750d8 100644 --- a/lnet/utils/debug.c +++ b/lnet/utils/debug.c @@ -69,7 +69,7 @@ static const char *portal_debug_masks[] = {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl", "blocks", "net", "warning", "buffs", "other", "dentry", "portals", "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace", - NULL}; + "reada", NULL}; struct debug_daemon_cmd { char *cmd; diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index fdd1abf..62bde7c 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -145,6 +145,13 @@ static inline void lustre_daemonize_helper(void) #define conditional_schedule() if (unlikely(need_resched())) schedule() #endif +/* 2.6 has the lovely PagePrivate bit for indicating that a filesystem + * has hung state off of page->private. We use it. */ +#define PG_private 9 /* unused in 2.4, apparently. */ +#define SetPagePrivate(page) set_bit(PG_private, &(page)->flags) +#define ClearPagePrivate(page) clear_bit(PG_private, &(page)->flags) +#define PagePrivate(page) test_bit(PG_private, &(page)->flags) + #endif /* end of 2.4 compat macros */ #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 6c02f81..c496b42 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -40,11 +40,6 @@ /* careful, this is easy to screw up */ #define PAGE_CACHE_MAXBYTES ((__u64)(~0UL) << PAGE_CACHE_SHIFT) -extern kmem_cache_t *ll_file_data_slab; -struct ll_file_data { - struct obd_client_handle fd_mds_och; - __u32 fd_flags; -}; /* struct lustre_intent_data { diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1a535f1..c7bb5a6 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -159,6 +159,7 @@ static int ll_local_open(struct file *file, struct lookup_intent *it) memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle)); fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC; file->private_data = fd; + ll_readahead_init(&fd->fd_ras); lli->lli_io_epoch = body->io_epoch; @@ -598,10 +599,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (err != ELDLM_OK) RETURN(err); + CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n", inode->i_ino, count, *ppos); + + /* turn off the kernel's read-ahead */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - filp->f_ramax = 0; /* turn off generic_file_readahead() */ + filp->f_ramax = 0; #else filp->f_ra.ra_pages = 0; #endif diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 8adc915..9818c43 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -43,6 +43,18 @@ struct ll_sb_info { struct list_head ll_pglist; }; +struct ll_readahead_state { + spinlock_t ras_lock; + unsigned long ras_last, ras_window, ras_next_index; +}; + +extern kmem_cache_t *ll_file_data_slab; +struct ll_file_data { + struct obd_client_handle fd_mds_och; + struct ll_readahead_state fd_ras; + __u32 fd_flags; +}; + struct lustre_handle; struct lov_stripe_md; @@ -95,9 +107,11 @@ struct it_cb_data { struct ll_async_page { int llap_magic; void *llap_cookie; - int llap_queued; struct page *llap_page; struct list_head llap_pending_write; + /* only trust these if the page lock is providing exclusion */ + int llap_write_queued:1, + llap_defer_uptodate:1; struct list_head llap_proc_item; }; @@ -142,9 +156,11 @@ int ll_ocp_update_obdo(struct obd_client_page *ocp, int cmd, struct obdo *oa); int ll_ocp_set_io_ready(struct obd_client_page *ocp, int cmd); int ll_ocp_update_io_args(struct obd_client_page *ocp, int cmd); void ll_removepage(struct page *page); +int ll_sync_page(struct page *page); int ll_readpage(struct file *file, struct page *page); struct ll_async_page *llap_from_cookie(void *cookie); struct ll_async_page *llap_from_page(struct page *page); +void ll_readahead_init(struct ll_readahead_state *ras); void ll_truncate(struct inode *inode); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index e11eae1..0475d13 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -232,18 +232,23 @@ static int ll_ap_make_ready(void *data, int cmd) struct page *page; ENTRY; - /* reads are always locked between queueing and completion, - * llite should never queue pages without _READY */ - LASSERT(cmd != OBD_BRW_READ); - llap = llap_from_cookie(data); if (IS_ERR(llap)) RETURN(-EINVAL); page = llap->llap_page; + if (cmd == OBD_BRW_READ) { + /* paths that want to cancel a read-ahead clear page-private + * before locking the page */ + if (test_and_clear_bit(PG_private, &page->flags)) + RETURN(0); + RETURN(-EINTR); + } + + /* we're trying to write, but the page is locked.. come back later */ if (TryLockPage(page)) - RETURN(-EBUSY); + RETURN(-EAGAIN); LL_CDEBUG_PAGE(page, "made ready\n"); page_cache_get(page); @@ -400,12 +405,15 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from, if (exp == NULL) RETURN(-EINVAL); + /* _make_ready only sees llap once we've unlocked the page */ + llap->llap_write_queued = 1; rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie, OBD_BRW_WRITE, 0, 0, 0, 0); if (rc != 0) { /* async failed, try sync.. */ struct obd_sync_io_container *osic; osic_init(&osic); + llap->llap_write_queued = 0; rc = obd_queue_sync_io(exp, lsm, NULL, osic, llap->llap_cookie, OBD_BRW_WRITE, 0, to, 0); @@ -422,7 +430,6 @@ free_osic: GOTO(out, rc); } LL_CDEBUG_PAGE(page, "write queued\n"); - llap->llap_queued = 1; //llap_write_pending(inode, llap); } else { lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, @@ -506,103 +513,215 @@ void ll_removepage(struct page *page) EXIT; } -static int ll_start_readpage(struct obd_export *exp, struct inode *inode, - struct page *page) +static int ll_page_matches(struct page *page) { - struct ll_async_page *llap; - int rc; + struct lustre_handle match_lockh = {0}; + struct inode *inode = page->mapping->host; + struct ldlm_extent page_extent; + int flags, matches; ENTRY; - llap = llap_from_page(page); - if (IS_ERR(llap)) - RETURN(PTR_ERR(llap)); - + page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; + page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; + flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; + matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, + ll_i2info(inode)->lli_smd, LDLM_EXTENT, + &page_extent, sizeof(page_extent), + LCK_PR, &flags, inode, &match_lockh); + if (matches < 0) { + LL_CDEBUG_PAGE(page, "lock match failed\n"); + RETURN(matches); + } + if (matches) { + obd_cancel(ll_i2sbi(inode)->ll_osc_exp, + ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); + } + RETURN(matches); +} + +static int ll_issue_page_read(struct obd_export *exp, + struct ll_async_page *llap, + int defer_uptodate) +{ + struct page *page = llap->llap_page; + int rc; + + /* we don't issue this page as URGENT so that it can be batched + * with other pages by the kernel's read-ahead. We have a strong + * requirement that readpage() callers must call wait_on_page() + * or lock_page() to get into ->sync_page() to trigger the IO */ + llap->llap_defer_uptodate = defer_uptodate; page_cache_get(page); - - rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, - llap->llap_cookie, OBD_BRW_READ, 0, PAGE_SIZE, - 0, ASYNC_READY | ASYNC_URGENT | - ASYNC_COUNT_STABLE); - /* XXX verify that failed pages here will make their way - * through ->removepage.. I suspect they will. */ - if (rc) + SetPagePrivate(page); + rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd, + NULL, llap->llap_cookie, OBD_BRW_READ, 0, + PAGE_SIZE, 0, ASYNC_COUNT_STABLE); + if (rc) { + LL_CDEBUG_PAGE(page, "read queueing failed\n"); + ClearPagePrivate(page); page_cache_release(page); - else { - llap->llap_queued = 1; - LL_CDEBUG_PAGE(page, "read queued\n"); } RETURN(rc); } -static void ll_start_readahead(struct obd_export *exp, struct inode *inode, - unsigned long first_index) +static void ll_readahead(struct ll_readahead_state *ras, + struct obd_export *exp, struct address_space *mapping) { - struct lustre_handle match_lockh = {0}; - struct ldlm_extent page_extent; - unsigned long index, end_index; + unsigned long i, start, end; + struct ll_async_page *llap; struct page *page; - int flags, matched, rc; - - /* for good throughput we need to have many 'blksize' rpcs in - * flight per stripe, so we try to read-ahead a ridiculous amount - * of data. "- 3" for 8 rpcs */ - end_index = first_index + (inode->i_blksize >> (PAGE_CACHE_SHIFT - 3)); - if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT)) - end_index = inode->i_size >> PAGE_CACHE_SHIFT; - - for (index = first_index + 1; index < end_index; index++) { - /* try to get a ref on an existing page or create a new - * one. if we find a locked page or lose the race - * with another reader we stop trying */ - page = grab_cache_page_nowait(inode->i_mapping, index); - if (page == NULL) - break; - /* make sure we didn't race with other teardown/readers */ - if (!page->mapping || Page_Uptodate(page)) { - unlock_page(page); - page_cache_release(page); - continue; - } + int rc; - /* make sure the page we're about to read is covered - * by a lock, stop when we go past the end of the lock */ - page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; - page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; - flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; - matched = obd_match(ll_i2sbi(inode)->ll_osc_exp, - ll_i2info(inode)->lli_smd, LDLM_EXTENT, - &page_extent, sizeof(page_extent), LCK_PR, - &flags, inode, &match_lockh); - if (matched < 0) { - LL_CDEBUG_PAGE(page, "lock match failed\n"); - unlock_page(page); - page_cache_release(page); - break; - } - if (matched == 0) { - LL_CDEBUG_PAGE(page, "didn't match a lock\n"); - unlock_page(page); - page_cache_release(page); - break; - } + if (mapping->host->i_size == 0) + return; - /* interestingly, we don't need to hold the lock across the IO. - * As long as we match the lock while the page is locked in the - * page cache we know that the lock's cancelation will wait for - * the page to be unlocked. XXX this should transition to - * proper association of pages and locks in the future */ - obd_cancel(ll_i2sbi(inode)->ll_osc_exp, - ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); + spin_lock(&ras->ras_lock); + + /* make sure to issue a window's worth of read-ahead pages */ + end = ras->ras_last; + start = end - ras->ras_window; + if (start > end) + start = 0; + + /* but don't iterate over pages that we've already issued. this + * will set start to end + 1 if we've already read-ahead up to + * ras_last sothe for() won't be entered */ + if (ras->ras_next_index > start) + start = ras->ras_next_index; + if (end != ~0UL) + ras->ras_next_index = end + 1; + + CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n", + ras->ras_next_index, ras->ras_last, ras->ras_window, + start, end); + + spin_unlock(&ras->ras_lock); + + /* clamp to filesize */ + i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT; + end = min(end, i); + + for (i = start; i <= end; i++) { + /* grab_cache_page_nowait returns null if this races with + * truncating the page (page->mapping == NULL) */ + page = grab_cache_page_nowait(mapping, i); + if (page == NULL) + continue; + + /* the book-keeping above promises that we've tried + * all the indices from start to end, so we don't + * stop if anyone returns an error. This may not be good. */ + if (Page_Uptodate(page) || ll_page_matches(page) <= 0) + goto next_page; + + llap = llap_from_page(page); + if (IS_ERR(llap) || llap->llap_defer_uptodate) + goto next_page; + + rc = ll_issue_page_read(exp, llap, 1); + if (rc == 0) + LL_CDEBUG_PAGE(page, "started read-ahead\n"); + if (rc) { + next_page: + LL_CDEBUG_PAGE(page, "skipping read-ahead\n"); - rc = ll_start_readpage(exp, inode, page); - if (rc != 0) { unlock_page(page); - page_cache_release(page); - break; } page_cache_release(page); } } + +/* XXX this should really bubble up somehow. */ +#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2) +#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES)) + +/* called with the ras_lock held or from places where it doesn't matter */ +static void ll_readahead_set(struct ll_readahead_state *ras, + unsigned long index) +{ + ras->ras_next_index = index; + if (ras->ras_next_index != ~0UL) + ras->ras_next_index++; + ras->ras_window = LL_RA_MIN; + ras->ras_last = ras->ras_next_index + ras->ras_window; + if (ras->ras_last < ras->ras_next_index) + ras->ras_last = ~0UL; + CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n", + ras->ras_next_index, ras->ras_last, ras->ras_window, + index); +} + +void ll_readahead_init(struct ll_readahead_state *ras) +{ + spin_lock_init(&ras->ras_lock); + ll_readahead_set(ras, 0); +} + +static void ll_readahead_update(struct ll_readahead_state *ras, + unsigned long index, int hit) +{ + unsigned long issued_start, new_last; + + spin_lock(&ras->ras_lock); + + /* we're interested in noticing the index's relation to the + * previously issued read-ahead pages */ + issued_start = ras->ras_next_index - ras->ras_window - 1; + if (issued_start > ras->ras_next_index) + issued_start = 0; + + CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n", + ras->ras_next_index, ras->ras_last, ras->ras_window, + hit ? "hit" : "miss", index, issued_start); + if (!hit && + index == ras->ras_next_index && index == ras->ras_last + 1) { + /* special case the kernel's read-ahead running into the + * page just beyond our read-ahead window as an extension + * of our read-ahead. sigh. wishing it was easier to + * turn off 2.4's read-ahead. */ + ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1); + if (index != ~0UL) + ras->ras_next_index = index + 1; + ras->ras_last = index; + } else if (!hit && + (index > issued_start || ras->ras_next_index >= index)) { + /* deal with a miss way out of the window. we interpret + * this as a seek and restart the window */ + ll_readahead_set(ras, index); + + } else if (!hit && + issued_start <= index && index < ras->ras_next_index) { + /* a miss inside the window? surely its memory pressure + * evicting our read pages before the app can see them. + * we shrink the window aggressively */ + unsigned long old_window = ras->ras_window; + + ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN); + ras->ras_last -= old_window - ras->ras_window; + if (ras->ras_next_index > ras->ras_last) + ras->ras_next_index = ras->ras_last + 1; + CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n", + ras->ras_next_index, ras->ras_last, ras->ras_window); + + } else if (hit && + issued_start <= index && index < ras->ras_next_index) { + /* a hit inside the window. grow the window by twice the + * number of pages that are satisified within the window. */ + ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2); + + /* we want the next readahead pass to issue a windows worth + * beyond where the app currently is */ + new_last = index + ras->ras_window; + if (new_last > ras->ras_last) + ras->ras_last = new_last; + + CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n", + ras->ras_next_index, ras->ras_last, ras->ras_window); + } + + spin_unlock(&ras->ras_lock); +} + /* * for now we do our readpage the same on both 2.4 and 2.5. The kernel's * read-ahead assumes it is valid to issue readpage all the way up to @@ -612,14 +731,13 @@ static void ll_start_readahead(struct obd_export *exp, struct inode *inode, * 2.6 is how read-ahead gets batched and issued, but we're using our own, * so they look the same. */ -int ll_readpage(struct file *file, struct page *page) +int ll_readpage(struct file *filp, struct page *page) { + struct ll_file_data *fd = filp->private_data; struct inode *inode = page->mapping->host; - struct lustre_handle match_lockh = {0}; struct obd_export *exp; - struct ldlm_extent page_extent; - int flags, rc = 0, matched; - struct ll_sb_info *sbi = ll_i2sbi(inode); + int rc; + struct ll_async_page *llap; ENTRY; LASSERT(PageLocked(page)); @@ -627,30 +745,32 @@ int ll_readpage(struct file *file, struct page *page) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n", inode->i_ino, inode->i_generation, inode, (((obd_off)page->index) << PAGE_SHIFT)); - LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0); - - if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) { - CERROR("reading beyond EOF\n"); - memset(kmap(page), 0, PAGE_SIZE); - kunmap(page); - SetPageUptodate(page); - GOTO(out, rc = 0); - } + LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0); exp = ll_i2obdexp(inode); if (exp == NULL) GOTO(out, rc = -EINVAL); - page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; - page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; - flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; - matched = obd_match(sbi->ll_osc_exp, ll_i2info(inode)->lli_smd, - LDLM_EXTENT, &page_extent, sizeof(page_extent), - LCK_PR, &flags, inode, &match_lockh); - if (matched < 0) - GOTO(out, rc = matched); + llap = llap_from_page(page); + if (IS_ERR(llap)) + GOTO(out, rc = PTR_ERR(llap)); + + if (llap->llap_defer_uptodate) { + ll_readahead_update(&fd->fd_ras, page->index, 1); + LL_CDEBUG_PAGE(page, "marking uptodate from defer\n"); + SetPageUptodate(page); + ll_readahead(&fd->fd_ras, exp, page->mapping); + unlock_page(page); + RETURN(0); + } + + ll_readahead_update(&fd->fd_ras, page->index, 0); - if (matched == 0) { + rc = ll_page_matches(page); + if (rc < 0) + GOTO(out, rc); + + if (rc == 0) { static unsigned long next_print; CDEBUG(D_INODE, "didn't match a lock"); if (time_after(jiffies, next_print)) { @@ -660,15 +780,49 @@ int ll_readpage(struct file *file, struct page *page) } } - rc = ll_start_readpage(exp, inode, page); - if (rc == 0 && (sbi->ll_flags & LL_SBI_READAHEAD)) - ll_start_readahead(exp, inode, page->index); - - if (matched == 1) - obd_cancel(ll_i2sbi(inode)->ll_osc_exp, - ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); + rc = ll_issue_page_read(exp, llap, 0); + if (rc == 0) { + LL_CDEBUG_PAGE(page, "queued readpage\n"); + if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)) + ll_readahead(&fd->fd_ras, exp, page->mapping); + } out: - if (rc) + if (rc) unlock_page(page); RETURN(rc); } + +/* this is for read pages. we issue them as ready but not urgent. when + * someone waits on them we fire them off, hopefully merged with adjacent + * reads that were queued by the kernel's read-ahead. */ +int ll_sync_page(struct page *page) +{ + struct obd_export *exp; + struct ll_async_page *llap; + int rc; + ENTRY; + + /* we're abusing PagePrivate to signify that a queued read should + * be issued once someone goes to lock it. it is cleared by + * canceling the read-ahead page before discarding and by issuing + * the read rpc */ + if (!PagePrivate(page)) + RETURN(0); + ClearPagePrivate(page); + + /* careful to only deref page->mapping after checking PagePrivate */ + exp = ll_i2obdexp(page->mapping->host); + if (exp == NULL) + RETURN(-EINVAL); + + llap = llap_from_page(page); + if (IS_ERR(llap)) + RETURN(PTR_ERR(llap)); + + LL_CDEBUG_PAGE(page, "setting ready|urgent\n"); + + rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd, + NULL, llap->llap_cookie, + ASYNC_READY|ASYNC_URGENT); + return rc; +} diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c index fe11b27..0adc60d 100644 --- a/lustre/llite/rw24.c +++ b/lustre/llite/rw24.c @@ -61,14 +61,16 @@ void ll_ap_completion_24(void *data, int cmd, int rc) return; } - llap->llap_queued = 0; page = llap->llap_page; - LASSERT(PageLocked(page)); if (rc == 0) { - if (cmd == OBD_BRW_READ) - SetPageUptodate(page); + if (cmd == OBD_BRW_READ) { + if (!llap->llap_defer_uptodate) + SetPageUptodate(page); + } else { + llap->llap_write_queued = 0; + } } else { SetPageError(page); } @@ -105,20 +107,21 @@ static int ll_writepage_24(struct page *page) GOTO(out, rc = PTR_ERR(llap)); page_cache_get(page); - if (llap->llap_queued) { + if (llap->llap_write_queued) { LL_CDEBUG_PAGE(page, "marking urgent\n"); rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, llap->llap_cookie, ASYNC_READY | ASYNC_URGENT); } else { + llap->llap_write_queued = 1; rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, llap->llap_cookie, OBD_BRW_WRITE, 0, 0, OBD_BRW_CREATE, ASYNC_READY | ASYNC_URGENT); - if (rc == 0) { + if (rc == 0) LL_CDEBUG_PAGE(page, "mmap write queued\n"); - llap->llap_queued = 1; - } + else + llap->llap_write_queued = 0; } if (rc) page_cache_release(page); @@ -210,5 +213,6 @@ struct address_space_operations ll_aops = { prepare_write: ll_prepare_write, commit_write: ll_commit_write, removepage: ll_removepage, + sync_page: ll_sync_page, bmap: NULL }; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6e5a9e0..a23975a 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1266,13 +1266,39 @@ static int osc_send_oap_rpc(struct client_obd *cli, int cmd, * will still be on the dirty list). we could call in * at the end of ll_file_write to process the queue again. */ if (!(oap->oap_async_flags & ASYNC_READY)) { - if (ops->ap_make_ready(oap->oap_caller_data, cmd)) { - CDEBUG(D_INODE, "oap at page_count %d not " - "ready\n", page_count); + int rc = ops->ap_make_ready(oap->oap_caller_data, cmd); + if (rc < 0) + CDEBUG(D_INODE, "oap %p page %p returned %d " + "instead of ready\n", oap, + oap->oap_page, rc); + switch (rc) { + case -EAGAIN: + /* llite is telling us that the page is still + * in commit_write and that we should try + * and put it in an rpc again later. we + * break out of the loop so we don't create + * a whole in the sequence of pages in + * the rpc stream.*/ + pos = NULL; + break; + case -EINTR: + /* the io isn't needed.. tell the checks + * below to complete the rpc with EINTR */ + oap->oap_async_flags |= ASYNC_COUNT_STABLE; + oap->oap_count = -EINTR; + break; + case 0: + oap->oap_async_flags |= ASYNC_READY; + break; + default: + LASSERTF(0, "oap %p page %p returned %d " + "from make_ready\n", oap, + oap->oap_page, rc); break; } - oap->oap_async_flags |= ASYNC_READY; } + if (pos == NULL) + break; /* take the page out of our book-keeping */ list_del_init(&oap->oap_pending_item); @@ -1723,7 +1749,10 @@ static int osc_set_async_flags(struct obd_export *exp, spin_lock(&cli->cl_loi_list_lock); - if (oap->oap_async_flags == async_flags) + if (list_empty(&oap->oap_pending_item)) + GOTO(out, rc = -EINVAL); + + if ((oap->oap_async_flags & async_flags) == async_flags) GOTO(out, rc = 0); if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) diff --git a/lustre/portals/include/linux/kp30.h b/lustre/portals/include/linux/kp30.h index 2cd7b06..b0889a2 100644 --- a/lustre/portals/include/linux/kp30.h +++ b/lustre/portals/include/linux/kp30.h @@ -71,6 +71,7 @@ extern unsigned int portal_cerror; #define D_HA (1 << 19) /* recovery and failover */ #define D_RPCTRACE (1 << 20) /* for distributed debugging */ #define D_VFSTRACE (1 << 21) +#define D_READA (1 << 22) /* read-ahead */ #ifdef __KERNEL__ # include /* THREAD_SIZE */ diff --git a/lustre/portals/utils/debug.c b/lustre/portals/utils/debug.c index 3f3e69c..14750d8 100644 --- a/lustre/portals/utils/debug.c +++ b/lustre/portals/utils/debug.c @@ -69,7 +69,7 @@ static const char *portal_debug_masks[] = {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl", "blocks", "net", "warning", "buffs", "other", "dentry", "portals", "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace", - NULL}; + "reada", NULL}; struct debug_daemon_cmd { char *cmd; -- 1.8.3.1