#include <lustre_fid.h>
#include <cl_object.h>
#include "cl_internal.h"
+#include <libcfs/crypto/llcrypt.h>
/*****************************************************************************
*
case CIT_GLIMPSE:
break;
case CIT_LADVISE:
+ case CIT_LSEEK:
break;
default:
LBUG();
struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
struct cl_page *pg;
int rc;
+ ENTRY;
cl_page_list_for_each(pg, &queue->c2_qin) {
LASSERT(pg->cp_sync_io == NULL);
cl_page_list_for_each(pg, &queue->c2_qin)
pg->cp_sync_io = NULL;
}
- return rc;
+ RETURN(rc);
}
EXPORT_SYMBOL(cl_io_submit_sync);
/**
- * Cancel an IO which has been submitted by cl_io_submit_rw.
- */
-int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue)
-{
- struct cl_page *page;
- int result = 0;
-
- CERROR("Canceling ongoing page trasmission\n");
- cl_page_list_for_each(page, queue) {
- int rc;
-
- rc = cl_page_cancel(env, page);
- result = result ?: rc;
- }
- return result;
-}
-
-/**
* Main io loop.
*
* Pumps io through iterations calling
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
- int result = 0;
+ int result = 0;
+ int rc = 0;
LINVRNT(cl_io_is_loopable(io));
ENTRY;
}
}
cl_io_iter_fini(env, io);
- } while (result == 0 && io->ci_continue);
+ if (result)
+ rc = result;
+ } while ((result == 0 || result == -EIOCBQUEUED) &&
+ io->ci_continue);
+
+ if (rc && !result)
+ result = rc;
if (result == -EWOULDBLOCK && io->ci_ndelay) {
io->ci_need_restart = 1;
ENTRY;
plist->pl_nr = 0;
INIT_LIST_HEAD(&plist->pl_pages);
- plist->pl_owner = current;
EXIT;
}
EXPORT_SYMBOL(cl_page_list_init);
/* it would be better to check that page is owned by "current" io, but
* it is not passed here. */
LASSERT(page->cp_owner != NULL);
- LINVRNT(plist->pl_owner == current);
LASSERT(list_empty(&page->cp_batch));
list_add_tail(&page->cp_batch, &plist->pl_pages);
{
LASSERT(plist->pl_nr > 0);
LASSERT(cl_page_is_vmlocked(env, page));
- LINVRNT(plist->pl_owner == current);
ENTRY;
list_del_init(&page->cp_batch);
struct cl_page *page)
{
LASSERT(src->pl_nr > 0);
- LINVRNT(dst->pl_owner == current);
- LINVRNT(src->pl_owner == current);
ENTRY;
list_move_tail(&page->cp_batch, &dst->pl_pages);
struct cl_page *page)
{
LASSERT(src->pl_nr > 0);
- LINVRNT(dst->pl_owner == current);
- LINVRNT(src->pl_owner == current);
ENTRY;
list_move(&page->cp_batch, &dst->pl_pages);
struct cl_page *page;
struct cl_page *tmp;
- LINVRNT(list->pl_owner == current);
- LINVRNT(head->pl_owner == current);
ENTRY;
cl_page_list_for_each_safe(page, tmp, list)
struct cl_page *page;
struct cl_page *temp;
- LINVRNT(plist->pl_owner == current);
ENTRY;
cl_page_list_for_each_safe(page, temp, plist) {
struct cl_page *page;
struct cl_page *temp;
- LINVRNT(plist->pl_owner == current);
ENTRY;
cl_page_list_for_each_safe(page, temp, plist)
{
struct cl_page *page;
- LINVRNT(plist->pl_owner == current);
cl_page_list_for_each(page, plist)
cl_page_assume(env, io, page);
{
struct cl_page *page;
- LINVRNT(plist->pl_owner == current);
ENTRY;
cl_page_list_for_each(page, plist)
cl_page_discard(env, io, page);
*/
void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
- cl_sync_io_end_t *end)
+ struct cl_dio_aio *aio, cl_sync_io_end_t *end)
{
ENTRY;
memset(anchor, 0, sizeof(*anchor));
atomic_set(&anchor->csi_sync_nr, nr);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
+ anchor->csi_aio = aio;
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_init_notify);
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout)
{
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- NULL, NULL, NULL);
- int rc;
+ int rc = 0;
ENTRY;
LASSERT(timeout >= 0);
- rc = l_wait_event(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- if (rc < 0) {
+ if (timeout > 0 &&
+ wait_event_idle_timeout(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ cfs_time_seconds(timeout)) == 0) {
+ rc = -ETIMEDOUT;
CERROR("IO failed: %d, still wait for %d remaining entries\n",
rc, atomic_read(&anchor->csi_sync_nr));
+ }
- wait_event_idle(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0);
- } else {
+ wait_event_idle(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0);
+ if (!rc)
rc = anchor->csi_sync_rc;
- }
+
/* We take the lock to ensure that cl_sync_io_note() has finished */
spin_lock(&anchor->csi_waitq.lock);
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
}
EXPORT_SYMBOL(cl_sync_io_wait);
+#ifndef HAVE_AIO_COMPLETE
+static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2)
+{
+ if (iocb->ki_complete)
+ iocb->ki_complete(iocb, res, res2);
+}
+#endif
+
+static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+ struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
+ ssize_t ret = anchor->csi_sync_rc;
+
+ ENTRY;
+
+ /* release pages */
+ while (aio->cda_pages.pl_nr > 0) {
+ struct cl_page *page = cl_page_list_first(&aio->cda_pages);
+ struct page *vmpage = cl_page_vmpage(page);
+ struct inode *inode = vmpage ? page2inode(vmpage) : NULL;
+
+ cl_page_get(page);
+ /* We end up here in case of Direct IO only. For encrypted file,
+ * mapping was set on pages in ll_direct_rw_pages(), so it has
+ * to be cleared now before page cleanup.
+ * PageChecked flag was also set there, so we clean up here.
+ */
+ if (inode && IS_ENCRYPTED(inode)) {
+ vmpage->mapping = NULL;
+ ClearPageChecked(vmpage);
+ }
+ cl_page_list_del(env, &aio->cda_pages, page);
+ cl_page_delete(env, page);
+ cl_page_put(env, page);
+ }
+
+ if (!is_sync_kiocb(aio->cda_iocb) && !aio->cda_no_aio_complete)
+ aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
+
+ EXIT;
+}
+
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb)
+{
+ struct cl_dio_aio *aio;
+
+ OBD_SLAB_ALLOC_PTR_GFP(aio, cl_dio_aio_kmem, GFP_NOFS);
+ if (aio != NULL) {
+ /*
+ * Hold one ref so that it won't be released until
+ * every pages is added.
+ */
+ cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ?
+ NULL : aio, cl_aio_end);
+ cl_page_list_init(&aio->cda_pages);
+ aio->cda_iocb = iocb;
+ aio->cda_no_aio_complete = 0;
+ }
+ return aio;
+}
+EXPORT_SYMBOL(cl_aio_alloc);
+
+void cl_aio_free(struct cl_dio_aio *aio)
+{
+ if (aio)
+ OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
+}
+EXPORT_SYMBOL(cl_aio_free);
+
+
/**
* Indicate that transfer of a single page completed.
*/
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_lock(&anchor->csi_sync_nr,
&anchor->csi_waitq.lock)) {
+ struct cl_dio_aio *aio = NULL;
+
cl_sync_io_end_t *end_io = anchor->csi_end_io;
/*
wake_up_all_locked(&anchor->csi_waitq);
if (end_io)
end_io(env, anchor);
+ if (anchor->csi_aio)
+ aio = anchor->csi_aio;
+
spin_unlock(&anchor->csi_waitq.lock);
- /* Can't access anchor any more */
+ /**
+ * If anchor->csi_aio is set, we are responsible for freeing
+ * memory here rather than when cl_sync_io_wait() completes.
+ */
+ cl_aio_free(aio);
}
EXIT;
}