/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * linux/fs/obdfilter/filter_io.c * * Copyright (c) 2001-2003 Cluster File Systems, Inc. * Author: Peter Braam * Author: Andreas Dilger * Author: Phil Schwan * * This file is part of Lustre, http://www.lustre.org. * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * * Lustre is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include #include #include // XXX kill me soon #include #define DEBUG_SUBSYSTEM S_FILTER #include #include #include #include #include "filter_internal.h" /* We should only change the file mtime (and not the ctime, like * update_inode_times() in generic_file_write()) when we only change data. */ void inode_update_time(struct inode *inode, int ctime_too) { time_t now = CURRENT_TIME; if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now)) return; inode->i_mtime = now; if (ctime_too) inode->i_ctime = now; mark_inode_dirty_sync(inode); } /* Bug 2254 -- this is better done in ext3_map_inode_page, but this * workaround will suffice until everyone has upgraded their kernels */ static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev, int size) { #if (LUSTRE_KERNEL_VERSION < 32) struct buffer_head *bh; int i; for (i = 0; i < nr_pages; i++) { bh = get_hash_table(dev, blocks[i], size); if (bh == NULL) continue; if (!buffer_dirty(bh)) { put_bh(bh); continue; } mark_buffer_clean(bh); wait_on_buffer(bh); clear_bit(BH_Req, &bh->b_state); __brelse(bh); } #endif } /* when brw_kiovec() is asked to read from block -1UL it just zeros * the page. this gives us a chance to verify the write mappings * as well */ static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf, struct inode *inode) { int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits; ENTRY; for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) { if (iobuf->blocks[i] > 0) continue; if (rw == OBD_BRW_WRITE) RETURN(-EINVAL); iobuf->blocks[i] = -1UL; } RETURN(0); } #if 0 static void dump_page(int rw, unsigned long block, struct page *page) { char *blah = kmap(page); CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block, blah[0], blah[1], blah[2], blah[3]); kunmap(page); } #endif static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf) { struct page *page; int i; for (i = 0; i < iobuf->nr_pages ; i++) { page = find_lock_page(inode->i_mapping, iobuf->maplist[i]->index); if (page == NULL) continue; if (page->mapping != NULL) { block_flushpage(page, 0); truncate_complete_page(page); } unlock_page(page); page_cache_release(page); } } /* Must be called with i_sem taken for writes; this will drop it */ int filter_direct_io(int rw, struct dentry *dchild, void *buf, struct obd_export *exp, struct iattr *attr, struct obd_trans_info *oti, void **wait_handle) { struct obd_device *obd = exp->exp_obd; struct inode *inode = dchild->d_inode; struct kiobuf *iobuf = buf; int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0; int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0; struct semaphore *sem = NULL; ENTRY; LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw); if (iobuf->nr_pages == 0) GOTO(cleanup, rc = 0); if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS) GOTO(cleanup, rc = -EINVAL); if (iobuf->nr_pages * blocks_per_page > OBDFILTER_CREATED_SCRATCHPAD_ENTRIES) GOTO(cleanup, rc = -EINVAL); cleanup_phase = 1; rc = lock_kiovec(1, &iobuf, 1); if (rc < 0) GOTO(cleanup, rc); cleanup_phase = 2; if (rw == OBD_BRW_WRITE) { create = 1; sem = &obd->u.filter.fo_alloc_lock; } rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist, iobuf->nr_pages, iobuf->blocks, obdfilter_created_scratchpad, create, sem); if (rc) GOTO(cleanup, rc); rc = filter_cleanup_mappings(rw, iobuf, inode); if (rc) GOTO(cleanup, rc); if (rw == OBD_BRW_WRITE) { filter_tally_write(&obd->u.filter, iobuf->maplist, iobuf->nr_pages, iobuf->blocks, blocks_per_page); if (attr->ia_size > inode->i_size) attr->ia_valid |= ATTR_SIZE; rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0); if (rc) GOTO(cleanup, rc); up(&inode->i_sem); cleanup_phase = 3; rc = filter_finish_transno(exp, oti, 0); if (rc) GOTO(cleanup, rc); rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle); committed = 1; if (rc) GOTO(cleanup, rc); } /* these are our hacks to keep our directio/bh IO coherent with ext3's * page cache use. Most notably ext3 reads file data into the page * cache when it is zeroing the tail of partial-block truncates and * leaves it there, sometimes generating io from it at later truncates. * Someday very soon we'll be performing our brw_kiovec() IO to and * from the page cache. */ check_pending_bhs(iobuf->blocks, iobuf->nr_pages, inode->i_dev, 1 << inode->i_blkbits); rc = filemap_fdatasync(inode->i_mapping); if (rc == 0) rc = fsync_inode_data_buffers(inode); if (rc == 0) rc = filemap_fdatawait(inode->i_mapping); if (rc < 0) GOTO(cleanup, rc); /* be careful to call this after fsync_inode_data_buffers has waited * for IO to complete before we evict it from the cache */ filter_clear_page_cache(inode, iobuf); rc = fsfilt_send_bio(rw, obd, inode, iobuf); CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n", iobuf->nr_pages, rc); if (rc > 0) rc = 0; EXIT; cleanup: if (!committed && (rw == OBD_BRW_WRITE)) { int err = fsfilt_commit_async(obd, inode, oti->oti_handle, wait_handle); oti->oti_handle = NULL; if (err) CERROR("can't close transaction: %d\n", err); /* * this is error path, so we prefer to return * original error, not this one */ } switch(cleanup_phase) { case 3: case 2: unlock_kiovec(1, &iobuf); case 1: case 0: if (cleanup_phase != 3 && rw == OBD_BRW_WRITE) up(&inode->i_sem); break; default: CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase); LBUG(); break; } return rc; } /* See if there are unallocated parts in given file region */ int filter_range_is_mapped(struct inode *inode, obd_size offset, int len) { int (*fs_bmap)(struct address_space *, long) = inode->i_mapping->a_ops->bmap; int j; /* We can't know if the range is mapped already or not */ if (fs_bmap == NULL) return 0; offset >>= inode->i_blkbits; len >>= inode->i_blkbits; for (j = 0; j < len; j++) if (fs_bmap(inode->i_mapping, offset + j) == 0) return 0; return 1; } /* some kernels require alloc_kiovec callers to zero members through the use of * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper * that makes sure we don't break the rules. */ static void clear_kiobuf(struct kiobuf *iobuf) { int i; for (i = 0; i < iobuf->array_len; i++) iobuf->maplist[i] = NULL; iobuf->nr_pages = 0; iobuf->offset = 0; iobuf->length = 0; } int filter_alloc_iobuf(int rw, int num_pages, void **ret) { int rc; struct kiobuf *iobuf; ENTRY; LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw); rc = alloc_kiovec(1, &iobuf); if (rc) RETURN(rc); rc = expand_kiobuf(iobuf, num_pages); if (rc) { free_kiovec(1, &iobuf); RETURN(rc); } #ifdef HAVE_KIOBUF_DOVARY iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */ #endif clear_kiobuf(iobuf); *ret = iobuf; RETURN(0); } void filter_free_iobuf(void *buf) { struct kiobuf *iobuf = buf; clear_kiobuf(iobuf); free_kiovec(1, &iobuf); } int filter_iobuf_add_page(struct obd_device *obd, void *buf, struct inode *inode, struct page *page) { struct kiobuf *iobuf = buf; iobuf->maplist[iobuf->nr_pages++] = page; iobuf->length += PAGE_SIZE; return 0; } int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { struct obd_device *obd = exp->exp_obd; struct lvfs_run_ctxt saved; struct niobuf_local *lnb; struct fsfilt_objinfo fso; struct iattr iattr = { 0 }; void *iobuf = NULL; struct inode *inode = NULL; int i, n, cleanup_phase = 0, err; unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */ void *wait_handle; ENTRY; LASSERT(oti != NULL); LASSERT(objcount == 1); LASSERT(current->journal_info == NULL); if (rc != 0) GOTO(cleanup, rc); rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf); if (rc) GOTO(cleanup, rc); cleanup_phase = 1; fso.fso_dentry = res->dentry; fso.fso_bufcnt = obj->ioo_bufcnt; inode = res->dentry->d_inode; for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) { loff_t this_size; /* If overwriting an existing block, we don't need a grant */ if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC && filter_range_is_mapped(inode, lnb->offset, lnb->len)) lnb->rc = 0; if (lnb->rc) /* ENOSPC, network RPC error */ continue; filter_iobuf_add_page(obd, iobuf, inode, lnb->page); /* We expect these pages to be in offset order, but we'll * be forgiving */ this_size = lnb->offset + lnb->len; if (this_size > iattr.ia_size) iattr.ia_size = this_size; } push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); cleanup_phase = 2; down(&inode->i_sem); oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res, oti); if (IS_ERR(oti->oti_handle)) { up(&inode->i_sem); rc = PTR_ERR(oti->oti_handle); CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "error starting transaction: rc = %d\n", rc); oti->oti_handle = NULL; GOTO(cleanup, rc); } fsfilt_check_slow(now, obd_timeout, "brw_start"); iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); /* filter_direct_io drops i_sem */ rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr, oti, &wait_handle); if (rc == 0) obdo_from_inode(oa, inode, FILTER_VALID_FLAGS); fsfilt_check_slow(now, obd_timeout, "direct_io"); err = fsfilt_commit_wait(obd, inode, wait_handle); if (err) rc = err; if (obd_sync_filter && !err) LASSERT(oti->oti_transno <= obd->obd_last_committed); fsfilt_check_slow(now, obd_timeout, "commitrw commit"); cleanup: filter_grant_commit(exp, niocount, res); switch (cleanup_phase) { case 2: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); LASSERT(current->journal_info == NULL); case 1: filter_free_iobuf(iobuf); case 0: filter_free_dio_pages(objcount, obj, niocount, res); f_dput(res->dentry); } RETURN(rc); }