enum cl_io_state ci_state;
/** main object this io is against. Immutable after creation. */
struct cl_object *ci_obj;
- /** one AIO request might be split in cl_io_loop */
- struct cl_dio_aio *ci_aio;
+ /** top level dio_aio */
+ struct cl_dio_aio *ci_dio_aio;
/**
* Upper layer io, of which this io is a part of. Immutable after
* creation.
struct cl_sync_io;
struct cl_dio_aio;
+struct cl_sub_dio;
typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *);
-void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
- struct cl_dio_aio *aio, cl_sync_io_end_t *end);
+void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr, void *dio_aio,
+ cl_sync_io_end_t *end);
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout);
int ioret);
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret);
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
- struct cl_dio_aio *ll_aio);
-void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
+struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ bool is_aio);
+struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, bool nofree);
+void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio,
+ bool always_free);
+void cl_sub_dio_free(struct cl_sub_dio *sdio, bool nofree);
static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
{
cl_sync_io_init_notify(anchor, nr, NULL, NULL);
wait_queue_head_t csi_waitq;
/** callback to invoke when this IO is finished */
cl_sync_io_end_t *csi_end_io;
- /** aio private data */
- struct cl_dio_aio *csi_aio;
+ /* private pointer for an associated DIO/AIO */
+ void *csi_dio_aio;
};
/** direct IO pages */
loff_t ldp_file_offset;
};
-/** To support Direct AIO */
+/* Top level struct used for AIO and DIO */
struct cl_dio_aio {
struct cl_sync_io cda_sync;
- struct cl_page_list cda_pages;
struct cl_object *cda_obj;
struct kiocb *cda_iocb;
ssize_t cda_bytes;
- struct cl_dio_aio *cda_ll_aio;
- struct ll_dio_pages cda_dio_pages;
unsigned cda_no_aio_complete:1,
- cda_no_aio_free:1;
+ cda_no_sub_free:1;
};
+/* Sub-dio used for splitting DIO (and AIO, because AIO is DIO) according to
+ * the layout/striping, so we can do parallel submit of DIO RPCs
+ */
+struct cl_sub_dio {
+ struct cl_sync_io csd_sync;
+ struct cl_page_list csd_pages;
+ ssize_t csd_bytes;
+ struct cl_dio_aio *csd_ll_aio;
+ struct ll_dio_pages csd_dio_pages;
+ unsigned csd_no_free:1;
+};
#if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) || \
defined(HAVE_DIRECTIO_2ARGS)
#define HAVE_DIO_ITER 1
unsigned int retried = 0, dio_lock = 0;
bool is_aio = false;
bool is_parallel_dio = false;
- struct cl_dio_aio *ci_aio = NULL;
+ struct cl_dio_aio *ci_dio_aio = NULL;
size_t per_bytes;
bool partial_io = false;
size_t max_io_pages, max_cached_pages;
if (!ll_sbi_has_parallel_dio(sbi))
is_parallel_dio = false;
- ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
- ll_i2info(inode)->lli_clob, NULL);
- if (!ci_aio)
+ ci_dio_aio = cl_dio_aio_alloc(args->u.normal.via_iocb,
+ ll_i2info(inode)->lli_clob, is_aio);
+ if (!ci_dio_aio)
GOTO(out, rc = -ENOMEM);
}
partial_io = per_bytes < count;
io = vvp_env_thread_io(env);
ll_io_init(io, file, iot, args);
- io->ci_aio = ci_aio;
+ io->ci_dio_aio = ci_dio_aio;
io->ci_dio_lock = dio_lock;
io->ci_ndelay_tried = retried;
io->ci_parallel_dio = is_parallel_dio;
rc = io->ci_result;
}
- /* N/B: parallel DIO may be disabled during i/o submission;
- * if that occurs, async RPCs are resolved before we get here, and this
- * wait call completes immediately.
- */
if (is_parallel_dio) {
- struct cl_sync_io *anchor = &io->ci_aio->cda_sync;
+ struct cl_sync_io *anchor = &io->ci_dio_aio->cda_sync;
/* for dio, EIOCBQUEUED is an implementation detail,
* and we don't return it to userspace
if (rc == -EIOCBQUEUED)
rc = 0;
+ /* N/B: parallel DIO may be disabled during i/o submission;
+ * if that occurs, I/O shifts to sync, so it's all resolved
+ * before we get here, and this wait call completes
+ * immediately.
+ */
rc2 = cl_sync_io_wait_recycle(env, anchor, 0, 0);
if (rc2 < 0)
rc = rc2;
goto restart;
}
- if (io->ci_aio) {
+ if (io->ci_dio_aio) {
/*
* VFS will call aio_complete() if no -EIOCBQUEUED
* is returned for AIO, so we can not call aio_complete()
* in our end_io().
+ *
+ * NB: This is safe because the atomic_dec_and_lock in
+ * cl_sync_io_init has implicit memory barriers, so this will
+ * be seen by whichever thread completes the DIO/AIO, even if
+ * it's not this one
*/
if (rc != -EIOCBQUEUED)
- io->ci_aio->cda_no_aio_complete = 1;
+ io->ci_dio_aio->cda_no_aio_complete = 1;
/**
* Drop one extra reference so that end_io() could be
* called for this IO context, we could call it after
* we make sure all AIO requests have been proceed.
*/
- cl_sync_io_note(env, &io->ci_aio->cda_sync,
+ cl_sync_io_note(env, &io->ci_dio_aio->cda_sync,
rc == -EIOCBQUEUED ? 0 : rc);
if (!is_aio) {
- cl_aio_free(env, io->ci_aio);
- io->ci_aio = NULL;
+ cl_dio_aio_free(env, io->ci_dio_aio, true);
+ io->ci_dio_aio = NULL;
}
}
static int
ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
- int rw, struct inode *inode, struct cl_dio_aio *aio)
+ int rw, struct inode *inode, struct cl_sub_dio *sdio)
{
- struct ll_dio_pages *pv = &aio->cda_dio_pages;
+ struct ll_dio_pages *pv = &sdio->csd_dio_pages;
struct cl_page *page;
struct cl_2queue *queue = &io->ci_queue;
struct cl_object *obj = io->ci_obj;
- struct cl_sync_io *anchor = &aio->cda_sync;
+ struct cl_sync_io *anchor = &sdio->csd_sync;
loff_t offset = pv->ldp_file_offset;
int io_pages = 0;
size_t page_size = cl_page_size(obj);
smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
- cl_page_list_splice(&queue->c2_qout, &aio->cda_pages);
+ cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
} else {
atomic_add(-queue->c2_qin.pl_nr,
&anchor->csi_sync_nr);
struct cl_io *io;
struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
- struct cl_dio_aio *ll_aio;
- struct cl_dio_aio *ldp_aio;
+ struct cl_dio_aio *ll_dio_aio;
+ struct cl_sub_dio *ldp_aio;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
+ bool sync_submit = false;
struct vvp_io *vio;
+ ssize_t rc2;
/* Check EOF by ourselves */
if (rw == READ && file_offset >= i_size_read(inode))
io = lcc->lcc_io;
LASSERT(io != NULL);
- ll_aio = io->ci_aio;
- LASSERT(ll_aio);
- LASSERT(ll_aio->cda_iocb == iocb);
+ ll_dio_aio = io->ci_dio_aio;
+ LASSERT(ll_dio_aio);
+ LASSERT(ll_dio_aio->cda_iocb == iocb);
+
+ /* We cannot do parallel submission of sub-I/Os - for AIO or regular
+ * DIO - unless lockless because it causes us to release the lock
+ * early.
+ *
+ * There are also several circumstances in which we must disable
+ * parallel DIO, so we check if it is enabled.
+ *
+ * The check for "is_sync_kiocb" excludes AIO, which does not need to
+ * be disabled in these situations.
+ */
+ if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio))
+ sync_submit = true;
while (iov_iter_count(iter)) {
struct ll_dio_pages *pvec;
count = i_size_read(inode) - file_offset;
}
- /* this aio is freed on completion from cl_sync_io_note, so we
- * do not need to directly free the memory here
+ /* if we are doing sync_submit, then we free this below,
+ * otherwise it is freed on the final call to cl_sync_io_note
+ * (either in this function or from a ptlrpcd daemon)
*/
- ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+ ldp_aio = cl_sub_dio_alloc(ll_dio_aio, sync_submit);
if (!ldp_aio)
GOTO(out, result = -ENOMEM);
- pvec = &ldp_aio->cda_dio_pages;
+ pvec = &ldp_aio->csd_dio_pages;
result = ll_get_user_pages(rw, iter, &pages,
&pvec->ldp_count, count);
if (unlikely(result <= 0)) {
- cl_sync_io_note(env, &ldp_aio->cda_sync, result);
+ cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+ if (sync_submit)
+ cl_sub_dio_free(ldp_aio, true);
GOTO(out, result);
}
/* We've submitted pages and can now remove the extra
* reference for that
*/
- cl_sync_io_note(env, &ldp_aio->cda_sync, result);
-
+ cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+
+ if (sync_submit) {
+ rc2 = cl_sync_io_wait(env, &ldp_aio->csd_sync,
+ 0);
+ if (result == 0 && rc2)
+ result = rc2;
+ cl_sub_dio_free(ldp_aio, true);
+ }
if (unlikely(result < 0))
GOTO(out, result);
}
out:
- ll_aio->cda_bytes += tot_bytes;
+ ll_dio_aio->cda_bytes += tot_bytes;
if (rw == WRITE)
vio->u.readwrite.vui_written += tot_bytes;
else
vio->u.readwrite.vui_read += tot_bytes;
- /* We cannot do async submission - for AIO or regular DIO - unless
- * lockless because it causes us to release the lock early.
- *
- * There are also several circumstances in which we must disable
- * parallel DIO, so we check if it is enabled.
- *
- * The check for "is_sync_kiocb" excludes AIO, which does not need to
- * be disabled in these situations.
+ /* AIO is not supported on pipes, so we cannot return EIOCBQEUED like
+ * we normally would for both DIO and AIO here
*/
- if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
- ssize_t rc2;
-
- /* Wait here rather than doing async submission */
- rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
- if (result == 0 && rc2)
- result = rc2;
-
- if (result == 0)
- result = tot_bytes;
- } else if (result == 0) {
+ if (result == 0 && !iov_iter_is_pipe(iter))
result = -EIOCBQUEUED;
- }
return result;
}
};
extern struct kmem_cache *cl_dio_aio_kmem;
+extern struct kmem_cache *cl_sub_dio_kmem;
extern struct kmem_cache *cl_page_kmem_array[16];
extern unsigned short cl_page_kmem_size_array[16];
*/
void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
- struct cl_dio_aio *aio, cl_sync_io_end_t *end)
+ void *dio_aio, cl_sync_io_end_t *end)
{
ENTRY;
memset(anchor, 0, sizeof(*anchor));
atomic_set(&anchor->csi_sync_nr, nr);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
- anchor->csi_aio = aio;
+ anchor->csi_dio_aio = dio_aio;
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_init_notify);
}
#endif
-static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
{
struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
ssize_t ret = anchor->csi_sync_rc;
ENTRY;
+ if (!aio->cda_no_aio_complete)
+ aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
+
+ EXIT;
+}
+
+static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+ struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
+ ssize_t ret = anchor->csi_sync_rc;
+
+ ENTRY;
+
/* release pages */
- while (aio->cda_pages.pl_nr > 0) {
- struct cl_page *page = cl_page_list_first(&aio->cda_pages);
+ while (sdio->csd_pages.pl_nr > 0) {
+ struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
cl_page_delete(env, page);
- cl_page_list_del(env, &aio->cda_pages, page);
+ cl_page_list_del(env, &sdio->csd_pages, page);
}
- if (!aio->cda_no_aio_complete)
- aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
-
- if (aio->cda_ll_aio) {
- ll_release_user_pages(aio->cda_dio_pages.ldp_pages,
- aio->cda_dio_pages.ldp_count);
- cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
- }
+ ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
+ sdio->csd_dio_pages.ldp_count);
+ cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
EXIT;
}
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
- struct cl_dio_aio *ll_aio)
+struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ bool is_aio)
{
struct cl_dio_aio *aio;
* Hold one ref so that it won't be released until
* every pages is added.
*/
- cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
- cl_page_list_init(&aio->cda_pages);
+ cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
aio->cda_iocb = iocb;
- if (is_sync_kiocb(iocb) || ll_aio)
- aio->cda_no_aio_complete = 1;
- else
- aio->cda_no_aio_complete = 0;
- /* in the case of a lower level aio struct (ll_aio is set), or
- * true AIO (!is_sync_kiocb()), the memory is freed by
- * the daemons calling cl_sync_io_note, because they are the
- * last users of the aio struct
+ aio->cda_no_aio_complete = !is_aio;
+ /* if this is true AIO, the memory is freed by the last call
+ * to cl_sync_io_note (when all the I/O is complete), because
+ * no one is waiting (in the kernel) for this to complete
*
* in other cases, the last user is cl_sync_io_wait, and in
- * that case, the caller frees the aio struct after that call
- * completes
+ * that case, the caller frees the struct after that call
*/
- if (ll_aio || !is_sync_kiocb(iocb))
- aio->cda_no_aio_free = 0;
- else
- aio->cda_no_aio_free = 1;
+ aio->cda_no_sub_free = !is_aio;
cl_object_get(obj);
aio->cda_obj = obj;
- aio->cda_ll_aio = ll_aio;
-
- if (ll_aio)
- atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
}
return aio;
}
-EXPORT_SYMBOL(cl_aio_alloc);
+EXPORT_SYMBOL(cl_dio_aio_alloc);
-void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
+struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, bool nofree)
{
- if (aio) {
+ struct cl_sub_dio *sdio;
+
+ OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
+ if (sdio != NULL) {
+ /*
+ * Hold one ref so that it won't be released until
+ * every pages is added.
+ */
+ cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
+ cl_sub_dio_end);
+ cl_page_list_init(&sdio->csd_pages);
+
+ sdio->csd_ll_aio = ll_aio;
+ atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
+ sdio->csd_no_free = nofree;
+ }
+ return sdio;
+}
+EXPORT_SYMBOL(cl_sub_dio_alloc);
+
+void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio,
+ bool always_free)
+{
+ if (aio && (!aio->cda_no_sub_free || always_free)) {
cl_object_put(env, aio->cda_obj);
OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
}
}
-EXPORT_SYMBOL(cl_aio_free);
+EXPORT_SYMBOL(cl_dio_aio_free);
+void cl_sub_dio_free(struct cl_sub_dio *sdio, bool always_free)
+{
+ if (sdio && (!sdio->csd_no_free || always_free))
+ OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
+}
+EXPORT_SYMBOL(cl_sub_dio_free);
/*
* ll_release_user_pages - tear down page struct array
* @pages: array of page struct pointers underlying target buffer
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_lock(&anchor->csi_sync_nr,
&anchor->csi_waitq.lock)) {
- struct cl_dio_aio *aio = NULL;
+ void *dio_aio = NULL;
cl_sync_io_end_t *end_io = anchor->csi_end_io;
if (end_io)
end_io(env, anchor);
- aio = anchor->csi_aio;
+ dio_aio = anchor->csi_dio_aio;
spin_unlock(&anchor->csi_waitq.lock);
- if (aio && !aio->cda_no_aio_free)
- cl_aio_free(env, aio);
+ if (dio_aio) {
+ if (end_io == cl_dio_aio_end)
+ cl_dio_aio_free(env,
+ (struct cl_dio_aio *) dio_aio,
+ false);
+ else if (end_io == cl_sub_dio_end)
+ cl_sub_dio_free((struct cl_sub_dio *) dio_aio,
+ false);
+ }
}
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);
-
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret)
{
- bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
int rc = 0;
- /* for true AIO, the daemons running cl_sync_io_note would normally
- * free the aio struct, but if we're waiting on it, we need them to not
- * do that. This ensures the aio is not freed when we drop the
- * reference count to zero in cl_sync_io_note below
- */
- anchor->csi_aio->cda_no_aio_free = 1;
/*
* @anchor was inited as 1 to prevent end_io to be
* called before we add all pages for IO, so drop
*/
atomic_add(1, &anchor->csi_sync_nr);
- anchor->csi_aio->cda_no_aio_free = no_aio_free;
-
return rc;
}
EXPORT_SYMBOL(cl_sync_io_wait_recycle);
static struct kmem_cache *cl_env_kmem;
struct kmem_cache *cl_dio_aio_kmem;
+struct kmem_cache *cl_sub_dio_kmem;
struct kmem_cache *cl_page_kmem_array[16];
unsigned short cl_page_kmem_size_array[16];
.ckd_size = sizeof(struct cl_dio_aio)
},
{
+ .ckd_cache = &cl_sub_dio_kmem,
+ .ckd_name = "cl_sub_dio_kmem",
+ .ckd_size = sizeof(struct cl_sub_dio)
+ },
+ {
.ckd_cache = NULL
}
};
test_42d() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
+ local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+ stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+ $LCTL set_param debug=+cache
+
trunc_test 42d 0
[ $BEFOREWRITES -eq $AFTERWRITES ] ||
error "beforewrites $BEFOREWRITES != afterwrites $AFTERWRITES on truncate"
[ $OST1_VERSION -ge $(version_code 2.11.56) ] ||
skip "Need OSS version at least 2.11.56"
+ local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+ stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+ $LCTL set_param debug=+cache
+
# Remount client to reset grant
remount_client $MOUNT || error "failed to remount client"
local osc_tgt="$FSNAME-OST0000-osc-$($LFS getname -i $DIR)"
test_64f() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
+ local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+ stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+ $LCTL set_param debug=+cache
+
# Remount client to reset grant
remount_client $MOUNT || error "failed to remount client"
local osc_tgt="$FSNAME-OST0000-osc-$($LFS getname -i $DIR)"