*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* Client IO.
*
case CIT_GLIMPSE:
break;
case CIT_LADVISE:
+ case CIT_LSEEK:
break;
default:
LBUG();
}
EXPORT_SYMBOL(cl_io_rw_init);
+#ifdef HAVE_LIST_CMP_FUNC_T
+static int cl_lock_descr_cmp(void *priv,
+ const struct list_head *a,
+ const struct list_head *b)
+#else /* !HAVE_LIST_CMP_FUNC_T */
static int cl_lock_descr_cmp(void *priv,
struct list_head *a, struct list_head *b)
+#endif /* HAVE_LIST_CMP_FUNC_T */
{
const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
cill_linkage);
EXPORT_SYMBOL(cl_io_read_ahead);
/**
+ * Called before io start, to reserve enough LRU slots to avoid
+ * deadlock.
+ *
+ * \see cl_io_operations::cio_lru_reserve()
+ */
+int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
+ loff_t pos, size_t bytes)
+{
+ const struct cl_io_slice *scan;
+ int result = 0;
+
+ LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+ LINVRNT(cl_io_invariant(io));
+ ENTRY;
+
+ list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
+ if (scan->cis_iop->cio_lru_reserve) {
+ result = scan->cis_iop->cio_lru_reserve(env, scan,
+ pos, bytes);
+ if (result)
+ break;
+ }
+ }
+
+ RETURN(result);
+}
+EXPORT_SYMBOL(cl_io_lru_reserve);
+
+/**
* Commit a list of contiguous pages into writeback cache.
*
* \returns 0 if all pages committed, or errcode if error occurred.
}
EXPORT_SYMBOL(cl_io_commit_async);
+void cl_io_extent_release(const struct lu_env *env, struct cl_io *io)
+{
+ const struct cl_io_slice *scan;
+ ENTRY;
+
+ list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
+ if (scan->cis_iop->cio_extent_release == NULL)
+ continue;
+ scan->cis_iop->cio_extent_release(env, scan);
+ }
+ EXIT;
+}
+EXPORT_SYMBOL(cl_io_extent_release);
+
/**
* Submits a list of pages for immediate io.
*
if (rc && !result)
result = rc;
- if (result == -EWOULDBLOCK && io->ci_ndelay) {
+ if (result == -EAGAIN && io->ci_ndelay) {
io->ci_need_restart = 1;
result = 0;
}
/**
* Adds a page to a page list.
*/
-void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
+void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page,
+ bool get_ref)
{
ENTRY;
/* it would be better to check that page is owned by "current" io, but
list_add_tail(&page->cp_batch, &plist->pl_pages);
++plist->pl_nr;
lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
- cl_page_get(page);
+ if (get_ref)
+ cl_page_get(page);
EXIT;
}
EXPORT_SYMBOL(cl_page_list_add);
/**
* splice the cl_page_list, just as list head does
*/
-void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
+void cl_page_list_splice(struct cl_page_list *src, struct cl_page_list *dst)
{
+#ifdef CONFIG_LUSTRE_DEBUG_LU_REF
struct cl_page *page;
struct cl_page *tmp;
-
ENTRY;
- cl_page_list_for_each_safe(page, tmp, list)
- cl_page_list_move(head, list, page);
+ cl_page_list_for_each_safe(page, tmp, src)
+ lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref,
+ "queue", src, dst);
+#else
+ ENTRY;
+#endif
+ dst->pl_nr += src->pl_nr;
+ src->pl_nr = 0;
+ list_splice_tail_init(&src->pl_pages, &dst->pl_pages);
+
EXIT;
}
EXPORT_SYMBOL(cl_page_list_splice);
/**
* Add a page to the incoming page list of 2-queue.
*/
-void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
+void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page, bool get_ref)
{
- ENTRY;
- cl_page_list_add(&queue->c2_qin, page);
- EXIT;
+ cl_page_list_add(&queue->c2_qin, page, get_ref);
}
EXPORT_SYMBOL(cl_2queue_add);
*/
void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
{
- ENTRY;
- cl_2queue_init(queue);
- cl_2queue_add(queue, page);
- EXIT;
+ ENTRY;
+ cl_2queue_init(queue);
+ cl_2queue_add(queue, page, true);
+ EXIT;
}
EXPORT_SYMBOL(cl_2queue_init_page);
/* release pages */
while (aio->cda_pages.pl_nr > 0) {
struct cl_page *page = cl_page_list_first(&aio->cda_pages);
- struct page *vmpage = cl_page_vmpage(page);
- struct inode *inode = vmpage ? page2inode(vmpage) : NULL;
cl_page_get(page);
- /* We end up here in case of Direct IO only. For encrypted file,
- * mapping was set on pages in ll_direct_rw_pages(), so it has
- * to be cleared now before page cleanup.
- * PageChecked flag was also set there, so we clean up here.
- */
- if (inode && IS_ENCRYPTED(inode)) {
- vmpage->mapping = NULL;
- ClearPageChecked(vmpage);
- }
cl_page_list_del(env, &aio->cda_pages, page);
cl_page_delete(env, page);
cl_page_put(env, page);
}
- if (!is_sync_kiocb(aio->cda_iocb) && !aio->cda_no_aio_complete)
+ if (!aio->cda_no_aio_complete)
aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
EXIT;
}
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb)
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
{
struct cl_dio_aio *aio;
* Hold one ref so that it won't be released until
* every pages is added.
*/
- cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ?
- NULL : aio, cl_aio_end);
+ cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
cl_page_list_init(&aio->cda_pages);
aio->cda_iocb = iocb;
- aio->cda_no_aio_complete = 0;
+ if (is_sync_kiocb(iocb))
+ aio->cda_no_aio_complete = 1;
+ else
+ aio->cda_no_aio_complete = 0;
+ cl_object_get(obj);
+ aio->cda_obj = obj;
}
return aio;
}
EXPORT_SYMBOL(cl_aio_alloc);
-void cl_aio_free(struct cl_dio_aio *aio)
+void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
{
- if (aio)
+ if (aio) {
+ cl_object_put(env, aio->cda_obj);
OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
+ }
}
EXPORT_SYMBOL(cl_aio_free);
* to immediately reclaim anchor when cl_sync_io_wait()
* completes.
*/
- wake_up_all_locked(&anchor->csi_waitq);
+ wake_up_locked(&anchor->csi_waitq);
if (end_io)
end_io(env, anchor);
- if (anchor->csi_aio)
- aio = anchor->csi_aio;
+
+ aio = anchor->csi_aio;
spin_unlock(&anchor->csi_waitq.lock);
/**
- * If anchor->csi_aio is set, we are responsible for freeing
- * memory here rather than when cl_sync_io_wait() completes.
+ * For AIO (!is_sync_kiocb), we are responsible for freeing
+ * memory here. This is because we are the last user of this
+ * aio struct, whereas in other cases, we will call
+ * cl_sync_io_wait to wait after this, and so the memory is
+ * freed after that call.
*/
- cl_aio_free(aio);
+ if (aio && !is_sync_kiocb(aio->cda_iocb))
+ cl_aio_free(env, aio);
}
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);
+
+
+int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout, int ioret)
+{
+ int rc = 0;
+
+ /*
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, ioret);
+ /* Wait for completion of normal dio.
+ * This replaces the EIOCBQEUED return from the DIO/AIO
+ * path, and this is where AIO and DIO implementations
+ * split.
+ */
+ rc = cl_sync_io_wait(env, anchor, timeout);
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
+
+ return rc;
+}
+EXPORT_SYMBOL(cl_sync_io_wait_recycle);