X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Frw26.c;h=8ea28c14dab532be8a39e542618ab81759e523f0;hp=ab6bec998f03aaa03fcd18bb9c57fb45e83164c8;hb=d1dded6e28473d889a9b24b47cbc804f90dd2956;hpb=6bce536725efd166d2772f13fe954f271f9c53b8 diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index ab6bec9..8ea28c1 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -168,130 +168,262 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) return result; } -#define MAX_DIRECTIO_SIZE 2*1024*1024*1024UL +#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) || \ + defined(HAVE_DIRECTIO_2ARGS) +#define HAVE_DIO_ITER 1 +#endif -static ssize_t -ll_direct_IO_seg(const struct lu_env *env, struct cl_io *io, int rw, - struct inode *inode, size_t size, loff_t file_offset, - struct page **pages, int page_count) +/* + * ll_free_user_pages - tear down page struct array + * @pages: array of page struct pointers underlying target buffer + */ +static void ll_free_user_pages(struct page **pages, int npages) { - struct cl_page *clp; - struct cl_2queue *queue; - struct cl_object *obj = io->ci_obj; int i; - ssize_t rc = 0; - size_t page_size = cl_page_size(obj); - size_t orig_size = size; - bool do_io; - int io_pages = 0; - ENTRY; - queue = &io->ci_queue; - cl_2queue_init(queue); - for (i = 0; i < page_count; i++) { - LASSERT(!(file_offset & (page_size - 1))); - clp = cl_page_find(env, obj, cl_index(obj, file_offset), - pages[i], CPT_TRANSIENT); - if (IS_ERR(clp)) { - rc = PTR_ERR(clp); + for (i = 0; i < npages; i++) { + if (!pages[i]) break; - } + put_page(pages[i]); + } - rc = cl_page_own(env, io, clp); - if (rc) { - LASSERT(clp->cp_state == CPS_FREEING); - cl_page_put(env, clp); - break; - } +#if defined(HAVE_DIO_ITER) + kvfree(pages); +#else + OBD_FREE_LARGE(pages, npages * sizeof(*pages)); +#endif +} - do_io = true; +static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter, + struct page ***pages, ssize_t *npages, + size_t maxsize) +{ +#if defined(HAVE_DIO_ITER) + size_t start; + size_t result; - /* check the page type: if the page is a host page, then do - * write directly - */ - if (clp->cp_type == CPT_CACHEABLE) { - struct page *vmpage = cl_page_vmpage(clp); - struct page *src_page; - struct page *dst_page; - void *src; - void *dst; - - src_page = (rw == WRITE) ? pages[i] : vmpage; - dst_page = (rw == WRITE) ? vmpage : pages[i]; - - src = ll_kmap_atomic(src_page, KM_USER0); - dst = ll_kmap_atomic(dst_page, KM_USER1); - memcpy(dst, src, min(page_size, size)); - ll_kunmap_atomic(dst, KM_USER1); - ll_kunmap_atomic(src, KM_USER0); - - /* make sure page will be added to the transfer by - * cl_io_submit()->...->vvp_page_prep_write(). - */ - if (rw == WRITE) - set_page_dirty(vmpage); - - if (rw == READ) { - /* do not issue the page for read, since it - * may reread a ra page which has NOT uptodate - * bit set. - */ - cl_page_disown(env, io, clp); - do_io = false; - } - } + /* + * iov_iter_get_pages_alloc() is introduced in 3.16 similar + * to HAVE_DIO_ITER. + */ + result = iov_iter_get_pages_alloc(iter, pages, maxsize, &start); + if (result > 0) + *npages = DIV_ROUND_UP(result + start, PAGE_SIZE); - if (likely(do_io)) { - cl_2queue_add(queue, clp); + return result; +#else + unsigned long addr; + size_t page_count; + size_t size; + long result; - /* - * Set page clip to tell transfer formation engine - * that page has to be sent even if it is beyond KMS. - */ - cl_page_clip(env, clp, 0, min(size, page_size)); + if (!maxsize) + return 0; - ++io_pages; - } + if (!iter->nr_segs) + return 0; - /* drop the reference count for cl_page_find */ - cl_page_put(env, clp); - size -= page_size; - file_offset += page_size; + addr = (unsigned long)iter->iov->iov_base + iter->iov_offset; + if (addr & ~PAGE_MASK) + return -EINVAL; + + size = min_t(size_t, maxsize, iter->iov->iov_len); + page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT; + OBD_ALLOC_LARGE(*pages, page_count * sizeof(**pages)); + if (*pages == NULL) + return -ENOMEM; + + down_read(¤t->mm->mmap_sem); + result = get_user_pages(current, current->mm, addr, page_count, + rw == READ, 0, *pages, NULL); + up_read(¤t->mm->mmap_sem); + + if (unlikely(result != page_count)) { + ll_free_user_pages(*pages, page_count); + *pages = NULL; + + if (result >= 0) + return -EFAULT; + + return result; + } + *npages = page_count; + + return size; +#endif +} + +/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */ +#if defined(HAVE_DIO_ITER) +static unsigned long ll_iov_iter_alignment(const struct iov_iter *i) +{ + return iov_iter_alignment(i); +} +#else /* copied from alignment_iovec() */ +static unsigned long ll_iov_iter_alignment(const struct iov_iter *i) +{ + const struct iovec *iov = i->iov; + unsigned long res; + size_t size = i->count; + size_t n; + + if (!size) + return 0; + + res = (unsigned long)iov->iov_base + i->iov_offset; + n = iov->iov_len - i->iov_offset; + if (n >= size) + return res | size; + + size -= n; + res |= n; + while (size > (++iov)->iov_len) { + res |= (unsigned long)iov->iov_base | iov->iov_len; + size -= iov->iov_len; } + res |= (unsigned long)iov->iov_base | size; + + return res; +} +#endif + +#ifndef HAVE_AIO_COMPLETE +static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2) +{ + if (iocb->ki_complete) + iocb->ki_complete(iocb, res, res2); +} +#endif + +/** direct IO pages */ +struct ll_dio_pages { + struct cl_dio_aio *ldp_aio; + /* + * page array to be written. we don't support + * partial pages except the last one. + */ + struct page **ldp_pages; + /** # of pages in the array. */ + size_t ldp_count; + /* the file offset of the first page. */ + loff_t ldp_file_offset; +}; + +static void ll_aio_end(const struct lu_env *env, struct cl_sync_io *anchor) +{ + struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync); + ssize_t ret = anchor->csi_sync_rc; + + ENTRY; + + /* release pages */ + while (aio->cda_pages.pl_nr > 0) { + struct cl_page *page = cl_page_list_first(&aio->cda_pages); - if (rc == 0 && io_pages) { - rc = cl_io_submit_sync(env, io, - rw == READ ? CRT_READ : CRT_WRITE, - queue, 0); + cl_page_get(page); + cl_page_list_del(env, &aio->cda_pages, page); + cl_page_delete(env, page); + cl_page_put(env, page); } - if (rc == 0) - rc = orig_size; - cl_2queue_discard(env, io, queue); - cl_2queue_disown(env, io, queue); - cl_2queue_fini(env, queue); - RETURN(rc); + if (!is_sync_kiocb(aio->cda_iocb)) + aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0); + + EXIT; +} + +static struct cl_dio_aio *ll_aio_alloc(struct kiocb *iocb) +{ + struct cl_dio_aio *aio; + + OBD_ALLOC_PTR(aio); + if (aio != NULL) { + /* + * Hold one ref so that it won't be released until + * every pages is added. + */ + cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ? + NULL : aio, ll_aio_end); + cl_page_list_init(&aio->cda_pages); + aio->cda_iocb = iocb; + } + return aio; } -/* ll_free_user_pages - tear down page struct array - * @pages: array of page struct pointers underlying target buffer */ -static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) +static int +ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, + int rw, struct inode *inode, struct ll_dio_pages *pv) { + struct cl_page *page; + struct cl_2queue *queue = &io->ci_queue; + struct cl_object *obj = io->ci_obj; + struct cl_sync_io *anchor = &pv->ldp_aio->cda_sync; + loff_t offset = pv->ldp_file_offset; + int io_pages = 0; + size_t page_size = cl_page_size(obj); int i; + ssize_t rc = 0; - for (i = 0; i < npages; i++) { - if (pages[i] == NULL) + ENTRY; + + cl_2queue_init(queue); + for (i = 0; i < pv->ldp_count; i++) { + LASSERT(!(offset & (PAGE_SIZE - 1))); + page = cl_page_find(env, obj, cl_index(obj, offset), + pv->ldp_pages[i], CPT_TRANSIENT); + if (IS_ERR(page)) { + rc = PTR_ERR(page); break; - if (do_dirty) - set_page_dirty_lock(pages[i]); - put_page(pages[i]); + } + LASSERT(page->cp_type == CPT_TRANSIENT); + rc = cl_page_own(env, io, page); + if (rc) { + cl_page_put(env, page); + break; + } + + page->cp_sync_io = anchor; + cl_2queue_add(queue, page); + /* + * Set page clip to tell transfer formation engine + * that page has to be sent even if it is beyond KMS. + */ + cl_page_clip(env, page, 0, min(size, page_size)); + ++io_pages; + + /* drop the reference count for cl_page_find */ + cl_page_put(env, page); + offset += page_size; + size -= page_size; + } + if (rc == 0 && io_pages > 0) { + int iot = rw == READ ? CRT_READ : CRT_WRITE; + + atomic_add(io_pages, &anchor->csi_sync_nr); + rc = cl_io_submit_rw(env, io, iot, queue); + if (rc == 0) { + cl_page_list_splice(&queue->c2_qout, + &pv->ldp_aio->cda_pages); + } else { + atomic_add(-queue->c2_qin.pl_nr, + &anchor->csi_sync_nr); + cl_page_list_for_each(page, &queue->c2_qin) + page->cp_sync_io = NULL; + } + /* handle partially submitted reqs */ + if (queue->c2_qin.pl_nr > 0) { + CERROR(DFID " failed to submit %d dio pages: %zd\n", + PFID(lu_object_fid(&obj->co_lu)), + queue->c2_qin.pl_nr, rc); + if (rc == 0) + rc = -EIO; + } } -#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) - kvfree(pages); -#else - OBD_FREE_LARGE(pages, npages * sizeof(*pages)); -#endif + cl_2queue_discard(env, io, queue); + cl_2queue_disown(env, io, queue); + cl_2queue_fini(env, queue); + RETURN(rc); } #ifdef KMALLOC_MAX_SIZE @@ -308,37 +440,23 @@ static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) #define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \ ~(DT_MAX_BRW_SIZE - 1)) -#ifndef HAVE_IOV_ITER_RW -# define iov_iter_rw(iter) rw -#endif - -#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) static ssize_t -ll_direct_IO( -# ifndef HAVE_IOV_ITER_RW - int rw, -# endif - struct kiocb *iocb, struct iov_iter *iter -# ifndef HAVE_DIRECTIO_2ARGS - , loff_t file_offset -# endif - ) +ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) { -#ifdef HAVE_DIRECTIO_2ARGS - loff_t file_offset = iocb->ki_pos; -#endif struct ll_cl_context *lcc; const struct lu_env *env; struct cl_io *io; struct file *file = iocb->ki_filp; struct inode *inode = file->f_mapping->host; - ssize_t count = iov_iter_count(iter); + struct cl_dio_aio *aio; + size_t count = iov_iter_count(iter); ssize_t tot_bytes = 0, result = 0; - size_t size = MAX_DIO_SIZE; + loff_t file_offset = iocb->ki_pos; /* Check EOF by ourselves */ - if (iov_iter_rw(iter) == READ && file_offset >= i_size_read(inode)) + if (rw == READ && file_offset >= i_size_read(inode)) return 0; + /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK)) return -EINVAL; @@ -350,7 +468,7 @@ ll_direct_IO( MAX_DIO_SIZE >> PAGE_SHIFT); /* Check that all user buffers are aligned as well */ - if (iov_iter_alignment(iter) & ~PAGE_MASK) + if (ll_iov_iter_alignment(iter) & ~PAGE_MASK) return -EINVAL; lcc = ll_cl_find(file); @@ -362,19 +480,23 @@ ll_direct_IO( io = lcc->lcc_io; LASSERT(io != NULL); + aio = ll_aio_alloc(iocb); + if (!aio) + RETURN(-ENOMEM); + /* 0. Need locking between buffered and direct access. and race with * size changing by concurrent truncates and writes. * 1. Need inode mutex to operate transient pages. */ - if (iov_iter_rw(iter) == READ) + if (rw == READ) inode_lock(inode); while (iov_iter_count(iter)) { + struct ll_dio_pages pvec = { .ldp_aio = aio }; struct page **pages; - size_t offs; - count = min_t(size_t, iov_iter_count(iter), size); - if (iov_iter_rw(iter) == READ) { + count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); + if (rw == READ) { if (file_offset >= i_size_read(inode)) break; @@ -382,194 +504,91 @@ ll_direct_IO( count = i_size_read(inode) - file_offset; } - result = iov_iter_get_pages_alloc(iter, &pages, count, &offs); - if (likely(result > 0)) { - int n = DIV_ROUND_UP(result + offs, PAGE_SIZE); + result = ll_get_user_pages(rw, iter, &pages, + &pvec.ldp_count, count); + if (unlikely(result <= 0)) + GOTO(out, result); - result = ll_direct_IO_seg(env, io, iov_iter_rw(iter), - inode, result, file_offset, - pages, n); - ll_free_user_pages(pages, n, - iov_iter_rw(iter) == READ); + count = result; + pvec.ldp_file_offset = file_offset; + pvec.ldp_pages = pages; - } - if (unlikely(result <= 0)) { - /* If we can't allocate a large enough buffer - * for the request, shrink it to a smaller - * PAGE_SIZE multiple and try again. - * We should always be able to kmalloc for a - * page worth of page pointers = 4MB on i386. */ - if (result == -ENOMEM && - size > (PAGE_SIZE / sizeof(*pages)) * - PAGE_SIZE) { - size = ((((size / 2) - 1) | - ~PAGE_MASK) + 1) & PAGE_MASK; - CDEBUG(D_VFSTRACE, "DIO size now %zu\n", - size); - continue; - } + result = ll_direct_rw_pages(env, io, count, + rw, inode, &pvec); + ll_free_user_pages(pages, pvec.ldp_count); + if (unlikely(result < 0)) GOTO(out, result); - } - iov_iter_advance(iter, result); - tot_bytes += result; - file_offset += result; + iov_iter_advance(iter, count); + tot_bytes += count; + file_offset += count; } -out: - if (iov_iter_rw(iter) == READ) - inode_unlock(inode); - if (tot_bytes > 0) { - struct vvp_io *vio = vvp_env_io(env); +out: + aio->cda_bytes = tot_bytes; + cl_sync_io_note(env, &aio->cda_sync, result); - /* no commit async for direct IO */ - vio->u.write.vui_written += tot_bytes; - } + if (is_sync_kiocb(iocb)) { + ssize_t rc2; - return tot_bytes ? : result; -} -#else /* !HAVE_DIRECTIO_ITER && !HAVE_IOV_ITER_RW */ + rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0); + if (result == 0 && rc2) + result = rc2; -static inline int ll_get_user_pages(int rw, unsigned long user_addr, - size_t size, struct page ***pages, - int *max_pages) -{ - int result = -ENOMEM; + if (result == 0) { + struct vvp_io *vio = vvp_env_io(env); + /* no commit async for direct IO */ + vio->u.write.vui_written += tot_bytes; + result = tot_bytes; + } + OBD_FREE_PTR(aio); - /* set an arbitrary limit to prevent arithmetic overflow */ - if (size > MAX_DIRECTIO_SIZE) { - *pages = NULL; - return -EFBIG; + } else { + result = -EIOCBQUEUED; } - *max_pages = (user_addr + size + PAGE_SIZE - 1) >> - PAGE_SHIFT; - *max_pages -= user_addr >> PAGE_SHIFT; - - OBD_ALLOC_LARGE(*pages, *max_pages * sizeof(**pages)); - if (*pages) { - down_read(¤t->mm->mmap_sem); - result = get_user_pages(current, current->mm, user_addr, - *max_pages, (rw == READ), 0, *pages, - NULL); - up_read(¤t->mm->mmap_sem); - if (unlikely(result <= 0)) - OBD_FREE_LARGE(*pages, *max_pages * sizeof(**pages)); - } + if (rw == READ) + inode_unlock(inode); return result; } -static ssize_t -ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, - loff_t file_offset, unsigned long nr_segs) +#if defined(HAVE_DIO_ITER) +static ssize_t ll_direct_IO( +#ifndef HAVE_IOV_ITER_RW + int rw, +#endif + struct kiocb *iocb, struct iov_iter *iter +#ifndef HAVE_DIRECTIO_2ARGS + , loff_t file_offset +#endif + ) { - struct ll_cl_context *lcc; - const struct lu_env *env; - struct cl_io *io; - struct file *file = iocb->ki_filp; - struct inode *inode = file->f_mapping->host; - ssize_t count = iov_length(iov, nr_segs); - ssize_t tot_bytes = 0, result = 0; - unsigned long seg = 0; - size_t size = MAX_DIO_SIZE; - ENTRY; - - /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */ - if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK)) - RETURN(-EINVAL); + int nrw; - CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), " - "offset=%lld=%llx, pages %zd (max %lu)\n", - PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE, - file_offset, file_offset, count >> PAGE_SHIFT, - MAX_DIO_SIZE >> PAGE_SHIFT); - - /* Check that all user buffers are aligned as well */ - for (seg = 0; seg < nr_segs; seg++) { - if (((unsigned long)iov[seg].iov_base & ~PAGE_MASK) || - (iov[seg].iov_len & ~PAGE_MASK)) - RETURN(-EINVAL); - } - - lcc = ll_cl_find(file); - if (lcc == NULL) - RETURN(-EIO); +#ifndef HAVE_IOV_ITER_RW + nrw = rw; +#else + nrw = iov_iter_rw(iter); +#endif - env = lcc->lcc_env; - LASSERT(!IS_ERR(env)); - io = lcc->lcc_io; - LASSERT(io != NULL); + return ll_direct_IO_impl(iocb, iter, nrw); +} - for (seg = 0; seg < nr_segs; seg++) { - size_t iov_left = iov[seg].iov_len; - unsigned long user_addr = (unsigned long)iov[seg].iov_base; - - if (rw == READ) { - if (file_offset >= i_size_read(inode)) - break; - if (file_offset + iov_left > i_size_read(inode)) - iov_left = i_size_read(inode) - file_offset; - } - - while (iov_left > 0) { - struct page **pages; - int page_count, max_pages = 0; - size_t bytes; - - bytes = min(size, iov_left); - page_count = ll_get_user_pages(rw, user_addr, bytes, - &pages, &max_pages); - if (likely(page_count > 0)) { - if (unlikely(page_count < max_pages)) - bytes = page_count << PAGE_SHIFT; - result = ll_direct_IO_seg(env, io, rw, inode, - bytes, file_offset, - pages, page_count); - ll_free_user_pages(pages, max_pages, rw==READ); - } else if (page_count == 0) { - GOTO(out, result = -EFAULT); - } else { - result = page_count; - } - if (unlikely(result <= 0)) { - /* If we can't allocate a large enough buffer - * for the request, shrink it to a smaller - * PAGE_SIZE multiple and try again. - * We should always be able to kmalloc for a - * page worth of page pointers = 4MB on i386. */ - if (result == -ENOMEM && - size > (PAGE_SIZE / sizeof(*pages)) * - PAGE_SIZE) { - size = ((((size / 2) - 1) | - ~PAGE_MASK) + 1) & - PAGE_MASK; - CDEBUG(D_VFSTRACE, "DIO size now %zu\n", - size); - continue; - } - - GOTO(out, result); - } - - tot_bytes += result; - file_offset += result; - iov_left -= result; - user_addr += result; - } - } -out: - if (tot_bytes > 0) { - struct vvp_io *vio = vvp_env_io(env); +#else /* !defined(HAVE_DIO_ITER) */ - /* no commit async for direct IO */ - vio->u.write.vui_written += tot_bytes; - } +static ssize_t +ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, + loff_t file_offset, unsigned long nr_segs) +{ + struct iov_iter iter; - RETURN(tot_bytes ? tot_bytes : result); + iov_iter_init(&iter, iov, nr_segs, iov_length(iov, nr_segs), 0); + return ll_direct_IO_impl(iocb, &iter, rw); } -#endif /* HAVE_DIRECTIO_ITER || HAVE_IOV_ITER_RW */ + +#endif /* !defined(HAVE_DIO_ITER) */ /** * Prepare partially written-to page for a write.