From d1dded6e28473d889a9b24b47cbc804f90dd2956 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Mon, 29 Apr 2019 16:30:05 +0800 Subject: [PATCH] LU-4198 clio: AIO support for direct IO This patch try to add aio support for Lustre, AIO is doing IO like DIO but we don't wait IO finished upon return, we return EIOCBQUEUED to vfs instead to indicate IO have been issued, aio_complete() will be called in the callback once IO have been done. fio AIO/DIO bandwidth results: # numjob=4, bs=512k MB/s write read master 832 1806 patched 6591 11800 fio AIO/DIO IOPS results: # 32 clients, 8192 threads # ioengine=libaio rw=randread blocksize=4096 iodepth=128 direct=1 # size=1g runtime=300 group_reporting numjobs=256 create_serialize=0 IOPS write read master 99K 1239K patched 265K 3498K Test-Parameters: testgroup=review-ldiskfs-arm Signed-off-by: Jinshan Xiong Signed-off-by: Patrick Farrell Signed-off-by: Wang Shilong Change-Id: If2ac9283612514e10fe342fc43e95b4081347168 Reviewed-on: https://review.whamcloud.com/32416 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Bobi Jam Tested-by: Shuichi Ihara Reviewed-by: Andreas Dilger Reviewed-by: Jinshan Xiong Reviewed-by: Oleg Drokin --- lustre/autoconf/lustre-core.m4 | 17 ++ lustre/include/cl_object.h | 15 +- lustre/llite/file.c | 2 - lustre/llite/rw26.c | 597 +++++++++++++++++++++-------------------- lustre/llite/vvp_io.c | 7 +- lustre/llite/vvp_page.c | 8 +- lustre/obdclass/cl_io.c | 15 +- lustre/osc/osc_cache.c | 3 - lustre/tests/sanity.sh | 51 ++++ 9 files changed, 413 insertions(+), 302 deletions(-) diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index a39489a..24c1f1b 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -1726,6 +1726,22 @@ EXTRA_KCFLAGS="$tmp_flags" ]) # LC_HAVE_DQUOT_QC_DQBLK # +# LC_HAVE_AIO_COMPLETE +# +# 3.19 kernel makes aio_complete() static +# +AC_DEFUN([LC_HAVE_AIO_COMPLETE], [ +LB_CHECK_COMPILE([if kernel has exported aio_complete() ], +aio_complete, [ + #include +],[ + aio_complete(NULL, 0, 0); +],[ + AC_DEFINE(HAVE_AIO_COMPLETE, 1, [aio_complete defined]) +]) +]) # LC_HAVE_AIO_COMPLETE + +# # LC_BACKING_DEV_INFO_REMOVAL # # 3.20 kernel removed backing_dev_info from address_space @@ -3071,6 +3087,7 @@ AC_DEFUN([LC_PROG_LINUX], [ # 3.19 LC_KIOCB_HAS_NBYTES LC_HAVE_DQUOT_QC_DQBLK + LC_HAVE_AIO_COMPLETE # 3.20 LC_BACKING_DEV_INFO_REMOVAL diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index a855df3..7979b14 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2484,11 +2484,12 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj, * @{ */ struct cl_sync_io; +struct cl_dio_aio; typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *); void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr, - cl_sync_io_end_t *end); + struct cl_dio_aio *aio, cl_sync_io_end_t *end); int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor, long timeout); @@ -2496,7 +2497,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, int ioret); static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr) { - cl_sync_io_init_notify(anchor, nr, NULL); + cl_sync_io_init_notify(anchor, nr, NULL, NULL); } /** @@ -2514,6 +2515,16 @@ struct cl_sync_io { wait_queue_head_t csi_waitq; /** callback to invoke when this IO is finished */ cl_sync_io_end_t *csi_end_io; + /** aio private data */ + struct cl_dio_aio *csi_aio; +}; + +/** To support Direct AIO */ +struct cl_dio_aio { + struct cl_sync_io cda_sync; + struct cl_page_list cda_pages; + struct kiocb *cda_iocb; + ssize_t cda_bytes; }; /** @} cl_sync_io */ diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 9ddbf22..d3cdc6f 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1478,7 +1478,6 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, ssize_t result = 0; int rc = 0; unsigned retried = 0; - bool restarted = false; unsigned ignore_lockless = 0; ENTRY; @@ -1573,7 +1572,6 @@ out: /* preserve the tried count for FLR */ retried = io->ci_ndelay_tried; ignore_lockless = io->ci_ignore_lockless; - restarted = true; goto restart; } diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index ab6bec9..8ea28c1 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -168,130 +168,262 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) return result; } -#define MAX_DIRECTIO_SIZE 2*1024*1024*1024UL +#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) || \ + defined(HAVE_DIRECTIO_2ARGS) +#define HAVE_DIO_ITER 1 +#endif -static ssize_t -ll_direct_IO_seg(const struct lu_env *env, struct cl_io *io, int rw, - struct inode *inode, size_t size, loff_t file_offset, - struct page **pages, int page_count) +/* + * ll_free_user_pages - tear down page struct array + * @pages: array of page struct pointers underlying target buffer + */ +static void ll_free_user_pages(struct page **pages, int npages) { - struct cl_page *clp; - struct cl_2queue *queue; - struct cl_object *obj = io->ci_obj; int i; - ssize_t rc = 0; - size_t page_size = cl_page_size(obj); - size_t orig_size = size; - bool do_io; - int io_pages = 0; - ENTRY; - queue = &io->ci_queue; - cl_2queue_init(queue); - for (i = 0; i < page_count; i++) { - LASSERT(!(file_offset & (page_size - 1))); - clp = cl_page_find(env, obj, cl_index(obj, file_offset), - pages[i], CPT_TRANSIENT); - if (IS_ERR(clp)) { - rc = PTR_ERR(clp); + for (i = 0; i < npages; i++) { + if (!pages[i]) break; - } + put_page(pages[i]); + } - rc = cl_page_own(env, io, clp); - if (rc) { - LASSERT(clp->cp_state == CPS_FREEING); - cl_page_put(env, clp); - break; - } +#if defined(HAVE_DIO_ITER) + kvfree(pages); +#else + OBD_FREE_LARGE(pages, npages * sizeof(*pages)); +#endif +} - do_io = true; +static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter, + struct page ***pages, ssize_t *npages, + size_t maxsize) +{ +#if defined(HAVE_DIO_ITER) + size_t start; + size_t result; - /* check the page type: if the page is a host page, then do - * write directly - */ - if (clp->cp_type == CPT_CACHEABLE) { - struct page *vmpage = cl_page_vmpage(clp); - struct page *src_page; - struct page *dst_page; - void *src; - void *dst; - - src_page = (rw == WRITE) ? pages[i] : vmpage; - dst_page = (rw == WRITE) ? vmpage : pages[i]; - - src = ll_kmap_atomic(src_page, KM_USER0); - dst = ll_kmap_atomic(dst_page, KM_USER1); - memcpy(dst, src, min(page_size, size)); - ll_kunmap_atomic(dst, KM_USER1); - ll_kunmap_atomic(src, KM_USER0); - - /* make sure page will be added to the transfer by - * cl_io_submit()->...->vvp_page_prep_write(). - */ - if (rw == WRITE) - set_page_dirty(vmpage); - - if (rw == READ) { - /* do not issue the page for read, since it - * may reread a ra page which has NOT uptodate - * bit set. - */ - cl_page_disown(env, io, clp); - do_io = false; - } - } + /* + * iov_iter_get_pages_alloc() is introduced in 3.16 similar + * to HAVE_DIO_ITER. + */ + result = iov_iter_get_pages_alloc(iter, pages, maxsize, &start); + if (result > 0) + *npages = DIV_ROUND_UP(result + start, PAGE_SIZE); - if (likely(do_io)) { - cl_2queue_add(queue, clp); + return result; +#else + unsigned long addr; + size_t page_count; + size_t size; + long result; - /* - * Set page clip to tell transfer formation engine - * that page has to be sent even if it is beyond KMS. - */ - cl_page_clip(env, clp, 0, min(size, page_size)); + if (!maxsize) + return 0; - ++io_pages; - } + if (!iter->nr_segs) + return 0; - /* drop the reference count for cl_page_find */ - cl_page_put(env, clp); - size -= page_size; - file_offset += page_size; + addr = (unsigned long)iter->iov->iov_base + iter->iov_offset; + if (addr & ~PAGE_MASK) + return -EINVAL; + + size = min_t(size_t, maxsize, iter->iov->iov_len); + page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT; + OBD_ALLOC_LARGE(*pages, page_count * sizeof(**pages)); + if (*pages == NULL) + return -ENOMEM; + + down_read(¤t->mm->mmap_sem); + result = get_user_pages(current, current->mm, addr, page_count, + rw == READ, 0, *pages, NULL); + up_read(¤t->mm->mmap_sem); + + if (unlikely(result != page_count)) { + ll_free_user_pages(*pages, page_count); + *pages = NULL; + + if (result >= 0) + return -EFAULT; + + return result; + } + *npages = page_count; + + return size; +#endif +} + +/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */ +#if defined(HAVE_DIO_ITER) +static unsigned long ll_iov_iter_alignment(const struct iov_iter *i) +{ + return iov_iter_alignment(i); +} +#else /* copied from alignment_iovec() */ +static unsigned long ll_iov_iter_alignment(const struct iov_iter *i) +{ + const struct iovec *iov = i->iov; + unsigned long res; + size_t size = i->count; + size_t n; + + if (!size) + return 0; + + res = (unsigned long)iov->iov_base + i->iov_offset; + n = iov->iov_len - i->iov_offset; + if (n >= size) + return res | size; + + size -= n; + res |= n; + while (size > (++iov)->iov_len) { + res |= (unsigned long)iov->iov_base | iov->iov_len; + size -= iov->iov_len; } + res |= (unsigned long)iov->iov_base | size; + + return res; +} +#endif + +#ifndef HAVE_AIO_COMPLETE +static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2) +{ + if (iocb->ki_complete) + iocb->ki_complete(iocb, res, res2); +} +#endif + +/** direct IO pages */ +struct ll_dio_pages { + struct cl_dio_aio *ldp_aio; + /* + * page array to be written. we don't support + * partial pages except the last one. + */ + struct page **ldp_pages; + /** # of pages in the array. */ + size_t ldp_count; + /* the file offset of the first page. */ + loff_t ldp_file_offset; +}; + +static void ll_aio_end(const struct lu_env *env, struct cl_sync_io *anchor) +{ + struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync); + ssize_t ret = anchor->csi_sync_rc; + + ENTRY; + + /* release pages */ + while (aio->cda_pages.pl_nr > 0) { + struct cl_page *page = cl_page_list_first(&aio->cda_pages); - if (rc == 0 && io_pages) { - rc = cl_io_submit_sync(env, io, - rw == READ ? CRT_READ : CRT_WRITE, - queue, 0); + cl_page_get(page); + cl_page_list_del(env, &aio->cda_pages, page); + cl_page_delete(env, page); + cl_page_put(env, page); } - if (rc == 0) - rc = orig_size; - cl_2queue_discard(env, io, queue); - cl_2queue_disown(env, io, queue); - cl_2queue_fini(env, queue); - RETURN(rc); + if (!is_sync_kiocb(aio->cda_iocb)) + aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0); + + EXIT; +} + +static struct cl_dio_aio *ll_aio_alloc(struct kiocb *iocb) +{ + struct cl_dio_aio *aio; + + OBD_ALLOC_PTR(aio); + if (aio != NULL) { + /* + * Hold one ref so that it won't be released until + * every pages is added. + */ + cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ? + NULL : aio, ll_aio_end); + cl_page_list_init(&aio->cda_pages); + aio->cda_iocb = iocb; + } + return aio; } -/* ll_free_user_pages - tear down page struct array - * @pages: array of page struct pointers underlying target buffer */ -static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) +static int +ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, + int rw, struct inode *inode, struct ll_dio_pages *pv) { + struct cl_page *page; + struct cl_2queue *queue = &io->ci_queue; + struct cl_object *obj = io->ci_obj; + struct cl_sync_io *anchor = &pv->ldp_aio->cda_sync; + loff_t offset = pv->ldp_file_offset; + int io_pages = 0; + size_t page_size = cl_page_size(obj); int i; + ssize_t rc = 0; - for (i = 0; i < npages; i++) { - if (pages[i] == NULL) + ENTRY; + + cl_2queue_init(queue); + for (i = 0; i < pv->ldp_count; i++) { + LASSERT(!(offset & (PAGE_SIZE - 1))); + page = cl_page_find(env, obj, cl_index(obj, offset), + pv->ldp_pages[i], CPT_TRANSIENT); + if (IS_ERR(page)) { + rc = PTR_ERR(page); break; - if (do_dirty) - set_page_dirty_lock(pages[i]); - put_page(pages[i]); + } + LASSERT(page->cp_type == CPT_TRANSIENT); + rc = cl_page_own(env, io, page); + if (rc) { + cl_page_put(env, page); + break; + } + + page->cp_sync_io = anchor; + cl_2queue_add(queue, page); + /* + * Set page clip to tell transfer formation engine + * that page has to be sent even if it is beyond KMS. + */ + cl_page_clip(env, page, 0, min(size, page_size)); + ++io_pages; + + /* drop the reference count for cl_page_find */ + cl_page_put(env, page); + offset += page_size; + size -= page_size; + } + if (rc == 0 && io_pages > 0) { + int iot = rw == READ ? CRT_READ : CRT_WRITE; + + atomic_add(io_pages, &anchor->csi_sync_nr); + rc = cl_io_submit_rw(env, io, iot, queue); + if (rc == 0) { + cl_page_list_splice(&queue->c2_qout, + &pv->ldp_aio->cda_pages); + } else { + atomic_add(-queue->c2_qin.pl_nr, + &anchor->csi_sync_nr); + cl_page_list_for_each(page, &queue->c2_qin) + page->cp_sync_io = NULL; + } + /* handle partially submitted reqs */ + if (queue->c2_qin.pl_nr > 0) { + CERROR(DFID " failed to submit %d dio pages: %zd\n", + PFID(lu_object_fid(&obj->co_lu)), + queue->c2_qin.pl_nr, rc); + if (rc == 0) + rc = -EIO; + } } -#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) - kvfree(pages); -#else - OBD_FREE_LARGE(pages, npages * sizeof(*pages)); -#endif + cl_2queue_discard(env, io, queue); + cl_2queue_disown(env, io, queue); + cl_2queue_fini(env, queue); + RETURN(rc); } #ifdef KMALLOC_MAX_SIZE @@ -308,37 +440,23 @@ static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) #define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \ ~(DT_MAX_BRW_SIZE - 1)) -#ifndef HAVE_IOV_ITER_RW -# define iov_iter_rw(iter) rw -#endif - -#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) static ssize_t -ll_direct_IO( -# ifndef HAVE_IOV_ITER_RW - int rw, -# endif - struct kiocb *iocb, struct iov_iter *iter -# ifndef HAVE_DIRECTIO_2ARGS - , loff_t file_offset -# endif - ) +ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) { -#ifdef HAVE_DIRECTIO_2ARGS - loff_t file_offset = iocb->ki_pos; -#endif struct ll_cl_context *lcc; const struct lu_env *env; struct cl_io *io; struct file *file = iocb->ki_filp; struct inode *inode = file->f_mapping->host; - ssize_t count = iov_iter_count(iter); + struct cl_dio_aio *aio; + size_t count = iov_iter_count(iter); ssize_t tot_bytes = 0, result = 0; - size_t size = MAX_DIO_SIZE; + loff_t file_offset = iocb->ki_pos; /* Check EOF by ourselves */ - if (iov_iter_rw(iter) == READ && file_offset >= i_size_read(inode)) + if (rw == READ && file_offset >= i_size_read(inode)) return 0; + /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK)) return -EINVAL; @@ -350,7 +468,7 @@ ll_direct_IO( MAX_DIO_SIZE >> PAGE_SHIFT); /* Check that all user buffers are aligned as well */ - if (iov_iter_alignment(iter) & ~PAGE_MASK) + if (ll_iov_iter_alignment(iter) & ~PAGE_MASK) return -EINVAL; lcc = ll_cl_find(file); @@ -362,19 +480,23 @@ ll_direct_IO( io = lcc->lcc_io; LASSERT(io != NULL); + aio = ll_aio_alloc(iocb); + if (!aio) + RETURN(-ENOMEM); + /* 0. Need locking between buffered and direct access. and race with * size changing by concurrent truncates and writes. * 1. Need inode mutex to operate transient pages. */ - if (iov_iter_rw(iter) == READ) + if (rw == READ) inode_lock(inode); while (iov_iter_count(iter)) { + struct ll_dio_pages pvec = { .ldp_aio = aio }; struct page **pages; - size_t offs; - count = min_t(size_t, iov_iter_count(iter), size); - if (iov_iter_rw(iter) == READ) { + count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); + if (rw == READ) { if (file_offset >= i_size_read(inode)) break; @@ -382,194 +504,91 @@ ll_direct_IO( count = i_size_read(inode) - file_offset; } - result = iov_iter_get_pages_alloc(iter, &pages, count, &offs); - if (likely(result > 0)) { - int n = DIV_ROUND_UP(result + offs, PAGE_SIZE); + result = ll_get_user_pages(rw, iter, &pages, + &pvec.ldp_count, count); + if (unlikely(result <= 0)) + GOTO(out, result); - result = ll_direct_IO_seg(env, io, iov_iter_rw(iter), - inode, result, file_offset, - pages, n); - ll_free_user_pages(pages, n, - iov_iter_rw(iter) == READ); + count = result; + pvec.ldp_file_offset = file_offset; + pvec.ldp_pages = pages; - } - if (unlikely(result <= 0)) { - /* If we can't allocate a large enough buffer - * for the request, shrink it to a smaller - * PAGE_SIZE multiple and try again. - * We should always be able to kmalloc for a - * page worth of page pointers = 4MB on i386. */ - if (result == -ENOMEM && - size > (PAGE_SIZE / sizeof(*pages)) * - PAGE_SIZE) { - size = ((((size / 2) - 1) | - ~PAGE_MASK) + 1) & PAGE_MASK; - CDEBUG(D_VFSTRACE, "DIO size now %zu\n", - size); - continue; - } + result = ll_direct_rw_pages(env, io, count, + rw, inode, &pvec); + ll_free_user_pages(pages, pvec.ldp_count); + if (unlikely(result < 0)) GOTO(out, result); - } - iov_iter_advance(iter, result); - tot_bytes += result; - file_offset += result; + iov_iter_advance(iter, count); + tot_bytes += count; + file_offset += count; } -out: - if (iov_iter_rw(iter) == READ) - inode_unlock(inode); - if (tot_bytes > 0) { - struct vvp_io *vio = vvp_env_io(env); +out: + aio->cda_bytes = tot_bytes; + cl_sync_io_note(env, &aio->cda_sync, result); - /* no commit async for direct IO */ - vio->u.write.vui_written += tot_bytes; - } + if (is_sync_kiocb(iocb)) { + ssize_t rc2; - return tot_bytes ? : result; -} -#else /* !HAVE_DIRECTIO_ITER && !HAVE_IOV_ITER_RW */ + rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0); + if (result == 0 && rc2) + result = rc2; -static inline int ll_get_user_pages(int rw, unsigned long user_addr, - size_t size, struct page ***pages, - int *max_pages) -{ - int result = -ENOMEM; + if (result == 0) { + struct vvp_io *vio = vvp_env_io(env); + /* no commit async for direct IO */ + vio->u.write.vui_written += tot_bytes; + result = tot_bytes; + } + OBD_FREE_PTR(aio); - /* set an arbitrary limit to prevent arithmetic overflow */ - if (size > MAX_DIRECTIO_SIZE) { - *pages = NULL; - return -EFBIG; + } else { + result = -EIOCBQUEUED; } - *max_pages = (user_addr + size + PAGE_SIZE - 1) >> - PAGE_SHIFT; - *max_pages -= user_addr >> PAGE_SHIFT; - - OBD_ALLOC_LARGE(*pages, *max_pages * sizeof(**pages)); - if (*pages) { - down_read(¤t->mm->mmap_sem); - result = get_user_pages(current, current->mm, user_addr, - *max_pages, (rw == READ), 0, *pages, - NULL); - up_read(¤t->mm->mmap_sem); - if (unlikely(result <= 0)) - OBD_FREE_LARGE(*pages, *max_pages * sizeof(**pages)); - } + if (rw == READ) + inode_unlock(inode); return result; } -static ssize_t -ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, - loff_t file_offset, unsigned long nr_segs) +#if defined(HAVE_DIO_ITER) +static ssize_t ll_direct_IO( +#ifndef HAVE_IOV_ITER_RW + int rw, +#endif + struct kiocb *iocb, struct iov_iter *iter +#ifndef HAVE_DIRECTIO_2ARGS + , loff_t file_offset +#endif + ) { - struct ll_cl_context *lcc; - const struct lu_env *env; - struct cl_io *io; - struct file *file = iocb->ki_filp; - struct inode *inode = file->f_mapping->host; - ssize_t count = iov_length(iov, nr_segs); - ssize_t tot_bytes = 0, result = 0; - unsigned long seg = 0; - size_t size = MAX_DIO_SIZE; - ENTRY; - - /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ - if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK)) - RETURN(-EINVAL); + int nrw; - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), " - "offset=%lld=%llx, pages %zd (max %lu)\n", - PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE, - file_offset, file_offset, count >> PAGE_SHIFT, - MAX_DIO_SIZE >> PAGE_SHIFT); - - /* Check that all user buffers are aligned as well */ - for (seg = 0; seg < nr_segs; seg++) { - if (((unsigned long)iov[seg].iov_base & ~PAGE_MASK) || - (iov[seg].iov_len & ~PAGE_MASK)) - RETURN(-EINVAL); - } - - lcc = ll_cl_find(file); - if (lcc == NULL) - RETURN(-EIO); +#ifndef HAVE_IOV_ITER_RW + nrw = rw; +#else + nrw = iov_iter_rw(iter); +#endif - env = lcc->lcc_env; - LASSERT(!IS_ERR(env)); - io = lcc->lcc_io; - LASSERT(io != NULL); + return ll_direct_IO_impl(iocb, iter, nrw); +} - for (seg = 0; seg < nr_segs; seg++) { - size_t iov_left = iov[seg].iov_len; - unsigned long user_addr = (unsigned long)iov[seg].iov_base; - - if (rw == READ) { - if (file_offset >= i_size_read(inode)) - break; - if (file_offset + iov_left > i_size_read(inode)) - iov_left = i_size_read(inode) - file_offset; - } - - while (iov_left > 0) { - struct page **pages; - int page_count, max_pages = 0; - size_t bytes; - - bytes = min(size, iov_left); - page_count = ll_get_user_pages(rw, user_addr, bytes, - &pages, &max_pages); - if (likely(page_count > 0)) { - if (unlikely(page_count < max_pages)) - bytes = page_count << PAGE_SHIFT; - result = ll_direct_IO_seg(env, io, rw, inode, - bytes, file_offset, - pages, page_count); - ll_free_user_pages(pages, max_pages, rw==READ); - } else if (page_count == 0) { - GOTO(out, result = -EFAULT); - } else { - result = page_count; - } - if (unlikely(result <= 0)) { - /* If we can't allocate a large enough buffer - * for the request, shrink it to a smaller - * PAGE_SIZE multiple and try again. - * We should always be able to kmalloc for a - * page worth of page pointers = 4MB on i386. */ - if (result == -ENOMEM && - size > (PAGE_SIZE / sizeof(*pages)) * - PAGE_SIZE) { - size = ((((size / 2) - 1) | - ~PAGE_MASK) + 1) & - PAGE_MASK; - CDEBUG(D_VFSTRACE, "DIO size now %zu\n", - size); - continue; - } - - GOTO(out, result); - } - - tot_bytes += result; - file_offset += result; - iov_left -= result; - user_addr += result; - } - } -out: - if (tot_bytes > 0) { - struct vvp_io *vio = vvp_env_io(env); +#else /* !defined(HAVE_DIO_ITER) */ - /* no commit async for direct IO */ - vio->u.write.vui_written += tot_bytes; - } +static ssize_t +ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, + loff_t file_offset, unsigned long nr_segs) +{ + struct iov_iter iter; - RETURN(tot_bytes ? tot_bytes : result); + iov_iter_init(&iter, iov, nr_segs, iov_length(iov, nr_segs), 0); + return ll_direct_IO_impl(iocb, &iter, rw); } -#endif /* HAVE_DIRECTIO_ITER || HAVE_IOV_ITER_RW */ + +#endif /* !defined(HAVE_DIO_ITER) */ /** * Prepare partially written-to page for a write. diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index ac3965f..007a8b3 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -1262,7 +1262,7 @@ static int vvp_io_write_start(const struct lu_env *env, inode_unlock(inode); written = result; - if (result > 0 || result == -EIOCBQUEUED) + if (result > 0) #ifdef HAVE_GENERIC_WRITE_SYNC_2ARGS result = generic_write_sync(vio->vui_iocb, result); #else @@ -1297,12 +1297,13 @@ static int vvp_io_write_start(const struct lu_env *env, /* rewind ki_pos to where it has successfully committed */ vio->vui_iocb->ki_pos = pos + io->ci_nob - nob; } - if (result > 0) { + if (result > 0 || result == -EIOCBQUEUED) { ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED); if (result < cnt) io->ci_continue = 0; - result = 0; + if (result > 0) + result = 0; } RETURN(result); diff --git a/lustre/llite/vvp_page.c b/lustre/llite/vvp_page.c index 7fb019c..0c9d552 100644 --- a/lustre/llite/vvp_page.c +++ b/lustre/llite/vvp_page.c @@ -89,6 +89,8 @@ static int vvp_page_own(const struct lu_env *env, struct vvp_page *vpg = cl2vvp_page(slice); struct page *vmpage = vpg->vpg_page; + ENTRY; + LASSERT(vmpage != NULL); if (nonblock) { if (!trylock_page(vmpage)) @@ -105,7 +107,7 @@ static int vvp_page_own(const struct lu_env *env, lock_page(vmpage); wait_on_page_writeback(vmpage); - return 0; + RETURN(0); } static void vvp_page_assume(const struct lu_env *env, @@ -134,10 +136,14 @@ static void vvp_page_disown(const struct lu_env *env, { struct page *vmpage = cl2vm_page(slice); + ENTRY; + LASSERT(vmpage != NULL); LASSERT(PageLocked(vmpage)); unlock_page(cl2vm_page(slice)); + + EXIT; } static void vvp_page_discard(const struct lu_env *env, diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 824aa6e..93a2657 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1124,7 +1124,7 @@ EXPORT_SYMBOL(cl_req_attr_set); */ void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr, - cl_sync_io_end_t *end) + struct cl_dio_aio *aio, cl_sync_io_end_t *end) { ENTRY; memset(anchor, 0, sizeof(*anchor)); @@ -1132,6 +1132,7 @@ void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr, atomic_set(&anchor->csi_sync_nr, nr); anchor->csi_sync_rc = 0; anchor->csi_end_io = end; + anchor->csi_aio = aio; EXIT; } EXPORT_SYMBOL(cl_sync_io_init_notify); @@ -1188,6 +1189,8 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, LASSERT(atomic_read(&anchor->csi_sync_nr) > 0); if (atomic_dec_and_lock(&anchor->csi_sync_nr, &anchor->csi_waitq.lock)) { + struct cl_dio_aio *aio = NULL; + cl_sync_io_end_t *end_io = anchor->csi_end_io; /* @@ -1201,9 +1204,17 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, wake_up_all_locked(&anchor->csi_waitq); if (end_io) end_io(env, anchor); + if (anchor->csi_aio) + aio = anchor->csi_aio; + spin_unlock(&anchor->csi_waitq.lock); - /* Can't access anchor any more */ + /** + * If anchor->csi_aio is set, we are responsible for freeing + * memory here rather than when cl_sync_io_wait() completes. + */ + if (aio) + OBD_FREE_PTR(aio); } EXIT; } diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 2481e51..8122fd1 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -1987,9 +1987,6 @@ static int try_to_add_extent_for_io(struct client_obd *cli, if (!can_merge(ext, tmp)) RETURN(0); - - /* remove break for strict check */ - break; } data->erd_max_extents--; diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index faa02a9..5ad991c 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -20592,6 +20592,57 @@ test_398b() { # LU-4198 } run_test 398b "DIO and buffer IO race" +test_398c() { # LU-4198 + which fio || skip_env "no fio installed" + + saved_debug=$($LCTL get_param -n debug) + $LCTL set_param debug=0 + + local size=$(lctl get_param -n osc.$FSNAME-OST0000*.kbytesavail | head -1) + ((size /= 1024)) # by megabytes + ((size /= 2)) # write half of the OST at most + [ $size -gt 40 ] && size=40 #reduce test time anyway + + $LFS setstripe -c 1 $DIR/$tfile + + # it seems like ldiskfs reserves more space than necessary if the + # writing blocks are not mapped, so it extends the file firstly + dd if=/dev/zero of=$DIR/$tfile bs=1M count=$size && sync + cancel_lru_locks osc + + # clear and verify rpc_stats later + $LCTL set_param osc.${FSNAME}-OST0000-osc-ffff*.rpc_stats=clear + + local njobs=4 + echo "writing ${size}M to OST0 by fio with $njobs jobs..." + fio --name=rand-write --rw=randwrite --bs=$PAGE_SIZE --direct=1 \ + --numjobs=$njobs --fallocate=none --ioengine=libaio \ + --iodepth=16 --allow_file_create=0 --size=$((size/njobs))M \ + --filename=$DIR/$tfile + [ $? -eq 0 ] || error "fio write error" + + [ $($LCTL get_param -n \ + ldlm.namespaces.${FSNAME}-OST0000-osc-ffff*.lock_count) -eq 0 ] || + error "Locks were requested while doing AIO" + + # get the percentage of 1-page I/O + pct=$($LCTL get_param osc.${FSNAME}-OST0000-osc-ffff*.rpc_stats | + grep -A 1 'pages per rpc' | grep -v 'pages per rpc' | + awk '{print $7}') + [ $pct -le 50 ] || error "$pct% of I/O are 1-page" + + echo "mix rw ${size}M to OST0 by fio with $njobs jobs..." + fio --name=rand-rw --rw=randrw --bs=$PAGE_SIZE --direct=1 \ + --numjobs=$njobs --fallocate=none --ioengine=libaio \ + --iodepth=16 --allow_file_create=0 --size=$((size/njobs))M \ + --filename=$DIR/$tfile + [ $? -eq 0 ] || error "fio mixed read write error" + + rm -rf $DIR/$tfile + $LCTL set_param debug="$saved_debug" +} +run_test 398c "run fio to test AIO" + test_fake_rw() { local read_write=$1 if [ "$read_write" = "write" ]; then -- 1.8.3.1