From 19d3759d8152c365d91b7e84d1bc6518511e9c30 Mon Sep 17 00:00:00 2001 From: zab Date: Tue, 22 Jul 2003 06:38:59 +0000 Subject: [PATCH] Work-in-progress commit of the IO path refactoring. - intitiate brw on pages by pushing down through the obd api - add some data structures for passing and collecting the pages - batch the pages into rpcs in the osc - refactor ptlrpc a little to export some ptlrpc_set_wait internals - temporarily build a - add a kernel patch that adds page->private to 2.4 - split up some page cache stuff into 2.4 and 2.6 files - make some progress in tearing out the 2.5 thread brw stuff --- .../kernel_patches/patches/add_page_private.patch | 15 + lustre/kernel_patches/pc/add_page_private.pc | 1 + lustre/llite/rw24.c | 384 +++++++++++++++++++++ lustre/llite/rw26.c | 226 ++++++++++++ 4 files changed, 626 insertions(+) create mode 100644 lustre/kernel_patches/patches/add_page_private.patch create mode 100644 lustre/kernel_patches/pc/add_page_private.pc create mode 100644 lustre/llite/rw24.c create mode 100644 lustre/llite/rw26.c diff --git a/lustre/kernel_patches/patches/add_page_private.patch b/lustre/kernel_patches/patches/add_page_private.patch new file mode 100644 index 0000000..f82fb92 --- /dev/null +++ b/lustre/kernel_patches/patches/add_page_private.patch @@ -0,0 +1,15 @@ + include/linux/mm.h | 1 + + 1 files changed, 1 insertion(+) + +--- linux-2.4.20-b_llpio-l21/include/linux/mm.h~add_page_private 2003-07-21 21:42:50.000000000 -0700 ++++ linux-2.4.20-b_llpio-l21-zab/include/linux/mm.h 2003-07-21 21:44:16.000000000 -0700 +@@ -162,6 +162,7 @@ typedef struct page { + protected by pagemap_lru_lock !! */ + struct page **pprev_hash; /* Complement to *next_hash. */ + struct buffer_head * buffers; /* Buffer maps us to a disk block. */ ++ unsigned long private; + + /* + * On machines where all RAM is mapped into kernel address space, + +_ diff --git a/lustre/kernel_patches/pc/add_page_private.pc b/lustre/kernel_patches/pc/add_page_private.pc new file mode 100644 index 0000000..476581c --- /dev/null +++ b/lustre/kernel_patches/pc/add_page_private.pc @@ -0,0 +1 @@ +include/linux/mm.h diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c new file mode 100644 index 0000000..924b8d2 --- /dev/null +++ b/lustre/llite/rw24.c @@ -0,0 +1,384 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite I/O page cache for the 2.4 kernel generation + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include "llite_internal.h" +#include + +/* + * we were asked to read a single page but we're going to try and read a batch + * of pages all at once. this vaguely simulates client-side read-ahead that + * is done via ->readpages in 2.5. + */ +static int ll_readpage_24(struct file *file, struct page *first_page) +{ + struct inode *inode = first_page->mapping->host; + struct ll_inode_info *lli = ll_i2info(inode); + struct page *page = first_page; + struct list_head *pos; + struct brw_page *pgs; + struct obdo *oa; + unsigned long end_index, extent_end = 0; + struct ptlrpc_request_set *set; + int npgs = 0, rc = 0, max_pages; + ENTRY; + + LASSERT(PageLocked(page)); + LASSERT(!PageUptodate(page)); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n", + inode->i_ino, inode->i_generation, inode, + (((obd_off)page->index) << PAGE_SHIFT)); + LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0); + + if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) { + CERROR("reading beyond EOF\n"); + memset(kmap(page), 0, PAGE_SIZE); + kunmap(page); + SetPageUptodate(page); + unlock_page(page); + RETURN(rc); + } + + /* try to read the file's preferred block size in a one-er */ + end_index = first_page->index + + (inode->i_blksize >> PAGE_CACHE_SHIFT); + if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT)) + end_index = inode->i_size >> PAGE_CACHE_SHIFT; + + max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >> + PAGE_SHIFT; + pgs = kmalloc(max_pages * sizeof(*pgs), GFP_USER); + if (pgs == NULL) + RETURN(-ENOMEM); + + /* + * find how far we're allowed to read under the extent ll_file_read + * is passing us.. + */ + spin_lock(&lli->lli_read_extent_lock); + list_for_each(pos, &lli->lli_read_extents) { + struct ll_read_extent *rextent; + rextent = list_entry(pos, struct ll_read_extent, re_lli_item); + if (rextent->re_task != current) + continue; + + if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end) + /* extent wrapping */ + extent_end = ~0; + else { + extent_end = (rextent->re_extent.end + PAGE_SIZE) + << PAGE_CACHE_SHIFT; + /* 32bit indexes, 64bit extents.. */ + if (((u64)extent_end >> PAGE_CACHE_SHIFT) < + rextent->re_extent.end) + extent_end = ~0; + } + break; + } + spin_unlock(&lli->lli_read_extent_lock); + + if (extent_end == 0) { + static unsigned long next_print; + if (time_after(jiffies, next_print)) { + next_print = jiffies + 30 * HZ; + CDEBUG(D_INODE, "mmap readpage - check locks\n"); + } + end_index = page->index + 1; + } else if (extent_end < end_index) + end_index = extent_end; + + /* to balance the find_get_page ref the other pages get that is + * decrefed on teardown.. */ + page_cache_get(page); + do { + unsigned long index ; + + pgs[npgs].pg = page; + pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT; + pgs[npgs].flag = 0; + pgs[npgs].count = PAGE_SIZE; + /* XXX Workaround for BA OSTs returning short reads at EOF. + * The linux OST will return the full page, zero-filled at the + * end, which will just overwrite the data we set here. Bug + * 593 relates to fixing this properly. + */ + if (inode->i_size < pgs[npgs].off + PAGE_SIZE) { + int count = inode->i_size - pgs[npgs].off; + void *addr = kmap(page); + pgs[npgs].count = count; + //POISON(addr, 0x7c, count); + memset(addr + count, 0, PAGE_SIZE - count); + kunmap(page); + } + + npgs++; + if (npgs == max_pages) + break; + + /* + * find pages ahead of us that we can read in. + * grab_cache_page waits on pages that are locked so + * we first try find_get_page, which doesn't. this stops + * the worst case behaviour of racing threads waiting on + * each other, but doesn't remove it entirely. + */ + for (index = page->index + 1, page = NULL; + page == NULL && index < end_index; index++) { + + /* see if the page already exists and needs updating */ + page = find_get_page(inode->i_mapping, index); + if (page) { + if (Page_Uptodate(page) || TryLockPage(page)) + goto out_release; + if (!page->mapping || Page_Uptodate(page)) + goto out_unlock; + } else { + /* ok, we have to create it.. */ + page = grab_cache_page(inode->i_mapping, index); + if (page == NULL) + continue; + if (Page_Uptodate(page)) + goto out_unlock; + } + + break; + + out_unlock: + unlock_page(page); + out_release: + page_cache_release(page); + page = NULL; + } + + } while (page); + + if ((oa = obdo_alloc()) == NULL) { + CERROR("ENOMEM allocing obdo\n"); + rc = -ENOMEM; + } else if ((set = ptlrpc_prep_set()) == NULL) { + CERROR("ENOMEM allocing request set\n"); + obdo_free(oa); + rc = -ENOMEM; + } else { + struct ll_file_data *fd = file->private_data; + + oa->o_id = lli->lli_smd->lsm_object_id; + memcpy(obdo_handle(oa), &fd->fd_ost_och.och_fh, + sizeof(fd->fd_ost_och.och_fh)); + oa->o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE; + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME); + + rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode), oa, + ll_i2info(inode)->lli_smd, npgs, pgs, + set, NULL); + if (rc == 0) + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + if (rc == 0) + obdo_refresh_inode(inode, oa, oa->o_valid); + if (rc && rc != -EIO) + CERROR("error from obd_brw_async: rc = %d\n", rc); + obdo_free(oa); + } + + while (npgs-- > 0) { + page = pgs[npgs].pg; + + if (rc == 0) + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); + } + + kfree(pgs); + RETURN(rc); +} + +void ll_complete_writepage_24(struct obd_client_page *ocp, int rc) +{ + struct page *page = ocp->ocp_page; + + LASSERT(page->private == (unsigned long)ocp); + LASSERT(PageLocked(page)); + +#if 0 + rc = ll_clear_dirty_pages(ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, + page->index, page->index); + LASSERT(rc == 0); +#endif + ll_ocp_free(page); + + unlock_page(page); +} + +static int ll_writepage_24(struct page *page) +{ + struct inode *inode = page->mapping->host; + struct obdo oa; + struct obd_export *exp; + struct obd_client_page *ocp; + int rc; + ENTRY; + + CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page, + PageLaunder(page), inode); + LASSERT(PageLocked(page)); + + exp = ll_i2obdexp(inode); + if (exp == NULL) + RETURN(-EINVAL); + + oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + ocp = ll_ocp_alloc(page); + if (IS_ERR(ocp)) + GOTO(out, rc = PTR_ERR(ocp)); + + ocp->ocp_callback = ll_complete_writepage_24; + ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT; + + rc = obd_brw_async_ocp(OBD_BRW_WRITE, exp, &oa, + ll_i2info(inode)->lli_smd, ocp, + ll_i2sbi(inode)->ll_lc.lc_set, NULL); + if (rc == 0) + rc = obd_brw_async_barrier(OBD_BRW_WRITE, exp, + ll_i2info(inode)->lli_smd, + ll_i2sbi(inode)->ll_lc.lc_set); +out: + RETURN(rc); +} + +static int ll_direct_IO_24(int rw, struct inode *inode, struct kiobuf *iobuf, + unsigned long blocknr, int blocksize) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct brw_page *pga; + struct ptlrpc_request_set *set; + struct obdo oa; + int length, i, flags, rc = 0; + loff_t offset; + ENTRY; + + if (!lsm || !lsm->lsm_object_id) + RETURN(-EBADF); + + if ((iobuf->offset & (blocksize - 1)) || + (iobuf->length & (blocksize - 1))) + RETURN(-EINVAL); + + set = ptlrpc_prep_set(); + if (set == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages); + if (!pga) { + ptlrpc_set_destroy(set); + RETURN(-ENOMEM); + } + + flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */; + offset = ((obd_off)blocknr << inode->i_blkbits); + length = iobuf->length; + + for (i = 0, length = iobuf->length; length > 0; + length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/ + pga[i].pg = iobuf->maplist[i]; + pga[i].off = offset; + /* To the end of the page, or the length, whatever is less */ + pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK), + length); + pga[i].flag = flags; + if (rw == READ) { + //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE); + //kunmap(iobuf->maplist[i]); + } + } + + oa.o_id = lsm->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + if (rw == WRITE) + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_WRITE, iobuf->length); + else + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_READ, iobuf->length); + rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga, + set, NULL); + if (rc) { + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "error from obd_brw_async: rc = %d\n", rc); + } else { + rc = ptlrpc_set_wait(set); + if (rc) + CERROR("error from callback: rc = %d\n", rc); + } + ptlrpc_set_destroy(set); + if (rc == 0) + rc = iobuf->length; + + OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages); + RETURN(rc); +} + +struct address_space_operations ll_aops = { + readpage: ll_readpage_24, + direct_IO: ll_direct_IO_24, + writepage: ll_writepage_24, + sync_page: block_sync_page, /* XXX what's this? */ + prepare_write: ll_prepare_write, + commit_write: ll_commit_write, + bmap: NULL +}; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c new file mode 100644 index 0000000..20d3c52 --- /dev/null +++ b/lustre/llite/rw26.c @@ -0,0 +1,226 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite I/O page cache routines for the 2.5/2.6 kernel generation + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include "llite_internal.h" +#include + +/* in 2.5 we hope that significant read traffic will come through + * readpages and will be nicely batched by read-ahead, this is just + * to pick up the rest. */ +static int ll_readpage_26(struct file *file, struct page *page) +{ + ENTRY; + + CDEBUG(D_CACHE, "page %p ind %lu inode %p\n", page, page->index, + page->mapping->host); + + LASSERT(PageLocked(page)); + LASSERT(!PageUptodate(page)); + LASSERT(page->private == 0); + + /* put it in the list that lliod will use */ + page_cache_get(page); + lliod_give_page(page->mapping->host, page, OBD_BRW_READ); + lliod_wakeup(page->mapping->host); + + RETURN(0); +} + +void ll_end_writeback_26(struct inode *inode, struct page *page) +{ + int rc; + ENTRY; + LASSERT(PageWriteback(page)); + rc = ll_clear_dirty_pages(ll_i2obdconn(inode), + ll_i2info(inode)->lli_smd, + page->index, page->index); + LASSERT(rc == 0); + end_page_writeback(page); + EXIT; +} + +static int ll_writepage_26(struct page *page, struct writeback_control *wbc) +{ + struct inode *inode = page->mapping->host; + struct ll_inode_info *lli = ll_i2info(inode); + ENTRY; + + LASSERT(PageLocked(page)); + LASSERT(!PageWriteback(page)); + LASSERT(page->private == 0); + + CDEBUG(D_CACHE, "page %p [wb %d] inode %p\n", page, + PageWriteback(page), inode); + + /* tell the vm that we're busy with the page */ + SetPageWriteback(page); + unlock_page(page); + + /* put it in the list that lliod will use */ + page_cache_get(page); + + lliod_give_page(inode, page, OBD_BRW_WRITE); + + if ((atomic_read(&lli->lli_in_writepages) == 0) || + ((lli->lli_pl_write.pl_num << PAGE_SHIFT) > inode->i_blksize) ) + lliod_wakeup(inode); + + RETURN(0); +} + +static int ll_writepages(struct address_space *mapping, + struct writeback_control *wbc) +{ + struct ll_inode_info *lli = ll_i2info(mapping->host); + int rc; + ENTRY; + + atomic_inc(&lli->lli_in_writepages); + + rc = mpage_writepages(mapping, wbc, NULL); + + if (atomic_dec_and_test(&lli->lli_in_writepages)) + lliod_wakeup(mapping->host); + + RETURN(rc); +} + +#if 0 /* XXX need to complete this */ +static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, + unsigned long blocknr, int blocksize) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct brw_page *pga; + struct ptlrpc_request_set *set; + struct obdo oa; + int length, i, flags, rc = 0; + loff_t offset; + ENTRY; + + if (!lsm || !lsm->lsm_object_id) + RETURN(-EBADF); + + if ((iobuf->offset & (blocksize - 1)) || + (iobuf->length & (blocksize - 1))) + RETURN(-EINVAL); + + set = ptlrpc_prep_set(); + if (set == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages); + if (!pga) { + ptlrpc_set_destroy(set); + RETURN(-ENOMEM); + } + + flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */; + offset = ((obd_off)blocknr << inode->i_blkbits); + length = iobuf->length; + + for (i = 0, length = iobuf->length; length > 0; + length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/ + pga[i].pg = iobuf->maplist[i]; + pga[i].off = offset; + /* To the end of the page, or the length, whatever is less */ + pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK), + length); + pga[i].flag = flags; + if (rw == READ) { + //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE); + //kunmap(iobuf->maplist[i]); + } + } + + oa.o_id = lsm->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + if (rw == WRITE) + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_WRITE, iobuf->length); + else + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_READ, iobuf->length); + rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga, + set, NULL); + if (rc) { + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "error from obd_brw_async: rc = %d\n", rc); + } else { + rc = ptlrpc_set_wait(set); + if (rc) + CERROR("error from callback: rc = %d\n", rc); + } + ptlrpc_set_destroy(set); + if (rc == 0) + rc = iobuf->length; + + OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages); + RETURN(rc); +} +#endif + +struct address_space_operations ll_aops = { + readpage: ll_readpage_26, +#if 0 + direct_IO: ll_direct_IO_26, +#endif + writepage: ll_writepage_26, + writepages: ll_writepages, + set_page_dirty: __set_page_dirty_nobuffers, + sync_page: block_sync_page, + prepare_write: ll_prepare_write, + commit_write: ll_commit_write, + bmap: NULL +}; -- 1.8.3.1