From ad856da8d0e3508c57ffc5a9ad3398195ec09316 Mon Sep 17 00:00:00 2001 From: zab Date: Thu, 31 Jul 2003 17:15:41 +0000 Subject: [PATCH] - introduce tunables for the aggregate amount of cached data that llite will allow on a machine. When we have more than 'dirty_high_water_MB' meg of data dirty we start async ocp io on the amount of data between the high water mark and 'dirty_low_water_MB'. If we ever have more than 'max_dirty_MB' we write synchronously in commit_write, batching with other dirty data in the inode. these watermarks are checked every time we pass through commit_write and we account seperately for dirty pages and pages that are under io. - add ocp_count to the ocp so that we can write partial pages at the end of the file. make the callback func a proper type. - remove iod.c, the few code paths we cared about in there are moved to rw{,24,26}.c - turn off generic file read-ahead by setting filp->f_ramax to 0. - have writepage and commit_write when writing sync use the plugging to batch their io with io started from other dirty pages in the inode's dirty list - woops, missed a hardcoded '16'. move to max_pags_per_rpc There's still much to do. The 2.6 side needs work on its many-page variants, kiovec->bio, etc. We need to actually start watermark io from the dirty inode list. --- lustre/llite/rw24.c | 324 +++++++++++++++++++++++----------------------- lustre/llite/rw26.c | 26 ++-- lustre/osc/osc_internal.h | 7 - 3 files changed, 175 insertions(+), 182 deletions(-) diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c index feddd3b..1ea2d48 100644 --- a/lustre/llite/rw24.c +++ b/lustre/llite/rw24.c @@ -49,22 +49,123 @@ #include "llite_internal.h" #include +void ll_complete_readpage_24(struct obd_client_page *ocp, int rc) +{ + struct page *page = ocp->ocp_page; + + LASSERT(page->private == (unsigned long)ocp); + LASSERT(PageLocked(page)); + + if (rc == 0) + SetPageUptodate(page); + + ocp_free(page); + unlock_page(page); + page_cache_release(page); +} + +int ll_start_readpage_24(struct ll_file_data *fd, struct obd_export *exp, + struct inode *inode, struct page *page) +{ + struct obdo oa; + struct obd_client_page *ocp; + int rc; + ENTRY; + + ocp = ocp_alloc(page); + if (IS_ERR(ocp)) + RETURN(PTR_ERR(ocp)); + + ocp->ocp_callback = ll_complete_readpage_24; + ocp->ocp_count = PAGE_CACHE_SIZE; + ocp->ocp_flag = 0; + + oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id; + memcpy(obdo_handle(&oa), &fd->fd_ost_och.och_fh, + sizeof(fd->fd_ost_och.och_fh)); + oa.o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME); + + rc = obd_brw_async_ocp(OBD_BRW_READ, exp, &oa, + ll_i2info(inode)->lli_smd, NULL, ocp, NULL); + RETURN(rc); +} + +void ll_start_readahead(struct ll_file_data *fd, struct obd_export *exp, + struct inode *inode, unsigned long first_index) +{ + struct lustre_handle match_lockh = {0}; + struct ldlm_extent page_extent; + unsigned long index, end_index; + struct page *page; + int flags, matched, rc; + + /* try to read the file's preferred block size in a one-er */ + end_index = first_index + (inode->i_blksize >> PAGE_CACHE_SHIFT); + if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT)) + end_index = inode->i_size >> PAGE_CACHE_SHIFT; + + for (index = first_index + 1; index < end_index; index++) { + /* see if the page already exists and needs updating */ + page = find_get_page(inode->i_mapping, index); + if (page) { + if (Page_Uptodate(page) || TryLockPage(page)) { + page_cache_release(page); + continue; + } + } else { + /* ok, we have to create it.. */ + page = grab_cache_page(inode->i_mapping, index); + if (page == NULL) + break; + } + /* make sure we didn't race with other teardown/readers */ + if (!page->mapping || Page_Uptodate(page)) { + page_cache_release(page); + continue; + } + + /* our lock matching is presumed to be more expensive + * than the pagecache lookups */ + page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; + page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; + flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; + matched = obd_match(&ll_i2sbi(inode)->ll_osc_conn, + ll_i2info(inode)->lli_smd, LDLM_EXTENT, + &page_extent, sizeof(page_extent), LCK_PR, + &flags, inode, &match_lockh); + if (!matched) { + page_cache_release(page); + break; + } + /* interestingly, we don't need to hold the lock across the + * IO. As long as we match the lock while the page is in + * the page cache we know that the lock's cancelation will + * invalidate the page. XXX this should transition to + * proper association of pages and locks in the future */ + obd_cancel(&ll_i2sbi(inode)->ll_osc_conn, + ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); + + rc = ll_start_readpage_24(fd, exp, inode, page); + if (rc != 0) { + page_cache_release(page); + break; + } + } +} /* * we were asked to read a single page but we're going to try and read a batch * of pages all at once. this vaguely simulates client-side read-ahead that * is done via ->readpages in 2.5. */ -static int ll_readpage_24(struct file *file, struct page *first_page) +static int ll_readpage_24(struct file *file, struct page *page) { - struct inode *inode = first_page->mapping->host; - struct ll_inode_info *lli = ll_i2info(inode); - struct page *page = first_page; - struct list_head *pos; - struct brw_page *pgs; - struct obdo *oa; - unsigned long end_index, extent_end = 0; - struct ptlrpc_request_set *set; - int npgs = 0, rc = 0, max_pages; + struct inode *inode = page->mapping->host; + struct lustre_handle match_lockh = {0}; + struct obd_export *exp; + struct ldlm_extent page_extent; + int flags, rc = 0, matched; + struct ll_file_data *fd = file->private_data; ENTRY; LASSERT(PageLocked(page)); @@ -83,211 +184,104 @@ static int ll_readpage_24(struct file *file, struct page *first_page) RETURN(rc); } - /* try to read the file's preferred block size in a one-er */ - end_index = first_page->index + - (inode->i_blksize >> PAGE_CACHE_SHIFT); - if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT)) - end_index = inode->i_size >> PAGE_CACHE_SHIFT; - - max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >> - PAGE_SHIFT; - pgs = kmalloc(max_pages * sizeof(*pgs), GFP_USER); - if (pgs == NULL) - RETURN(-ENOMEM); - - /* - * find how far we're allowed to read under the extent ll_file_read - * is passing us.. - */ - spin_lock(&lli->lli_read_extent_lock); - list_for_each(pos, &lli->lli_read_extents) { - struct ll_read_extent *rextent; - rextent = list_entry(pos, struct ll_read_extent, re_lli_item); - if (rextent->re_task != current) - continue; - - if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end) - /* extent wrapping */ - extent_end = ~0; - else { - extent_end = (rextent->re_extent.end + PAGE_SIZE) - << PAGE_CACHE_SHIFT; - /* 32bit indexes, 64bit extents.. */ - if (((u64)extent_end >> PAGE_CACHE_SHIFT) < - rextent->re_extent.end) - extent_end = ~0; - } - break; - } - spin_unlock(&lli->lli_read_extent_lock); + exp = ll_i2obdexp(inode); + if (exp == NULL) + RETURN(-EINVAL); - if (extent_end == 0) { + page_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; + page_extent.end = page_extent.start + PAGE_CACHE_SIZE - 1; + flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; + matched = obd_match(&ll_i2sbi(inode)->ll_osc_conn, + ll_i2info(inode)->lli_smd, LDLM_EXTENT, + &page_extent, sizeof(page_extent), LCK_PR, &flags, + inode, &match_lockh); + + /* if we wanted to do read-ahead here we could ldlm_handle2lock + * on the lock and issue reads up to the end of the lock */ + if (!matched) { static unsigned long next_print; + CDEBUG(D_INODE, "didn't match a lock"); if (time_after(jiffies, next_print)) { next_print = jiffies + 30 * HZ; - CDEBUG(D_INODE, "mmap readpage - check locks\n"); - } - end_index = page->index + 1; - } else if (extent_end < end_index) - end_index = extent_end; - - /* to balance the find_get_page ref the other pages get that is - * decrefed on teardown.. */ - page_cache_get(page); - do { - unsigned long index ; - - pgs[npgs].pg = page; - pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT; - pgs[npgs].flag = 0; - pgs[npgs].count = PAGE_SIZE; - /* XXX Workaround for BA OSTs returning short reads at EOF. - * The linux OST will return the full page, zero-filled at the - * end, which will just overwrite the data we set here. Bug - * 593 relates to fixing this properly. - */ - if (inode->i_size < pgs[npgs].off + PAGE_SIZE) { - int count = inode->i_size - pgs[npgs].off; - void *addr = kmap(page); - pgs[npgs].count = count; - //POISON(addr, 0x7c, count); - memset(addr + count, 0, PAGE_SIZE - count); - kunmap(page); - } - - npgs++; - if (npgs == max_pages) - break; - - /* - * find pages ahead of us that we can read in. - * grab_cache_page waits on pages that are locked so - * we first try find_get_page, which doesn't. this stops - * the worst case behaviour of racing threads waiting on - * each other, but doesn't remove it entirely. - */ - for (index = page->index + 1, page = NULL; - page == NULL && index < end_index; index++) { - - /* see if the page already exists and needs updating */ - page = find_get_page(inode->i_mapping, index); - if (page) { - if (Page_Uptodate(page) || TryLockPage(page)) - goto out_release; - if (!page->mapping || Page_Uptodate(page)) - goto out_unlock; - } else { - /* ok, we have to create it.. */ - page = grab_cache_page(inode->i_mapping, index); - if (page == NULL) - continue; - if (Page_Uptodate(page)) - goto out_unlock; - } - - break; - - out_unlock: - unlock_page(page); - out_release: - page_cache_release(page); - page = NULL; + CERROR("not covered by a lock (mmap?). check debug " + "logs.\n"); } - - } while (page); - - if ((oa = obdo_alloc()) == NULL) { - CERROR("ENOMEM allocing obdo\n"); - rc = -ENOMEM; - } else if ((set = ptlrpc_prep_set()) == NULL) { - CERROR("ENOMEM allocing request set\n"); - obdo_free(oa); - rc = -ENOMEM; - } else { - struct ll_file_data *fd = file->private_data; - - oa->o_id = lli->lli_smd->lsm_object_id; - memcpy(obdo_handle(oa), &fd->fd_ost_och.och_fh, - sizeof(fd->fd_ost_och.och_fh)); - oa->o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME); - - rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode), oa, - ll_i2info(inode)->lli_smd, npgs, pgs, - set, NULL); - if (rc == 0) - rc = ptlrpc_set_wait(set); - ptlrpc_set_destroy(set); - if (rc == 0) - obdo_refresh_inode(inode, oa, oa->o_valid); - if (rc && rc != -EIO) - CERROR("error from obd_brw_async: rc = %d\n", rc); - obdo_free(oa); } - while (npgs-- > 0) { - page = pgs[npgs].pg; + obd_brw_plug(OBD_BRW_READ, exp, ll_i2info(inode)->lli_smd, NULL); - if (rc == 0) - SetPageUptodate(page); - unlock_page(page); + page_cache_get(page); + rc = ll_start_readpage_24(fd, exp, inode, page); + if (rc == 0) { + ll_start_readahead(fd, exp, inode, page->index); + } else { page_cache_release(page); } - kfree(pgs); + obd_brw_unplug(OBD_BRW_READ, exp, ll_i2info(inode)->lli_smd, NULL); + + if (matched) + obd_cancel(&ll_i2sbi(inode)->ll_osc_conn, + ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh); + class_export_put(exp); RETURN(rc); } void ll_complete_writepage_24(struct obd_client_page *ocp, int rc) { struct page *page = ocp->ocp_page; + struct inode *inode = page->mapping->host; LASSERT(page->private == (unsigned long)ocp); LASSERT(PageLocked(page)); -#if 0 rc = ll_clear_dirty_pages(ll_i2obdconn(inode), ll_i2info(inode)->lli_smd, page->index, page->index); LASSERT(rc == 0); -#endif ocp_free(page); unlock_page(page); } +int ll_ocp_write_count(struct inode *inode, struct page *page) +{ + if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size) + return inode->i_size % PAGE_SIZE; + else + return PAGE_SIZE; +} + static int ll_writepage_24(struct page *page) { struct inode *inode = page->mapping->host; - struct obdo oa; struct obd_export *exp; struct obd_client_page *ocp; int rc; ENTRY; - CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page, - PageLaunder(page), inode); - LASSERT(PageLocked(page)); - exp = ll_i2obdexp(inode); if (exp == NULL) RETURN(-EINVAL); - oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id; - oa.o_valid = OBD_MD_FLID; - obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - ocp = ocp_alloc(page); if (IS_ERR(ocp)) GOTO(out, rc = PTR_ERR(ocp)); ocp->ocp_callback = ll_complete_writepage_24; + ocp->ocp_count = ll_ocp_write_count(inode, page); ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT; - rc = obd_brw_async_ocp(OBD_BRW_WRITE, exp, &oa, - ll_i2info(inode)->lli_smd, NULL, ocp, NULL); + obd_brw_plug(OBD_BRW_WRITE, exp, ll_i2info(inode)->lli_smd, NULL); + rc = ll_start_ocp_io(page); + if (rc == 0) { + ll_page_acct(-1, 1); + ll_start_io_from_dirty(inode, ll_complete_writepage_24); + } else { + ocp_free(page); + } + obd_brw_unplug(OBD_BRW_WRITE, exp, ll_i2info(inode)->lli_smd, NULL); out: class_export_put(exp); RETURN(rc); diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 20d3c52..c4d11fb 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -90,27 +90,33 @@ static int ll_writepage_26(struct page *page, struct writeback_control *wbc) { struct inode *inode = page->mapping->host; struct ll_inode_info *lli = ll_i2info(inode); + struct obdo oa; + struct obd_export *exp; + struct obd_client_page *ocp; + int rc; ENTRY; LASSERT(PageLocked(page)); LASSERT(!PageWriteback(page)); - LASSERT(page->private == 0); - CDEBUG(D_CACHE, "page %p [wb %d] inode %p\n", page, - PageWriteback(page), inode); + ocp = ocp_alloc(page); + if (IS_ERR(ocp)) + GOTO(out, rc = PTR_ERR(ocp)); + + ocp->ocp_callback = ll_complete_writepage_26; + ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT; /* tell the vm that we're busy with the page */ SetPageWriteback(page); unlock_page(page); - /* put it in the list that lliod will use */ - page_cache_get(page); + /* XXX clean up the ocp? */ + rc = ll_writepage_common(page); + if (rc) + RETURN(rc); + ll_local_cache_started_io(1); - lliod_give_page(inode, page, OBD_BRW_WRITE); - - if ((atomic_read(&lli->lli_in_writepages) == 0) || - ((lli->lli_pl_write.pl_num << PAGE_SHIFT) > inode->i_blksize) ) - lliod_wakeup(inode); + ll_writeback_from_dirty(inode); RETURN(0); } diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 58c3d4c..04b54b1 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -15,13 +15,6 @@ int osc_rpcd_addref(void); int osc_rpcd_decref(void); void osc_rpcd_add_req(struct ptlrpc_request *req); -int osc_read_tunable(char *page, char **start, off_t off, int count, - int *eof, void *data, atomic_t *tunable); - -int osc_write_tunable(struct file *file, const char *buffer, - unsigned long count, void *data, - atomic_t *tunable, unsigned int max); - /* osc/lproc_osc.c */ extern atomic_t osc_max_pages_per_rpc; extern atomic_t osc_max_rpcs_in_flight; -- 1.8.3.1