This patch creates a lower level aio struct for each set of
pages submitted, and attaches that to the llite level aio.
That means the completion of i/o (in the sense of
successful RPC/page completion) is associated with the
lower level aio struct, and the higher level aio waits for
the completion of these lower level structs. Previously,
all pages were associated with the upper level (and only)
aio struct.
This patch is a reorganization/cleanup, which is necessary
for the next patch, which moves release pages to aio_end.
The justification for this (correctness and performance)
will be provided in that patch.
Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: I02d6a33a0d9f9bbc1a182bcd539bd836c240bcc5
Reviewed-on: https://review.whamcloud.com/44209
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Yingjin Qian <qian@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
int ioret);
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret);
int ioret);
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret);
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj);
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ struct cl_dio_aio *ll_aio);
void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
{
void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
{
struct cl_object *cda_obj;
struct kiocb *cda_iocb;
ssize_t cda_bytes;
struct cl_object *cda_obj;
struct kiocb *cda_iocb;
ssize_t cda_bytes;
- unsigned cda_no_aio_complete:1;
+ struct cl_dio_aio *cda_ll_aio;
+ unsigned cda_no_aio_complete:1,
+ cda_no_aio_free:1;
is_parallel_dio = false;
ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
is_parallel_dio = false;
ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
- ll_i2info(inode)->lli_clob);
+ ll_i2info(inode)->lli_clob, NULL);
if (!ci_aio)
GOTO(out, rc = -ENOMEM);
}
if (!ci_aio)
GOTO(out, rc = -ENOMEM);
}
struct cl_io *io;
struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
struct cl_io *io;
struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
- struct cl_dio_aio *aio;
+ struct cl_dio_aio *ll_aio;
+ struct cl_dio_aio *ldp_aio;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
io = lcc->lcc_io;
LASSERT(io != NULL);
io = lcc->lcc_io;
LASSERT(io != NULL);
- aio = io->ci_aio;
- LASSERT(aio);
- LASSERT(aio->cda_iocb == iocb);
+ ll_aio = io->ci_aio;
+ LASSERT(ll_aio);
+ LASSERT(ll_aio->cda_iocb == iocb);
while (iov_iter_count(iter)) {
while (iov_iter_count(iter)) {
- struct ll_dio_pages pvec = { .ldp_aio = aio };
+ struct ll_dio_pages pvec = {};
struct page **pages;
count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
struct page **pages;
count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
count = i_size_read(inode) - file_offset;
}
count = i_size_read(inode) - file_offset;
}
+ /* this aio is freed on completion from cl_sync_io_note, so we
+ * do not need to directly free the memory here
+ */
+ ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+ if (!ldp_aio)
+ GOTO(out, result = -ENOMEM);
+ pvec.ldp_aio = ldp_aio;
+
result = ll_get_user_pages(rw, iter, &pages,
&pvec.ldp_count, count);
result = ll_get_user_pages(rw, iter, &pages,
&pvec.ldp_count, count);
- if (unlikely(result <= 0))
+ if (unlikely(result <= 0)) {
+ cl_sync_io_note(env, &ldp_aio->cda_sync, result);
count = result;
pvec.ldp_file_offset = file_offset;
count = result;
pvec.ldp_file_offset = file_offset;
result = ll_direct_rw_pages(env, io, count,
rw, inode, &pvec);
result = ll_direct_rw_pages(env, io, count,
rw, inode, &pvec);
+ /* We've submitted pages and can now remove the extra
+ * reference for that
+ */
+ cl_sync_io_note(env, &ldp_aio->cda_sync, result);
ll_free_user_pages(pages, pvec.ldp_count);
if (unlikely(result < 0))
ll_free_user_pages(pages, pvec.ldp_count);
if (unlikely(result < 0))
- aio->cda_bytes += tot_bytes;
+ ll_aio->cda_bytes += tot_bytes;
if (rw == WRITE)
vio->u.readwrite.vui_written += tot_bytes;
if (rw == WRITE)
vio->u.readwrite.vui_written += tot_bytes;
ssize_t rc2;
/* Wait here rather than doing async submission */
ssize_t rc2;
/* Wait here rather than doing async submission */
- rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
+ rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
if (result == 0 && rc2)
result = rc2;
if (!aio->cda_no_aio_complete)
aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
if (!aio->cda_no_aio_complete)
aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
+ if (aio->cda_ll_aio)
+ cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
+
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+ struct cl_dio_aio *ll_aio)
{
struct cl_dio_aio *aio;
{
struct cl_dio_aio *aio;
cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
cl_page_list_init(&aio->cda_pages);
aio->cda_iocb = iocb;
cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
cl_page_list_init(&aio->cda_pages);
aio->cda_iocb = iocb;
- if (is_sync_kiocb(iocb))
+ if (is_sync_kiocb(iocb) || ll_aio)
aio->cda_no_aio_complete = 1;
else
aio->cda_no_aio_complete = 0;
aio->cda_no_aio_complete = 1;
else
aio->cda_no_aio_complete = 0;
+ /* in the case of a lower level aio struct (ll_aio is set), or
+ * true AIO (!is_sync_kiocb()), the memory is freed by
+ * the daemons calling cl_sync_io_note, because they are the
+ * last users of the aio struct
+ *
+ * in other cases, the last user is cl_sync_io_wait, and in
+ * that case, the caller frees the aio struct after that call
+ * completes
+ */
+ if (ll_aio || !is_sync_kiocb(iocb))
+ aio->cda_no_aio_free = 0;
+ else
+ aio->cda_no_aio_free = 1;
+
cl_object_get(obj);
aio->cda_obj = obj;
cl_object_get(obj);
aio->cda_obj = obj;
+ aio->cda_ll_aio = ll_aio;
+
+ if (ll_aio)
+ atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
if (anchor->csi_sync_rc == 0 && ioret < 0)
anchor->csi_sync_rc = ioret;
/*
if (anchor->csi_sync_rc == 0 && ioret < 0)
anchor->csi_sync_rc = ioret;
/*
spin_unlock(&anchor->csi_waitq.lock);
spin_unlock(&anchor->csi_waitq.lock);
- /**
- * For AIO (!is_sync_kiocb), we are responsible for freeing
- * memory here. This is because we are the last user of this
- * aio struct, whereas in other cases, we will call
- * cl_sync_io_wait to wait after this, and so the memory is
- * freed after that call.
- */
- if (aio && !is_sync_kiocb(aio->cda_iocb))
+ if (aio && !aio->cda_no_aio_free)
cl_aio_free(env, aio);
}
EXIT;
cl_aio_free(env, aio);
}
EXIT;
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret)
{
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret)
{
+ bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
+ /* for true AIO, the daemons running cl_sync_io_note would normally
+ * free the aio struct, but if we're waiting on it, we need them to not
+ * do that. This ensures the aio is not freed when we drop the
+ * reference count to zero in cl_sync_io_note below
+ */
+ anchor->csi_aio->cda_no_aio_free = 1;
/*
* @anchor was inited as 1 to prevent end_io to be
* called before we add all pages for IO, so drop
/*
* @anchor was inited as 1 to prevent end_io to be
* called before we add all pages for IO, so drop
*/
atomic_add(1, &anchor->csi_sync_nr);
*/
atomic_add(1, &anchor->csi_sync_nr);
+ anchor->csi_aio->cda_no_aio_free = no_aio_free;
+
return rc;
}
EXPORT_SYMBOL(cl_sync_io_wait_recycle);
return rc;
}
EXPORT_SYMBOL(cl_sync_io_wait_recycle);