X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Frw26.c;h=6fb6d537fe4ca5b669b6b24f5cb1593f33f8d22d;hp=18098dc5860d38ba682903ff98a597e634aa88d6;hb=1e4d10af3909452b0eee1f99010d80aeb01d42a7;hpb=110d8d4952a9de607cf21f648d75e0b05ef0cee1 diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 18098dc..6fb6d53 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,44 +23,32 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lustre/lustre/llite/rw26.c * * Lustre Lite I/O page cache routines for the 2.5/2.6 kernel version */ +#include +#include +#include #include #include +#include +#include #include -#include -#include #include -#include - -#ifdef HAVE_MIGRATE_H -#include -#elif defined(HAVE_MIGRATE_MODE_H) -#include -#endif -#include -#include -#include #include -#include -#include -#include -#include +#include #define DEBUG_SUBSYSTEM S_LLITE -#include #include "llite_internal.h" -#include +#include /** * Implements Linux VM address_space::invalidatepage() method. This method is @@ -76,38 +60,48 @@ * aligned truncate). Lustre leaves partially truncated page in the cache, * relying on struct inode::i_size to limit further accesses. */ -static void ll_invalidatepage(struct page *vmpage, unsigned long offset) +static void ll_invalidatepage(struct page *vmpage, +#ifdef HAVE_INVALIDATE_RANGE + unsigned int offset, unsigned int length +#else + unsigned long offset +#endif + ) { struct inode *inode; struct lu_env *env; struct cl_page *page; struct cl_object *obj; - int refcheck; - LASSERT(PageLocked(vmpage)); LASSERT(!PageWriteback(vmpage)); - /* - * It is safe to not check anything in invalidatepage/releasepage - * below because they are run with page locked and all our io is - * happening with locked page too - */ - if (offset == 0) { - env = cl_env_get(&refcheck); - if (!IS_ERR(env)) { - inode = vmpage->mapping->host; - obj = ll_i2info(inode)->lli_clob; - if (obj != NULL) { - page = cl_vmpage_page(vmpage, obj); - if (page != NULL) { - cl_page_delete(env, page); - cl_page_put(env, page); - } - } else - LASSERT(vmpage->private == 0); - cl_env_put(env, &refcheck); - } + /* + * It is safe to not check anything in invalidatepage/releasepage + * below because they are run with page locked and all our io is + * happening with locked page too + */ +#ifdef HAVE_INVALIDATE_RANGE + if (offset == 0 && length == PAGE_SIZE) { +#else + if (offset == 0) { +#endif + /* See the comment in ll_releasepage() */ + env = cl_env_percpu_get(); + LASSERT(!IS_ERR(env)); + + inode = vmpage->mapping->host; + obj = ll_i2info(inode)->lli_clob; + if (obj != NULL) { + page = cl_vmpage_page(vmpage, obj); + if (page != NULL) { + cl_page_delete(env, page); + cl_page_put(env, page); + } + } else + LASSERT(vmpage->private == 0); + + cl_env_percpu_put(env); } } @@ -119,7 +113,6 @@ static void ll_invalidatepage(struct page *vmpage, unsigned long offset) static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) { struct lu_env *env; - void *cookie; struct cl_object *obj; struct cl_page *page; struct address_space *mapping; @@ -137,15 +130,10 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) if (obj == NULL) return 1; - /* 1 for caller, 1 for cl_page and 1 for page cache */ - if (page_count(vmpage) > 3) - return 0; - page = cl_vmpage_page(vmpage, obj); if (page == NULL) return 1; - cookie = cl_env_reenter(); env = cl_env_percpu_get(); LASSERT(!IS_ERR(env)); @@ -161,8 +149,7 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) * If this page holds the last refc of cl_object, the following * call path may cause reschedule: * cl_page_put -> cl_page_free -> cl_object_put -> - * lu_object_put -> lu_object_free -> lov_delete_raid0 -> - * cl_locks_prune. + * lu_object_put -> lu_object_free -> lov_delete_raid0. * * However, the kernel can't get rid of this inode until all pages have * been cleaned up. Now that we hold page lock here, it's pretty safe @@ -172,201 +159,262 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) cl_page_put(env, page); cl_env_percpu_put(env); - cl_env_reexit(cookie); return result; } -static int ll_set_page_dirty(struct page *vmpage) +#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) || \ + defined(HAVE_DIRECTIO_2ARGS) +#define HAVE_DIO_ITER 1 +#endif + +/* + * ll_free_user_pages - tear down page struct array + * @pages: array of page struct pointers underlying target buffer + */ +static void ll_free_user_pages(struct page **pages, int npages) { -#if 0 - struct cl_page *page = vvp_vmpage_page_transient(vmpage); - struct vvp_object *obj = cl_inode2vvp(vmpage->mapping->host); - struct vvp_page *cpg; - - /* - * XXX should page method be called here? - */ - LASSERT(&obj->co_cl == page->cp_obj); - cpg = cl2vvp_page(cl_page_at(page, &vvp_device_type)); - /* - * XXX cannot do much here, because page is possibly not locked: - * sys_munmap()->... - * ->unmap_page_range()->zap_pte_range()->set_page_dirty(). - */ - vvp_write_pending(obj, cpg); + int i; + + for (i = 0; i < npages; i++) { + if (!pages[i]) + break; + put_page(pages[i]); + } + +#if defined(HAVE_DIO_ITER) + kvfree(pages); +#else + OBD_FREE_PTR_ARRAY_LARGE(pages, npages); #endif - RETURN(__set_page_dirty_nobuffers(vmpage)); } -#define MAX_DIRECTIO_SIZE 2*1024*1024*1024UL - -static inline int ll_get_user_pages(int rw, unsigned long user_addr, - size_t size, struct page ***pages, - int *max_pages) +static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter, + struct page ***pages, ssize_t *npages, + size_t maxsize) { - int result = -ENOMEM; +#if defined(HAVE_DIO_ITER) + size_t start; + size_t result; - /* set an arbitrary limit to prevent arithmetic overflow */ - if (size > MAX_DIRECTIO_SIZE) { - *pages = NULL; - return -EFBIG; - } + /* + * iov_iter_get_pages_alloc() is introduced in 3.16 similar + * to HAVE_DIO_ITER. + */ + result = iov_iter_get_pages_alloc(iter, pages, maxsize, &start); + if (result > 0) + *npages = DIV_ROUND_UP(result + start, PAGE_SIZE); - *max_pages = (user_addr + size + PAGE_CACHE_SIZE - 1) >> - PAGE_CACHE_SHIFT; - *max_pages -= user_addr >> PAGE_CACHE_SHIFT; - - OBD_ALLOC_LARGE(*pages, *max_pages * sizeof(**pages)); - if (*pages) { - down_read(¤t->mm->mmap_sem); - result = get_user_pages(current, current->mm, user_addr, - *max_pages, (rw == READ), 0, *pages, - NULL); - up_read(¤t->mm->mmap_sem); - if (unlikely(result <= 0)) - OBD_FREE_LARGE(*pages, *max_pages * sizeof(**pages)); - } + return result; +#else + unsigned long addr; + size_t page_count; + size_t size; + long result; + + if (!maxsize) + return 0; + + if (!iter->nr_segs) + return 0; + + addr = (unsigned long)iter->iov->iov_base + iter->iov_offset; + if (addr & ~PAGE_MASK) + return -EINVAL; + + size = min_t(size_t, maxsize, iter->iov->iov_len); + page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT; + OBD_ALLOC_PTR_ARRAY_LARGE(*pages, page_count); + if (*pages == NULL) + return -ENOMEM; + + mmap_read_lock(current->mm); + result = get_user_pages(current, current->mm, addr, page_count, + rw == READ, 0, *pages, NULL); + mmap_read_unlock(current->mm); + + if (unlikely(result != page_count)) { + ll_free_user_pages(*pages, page_count); + *pages = NULL; + + if (result >= 0) + return -EFAULT; + + return result; + } + *npages = page_count; - return result; + return size; +#endif } -/* ll_free_user_pages - tear down page struct array - * @pages: array of page struct pointers underlying target buffer */ -static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) +/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */ +#if defined(HAVE_DIO_ITER) +static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i) { - int i; - - for (i = 0; i < npages; i++) { - if (pages[i] == NULL) - break; - if (do_dirty) - set_page_dirty_lock(pages[i]); - page_cache_release(pages[i]); - } + return iov_iter_alignment(i); +} +#else /* copied from alignment_iovec() */ +static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i) +{ + const struct iovec *iov = i->iov; + unsigned long res; + size_t size = i->count; + size_t n; + + if (!size) + return 0; + + res = (unsigned long)iov->iov_base + i->iov_offset; + n = iov->iov_len - i->iov_offset; + if (n >= size) + return res | size; - OBD_FREE_LARGE(pages, npages * sizeof(*pages)); + size -= n; + res |= n; + while (size > (++iov)->iov_len) { + res |= (unsigned long)iov->iov_base | iov->iov_len; + size -= iov->iov_len; + } + res |= (unsigned long)iov->iov_base | size; + + return res; } +#endif -ssize_t ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, - int rw, struct inode *inode, - struct ll_dio_pages *pv) +/* + * Lustre could relax a bit for alignment, io count is not + * necessary page alignment. + */ +static unsigned long ll_iov_iter_alignment(struct iov_iter *i) { - struct cl_page *clp; - struct cl_2queue *queue; - struct cl_object *obj = io->ci_obj; - int i; - ssize_t rc = 0; - loff_t file_offset = pv->ldp_start_offset; - long size = pv->ldp_size; - int page_count = pv->ldp_nr; - struct page **pages = pv->ldp_pages; - long page_size = cl_page_size(obj); - bool do_io; - int io_pages = 0; - ENTRY; - - queue = &io->ci_queue; - cl_2queue_init(queue); - for (i = 0; i < page_count; i++) { - if (pv->ldp_offsets) - file_offset = pv->ldp_offsets[i]; - - LASSERT(!(file_offset & (page_size - 1))); - clp = cl_page_find(env, obj, cl_index(obj, file_offset), - pv->ldp_pages[i], CPT_TRANSIENT); - if (IS_ERR(clp)) { - rc = PTR_ERR(clp); - break; - } - - rc = cl_page_own(env, io, clp); - if (rc) { - LASSERT(clp->cp_state == CPS_FREEING); - cl_page_put(env, clp); - break; - } - - do_io = true; - - /* check the page type: if the page is a host page, then do - * write directly */ - if (clp->cp_type == CPT_CACHEABLE) { - struct page *vmpage = cl_page_vmpage(env, clp); - struct page *src_page; - struct page *dst_page; - void *src; - void *dst; - - src_page = (rw == WRITE) ? pages[i] : vmpage; - dst_page = (rw == WRITE) ? vmpage : pages[i]; - - src = ll_kmap_atomic(src_page, KM_USER0); - dst = ll_kmap_atomic(dst_page, KM_USER1); - memcpy(dst, src, min(page_size, size)); - ll_kunmap_atomic(dst, KM_USER1); - ll_kunmap_atomic(src, KM_USER0); - - /* make sure page will be added to the transfer by - * cl_io_submit()->...->vvp_page_prep_write(). */ - if (rw == WRITE) - set_page_dirty(vmpage); - - if (rw == READ) { - /* do not issue the page for read, since it - * may reread a ra page which has NOT uptodate - * bit set. */ - cl_page_disown(env, io, clp); - do_io = false; - } - } - - if (likely(do_io)) { - cl_2queue_add(queue, clp); - - /* - * Set page clip to tell transfer formation engine - * that page has to be sent even if it is beyond KMS. - */ - cl_page_clip(env, clp, 0, min(size, page_size)); - - ++io_pages; - } - - /* drop the reference count for cl_page_find */ - cl_page_put(env, clp); - size -= page_size; - file_offset += page_size; - } + size_t orig_size = i->count; + size_t count = orig_size & ~PAGE_MASK; + unsigned long res; - if (rc == 0 && io_pages) { - rc = cl_io_submit_sync(env, io, - rw == READ ? CRT_READ : CRT_WRITE, - queue, 0); - } - if (rc == 0) - rc = pv->ldp_size; + if (!count) + return iov_iter_alignment_vfs(i); + + if (orig_size > PAGE_SIZE) { + iov_iter_truncate(i, orig_size - count); + res = iov_iter_alignment_vfs(i); + iov_iter_reexpand(i, orig_size); + + return res; + } + + res = iov_iter_alignment_vfs(i); + /* start address is page aligned */ + if ((res & ~PAGE_MASK) == orig_size) + return PAGE_SIZE; - cl_2queue_discard(env, io, queue); - cl_2queue_disown(env, io, queue); - cl_2queue_fini(env, queue); - RETURN(rc); + return res; } -EXPORT_SYMBOL(ll_direct_rw_pages); -static ssize_t ll_direct_IO_26_seg(const struct lu_env *env, struct cl_io *io, - int rw, struct inode *inode, - struct address_space *mapping, - size_t size, loff_t file_offset, - struct page **pages, int page_count) +/** direct IO pages */ +struct ll_dio_pages { + struct cl_dio_aio *ldp_aio; + /* + * page array to be written. we don't support + * partial pages except the last one. + */ + struct page **ldp_pages; + /** # of pages in the array. */ + size_t ldp_count; + /* the file offset of the first page. */ + loff_t ldp_file_offset; +}; + +static int +ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, + int rw, struct inode *inode, struct ll_dio_pages *pv) { - struct ll_dio_pages pvec = { .ldp_pages = pages, - .ldp_nr = page_count, - .ldp_size = size, - .ldp_offsets = NULL, - .ldp_start_offset = file_offset - }; - - return ll_direct_rw_pages(env, io, rw, inode, &pvec); + struct cl_page *page; + struct cl_2queue *queue = &io->ci_queue; + struct cl_object *obj = io->ci_obj; + struct cl_sync_io *anchor = &pv->ldp_aio->cda_sync; + loff_t offset = pv->ldp_file_offset; + int io_pages = 0; + size_t page_size = cl_page_size(obj); + int i; + ssize_t rc = 0; + + ENTRY; + + cl_2queue_init(queue); + for (i = 0; i < pv->ldp_count; i++) { + LASSERT(!(offset & (PAGE_SIZE - 1))); + page = cl_page_find(env, obj, cl_index(obj, offset), + pv->ldp_pages[i], CPT_TRANSIENT); + if (IS_ERR(page)) { + rc = PTR_ERR(page); + break; + } + LASSERT(page->cp_type == CPT_TRANSIENT); + rc = cl_page_own(env, io, page); + if (rc) { + cl_page_put(env, page); + break; + } + + page->cp_sync_io = anchor; + if (inode && IS_ENCRYPTED(inode)) { + /* In case of Direct IO on encrypted file, we need to + * add a reference to the inode on the cl_page. + * This info is required by llcrypt to proceed + * to encryption/decryption. + * This is safe because we know these pages are private + * to the thread doing the Direct IO. + */ + page->cp_inode = inode; + } + /* We keep the refcount from cl_page_find, so we don't need + * another one here + */ + cl_2queue_add(queue, page, false); + /* + * Set page clip to tell transfer formation engine + * that page has to be sent even if it is beyond KMS. + */ + if (size < page_size) + cl_page_clip(env, page, 0, size); + ++io_pages; + + offset += page_size; + size -= page_size; + } + if (rc == 0 && io_pages > 0) { + int iot = rw == READ ? CRT_READ : CRT_WRITE; + + atomic_add(io_pages, &anchor->csi_sync_nr); + /* + * Avoid out-of-order execution of adding inflight + * modifications count and io submit. + */ + smp_mb(); + rc = cl_io_submit_rw(env, io, iot, queue); + if (rc == 0) { + cl_page_list_splice(&queue->c2_qout, + &pv->ldp_aio->cda_pages); + } else { + atomic_add(-queue->c2_qin.pl_nr, + &anchor->csi_sync_nr); + cl_page_list_for_each(page, &queue->c2_qin) + page->cp_sync_io = NULL; + } + /* handle partially submitted reqs */ + if (queue->c2_qin.pl_nr > 0) { + CERROR(DFID " failed to submit %d dio pages: %zd\n", + PFID(lu_object_fid(&obj->co_lu)), + queue->c2_qin.pl_nr, rc); + if (rc == 0) + rc = -EIO; + } + } + + cl_2queue_discard(env, io, queue); + cl_2queue_disown(env, io, queue); + cl_2queue_fini(env, queue); + RETURN(rc); } #ifdef KMALLOC_MAX_SIZE @@ -380,223 +428,512 @@ static ssize_t ll_direct_IO_26_seg(const struct lu_env *env, struct cl_io *io, * representing PAGE_SIZE worth of user data, into a single buffer, and * then truncate this to be a full-sized RPC. For 4kB PAGE_SIZE this is * up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */ -#define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_CACHE_SIZE) & \ - ~(DT_MAX_BRW_SIZE - 1)) -static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, - const struct iovec *iov, loff_t file_offset, - unsigned long nr_segs) +#define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \ + ~((size_t)DT_MAX_BRW_SIZE - 1)) + +static ssize_t +ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) { - struct lu_env *env; - struct cl_io *io; - struct file *file = iocb->ki_filp; - struct inode *inode = file->f_mapping->host; - struct ccc_object *obj = cl_inode2ccc(inode); - long count = iov_length(iov, nr_segs); - long tot_bytes = 0, result = 0; - struct ll_inode_info *lli = ll_i2info(inode); - unsigned long seg = 0; - long size = MAX_DIO_SIZE; - int refcheck; - ENTRY; - - if (!lli->lli_has_smd) - RETURN(-EBADF); - - /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ - if ((file_offset & ~CFS_PAGE_MASK) || (count & ~CFS_PAGE_MASK)) - RETURN(-EINVAL); - - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%lu (max %lu), " - "offset=%lld=%llx, pages %lu (max %lu)\n", + struct ll_cl_context *lcc; + const struct lu_env *env; + struct cl_io *io; + struct file *file = iocb->ki_filp; + struct inode *inode = file->f_mapping->host; + struct cl_dio_aio *aio; + size_t count = iov_iter_count(iter); + ssize_t tot_bytes = 0, result = 0; + loff_t file_offset = iocb->ki_pos; + struct vvp_io *vio; + + /* Check EOF by ourselves */ + if (rw == READ && file_offset >= i_size_read(inode)) + return 0; + + /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ + if (file_offset & ~PAGE_MASK) + RETURN(-EINVAL); + + CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), " + "offset=%lld=%llx, pages %zd (max %lu)\n", PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE, - file_offset, file_offset, count >> PAGE_CACHE_SHIFT, - MAX_DIO_SIZE >> PAGE_CACHE_SHIFT); - - /* Check that all user buffers are aligned as well */ - for (seg = 0; seg < nr_segs; seg++) { - if (((unsigned long)iov[seg].iov_base & ~CFS_PAGE_MASK) || - (iov[seg].iov_len & ~CFS_PAGE_MASK)) - RETURN(-EINVAL); - } + file_offset, file_offset, count >> PAGE_SHIFT, + MAX_DIO_SIZE >> PAGE_SHIFT); - env = cl_env_get(&refcheck); - LASSERT(!IS_ERR(env)); - io = ccc_env_io(env)->cui_cl.cis_io; - LASSERT(io != NULL); + /* Check that all user buffers are aligned as well */ + if (ll_iov_iter_alignment(iter) & ~PAGE_MASK) + RETURN(-EINVAL); + + lcc = ll_cl_find(file); + if (lcc == NULL) + RETURN(-EIO); + + env = lcc->lcc_env; + LASSERT(!IS_ERR(env)); + vio = vvp_env_io(env); + io = lcc->lcc_io; + LASSERT(io != NULL); + + aio = io->ci_aio; + LASSERT(aio); + LASSERT(aio->cda_iocb == iocb); + + while (iov_iter_count(iter)) { + struct ll_dio_pages pvec = { .ldp_aio = aio }; + struct page **pages; + + count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); + if (rw == READ) { + if (file_offset >= i_size_read(inode)) + break; + + if (file_offset + count > i_size_read(inode)) + count = i_size_read(inode) - file_offset; + } + + result = ll_get_user_pages(rw, iter, &pages, + &pvec.ldp_count, count); + if (unlikely(result <= 0)) + GOTO(out, result); + + count = result; + pvec.ldp_file_offset = file_offset; + pvec.ldp_pages = pages; + + result = ll_direct_rw_pages(env, io, count, + rw, inode, &pvec); + ll_free_user_pages(pages, pvec.ldp_count); + + if (unlikely(result < 0)) + GOTO(out, result); + + iov_iter_advance(iter, count); + tot_bytes += count; + file_offset += count; + } - /* 0. Need locking between buffered and direct access. and race with - * size changing by concurrent truncates and writes. - * 1. Need inode mutex to operate transient pages. - */ - if (rw == READ) - mutex_lock(&inode->i_mutex); - - LASSERT(obj->cob_transient_pages == 0); - for (seg = 0; seg < nr_segs; seg++) { - long iov_left = iov[seg].iov_len; - unsigned long user_addr = (unsigned long)iov[seg].iov_base; - - if (rw == READ) { - if (file_offset >= i_size_read(inode)) - break; - if (file_offset + iov_left > i_size_read(inode)) - iov_left = i_size_read(inode) - file_offset; - } - - while (iov_left > 0) { - struct page **pages; - int page_count, max_pages = 0; - long bytes; - - bytes = min(size, iov_left); - page_count = ll_get_user_pages(rw, user_addr, bytes, - &pages, &max_pages); - if (likely(page_count > 0)) { - if (unlikely(page_count < max_pages)) - bytes = page_count << PAGE_CACHE_SHIFT; - result = ll_direct_IO_26_seg(env, io, rw, inode, - file->f_mapping, - bytes, file_offset, - pages, page_count); - ll_free_user_pages(pages, max_pages, rw==READ); - } else if (page_count == 0) { - GOTO(out, result = -EFAULT); - } else { - result = page_count; - } - if (unlikely(result <= 0)) { - /* If we can't allocate a large enough buffer - * for the request, shrink it to a smaller - * PAGE_SIZE multiple and try again. - * We should always be able to kmalloc for a - * page worth of page pointers = 4MB on i386. */ - if (result == -ENOMEM && - size > (PAGE_CACHE_SIZE / sizeof(*pages)) * - PAGE_CACHE_SIZE) { - size = ((((size / 2) - 1) | - ~CFS_PAGE_MASK) + 1) & - CFS_PAGE_MASK; - CDEBUG(D_VFSTRACE,"DIO size now %lu\n", - size); - continue; - } - - GOTO(out, result); - } - - tot_bytes += result; - file_offset += result; - iov_left -= result; - user_addr += result; - } - } out: - LASSERT(obj->cob_transient_pages == 0); - if (rw == READ) - mutex_unlock(&inode->i_mutex); - - if (tot_bytes > 0) { - if (rw == WRITE) { - struct lov_stripe_md *lsm; - - lsm = ccc_inode_lsm_get(inode); - LASSERT(lsm != NULL); - lov_stripe_lock(lsm); - obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0); - lov_stripe_unlock(lsm); - ccc_inode_lsm_put(inode, lsm); + aio->cda_bytes += tot_bytes; + + if (rw == WRITE) + vio->u.readwrite.vui_written += tot_bytes; + else + vio->u.readwrite.vui_read += tot_bytes; + + /* We cannot do async submission - for AIO or regular DIO - unless + * lockless because it causes us to release the lock early. + * + * There are also several circumstances in which we must disable + * parallel DIO, so we check if it is enabled. + * + * The check for "is_sync_kiocb" excludes AIO, which does not need to + * be disabled in these situations. + */ + if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) { + ssize_t rc2; + + /* Wait here rather than doing async submission */ + rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0); + if (result == 0 && rc2) + result = rc2; + + if (result == 0) + result = tot_bytes; + } else if (result == 0) { + result = -EIOCBQUEUED; + } + + return result; +} + +#if defined(HAVE_DIO_ITER) +static ssize_t ll_direct_IO( +#ifndef HAVE_IOV_ITER_RW + int rw, +#endif + struct kiocb *iocb, struct iov_iter *iter +#ifndef HAVE_DIRECTIO_2ARGS + , loff_t file_offset +#endif + ) +{ + int nrw; + +#ifndef HAVE_IOV_ITER_RW + nrw = rw; +#else + nrw = iov_iter_rw(iter); +#endif + + return ll_direct_IO_impl(iocb, iter, nrw); +} + +#else /* !defined(HAVE_DIO_ITER) */ + +static ssize_t +ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, + loff_t file_offset, unsigned long nr_segs) +{ + struct iov_iter iter; + + iov_iter_init(&iter, iov, nr_segs, iov_length(iov, nr_segs), 0); + return ll_direct_IO_impl(iocb, &iter, rw); +} + +#endif /* !defined(HAVE_DIO_ITER) */ + +/** + * Prepare partially written-to page for a write. + * @pg is owned when passed in and disowned when it returns non-zero result to + * the caller. + */ +static int ll_prepare_partial_page(const struct lu_env *env, struct cl_io *io, + struct cl_page *pg, struct file *file) +{ + struct cl_attr *attr = vvp_env_thread_attr(env); + struct cl_object *obj = io->ci_obj; + struct vvp_page *vpg = cl_object_page_slice(obj, pg); + loff_t offset = cl_offset(obj, vvp_index(vpg)); + int result; + ENTRY; + + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + cl_object_attr_unlock(obj); + if (result) { + cl_page_disown(env, io, pg); + GOTO(out, result); + } + + /* + * If are writing to a new page, no need to read old data. + * The extent locking will have updated the KMS, and for our + * purposes here we can treat it like i_size. + */ + if (attr->cat_kms <= offset) { + char *kaddr = kmap_atomic(vpg->vpg_page); + + memset(kaddr, 0, cl_page_size(obj)); + kunmap_atomic(kaddr); + GOTO(out, result = 0); + } + + if (vpg->vpg_defer_uptodate) { + vpg->vpg_ra_used = 1; + GOTO(out, result = 0); + } + + result = ll_io_read_page(env, io, pg, file); + if (result) + GOTO(out, result); + + /* ll_io_read_page() disowns the page */ + result = cl_page_own(env, io, pg); + if (!result) { + if (!PageUptodate(cl_page_vmpage(pg))) { + cl_page_disown(env, io, pg); + result = -EIO; } + } else if (result == -ENOENT) { + /* page was truncated */ + result = -EAGAIN; } + EXIT; - cl_env_put(env, &refcheck); - RETURN(tot_bytes ? : result); +out: + return result; +} + +static int ll_tiny_write_begin(struct page *vmpage, struct address_space *mapping) +{ + /* Page must be present, up to date, dirty, and not in writeback. */ + if (!vmpage || !PageUptodate(vmpage) || !PageDirty(vmpage) || + PageWriteback(vmpage) || vmpage->mapping != mapping) + return -ENODATA; + + return 0; } static int ll_write_begin(struct file *file, struct address_space *mapping, - loff_t pos, unsigned len, unsigned flags, - struct page **pagep, void **fsdata) + loff_t pos, unsigned len, unsigned flags, + struct page **pagep, void **fsdata) { - pgoff_t index = pos >> PAGE_CACHE_SHIFT; - struct page *page; - int rc; - unsigned from = pos & (PAGE_CACHE_SIZE - 1); - ENTRY; - - page = grab_cache_page_write_begin(mapping, index, flags); - if (!page) - RETURN(-ENOMEM); - - *pagep = page; - - rc = ll_prepare_write(file, page, from, from + len); - if (rc) { - unlock_page(page); - page_cache_release(page); - } - RETURN(rc); + struct ll_cl_context *lcc = NULL; + const struct lu_env *env = NULL; + struct cl_io *io = NULL; + struct cl_page *page = NULL; + + struct cl_object *clob = ll_i2info(mapping->host)->lli_clob; + pgoff_t index = pos >> PAGE_SHIFT; + struct page *vmpage = NULL; + unsigned from = pos & (PAGE_SIZE - 1); + unsigned to = from + len; + int result = 0; + ENTRY; + + CDEBUG(D_VFSTRACE, "Writing %lu of %d to %d bytes\n", index, from, len); + + lcc = ll_cl_find(file); + if (lcc == NULL) { + vmpage = grab_cache_page_nowait(mapping, index); + result = ll_tiny_write_begin(vmpage, mapping); + GOTO(out, result); + } + + env = lcc->lcc_env; + io = lcc->lcc_io; + + if (file->f_flags & O_DIRECT) { + /* direct IO failed because it couldn't clean up cached pages, + * this causes a problem for mirror write because the cached + * page may belong to another mirror, which will result in + * problem submitting the I/O. */ + if (io->ci_designated_mirror > 0) + GOTO(out, result = -EBUSY); + + /** + * Direct write can fall back to buffered read, but DIO is done + * with lockless i/o, and buffered requires LDLM locking, so + * in this case we must restart without lockless. + */ + if (!io->ci_dio_lock) { + io->ci_dio_lock = 1; + io->ci_need_restart = 1; + GOTO(out, result = -ENOLCK); + } + } +again: + /* To avoid deadlock, try to lock page first. */ + vmpage = grab_cache_page_nowait(mapping, index); + + if (unlikely(vmpage == NULL || + PageDirty(vmpage) || PageWriteback(vmpage))) { + struct vvp_io *vio = vvp_env_io(env); + struct cl_page_list *plist = &vio->u.readwrite.vui_queue; + + /* if the page is already in dirty cache, we have to commit + * the pages right now; otherwise, it may cause deadlock + * because it holds page lock of a dirty page and request for + * more grants. It's okay for the dirty page to be the first + * one in commit page list, though. */ + if (vmpage != NULL && plist->pl_nr > 0) { + unlock_page(vmpage); + put_page(vmpage); + vmpage = NULL; + } + + /* commit pages and then wait for page lock */ + result = vvp_io_write_commit(env, io); + if (result < 0) + GOTO(out, result); + + if (vmpage == NULL) { + vmpage = grab_cache_page_write_begin(mapping, index, + flags); + if (vmpage == NULL) + GOTO(out, result = -ENOMEM); + } + } + + /* page was truncated */ + if (mapping != vmpage->mapping) { + CDEBUG(D_VFSTRACE, "page: %lu was truncated\n", index); + unlock_page(vmpage); + put_page(vmpage); + vmpage = NULL; + goto again; + } + + page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE); + if (IS_ERR(page)) + GOTO(out, result = PTR_ERR(page)); + + lcc->lcc_page = page; + lu_ref_add(&page->cp_reference, "cl_io", io); + + cl_page_assume(env, io, page); + if (!PageUptodate(vmpage)) { + /* + * We're completely overwriting an existing page, + * so _don't_ set it up to date until commit_write + */ + if (from == 0 && to == PAGE_SIZE) { + CL_PAGE_HEADER(D_PAGE, env, page, "full page write\n"); + POISON_PAGE(vmpage, 0x11); + } else { + /* TODO: can be optimized at OSC layer to check if it + * is a lockless IO. In that case, it's not necessary + * to read the data. */ + result = ll_prepare_partial_page(env, io, page, file); + if (result) { + /* vmpage should have been unlocked */ + put_page(vmpage); + vmpage = NULL; + + if (result == -EAGAIN) + goto again; + GOTO(out, result); + } + } + } + EXIT; +out: + if (result < 0) { + if (vmpage != NULL) { + unlock_page(vmpage); + put_page(vmpage); + } + /* On tiny_write failure, page and io are always null. */ + if (!IS_ERR_OR_NULL(page)) { + lu_ref_del(&page->cp_reference, "cl_io", io); + cl_page_put(env, page); + } + if (io) + io->ci_result = result; + } else { + *pagep = vmpage; + *fsdata = lcc; + } + RETURN(result); +} + +static int ll_tiny_write_end(struct file *file, struct address_space *mapping, + loff_t pos, unsigned int len, unsigned int copied, + struct page *vmpage) +{ + struct cl_page *clpage = (struct cl_page *) vmpage->private; + loff_t kms = pos+copied; + loff_t to = kms & (PAGE_SIZE-1) ? kms & (PAGE_SIZE-1) : PAGE_SIZE; + __u16 refcheck; + struct lu_env *env = cl_env_get(&refcheck); + int rc = 0; + + ENTRY; + + if (IS_ERR(env)) { + rc = PTR_ERR(env); + goto out; + } + + /* This page is dirty in cache, so it should have a cl_page pointer + * set in vmpage->private. + */ + LASSERT(clpage != NULL); + + if (copied == 0) + goto out_env; + + /* Update the underlying size information in the OSC/LOV objects this + * page is part of. + */ + cl_page_touch(env, clpage, to); + +out_env: + cl_env_put(env, &refcheck); + +out: + /* Must return page unlocked. */ + unlock_page(vmpage); + + RETURN(rc); } static int ll_write_end(struct file *file, struct address_space *mapping, - loff_t pos, unsigned len, unsigned copied, - struct page *page, void *fsdata) + loff_t pos, unsigned len, unsigned copied, + struct page *vmpage, void *fsdata) { - unsigned from = pos & (PAGE_CACHE_SIZE - 1); - int rc; + struct ll_cl_context *lcc = fsdata; + const struct lu_env *env; + struct cl_io *io; + struct vvp_io *vio; + struct cl_page *page; + unsigned from = pos & (PAGE_SIZE - 1); + bool unplug = false; + int result = 0; + ENTRY; + + put_page(vmpage); + + CDEBUG(D_VFSTRACE, "pos %llu, len %u, copied %u\n", pos, len, copied); - rc = ll_commit_write(file, page, from, from + copied); - unlock_page(page); - page_cache_release(page); + if (lcc == NULL) { + result = ll_tiny_write_end(file, mapping, pos, len, copied, + vmpage); + GOTO(out, result); + } + + LASSERT(lcc != NULL); + env = lcc->lcc_env; + page = lcc->lcc_page; + io = lcc->lcc_io; + vio = vvp_env_io(env); + + LASSERT(cl_page_is_owned(page, io)); + if (copied > 0) { + struct cl_page_list *plist = &vio->u.readwrite.vui_queue; + + lcc->lcc_page = NULL; /* page will be queued */ + + /* Add it into write queue */ + cl_page_list_add(plist, page, true); + if (plist->pl_nr == 1) /* first page */ + vio->u.readwrite.vui_from = from; + else + LASSERT(from == 0); + vio->u.readwrite.vui_to = from + copied; + + /* To address the deadlock in balance_dirty_pages() where + * this dirty page may be written back in the same thread. */ + if (PageDirty(vmpage)) + unplug = true; + + /* We may have one full RPC, commit it soon */ + if (plist->pl_nr >= PTLRPC_MAX_BRW_PAGES) + unplug = true; + + CL_PAGE_DEBUG(D_VFSTRACE, env, page, + "queued page: %d.\n", plist->pl_nr); + } else { + cl_page_disown(env, io, page); + + lcc->lcc_page = NULL; + lu_ref_del(&page->cp_reference, "cl_io", io); + cl_page_put(env, page); + + /* page list is not contiguous now, commit it now */ + unplug = true; + } + if (unplug || io->u.ci_wr.wr_sync) + result = vvp_io_write_commit(env, io); - return rc ?: copied; + if (result < 0) + io->ci_result = result; + + +out: + RETURN(result >= 0 ? copied : result); } #ifdef CONFIG_MIGRATION -int ll_migratepage(struct address_space *mapping, - struct page *newpage, struct page *page -#ifdef HAVE_MIGRATEPAGE_4ARGS - , enum migrate_mode mode -#endif - ) +static int ll_migratepage(struct address_space *mapping, + struct page *newpage, struct page *page, + enum migrate_mode mode) { /* Always fail page migration until we have a proper implementation */ return -EIO; } #endif -#ifndef MS_HAS_NEW_AOPS -struct address_space_operations ll_aops = { - .readpage = ll_readpage, -// .readpages = ll_readpages, - .direct_IO = ll_direct_IO_26, - .writepage = ll_writepage, - .writepages = ll_writepages, - .set_page_dirty = ll_set_page_dirty, - .write_begin = ll_write_begin, - .write_end = ll_write_end, - .invalidatepage = ll_invalidatepage, - .releasepage = (void *)ll_releasepage, -#ifdef CONFIG_MIGRATION - .migratepage = ll_migratepage, -#endif - .bmap = NULL -}; -#else -struct address_space_operations_ext ll_aops = { - .orig_aops.readpage = ll_readpage, -// .orig_aops.readpages = ll_readpages, - .orig_aops.direct_IO = ll_direct_IO_26, - .orig_aops.writepage = ll_writepage, - .orig_aops.writepages = ll_writepages, - .orig_aops.set_page_dirty = ll_set_page_dirty, - .orig_aops.prepare_write = ll_prepare_write, - .orig_aops.commit_write = ll_commit_write, - .orig_aops.invalidatepage = ll_invalidatepage, - .orig_aops.releasepage = ll_releasepage, +const struct address_space_operations ll_aops = { + .readpage = ll_readpage, + .direct_IO = ll_direct_IO, + .writepage = ll_writepage, + .writepages = ll_writepages, + .set_page_dirty = __set_page_dirty_nobuffers, + .write_begin = ll_write_begin, + .write_end = ll_write_end, + .invalidatepage = ll_invalidatepage, + .releasepage = (void *)ll_releasepage, #ifdef CONFIG_MIGRATION - .orig_aops.migratepage = ll_migratepage, + .migratepage = ll_migratepage, #endif - .orig_aops.bmap = NULL, - .write_begin = ll_write_begin, - .write_end = ll_write_end }; -#endif