}
/**
- * True, iff \a io is a sendfile().
- */
-int cl_io_is_sendfile(const struct cl_io *io)
-{
- return io->ci_type == CIT_READ && io->u.ci_rd.rd_is_sendfile;
-}
-EXPORT_SYMBOL(cl_io_is_sendfile);
-
-/**
* Returns true iff there is an IO ongoing in the given environment.
*/
int cl_io_is_going(const struct lu_env *env)
ENTRY;
LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
- "io range: %i [%llu, %llu) %i %i\n",
+ "io range: %u ["LPU64", "LPU64") %u %u\n",
iot, (__u64)pos, (__u64)pos + count,
io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
io->u.ci_rw.crw_pos = pos;
ENTRY;
- lock = cl_lock_request(env, io, &link->cill_descr, link->cill_enq_flags,
- "io", io);
+ lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
if (!IS_ERR(lock)) {
link->cill_lock = lock;
list_move(&link->cill_linkage, &set->cls_curr);
- if (!(link->cill_enq_flags & CEF_ASYNC)) {
+ if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
result = cl_wait(env, lock);
if (result == 0)
list_move(&link->cill_linkage, &set->cls_done);
* Allocates new lock link, and uses it to add a lock to a lockset.
*/
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
- struct cl_lock_descr *descr, int enqflags)
+ struct cl_lock_descr *descr)
{
struct cl_io_lock_link *link;
int result;
OBD_ALLOC_PTR(link);
if (link != NULL) {
link->cill_descr = *descr;
- link->cill_enq_flags = enqflags;
link->cill_fini = cl_free_io_lock_link;
result = cl_io_lock_add(env, io, link);
if (result) /* lock match */
EXPORT_SYMBOL(cl_io_submit_rw);
/**
+ * Submit a sync_io and wait for the IO to be finished, or error happens.
+ * If \a timeout is zero, it means to wait for the IO unconditionally.
+ */
+int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
+ enum cl_req_type iot, struct cl_2queue *queue,
+ enum cl_req_priority prio, long timeout)
+{
+ struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
+ struct cl_page *pg;
+ int rc;
+
+ LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL);
+
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ LASSERT(pg->cp_sync_io == NULL);
+ pg->cp_sync_io = anchor;
+ }
+
+ cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
+ rc = cl_io_submit_rw(env, io, iot, queue, prio);
+ if (rc == 0) {
+ /*
+ * If some pages weren't sent for any reason (e.g.,
+ * read found up-to-date pages in the cache, or write found
+ * clean pages), count them as completed to avoid infinite
+ * wait.
+ */
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ pg->cp_sync_io = NULL;
+ cl_sync_io_note(anchor, +1);
+ }
+
+ /* wait for the IO to be finished. */
+ rc = cl_sync_io_wait(env, io, &queue->c2_qout,
+ anchor, timeout);
+ } else {
+ LASSERT(list_empty(&queue->c2_qout.pl_pages));
+ cl_page_list_for_each(pg, &queue->c2_qin)
+ pg->cp_sync_io = NULL;
+ }
+ return rc;
+}
+EXPORT_SYMBOL(cl_io_submit_sync);
+
+/**
* Cancel an IO which has been submitted by cl_io_submit_rw.
*/
int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
{
struct cl_page *page;
struct cl_page *temp;
+ pgoff_t index = 0;
int result;
LINVRNT(plist->pl_owner == cfs_current());
ENTRY;
result = 0;
cl_page_list_for_each_safe(page, temp, plist) {
+ LASSERT(index <= page->cp_index);
+ index = page->cp_index;
if (cl_page_own(env, io, page) == 0)
- result = result ?: page->cp_error;
+ result = result ?: page->cp_error;
else
cl_page_list_del(env, plist, page);
}
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
{
ENTRY;
- init_completion(&anchor->csi_sync_completion);
+ cfs_waitq_init(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nrpages);
anchor->csi_sync_rc = 0;
EXIT;
* cl_sync_io_note() for every page.
*/
int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_sync_io *anchor)
+ struct cl_page_list *queue, struct cl_sync_io *anchor,
+ long timeout)
{
+ struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
+ NULL, NULL, NULL);
int rc;
ENTRY;
- rc = wait_for_completion_interruptible(&anchor->csi_sync_completion);
+ LASSERT(timeout >= 0);
+
+ rc = l_wait_event(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ &lwi);
if (rc < 0) {
- int rc2;
- rc2 = cl_io_cancel(env, io, queue);
- if (rc2 < 0) {
- /* Too bad, some pages are still in IO. */
- CDEBUG(D_VFSTRACE, "Failed to cancel transfer (%i). "
- "Waiting for %i pages\n",
- rc2, atomic_read(&anchor->csi_sync_nr));
- wait_for_completion(&anchor->csi_sync_completion);
- }
- } else
+ CERROR("SYNC IO failed with error: %d, try to cancel "
+ "%d remaining pages\n",
+ rc, atomic_read(&anchor->csi_sync_nr));
+
+ (void)cl_io_cancel(env, io, queue);
+
+ lwi = (struct l_wait_info) { 0 };
+ (void)l_wait_event(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ &lwi);
+ } else {
rc = anchor->csi_sync_rc;
+ }
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
cl_page_list_assume(env, io, queue);
POISON(anchor, 0x5a, sizeof *anchor);
* ->{prepare,commit}_write(). Completion is used to signal the end of
* IO.
*/
+ LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_test(&anchor->csi_sync_nr))
- complete(&anchor->csi_sync_completion);
+ cfs_waitq_broadcast(&anchor->csi_waitq);
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);