#include <linux/lustre_fsfilt.h>
#include "filter_internal.h"
-#warning "implement writeback mode -bzzz"
-
/* 512byte block min */
#define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
struct dio_request {
- atomic_t numreqs; /* number of reqs being processed */
- struct bio *bio_current;/* bio currently being constructed */
- struct bio *bio_list; /* list of completed bios */
+ atomic_t dr_numreqs; /* number of reqs being processed */
+ struct bio *dr_bios; /* list of completed bios */
wait_queue_head_t dr_wait;
- int dr_num_pages;
- int dr_rw;
- int dr_error;
- int dr_created[MAX_BLOCKS_PER_PAGE];
- unsigned long dr_blocks[MAX_BLOCKS_PER_PAGE];
- spinlock_t dr_lock;
-
+ int dr_max_pages;
+ int dr_npages;
+ int dr_error;
+ struct page **dr_pages;
+ unsigned long *dr_blocks;
+ spinlock_t dr_lock;
};
static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
struct dio_request *dreq = bio->bi_private;
unsigned long flags;
+ if (bio->bi_size) {
+ CWARN("gets called against non-complete bio 0x%p: %d/%d/%d\n",
+ bio, bio->bi_size, done, error);
+ return 1;
+ }
+
+ if (dreq == NULL) {
+ CERROR("***** bio->bi_private is NULL! This should never "
+ "happen. Normally, I would crash here, but instead I "
+ "will dump the bio contents to the console. Please "
+ "report this to CFS, along with any interesting messages "
+ "leading up to this point (like SCSI errors, perhaps). "
+ "Because bi_private is NULL, I can't wake up the thread "
+ "that initiated this I/O -- so you will probably have to "
+ "reboot this node.");
+ CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
+ "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
+ "bi_private: %p\n", bio->bi_next, bio->bi_flags,
+ bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
+ bio->bi_end_io, atomic_read(&bio->bi_cnt),
+ bio->bi_private);
+ return 0;
+ }
+
spin_lock_irqsave(&dreq->dr_lock, flags);
- bio->bi_private = dreq->bio_list;
- dreq->bio_list = bio;
- spin_unlock_irqrestore(&dreq->dr_lock, flags);
- if (atomic_dec_and_test(&dreq->numreqs))
- wake_up(&dreq->dr_wait);
+ bio->bi_private = dreq->dr_bios;
+ dreq->dr_bios = bio;
if (dreq->dr_error == 0)
dreq->dr_error = error;
+ spin_unlock_irqrestore(&dreq->dr_lock, flags);
+
+ if (atomic_dec_and_test(&dreq->dr_numreqs))
+ wake_up(&dreq->dr_wait);
+
return 0;
}
size = bio->bi_size >> 9;
return bio->bi_sector + size == sector ? 1 : 0;
}
+
+
int filter_alloc_iobuf(int rw, int num_pages, void **ret)
{
struct dio_request *dreq;
OBD_ALLOC(dreq, sizeof(*dreq));
if (dreq == NULL)
- RETURN(-ENOMEM);
-
- dreq->bio_list = NULL;
+ goto failed_0;
+
+ OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
+ if (dreq->dr_pages == NULL)
+ goto failed_1;
+
+ OBD_ALLOC(dreq->dr_blocks,
+ MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
+ if (dreq->dr_blocks == NULL)
+ goto failed_2;
+
+ dreq->dr_bios = NULL;
init_waitqueue_head(&dreq->dr_wait);
- atomic_set(&dreq->numreqs, 0);
+ atomic_set(&dreq->dr_numreqs, 0);
spin_lock_init(&dreq->dr_lock);
- dreq->dr_num_pages = num_pages;
- dreq->dr_rw = rw;
+ dreq->dr_max_pages = num_pages;
+ dreq->dr_npages = 0;
*ret = dreq;
RETURN(0);
+
+ failed_2:
+ OBD_FREE(dreq->dr_pages,
+ num_pages * sizeof(*dreq->dr_pages));
+ failed_1:
+ OBD_FREE(dreq, sizeof(*dreq));
+ failed_0:
+ RETURN(-ENOMEM);
}
void filter_free_iobuf(void *iobuf)
{
struct dio_request *dreq = iobuf;
+ int num_pages = dreq->dr_max_pages;
/* free all bios */
- while (dreq->bio_list) {
- struct bio *bio = dreq->bio_list;
- dreq->bio_list = bio->bi_private;
+ while (dreq->dr_bios) {
+ struct bio *bio = dreq->dr_bios;
+ dreq->dr_bios = bio->bi_private;
bio_put(bio);
}
+ OBD_FREE(dreq->dr_blocks,
+ MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
+ OBD_FREE(dreq->dr_pages,
+ num_pages * sizeof(*dreq->dr_pages));
OBD_FREE(dreq, sizeof(*dreq));
}
struct inode *inode, struct page *page)
{
struct dio_request *dreq = iobuf;
- int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
- unsigned int len = inode->i_sb->s_blocksize, offs;
- struct bio *bio = dreq->bio_current;
- sector_t sector;
- int k, rc;
+
+ LASSERT (dreq->dr_npages < dreq->dr_max_pages);
+ dreq->dr_pages[dreq->dr_npages++] = page;
+
+ return 0;
+}
+
+int filter_do_bio(struct obd_device *obd, struct inode *inode,
+ struct dio_request *dreq, int rw)
+{
+ int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+ struct page **pages = dreq->dr_pages;
+ int npages = dreq->dr_npages;
+ unsigned long *blocks = dreq->dr_blocks;
+ int total_blocks = npages * blocks_per_page;
+ int sector_bits = inode->i_sb->s_blocksize_bits - 9;
+ unsigned int blocksize = inode->i_sb->s_blocksize;
+ struct bio *bio = NULL;
+ struct page *page;
+ unsigned int page_offset;
+ sector_t sector;
+ int nblocks;
+ int block_idx;
+ int page_idx;
+ int i;
+ int rc = 0;
ENTRY;
- /* get block number for next page */
- rc = fsfilt_map_inode_pages(obd, inode, &page, 1, dreq->dr_blocks,
- dreq->dr_created,
- dreq->dr_rw == OBD_BRW_WRITE, NULL);
- if (rc)
- RETURN(rc);
+ LASSERT(dreq->dr_npages == npages);
+ LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
- for (k = 0, offs = 0; k < blocks_per_page; k++, offs += len) {
- if (dreq->dr_created[k] == -1) {
- memset(kmap(page) + offs, 0, len);
- kunmap(page);
- continue;
- }
+ for (page_idx = 0, block_idx = 0;
+ page_idx < npages;
+ page_idx++, block_idx += blocks_per_page) {
+
+ page = pages[page_idx];
+ LASSERT (block_idx + blocks_per_page <= total_blocks);
+
+ for (i = 0, page_offset = 0;
+ i < blocks_per_page;
+ i += nblocks, page_offset += blocksize * nblocks) {
+
+ nblocks = 1;
+
+ if (blocks[block_idx + i] == 0) { /* hole */
+ LASSERT(rw == OBD_BRW_READ);
+ memset(kmap(page) + page_offset, 0, blocksize);
+ kunmap(page);
+ continue;
+ }
- sector = dreq->dr_blocks[k] <<(inode->i_sb->s_blocksize_bits-9);
-
- if (!bio || !can_be_merged(bio, sector) ||
- !bio_add_page(bio, page, len, offs)) {
- if (bio) {
- atomic_inc(&dreq->numreqs);
- /* FIXME
- filter_tally_write(&obd->u.filter,dreq->maplist,
- dreq->nr_pages,dreq->blocks,
- blocks_per_page);
- */
- fsfilt_send_bio(dreq->dr_rw, obd, inode, bio);
- dreq->bio_current = bio = NULL;
+ sector = blocks[block_idx + i] << sector_bits;
+
+ /* Additional contiguous file blocks? */
+ while (i + nblocks < blocks_per_page &&
+ (sector + nblocks*(blocksize>>9)) ==
+ (blocks[block_idx + i + nblocks] << sector_bits))
+ nblocks++;
+
+ if (bio != NULL &&
+ can_be_merged(bio, sector) &&
+ bio_add_page(bio, page,
+ blocksize * nblocks, page_offset) != 0)
+ continue; /* added this frag OK */
+
+ if (bio != NULL) {
+ request_queue_t *q = bdev_get_queue(bio->bi_bdev);
+
+ /* Dang! I have to fragment this I/O */
+ CDEBUG(D_INODE|D_ERROR, "bio++ sz %d vcnt %d(%d) "
+ "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
+ bio->bi_size,
+ bio->bi_vcnt, bio->bi_max_vecs,
+ bio->bi_size >> 9, q->max_sectors,
+ bio_phys_segments(q, bio),
+ q->max_phys_segments,
+ bio_hw_segments(q, bio),
+ q->max_hw_segments);
+
+ atomic_inc(&dreq->dr_numreqs);
+ rc = fsfilt_send_bio(rw, obd, inode, bio);
+ if (rc < 0) {
+ CERROR("Can't send bio: %d\n", rc);
+ /* OK do dec; we do the waiting */
+ atomic_dec(&dreq->dr_numreqs);
+ goto out;
+ }
+ rc = 0;
+
+ bio = NULL;
}
+
/* allocate new bio */
- dreq->bio_current = bio =
- bio_alloc(GFP_NOIO, dreq->dr_num_pages *
- blocks_per_page);
+ bio = bio_alloc(GFP_NOIO,
+ (npages - page_idx) * blocks_per_page);
+ if (bio == NULL) {
+ CERROR ("Can't allocate bio\n");
+ rc = -ENOMEM;
+ goto out;
+ }
+
bio->bi_bdev = inode->i_sb->s_bdev;
bio->bi_sector = sector;
bio->bi_end_io = dio_complete_routine;
bio->bi_private = dreq;
- if (!bio_add_page(bio, page, len, offs))
- LBUG();
+ rc = bio_add_page(bio, page,
+ blocksize * nblocks, page_offset);
+ LASSERT (rc != 0);
}
}
- dreq->dr_num_pages--;
- RETURN(0);
-}
+ if (bio != NULL) {
+ atomic_inc(&dreq->dr_numreqs);
+ rc = fsfilt_send_bio(rw, obd, inode, bio);
+ if (rc >= 0) {
+ rc = 0;
+ } else {
+ CERROR("Can't send bio: %d\n", rc);
+ /* OK do dec; we do the waiting */
+ atomic_dec(&dreq->dr_numreqs);
+ }
+ }
+
+ out:
+ wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
-static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+ if (rc == 0)
+ rc = dreq->dr_error;
+ RETURN(rc);
+}
+
+/* These are our hacks to keep our directio/bh IO coherent with ext3's
+ * page cache use. Most notably ext3 reads file data into the page
+ * cache when it is zeroing the tail of partial-block truncates and
+ * leaves it there, sometimes generating io from it at later truncates.
+ * This removes the partial page and its buffers from the page cache,
+ * so it should only ever cause a wait in rare cases, as otherwise we
+ * always do full-page IO to the OST.
+ *
+ * The call to truncate_complete_page() will call journal_invalidatepage()
+ * to free the buffers and drop the page from cache. The buffers should
+ * not be dirty, because we already called fdatasync/fdatawait on them.
+ */
+static int filter_clear_page_cache(struct inode *inode,
+ struct dio_request *iobuf)
{
-#if 0
struct page *page;
- int i;
-
- for (i = 0; i < iobuf->nr_pages ; i++) {
+ int i, rc, rc2;
+
+ /* This is nearly generic_osync_inode, without the waiting on the inode
+ rc = generic_osync_inode(inode, inode->i_mapping,
+ OSYNC_DATA|OSYNC_METADATA);
+ */
+ rc = filemap_fdatawrite(inode->i_mapping);
+ rc2 = sync_mapping_buffers(inode->i_mapping);
+ if (rc == 0)
+ rc = rc2;
+ rc2 = filemap_fdatawait(inode->i_mapping);
+ if (rc == 0)
+ rc = rc2;
+ if (rc != 0)
+ RETURN(rc);
+
+ /* be careful to call this after fsync_inode_data_buffers has waited
+ * for IO to complete before we evict it from the cache */
+ for (i = 0; i < iobuf->dr_npages; i++) {
page = find_lock_page(inode->i_mapping,
- iobuf->maplist[i]->index);
+ iobuf->dr_pages[i]->index);
if (page == NULL)
- continue;
+ continue;
if (page->mapping != NULL) {
- block_invalidatepage(page, 0);
- truncate_complete_page(page);
+ wait_on_page_writeback(page);
+ ll_truncate_complete_page(page);
}
+
unlock_page(page);
page_cache_release(page);
}
-#endif
+ return 0;
}
-
/* Must be called with i_sem taken for writes; this will drop it */
int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
struct obd_export *exp, struct iattr *attr,
struct obd_trans_info *oti, void **wait_handle)
{
- struct dio_request *dreq = iobuf;
+ struct obd_device *obd = exp->exp_obd;
struct inode *inode = dchild->d_inode;
- int rc;
+ struct dio_request *dreq = iobuf;
+ struct semaphore *sem = NULL;
+ int rc, rc2, create = 0;
ENTRY;
LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+ LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
+ dreq->dr_npages, dreq->dr_max_pages);
+
+ if (dreq->dr_npages == 0)
+ RETURN(0);
+
+ if (dreq->dr_npages > OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
+ RETURN(-EINVAL);
+
+ if (rw == OBD_BRW_WRITE) {
+ create = 1;
+ //sem = &obd->u.filter.fo_alloc_lock;
+ }
- /* This is nearly osync_inode, without the waiting
- rc = generic_osync_inode(inode, inode->i_mapping,
- OSYNC_DATA|OSYNC_METADATA); */
- rc = filemap_fdatawrite(inode->i_mapping);
- if (rc == 0)
- rc = sync_mapping_buffers(inode->i_mapping);
- if (rc == 0)
- rc = filemap_fdatawait(inode->i_mapping);
- if (rc < 0)
- GOTO(cleanup, rc);
-
- if (rw == OBD_BRW_WRITE)
+ rc = fsfilt_map_inode_pages(obd, inode,
+ dreq->dr_pages, dreq->dr_npages,
+ dreq->dr_blocks,
+ obdfilter_created_scratchpad,
+ create, sem);
+
+ if (rw == OBD_BRW_WRITE) {
+ if (rc == 0) {
+ int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+ filter_tally_write(&obd->u.filter, dreq->dr_pages,
+ dreq->dr_npages, dreq->dr_blocks,
+ blocks_per_page);
+ if (attr->ia_size > inode->i_size)
+ attr->ia_valid |= ATTR_SIZE;
+ rc = fsfilt_setattr(obd, dchild,
+ oti->oti_handle, attr, 0);
+ }
+
up(&inode->i_sem);
- /* be careful to call this after fsync_inode_data_buffers has waited
- * for IO to complete before we evict it from the cache */
- filter_clear_page_cache(inode, iobuf);
+ rc2 = filter_finish_transno(exp, oti, 0);
+ if (rc2 != 0)
+ CERROR("can't close transaction: %d\n", rc);
+ rc = (rc == 0) ? rc2 : rc;
- if (dreq->bio_current != NULL) {
- atomic_inc(&dreq->numreqs);
- fsfilt_send_bio(rw, exp->exp_obd, inode, dreq->bio_current);
- dreq->bio_current = NULL;
- }
+ rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
+ rc = (rc == 0) ? rc2 : rc;
- /* time to wait for I/O completion */
- wait_event(dreq->dr_wait, atomic_read(&dreq->numreqs) == 0);
-
- rc = dreq->dr_error;
- if (rw == OBD_BRW_WRITE && rc == 0) {
- /* FIXME:
- filter_tally_write(&obd->u.filter, dreq->maplist,
- dreq->nr_pages, dreq->blocks,
- blocks_per_page);
- */
-
- if (attr->ia_size > inode->i_size) {
- CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
- attr->ia_size);
-
- attr->ia_valid |= ATTR_SIZE;
- down(&inode->i_sem);
- fsfilt_setattr(exp->exp_obd, dchild, oti->oti_handle,
- attr, 0);
- up(&inode->i_sem);
- }
+ if (rc != 0)
+ RETURN(rc);
}
-cleanup:
- RETURN(rc);
-}
-
+ rc = filter_clear_page_cache(inode, dreq);
+ if (rc != 0)
+ RETURN(rc);
+ RETURN(filter_do_bio(obd, inode, dreq, rw));
+}
/* See if there are unallocated parts in given file region */
static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
unsigned long now = jiffies;
int i, err, cleanup_phase = 0;
struct obd_device *obd = exp->exp_obd;
-
+ void *wait_handle = NULL;
+ int total_size = 0;
+ loff_t old_size;
ENTRY;
LASSERT(oti != NULL);
if (rc != 0)
GOTO(cleanup, rc);
-
- inode = res->dentry->d_inode;
-
+
rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
if (rc)
GOTO(cleanup, rc);
-
cleanup_phase = 1;
+
fso.fso_dentry = res->dentry;
fso.fso_bufcnt = obj->ioo_bufcnt;
+ inode = res->dentry->d_inode;
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- cleanup_phase = 2;
-
- generic_osync_inode(inode, inode->i_mapping, OSYNC_DATA|OSYNC_METADATA);
-
- oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
- oti);
- if (IS_ERR(oti->oti_handle)) {
- rc = PTR_ERR(oti->oti_handle);
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "error starting transaction: rc = %d\n", rc);
- oti->oti_handle = NULL;
- GOTO(cleanup, rc);
- }
-
- /* have to call fsfilt_commit() from this point on */
-
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
-
- down(&inode->i_sem);
for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
loff_t this_size;
filter_range_is_mapped(inode, lnb->offset, lnb->len))
lnb->rc = 0;
- if (lnb->rc) /* ENOSPC, network RPC error, etc. */
+ if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
+ CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
continue;
+ }
err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
- if (err != 0) {
- lnb->rc = err;
- continue;
- }
+ LASSERT (err == 0);
+
+ total_size += lnb->len;
/* we expect these pages to be in offset order, but we'll
* be forgiving */
if (this_size > iattr.ia_size)
iattr.ia_size = this_size;
}
+#if 0
+ /* I use this when I'm checking our lovely 1M I/Os reach the disk -eeb */
+ if (total_size != (1<<20))
+ CWARN("total size %d (%d pages)\n",
+ total_size, total_size/PAGE_SIZE);
+#endif
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ cleanup_phase = 2;
+
+ down(&inode->i_sem);
+ old_size = inode->i_size;
+ oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
+ oti);
+ if (IS_ERR(oti->oti_handle)) {
+ up(&inode->i_sem);
+ rc = PTR_ERR(oti->oti_handle);
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "error starting transaction: rc = %d\n", rc);
+ oti->oti_handle = NULL;
+ GOTO(cleanup, rc);
+ }
+ /* have to call fsfilt_commit() from this point on */
+
+ fsfilt_check_slow(now, obd_timeout, "brw_start");
iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+ /* filter_direct_io drops i_sem */
rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
- oti, NULL);
- rc = filter_finish_transno(exp, oti, rc);
+ oti, &wait_handle);
+
+#if 0
+ if (inode->i_size != old_size) {
+ struct llog_cookie *cookie = obdo_logcookie(oa);
+ struct lustre_id *id = obdo_id(oa);
+ filter_log_sz_change(obd, id, oa->o_easize, cookie, inode);
+ }
+#endif
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
+ if (rc == 0)
+ obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
+ fsfilt_check_slow(now, obd_timeout, "direct_io");
- err = fsfilt_commit(obd, obd->u.filter.fo_sb, inode, oti->oti_handle,
- obd_sync_filter);
- if (err)
+ err = fsfilt_commit_wait(obd, inode, wait_handle);
+ if (rc == 0)
rc = err;
- if (obd_sync_filter)
- LASSERT(oti->oti_transno <= obd->obd_last_committed);
-
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
+ fsfilt_check_slow(now, obd_timeout, "commitrw commit");
cleanup:
filter_grant_commit(exp, niocount, res);