X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fllite%2Frw26.c;h=4852d437abed18bd598010912cb3abaa2f80795d;hb=9b6f9d17a35188f5f4dbfae840164b999a7a78a2;hp=ad4d86f67567d638438891c34b3c2e83ccdb9345;hpb=2bb880b053bcf93be537d24c5588c9d6e323d8f0;p=fs%2Flustre-release.git diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index ad4d86f..4852d43 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -20,8 +20,9 @@ * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ - +#ifdef HAVE_KERNEL_CONFIG_H #include +#endif #include #include #include @@ -43,12 +44,10 @@ #include #include #include -#include #define DEBUG_SUBSYSTEM S_LLITE -#include -#include +#include #include "llite_internal.h" #include @@ -60,53 +59,231 @@ static int ll_writepage_26(struct page *page, struct writeback_control *wbc) /* It is safe to not check anything in invalidatepage/releasepage below because they are run with page locked and all our io is happening with locked page too */ +#ifdef HAVE_INVALIDATEPAGE_RETURN_INT static int ll_invalidatepage(struct page *page, unsigned long offset) { - if (offset) - return 0; + if (offset) + return 0; if (PagePrivate(page)) ll_removepage(page); return 1; } +#else +static void ll_invalidatepage(struct page *page, unsigned long offset) +{ + if (offset) + return; + if (PagePrivate(page)) + ll_removepage(page); +} +#endif -static int ll_releasepage(struct page *page, int gfp_mask) +static int ll_releasepage(struct page *page, gfp_t gfp_mask) { if (PagePrivate(page)) ll_removepage(page); return 1; } -static int ll_writepages(struct address_space *mapping, - struct writeback_control *wbc) +#define MAX_DIRECTIO_SIZE 2*1024*1024*1024UL + +static inline int ll_get_user_pages(int rw, unsigned long user_addr, + size_t size, struct page ***pages) +{ + int result = -ENOMEM; + int page_count; + + /* set an arbitrary limit to prevent arithmetic overflow */ + if (size > MAX_DIRECTIO_SIZE) { + *pages = NULL; + return -EFBIG; + } + + page_count = ((user_addr + size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT)- + (user_addr >> CFS_PAGE_SHIFT); + + OBD_ALLOC_GFP(*pages, page_count * sizeof(**pages), GFP_KERNEL); + if (*pages) { + down_read(¤t->mm->mmap_sem); + result = get_user_pages(current, current->mm, user_addr, + page_count, (rw == READ), 0, *pages, + NULL); + up_read(¤t->mm->mmap_sem); + if (result < 0) + OBD_FREE(*pages, page_count * sizeof(**pages)); + } + + return result; +} + +/* ll_free_user_pages - tear down page struct array + * @pages: array of page struct pointers underlying target buffer */ +static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) +{ + int i; + + for (i = 0; i < npages; i++) { + if (do_dirty) + set_page_dirty_lock(pages[i]); + page_cache_release(pages[i]); + } + + OBD_FREE(pages, npages * sizeof(*pages)); +} + +static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, + struct address_space *mapping, + struct lov_stripe_md *lsm, + size_t size, loff_t file_offset, + struct page **pages, int page_count) { - struct timeval tstart, now; - int rc; - do_gettimeofday(&tstart); - rc = generic_writepages(mapping, wbc); - if (rc == 0 && wbc->sync_mode == WB_SYNC_ALL) { - /* as we don't use Writeback bit to track pages - * under I/O, filemap_fdatawait() doesn't work - * for us. let's wait for I/O completion here */ - struct ll_inode_info *lli = ll_i2info(mapping->host); - wait_event(lli->lli_dirty_wait, - ll_is_inode_dirty(mapping->host) == 0); - do_gettimeofday(&now); - if (now.tv_sec - tstart.tv_sec > obd_timeout) { - CDEBUG(D_ERROR, "synching inode 0x%p "DLID4" took %ds\n", - mapping->host, OLID4(&lli->lli_id), - (int) (now.tv_sec - tstart.tv_sec)); - portals_debug_dumplog(); + struct brw_page *pga; + struct obdo oa; + int i, rc = 0; + size_t length; + ENTRY; + + OBD_ALLOC(pga, sizeof(*pga) * page_count); + if (!pga) { + CDEBUG(D_VFSTRACE, "sizeof(*pga) = %u page_count = %u\n", + (int)sizeof(*pga), page_count); + RETURN(-ENOMEM); + } + + for (i = 0, length = size; length > 0; + length -=pga[i].count, file_offset +=pga[i].count,i++) {/*i last!*/ + pga[i].pg = pages[i]; + pga[i].off = file_offset; + /* To the end of the page, or the length, whatever is less */ + pga[i].count = min_t(int, CFS_PAGE_SIZE -(file_offset & ~CFS_PAGE_MASK), + length); + pga[i].flag = 0; + if (rw == READ) + POISON_PAGE(pages[i], 0x0d); + } + + ll_inode_fill_obdo(inode, rw, &oa); + + rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2obdexp(inode), &oa, lsm, page_count, pga, NULL); + if (rc == 0) { + rc = size; + if (rw == WRITE) { + lov_stripe_lock(lsm); + obd_adjust_kms(ll_i2obdexp(inode), lsm, file_offset, 0); + lov_stripe_unlock(lsm); + } + } + + OBD_FREE(pga, sizeof(*pga) * page_count); + RETURN(rc); +} + +/* This is the maximum size of a single O_DIRECT request, based on a 128kB + * kmalloc limit. We need to fit all of the brw_page structs, each one + * representing PAGE_SIZE worth of user data, into a single buffer, and + * then truncate this to be a full-sized RPC. This is 22MB for 4kB pages. */ +#define MAX_DIO_SIZE ((128 * 1024 / sizeof(struct brw_page) * CFS_PAGE_SIZE) & \ + ~(PTLRPC_MAX_BRW_SIZE - 1)) +static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, + const struct iovec *iov, loff_t file_offset, + unsigned long nr_segs) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file->f_mapping->host; + ssize_t count = iov_length(iov, nr_segs), tot_bytes = 0; + struct ll_inode_info *lli = ll_i2info(inode); + unsigned long seg; + size_t size = MAX_DIO_SIZE; + ENTRY; + + if (!lli->lli_smd || !lli->lli_smd->lsm_object_id) + RETURN(-EBADF); + + /* FIXME: io smaller than CFS_PAGE_SIZE is broken on ia64 ??? */ + if ((file_offset & (~CFS_PAGE_MASK)) || (count & ~CFS_PAGE_MASK)) + RETURN(-EINVAL); + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), size="LPSZ" (max "LPSZ + "), offset=%lld=%llx, pages "LPSZ" (max "LPSZ")\n", + inode->i_ino, inode->i_generation, inode, count, MAX_DIO_SIZE, + file_offset, file_offset, count >> CFS_PAGE_SHIFT, + MAX_DIO_SIZE >> CFS_PAGE_SHIFT); + + if (rw == WRITE) + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_WRITE, count); + else + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_READ, count); + + /* Check that all user buffers are aligned as well */ + for (seg = 0; seg < nr_segs; seg++) { + if (((unsigned long)iov[seg].iov_base & ~CFS_PAGE_MASK) || + (iov[seg].iov_len & ~CFS_PAGE_MASK)) + RETURN(-EINVAL); + } + + for (seg = 0; seg < nr_segs; seg++) { + size_t iov_left = iov[seg].iov_len; + unsigned long user_addr = (unsigned long)iov[seg].iov_base; + + while (iov_left > 0) { + struct page **pages; + int page_count; + ssize_t result; + + page_count = ll_get_user_pages(rw, user_addr, + min(size, iov_left), + &pages); + LASSERT(page_count != 0); + if (page_count > 0) { + result = ll_direct_IO_26_seg(rw, inode, + file->f_mapping, + lli->lli_smd, + min(size,iov_left), + file_offset, pages, + page_count); + ll_free_user_pages(pages, page_count, rw==READ); + } else { + result = 0; + } + if (page_count < 0 || result <= 0) { + /* If we can't allocate a large enough buffer + * for the request, shrink it to a smaller + * PAGE_SIZE multiple and try again. + * We should always be able to kmalloc for a + * page worth of page pointers = 4MB on i386. */ + if ((page_count == -ENOMEM||result == -ENOMEM)&& + size > (CFS_PAGE_SIZE / sizeof(*pages)) * + CFS_PAGE_SIZE) { + size = ((((size / 2) - 1) | + ~CFS_PAGE_MASK) + 1) & + CFS_PAGE_MASK; + CDEBUG(D_VFSTRACE, "DIO size now %u\n", + (int)size); + continue; + } + if (tot_bytes > 0) + RETURN(tot_bytes); + RETURN(page_count < 0 ? page_count : result); + } + + tot_bytes += result; + file_offset += result; + iov_left -= result; + user_addr += result; } } - return rc; + RETURN(tot_bytes); } struct address_space_operations ll_aops = { .readpage = ll_readpage, // .readpages = ll_readpages, -// .direct_IO = ll_direct_IO_26, + .direct_IO = ll_direct_IO_26, .writepage = ll_writepage_26, - .writepages = ll_writepages, + .writepages = generic_writepages, .set_page_dirty = __set_page_dirty_nobuffers, .sync_page = NULL, .prepare_write = ll_prepare_write,