#include <asm/system.h>
#include <asm/uaccess.h>
+
#include <linux/fs.h>
#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
#include <linux/buffer_head.h>
list_del(&page->list);
list_add(&page->list, &mapping->clean_pages);
+ /* XXX doesn't inode_lock protect i_state ? */
inode = mapping->host;
if (list_empty(&mapping->dirty_pages)) {
CDEBUG(D_INODE, "inode clean\n");
EXIT;
}
-inline void set_page_clean(struct page *page)
+void set_page_clean(struct page *page)
{
if (PageDirty(page)) {
ClearPageDirty(page);
}
/* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
+static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags)
{
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
pg.count = PAGE_SIZE;
CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
- cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
- pg.off, pg.off);
+ cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
+ pg.off, pg.off);
if (pg.count == 0) {
CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
LPU64"\n",
page->mapping->host->i_size, page->index, pg.off);
}
- pg.flag = create ? OBD_BRW_CREATE : 0;
+ pg.flag = flags;
set->brw_callback = ll_brw_sync_wait;
rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL);
if (rc)
CERROR("error from callback: rc = %d\n", rc);
}
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
RETURN(rc);
}
-/* returns the page unlocked, but with a reference */
-static int ll_readpage(struct file *file, struct page *page)
+/*
+ * we were asked to read a single page but we're going to try and read a batch
+ * of pages all at once. this vaguely simulates 2.5's readpages.
+ */
+static int ll_readpage(struct file *file, struct page *first_page)
{
- struct inode *inode = page->mapping->host;
- obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
- int rc = 0;
+ struct inode *inode = first_page->mapping->host;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct page *page = first_page;
+ struct list_head *pos;
+ struct brw_page *pgs;
+ struct obd_brw_set *set;
+ unsigned long end_index, extent_end = 0;
+ int npgs = 0, rc = 0;
ENTRY;
- if (!PageLocked(page))
- LBUG();
+ LASSERT(PageLocked(page));
+ LASSERT(!PageUptodate(page));
+ CDEBUG(D_VFSTRACE, "VFS Op\n");
- if (inode->i_size <= offset) {
+ if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
CERROR("reading beyond EOF\n");
memset(kmap(page), 0, PAGE_SIZE);
kunmap(page);
- GOTO(readpage_out, rc);
+ SetPageUptodate(page);
+ unlock_page(page);
+ RETURN(rc);
}
- /* XXX Workaround for BA OSTs returning short reads at EOF. The linux
- * OST will return the full page, zero-filled at the end, which
- * will just overwrite the data we set here.
- * Bug 593 relates to fixing this properly.
+ pgs = kmalloc(PTL_MD_MAX_IOV * sizeof(*pgs), GFP_USER);
+ if ( pgs == NULL )
+ RETURN(-ENOMEM);
+ set = obd_brw_set_new();
+ if ( set == NULL )
+ GOTO(out_pgs, rc = -ENOMEM);
+
+ /* arbitrarily try to read-ahead 8 times what we can pass on
+ * the wire at once, clamped to file size */
+ end_index = first_page->index +
+ 8 * ((PTL_MD_MAX_IOV * PAGE_SIZE)>>PAGE_CACHE_SHIFT);
+ if ( end_index > inode->i_size >> PAGE_CACHE_SHIFT )
+ end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+
+ /*
+ * find how far we're allowed to read under the extent ll_file_read
+ * is passing us..
*/
- if (inode->i_size < offset + PAGE_SIZE) {
- int count = inode->i_size - offset;
- void *addr = kmap(page);
- //POISON(addr, 0x7c, count);
- memset(addr + count, 0, PAGE_SIZE - count);
- kunmap(page);
+ spin_lock(&lli->lli_read_extent_lock);
+ list_for_each(pos, &lli->lli_read_extents) {
+ struct ll_read_extent *rextent;
+ rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
+ if ( rextent->re_task != current )
+ continue;
+
+ if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
+ /* extent wrapping */
+ extent_end = ~0;
+ else {
+ extent_end = ( rextent->re_extent.end + PAGE_SIZE )
+ << PAGE_CACHE_SHIFT;
+ /* 32bit indexes, 64bit extents.. */
+ if ( ((u64)extent_end >> PAGE_CACHE_SHIFT ) <
+ rextent->re_extent.end )
+ extent_end = ~0;
+ }
+ break;
}
+ spin_unlock(&lli->lli_read_extent_lock);
+
+ if ( extent_end == 0 ) {
+ CERROR("readpage outside ll_file_read, no lock held?\n");
+ end_index = page->index + 1;
+ } else if ( extent_end < end_index )
+ end_index = extent_end;
+
+ /* to balance the find_get_page ref the other pages get that is
+ * decrefed on teardown.. */
+ page_cache_get(page);
+ do {
+ unsigned long index ;
+
+ pgs[npgs].pg = page;
+ pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
+ pgs[npgs].flag = 0;
+ pgs[npgs].count = PAGE_SIZE;
+ /* XXX Workaround for BA OSTs returning short reads at EOF.
+ * The linux OST will return the full page, zero-filled at the
+ * end, which will just overwrite the data we set here. Bug
+ * 593 relates to fixing this properly.
+ */
+ if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
+ int count = inode->i_size - pgs[npgs].off;
+ void *addr = kmap(page);
+ pgs[npgs].count = count;
+ //POISON(addr, 0x7c, count);
+ memset(addr + count, 0, PAGE_SIZE - count);
+ kunmap(page);
+ }
+
+ npgs++;
+ if ( npgs == PTL_MD_MAX_IOV )
+ break;
+
+ /*
+ * find pages ahead of us that we can read in.
+ * grab_cache_page waits on pages that are locked so
+ * we first try find_get_page, which doesn't. this stops
+ * the worst case behaviour of racing threads waiting on
+ * each other, but doesn't remove it entirely.
+ */
+ for ( index = page->index + 1, page = NULL ;
+ page == NULL && index < end_index ; index++ ) {
+
+ /* see if the page already exists and needs updating */
+ page = find_get_page(inode->i_mapping, index);
+ if ( page ) {
+ if ( Page_Uptodate(page) || TryLockPage(page) )
+ goto out_release;
+ if ( !page->mapping || Page_Uptodate(page))
+ goto out_unlock;
+ } else {
+ /* ok, we have to create it.. */
+ page = grab_cache_page(inode->i_mapping, index);
+ if ( page == NULL )
+ continue;
+ if ( Page_Uptodate(page) )
+ goto out_unlock;
+ }
+
+ break;
+
+ out_unlock:
+ unlock_page(page);
+ out_release:
+ page_cache_release(page);
+ page = NULL;
+ }
- if (PageUptodate(page)) {
- CERROR("Explain this please?\n");
- GOTO(readpage_out, rc);
+ } while (page);
+
+ set->brw_callback = ll_brw_sync_wait;
+ rc = obd_brw(OBD_BRW_READ, ll_i2obdconn(inode),
+ ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
+ if (rc) {
+ CERROR("error from obd_brw: rc = %d\n", rc);
+ } else {
+ rc = ll_brw_sync_wait(set, CB_PHASE_START);
+ if (rc)
+ CERROR("error from callback: rc = %d\n", rc);
}
+ obd_brw_set_decref(set);
- CDEBUG(D_VFSTRACE, "VFS Op\n");
- rc = ll_brw(OBD_BRW_READ, inode, page, 0);
- EXIT;
+ while ( --npgs > -1 ) {
+ page = pgs[npgs].pg;
- readpage_out:
- if (!rc)
- SetPageUptodate(page);
- unlock_page(page);
- return 0;
+ if ( rc == 0 )
+ SetPageUptodate(page);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+out_pgs:
+ kfree(pgs);
+ RETURN(rc);
} /* ll_readpage */
void ll_truncate(struct inode *inode)
struct obdo oa = {0};
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
struct lustre_handle lockh = { 0, 0 };
+ struct ldlm_extent extent = {inode->i_size, OBD_OBJECT_EOF};
int err;
ENTRY;
if (!lsm) {
/* object not yet allocated */
inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+ EXIT;
return;
}
CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
oa.o_id, inode->i_size);
- err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
- if (err) {
- CERROR("ll_size_lock failed: %d\n", err);
+ /* i_size has already been set to the new size */
+ err = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
+ &extent, &lockh);
+ if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+ EXIT;
return;
}
else
obdo_to_inode(inode, &oa, oa.o_valid);
- err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
+ err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
if (err)
- CERROR("ll_size_unlock failed: %d\n", err);
+ CERROR("ll_extent_unlock failed: %d\n", err);
EXIT;
return;
struct inode *inode = page->mapping->host;
obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
int rc = 0;
- char *addr;
ENTRY;
- addr = kmap(page);
- LASSERT(PageLocked(page));
+ ll_check_dirty(inode->i_sb);
+
+ if (!PageLocked(page))
+ LBUG();
if (PageUptodate(page))
RETURN(0);
RETURN(0);
CDEBUG(D_VFSTRACE, "VFS Op\n");
- /* If are writing to a new page, no need to read old data. If we
- * haven't already gotten the file size in ll_file_write() since
- * we got our extent lock, we need to verify it here before we
- * overwrite some other node's write (bug 445).
- */
+ /* If are writing to a new page, no need to read old data.
+ * the extent locking and getattr procedures in ll_file_write have
+ * guaranteed that i_size is stable enough for our zeroing needs */
if (inode->i_size <= offset) {
- if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) {
- struct ll_file_data *fd = file->private_data;
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-
- rc = ll_file_size(inode, lsm, fd->fd_ostdata);
- if (rc)
- GOTO(prepare_done, rc);
- }
- if (inode->i_size <= offset) {
- memset(addr, 0, PAGE_SIZE);
- GOTO(prepare_done, rc=0);
- }
+ memset(kmap(page), 0, PAGE_SIZE);
+ kunmap(page);
+ GOTO(prepare_done, rc = 0);
}
rc = ll_brw(OBD_BRW_READ, inode, page, 0);
EXIT;
prepare_done:
- if (!rc)
+ if (rc == 0)
SetPageUptodate(page);
- else
- kunmap (page);
return rc;
}
-/* Write a page from kupdated or kswapd.
+/*
+ * background file writeback. This is called regularly from kupdated to write
+ * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
+ * super blocks or inodes are synced..
*
- * We unlock the page even in the face of an error, otherwise dirty
- * pages could OOM the system if they cannot be written. Also, there
- * is nobody to return an error code to from here - the application
- * may not even be running anymore.
+ * obd_brw errors down in _batch_writepage are ignored, so pages are always
+ * unlocked. Also, there is nobody to return an error code to from here - the
+ * application may not even be running anymore.
*
- * Returns the page unlocked, but with a reference.
+ * this should be async so that things like kswapd can have a chance to
+ * free some more pages that our allocating writeback may need, but it isn't
+ * yet.
*/
-static int ll_writepage(struct page *page) {
+static int ll_writepage(struct page *page)
+{
struct inode *inode = page->mapping->host;
- int err;
ENTRY;
- LASSERT(PageLocked(page));
-
- /* XXX need to make sure we have LDLM lock on this page */
+ CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
+ PageLaunder(page), inode);
CDEBUG(D_VFSTRACE, "VFS Op\n");
- err = ll_brw(OBD_BRW_WRITE, inode, page, 1);
- if (err)
- CERROR("ll_brw failure %d\n", err);
- else
- set_page_clean(page);
+ LASSERT(PageLocked(page));
- unlock_page(page);
- RETURN(err);
+ /* XXX should obd_brw errors trickle up? */
+ ll_batch_writepage(inode, page);
+ RETURN(0);
}
-
-/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
- * too */
+/*
+ * we really don't want to start writeback here, we want to give callers some
+ * time to further dirty the pages before we write them out.
+ */
static int ll_commit_write(struct file *file, struct page *page,
unsigned from, unsigned to)
{
struct inode *inode = page->mapping->host;
- struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *md = lli->lli_smd;
- struct brw_page pg;
- struct obd_brw_set *set;
- int rc, create = 1;
loff_t size;
ENTRY;
- pg.pg = page;
- pg.count = to;
- /* XXX make the starting offset "from" */
- pg.off = (((obd_off)page->index) << PAGE_SHIFT);
- pg.flag = create ? OBD_BRW_CREATE : 0;
-
- set = obd_brw_set_new();
- if (set == NULL)
- RETURN(-ENOMEM);
-
- SetPageUptodate(page);
-
- if (!PageLocked(page))
- LBUG();
+ LASSERT(inode == file->f_dentry->d_inode);
+ LASSERT(PageLocked(page));
CDEBUG(D_VFSTRACE, "VFS Op\n");
- CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
- pg.off, pg.count);
+ CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
+ inode, page, from, to, page->index);
- set->brw_callback = ll_brw_sync_wait;
- rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL);
- if (rc)
- CERROR("error from obd_brw: rc = %d\n", rc);
- else {
- rc = ll_brw_sync_wait(set, CB_PHASE_START);
- if (rc)
- CERROR("error from callback: rc = %d\n", rc);
- }
- obd_brw_set_free(set);
- kunmap(page);
+ /* to match full page case in prepare_write */
+ SetPageUptodate(page);
+ /* mark the page dirty, put it on mapping->dirty,
+ * mark the inode PAGES_DIRTY, put it on sb->dirty */
+ set_page_dirty(page);
- size = pg.off + pg.count;
- /* do NOT truncate when writing in the middle of a file */
+ /* this is matched by a hack in obdo_to_inode at the moment */
+ size = (((obd_off)page->index) << PAGE_SHIFT) + to;
if (size > inode->i_size)
inode->i_size = size;
- RETURN(rc);
+ RETURN(0);
} /* ll_commit_write */
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
if (!lsm || !lsm->lsm_object_id)
RETURN(-ENOMEM);
+ if ((iobuf->offset & (blocksize - 1)) ||
+ (iobuf->length & (blocksize - 1)))
+ RETURN(-EINVAL);
+
+#if 0
/* XXX Keep here until we find ia64 problem, it crashes otherwise */
if (blocksize != PAGE_SIZE) {
CERROR("direct_IO blocksize != PAGE_SIZE\n");
RETURN(-EINVAL);
}
+#endif
set = obd_brw_set_new();
if (set == NULL)
OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
if (!pga) {
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
RETURN(-ENOMEM);
}
- CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, "
- "array_len %u, offset %u, length %u\n",
- blocksize, blocknr, iobuf, iobuf->nr_pages,
- iobuf->array_len, iobuf->offset, iobuf->length);
-
flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
- offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */;
+ offset = (blocknr << inode->i_blkbits);
length = iobuf->length;
for (i = 0, length = iobuf->length; length > 0;
pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
length);
pga[i].flag = flags;
- CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n",
- i, pga[i].pg, pga[i].off, pga[i].count);
if (rw == READ) {
//POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
//kunmap(iobuf->maplist[i]);
if (rc)
CERROR("error from callback: rc = %d\n", rc);
}
- obd_brw_set_free(set);
+ obd_brw_set_decref(set);
if (rc == 0)
rc = iobuf->length;
}
#endif
-int ll_flush_inode_pages(struct inode * inode)
-{
- obd_count bufs_per_obdo = 0;
- obd_size *count = NULL;
- obd_off *offset = NULL;
- obd_flag *flags = NULL;
- int err = 0;
-
- ENTRY;
-
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
- spin_lock(&pagecache_lock);
-
- spin_unlock(&pagecache_lock);
-#endif
-
-
- OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
- OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
- OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
- if (!count || !offset || !flags)
- GOTO(out, err=-ENOMEM);
-
-#if 0
- for (i = 0 ; i < bufs_per_obdo ; i++) {
- count[i] = PAGE_SIZE;
- offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
- flags[i] = OBD_BRW_CREATE;
- }
-
- err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
- ll_i2info(inode)->lli_smd, bufs_per_obdo,
- iobuf->maplist, count, offset, flags, NULL, NULL);
- if (err == 0)
- err = bufs_per_obdo * 4096;
-#endif
- out:
- OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
- OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
- OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
- RETURN(err);
-}
-
//#endif
-
struct address_space_operations ll_aops = {
readpage: ll_readpage,
#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))