#endif
}
-/* Must be called with i_sem taken; this will drop it */
-static int filter_direct_io(int rw, struct dentry *dchild, struct kiobuf *iobuf,
- struct obd_export *exp, struct iattr *attr,
- struct obd_trans_info *oti, void **wait_handle)
+/* when brw_kiovec() is asked to read from block -1UL it just zeros
+ * the page. this gives us a chance to verify the write mappings
+ * as well */
+static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
+ struct inode *inode)
+{
+ int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
+ ENTRY;
+
+ for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
+ if (iobuf->blocks[i] > 0)
+ continue;
+
+ if (rw == OBD_BRW_WRITE)
+ RETURN(-EINVAL);
+
+ iobuf->blocks[i] = -1UL;
+ }
+ RETURN(0);
+}
+
+#if 0
+static void dump_page(int rw, unsigned long block, struct page *page)
+{
+ char *blah = kmap(page);
+ CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
+ blah[0], blah[1], blah[2], blah[3]);
+ kunmap(page);
+}
+#endif
+
+static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+{
+ struct page *page;
+ int i;
+
+ for (i = 0; i < iobuf->nr_pages ; i++) {
+ page = find_lock_page(inode->i_mapping,
+ iobuf->maplist[i]->index);
+ if (page == NULL)
+ continue;
+ if (page->mapping != NULL) {
+ block_flushpage(page, 0);
+ truncate_complete_page(page);
+ }
+ unlock_page(page);
+ page_cache_release(page);
+ }
+}
+
+/* Must be called with i_sem taken for writes; this will drop it */
+int filter_direct_io(int rw, struct dentry *dchild, void *buf,
+ struct obd_export *exp, struct iattr *attr,
+ struct obd_trans_info *oti, void **wait_handle)
{
struct obd_device *obd = exp->exp_obd;
struct inode *inode = dchild->d_inode;
- int rc, create = (rw == OBD_BRW_WRITE), blocks_per_page;
- int cleanup_phase = 0, *created = NULL;
- int committed = 0;
+ struct kiobuf *iobuf = buf;
+ int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0;
+ int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
+ struct semaphore *sem = NULL;
ENTRY;
- blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+ LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+ if (iobuf->nr_pages == 0)
+ GOTO(cleanup, rc = 0);
+
if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
GOTO(cleanup, rc = -EINVAL);
GOTO(cleanup, rc);
cleanup_phase = 2;
+ if (rw == OBD_BRW_WRITE) {
+ create = 1;
+ sem = &obd->u.filter.fo_alloc_lock;
+ }
+
rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
iobuf->nr_pages, iobuf->blocks, created,
- create, &obd->u.filter.fo_alloc_lock);
+ create, sem);
if (rc)
GOTO(cleanup, rc);
- filter_tally_write(&obd->u.filter, iobuf->maplist, iobuf->nr_pages,
- iobuf->blocks, blocks_per_page);
-
- if (attr->ia_size > inode->i_size)
- attr->ia_valid |= ATTR_SIZE;
- rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
+ rc = filter_cleanup_mappings(rw, iobuf, inode);
if (rc)
GOTO(cleanup, rc);
- up(&inode->i_sem);
- cleanup_phase = 3;
+ if (rw == OBD_BRW_WRITE) {
+ filter_tally_write(&obd->u.filter, iobuf->maplist,
+ iobuf->nr_pages, iobuf->blocks,
+ blocks_per_page);
- rc = filter_finish_transno(exp, oti, 0);
- if (rc)
- GOTO(cleanup, rc);
+ if (attr->ia_size > inode->i_size)
+ attr->ia_valid |= ATTR_SIZE;
+ rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
+ if (rc)
+ GOTO(cleanup, rc);
+ up(&inode->i_sem);
+ cleanup_phase = 3;
+ rc = filter_finish_transno(exp, oti, 0);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
+ committed = 1;
+ if (rc)
+ GOTO(cleanup, rc);
+ }
- rc = fsfilt_commit_async(obd, inode, oti->oti_handle, wait_handle);
- oti->oti_handle = NULL;
- committed = 1;
- if (rc)
- GOTO(cleanup, rc);
+ /* these are our hacks to keep our directio/bh IO coherent with ext3's
+ * page cache use. Most notably ext3 reads file data into the page
+ * cache when it is zeroing the tail of partial-block truncates and
+ * leaves it there, sometimes generating io from it at later truncates.
+ * Someday very soon we'll be performing our brw_kiovec() IO to and
+ * from the page cache. */
check_pending_bhs(iobuf->blocks, iobuf->nr_pages, inode->i_dev,
1 << inode->i_blkbits);
if (rc < 0)
GOTO(cleanup, rc);
- rc = fsfilt_send_bio(obd, inode, iobuf);
+ /* be careful to call this after fsync_inode_data_buffers has waited
+ * for IO to complete before we evict it from the cache */
+ filter_clear_page_cache(inode, iobuf);
+
+ rc = fsfilt_send_bio(rw, obd, inode, iobuf);
CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
iobuf->nr_pages, rc);
EXIT;
cleanup:
- if (!committed) {
+ if (!committed && (rw == OBD_BRW_WRITE)) {
int err = fsfilt_commit_async(obd, inode,
oti->oti_handle, wait_handle);
oti->oti_handle = NULL;
OBD_FREE(created, sizeof(*created) *
iobuf->nr_pages*blocks_per_page);
case 0:
- if (cleanup_phase == 3)
- break;
- up(&inode->i_sem);
+ if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)
+ up(&inode->i_sem);
break;
default:
CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
}
/* See if there are unallocated parts in given file region */
-static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
+int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
{
int (*fs_bmap)(struct address_space *, long) =
inode->i_mapping->a_ops->bmap;
offset >>= inode->i_blkbits;
len >>= inode->i_blkbits;
- for (j = 0; j <= len; j++)
+ for (j = 0; j < len; j++)
if (fs_bmap(inode->i_mapping, offset + j) == 0)
return 0;
return 1;
}
+
+/* some kernels require alloc_kiovec callers to zero members through the use of
+ * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
+ * that makes sure we don't break the rules. */
+static void clear_kiobuf(struct kiobuf *iobuf)
+{
+ int i;
+
+ for (i = 0; i < iobuf->array_len; i++)
+ iobuf->maplist[i] = NULL;
+
+ iobuf->nr_pages = 0;
+ iobuf->offset = 0;
+ iobuf->length = 0;
+}
+
+int filter_alloc_iobuf(int rw, int num_pages, void **ret)
+{
+ int rc;
+ struct kiobuf *iobuf;
+ ENTRY;
+
+ LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+ rc = alloc_kiovec(1, &iobuf);
+ if (rc)
+ RETURN(rc);
+
+ rc = expand_kiobuf(iobuf, num_pages);
+ if (rc) {
+ free_kiovec(1, &iobuf);
+ RETURN(rc);
+ }
+
+#ifdef HAVE_KIOBUF_DOVARY
+ iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
+#endif
+ clear_kiobuf(iobuf);
+ *ret = iobuf;
+ RETURN(0);
+}
+
+void filter_free_iobuf(void *buf)
+{
+ struct kiobuf *iobuf = buf;
+
+ clear_kiobuf(iobuf);
+ free_kiovec(1, &iobuf);
+}
+
+int filter_iobuf_add_page(struct obd_device *obd, void *buf,
+ struct inode *inode, struct page *page)
+{
+ struct kiobuf *iobuf = buf;
+
+ iobuf->maplist[iobuf->nr_pages++] = page;
+ iobuf->length += PAGE_SIZE;
+
+ return 0;
+}
+
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, int niocount,
struct niobuf_local *res, struct obd_trans_info *oti,
struct niobuf_local *lnb;
struct fsfilt_objinfo fso;
struct iattr iattr = { 0 };
- struct kiobuf *iobuf;
+ void *iobuf = NULL;
struct inode *inode = NULL;
int i, n, cleanup_phase = 0, err;
unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
if (rc != 0)
GOTO(cleanup, rc);
- rc = alloc_kiovec(1, &iobuf);
+ rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf);
if (rc)
GOTO(cleanup, rc);
cleanup_phase = 1;
-#ifdef HAVE_KIOBUF_DOVARY
- iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
-#endif
- rc = expand_kiobuf(iobuf, obj->ioo_bufcnt);
- if (rc)
- GOTO(cleanup, rc);
-
- iobuf->offset = 0;
- iobuf->length = 0;
- iobuf->nr_pages = 0;
-
- cleanup_phase = 1;
fso.fso_dentry = res->dentry;
fso.fso_bufcnt = obj->ioo_bufcnt;
inode = res->dentry->d_inode;
/* If overwriting an existing block, we don't need a grant */
if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
- filter_range_is_mapped(inode, lnb->offset, lnb->len))
+ filter_range_is_mapped(inode, lnb->offset, lnb->len))
lnb->rc = 0;
if (lnb->rc) /* ENOSPC, network RPC error */
continue;
- iobuf->maplist[n++] = lnb->page;
- iobuf->length += PAGE_SIZE;
- iobuf->nr_pages++;
-
+ filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
/* We expect these pages to be in offset order, but we'll
* be forgiving */
this_size = lnb->offset + lnb->len;
oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
oti);
if (IS_ERR(oti->oti_handle)) {
+ up(&inode->i_sem);
rc = PTR_ERR(oti->oti_handle);
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
"error starting transaction: rc = %d\n", rc);
CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+ /* filter_direct_io drops i_sem */
rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
oti, &wait_handle);
if (rc == 0)
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
LASSERT(current->journal_info == NULL);
case 1:
- free_kiovec(1, &iobuf);
+ filter_free_iobuf(iobuf);
case 0:
- for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
- filter_release_write_page(&obd->u.filter,
- res->dentry->d_inode, lnb,
- rc);
- }
-
+ filter_free_dio_pages(objcount, obj, niocount, res);
f_dput(res->dentry);
}
RETURN(rc);
}
+