struct lu_ref_link *cp_queue_ref;
/** Per-page flags from enum cl_page_flags. Protected by a VM lock. */
unsigned cp_flags;
+ /** Assigned if doing a sync_io */
+ struct cl_sync_io *cp_sync_io;
};
/**
int cl_io_submit_rw (const struct lu_env *env, struct cl_io *io,
enum cl_req_type iot, struct cl_2queue *queue,
enum cl_req_priority priority);
+int cl_io_submit_sync (const struct lu_env *env, struct cl_io *io,
+ enum cl_req_type iot, struct cl_2queue *queue,
+ enum cl_req_priority priority, long timeout);
void cl_io_rw_advance (const struct lu_env *env, struct cl_io *io,
size_t nob);
int cl_io_cancel (const struct lu_env *env, struct cl_io *io,
/** number of pages yet to be transferred. */
atomic_t csi_sync_nr;
/** completion to be signaled when transfer is complete. */
- struct completion csi_sync_completion;
+ cfs_waitq_t csi_waitq;
/** error code. */
int csi_sync_rc;
};
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages);
int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_sync_io *anchor);
+ struct cl_page_list *queue, struct cl_sync_io *anchor,
+ long timeout);
void cl_sync_io_note(struct cl_sync_io *anchor, int ioret);
/** @} cl_sync_io */
struct ccc_thread_info {
struct cl_lock_descr cti_descr;
struct cl_io cti_io;
- struct cl_sync_io cti_sync_io;
struct cl_attr cti_attr;
};
struct list_head cpg_pending_linkage;
/** VM page */
cfs_page_t *cpg_page;
- struct cl_sync_io *cpg_sync_io;
/**
* checksum for paranoid I/O debugging enabled by
* ENABLE_LLITE_CHECKSUM configuration option.
static void slp_page_completion_common(const struct lu_env *env,
struct ccc_page *cp, int ioret)
{
- struct cl_sync_io *anchor = cp->cpg_sync_io;
-
- if (anchor) {
- cp->cpg_sync_io = NULL;
- cl_sync_io_note(anchor, ioret);
- } else {
- LBUG();
- }
+ LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
}
static void slp_page_completion_read(const struct lu_env *env,
struct intnl_stat *st = llu_i2stat(inode);
struct obd_export *exp = llu_i2obdexp(inode);
struct page *page;
- int rc = 0, npages = 0, ret_bytes = 0;
+ int rc = 0, ret_bytes = 0;
int local_lock;
struct cl_page *clp;
- struct ccc_page *clup;
struct cl_2queue *queue;
- struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
ENTRY;
if (!exp)
break;
}
- clup = cl2ccc_page(cl_page_at(clp, &slp_device_type));
- clup->cpg_sync_io = anchor;
cl_2queue_add(queue, clp);
/* drop the reference count for cl_page_find, so that the page
cl_page_clip(env, clp, offset, offset+bytes);
- npages++;
count -= bytes;
pos += bytes;
buf += bytes;
page++;
} while (count);
- cl_sync_io_init(anchor, npages);
- /* printk("Inited anchor with %d pages\n", npages); */
-
if (rc == 0) {
- enum cl_req_type crt;
-
- crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
- rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
- if (rc == 0) {
- /* If some pages weren't sent for any reason, count
- * then as completed, to avoid infinite wait. */
- cl_page_list_for_each(clp, &queue->c2_qin) {
- CL_PAGE_DEBUG(D_ERROR, env, clp,
- "not completed\n");
- cl_sync_io_note(anchor, +1);
- }
- /* wait for the IO to be finished. */
- rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
- }
+ enum cl_req_type iot;
+ iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
+ rc = cl_io_submit_sync(env, io, iot, queue, CRP_NORMAL, 0);
}
group->lig_rc = rc;
struct ll_dio_pages *pv)
{
struct cl_page *clp;
- struct ccc_page *clup;
struct cl_2queue *queue;
struct cl_object *obj = io->ci_obj;
- struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
int i;
ssize_t rc = 0;
loff_t file_offset = pv->ldp_start_offset;
size_t page_size = cl_page_size(obj);
ENTRY;
- cl_sync_io_init(anchor, page_count);
-
queue = &io->ci_queue;
cl_2queue_init(queue);
for (i = 0; i < page_count; i++) {
break;
}
- clup = cl2ccc_page(cl_page_at(clp, &vvp_device_type));
- clup->cpg_sync_io = anchor;
cl_2queue_add(queue, clp);
/* drop the reference count for cl_page_find, so that the page
}
if (rc == 0) {
- rc = cl_io_submit_rw(env, io, rw == READ ? CRT_READ : CRT_WRITE,
- queue, CRP_NORMAL);
- if (rc == 0) {
- /*
- * If some pages weren't sent for any reason (e.g.,
- * direct-io read found up-to-date pages in the
- * cache), count them as completed to avoid infinite
- * wait.
- */
- cl_page_list_for_each(clp, &queue->c2_qin)
- cl_sync_io_note(anchor, +1);
- /* wait for the IO to be finished. */
- rc = cl_sync_io_wait(env, io, &queue->c2_qout,
- anchor) ?: pv->ldp_size;
- }
+ rc = cl_io_submit_sync(env, io,
+ rw == READ ? CRT_READ : CRT_WRITE,
+ queue, CRP_NORMAL, 0);
+ if (rc == 0)
+ rc = pv->ldp_size;
}
cl_2queue_discard(env, io, queue);
int to, enum cl_req_type crt)
{
struct cl_2queue *queue;
- struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
int result;
LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
queue = &io->ci_queue;
cl_2queue_init_page(queue, page);
-
- cl_sync_io_init(anchor, 1);
- cp->cpg_sync_io = anchor;
cl_page_clip(env, page, 0, to);
- result = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
- if (result == 0)
- result = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
- else
- cp->cpg_sync_io = NULL;
+
+ result = cl_io_submit_sync(env, io, crt, queue, CRP_NORMAL, 0);
LASSERT(cl_page_is_owned(page, io));
cl_page_clip(env, page, 0, CFS_PAGE_SIZE);
struct cl_page *clp = cp->cpg_cl.cpl_page;
cfs_page_t *vmpage = cp->cpg_page;
struct inode *inode = ccc_object_inode(clp->cp_obj);
- struct cl_sync_io *anchor = cp->cpg_sync_io;
LINVRNT(cl_page_is_vmlocked(env, clp));
- if (anchor != NULL) {
- cp->cpg_sync_io = NULL;
- cl_sync_io_note(anchor, ioret);
- } else if (clp->cp_type == CPT_CACHEABLE) {
+ if (!clp->cp_sync_io && clp->cp_type == CPT_CACHEABLE) {
/*
* Only mark the page error only when it's a cacheable page
* and NOT a sync io.
* Pointer to the topmost ongoing IO in this thread.
*/
struct cl_io *clt_current_io;
+ /**
+ * Used for submitting a sync io.
+ */
+ struct cl_sync_io clt_anchor;
};
struct cl_thread_info *cl_env_info(const struct lu_env *env);
EXPORT_SYMBOL(cl_io_submit_rw);
/**
+ * Submit a sync_io and wait for the IO to be finished, or error happens.
+ * If @timeout is zero, it means to wait for the IO unconditionally.
+ */
+int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
+ enum cl_req_type iot, struct cl_2queue *queue,
+ enum cl_req_priority prio, long timeout)
+{
+ struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
+ struct cl_page *pg;
+ int rc;
+
+ LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL);
+
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ LASSERT(pg->cp_sync_io == NULL);
+ pg->cp_sync_io = anchor;
+ }
+
+ cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
+ rc = cl_io_submit_rw(env, io, iot, queue, prio);
+ if (rc == 0) {
+ /*
+ * If some pages weren't sent for any reason (e.g.,
+ * read found up-to-date pages in the cache, or write found
+ * clean pages), count them as completed to avoid infinite
+ * wait.
+ */
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ pg->cp_sync_io = NULL;
+ cl_sync_io_note(anchor, +1);
+ }
+
+ /* wait for the IO to be finished. */
+ rc = cl_sync_io_wait(env, io, &queue->c2_qout,
+ anchor, timeout);
+ } else {
+ LASSERT(list_empty(&queue->c2_qout.pl_pages));
+ cl_page_list_for_each(pg, &queue->c2_qin)
+ pg->cp_sync_io = NULL;
+ }
+ return rc;
+}
+EXPORT_SYMBOL(cl_io_submit_sync);
+
+/**
* Cancel an IO which has been submitted by cl_io_submit_rw.
*/
int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
{
ENTRY;
- init_completion(&anchor->csi_sync_completion);
+ cfs_waitq_init(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nrpages);
anchor->csi_sync_rc = 0;
EXIT;
* cl_sync_io_note() for every page.
*/
int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_sync_io *anchor)
+ struct cl_page_list *queue, struct cl_sync_io *anchor,
+ long timeout)
{
+ struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
+ NULL, NULL, NULL);
int rc;
ENTRY;
- rc = wait_for_completion_interruptible(&anchor->csi_sync_completion);
+ LASSERT(timeout >= 0);
+
+ rc = l_wait_event(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ &lwi);
if (rc < 0) {
int rc2;
+
+ CERROR("SYNC IO failed with error: %d, try to cancel "
+ "the remaining page\n", rc);
+
rc2 = cl_io_cancel(env, io, queue);
if (rc2 < 0) {
+ lwi = (struct l_wait_info) { 0 };
/* Too bad, some pages are still in IO. */
- CDEBUG(D_VFSTRACE, "Failed to cancel transfer (%i). "
- "Waiting for %i pages\n",
+ CERROR("Failed to cancel transfer error: %d, mostly "
+ "because of they are still being transferred, "
+ "waiting for %i pages\n",
rc2, atomic_read(&anchor->csi_sync_nr));
- wait_for_completion(&anchor->csi_sync_completion);
+ (void)l_wait_event(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ &lwi);
}
- } else
+ } else {
rc = anchor->csi_sync_rc;
+ }
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
cl_page_list_assume(env, io, queue);
POISON(anchor, 0x5a, sizeof *anchor);
* IO.
*/
if (atomic_dec_and_test(&anchor->csi_sync_nr))
- complete(&anchor->csi_sync_completion);
+ cfs_waitq_broadcast(&anchor->csi_waitq);
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);
struct cl_io *io = &info->clt_io;
struct cl_2queue *queue = &info->clt_queue;
struct cl_lock_descr *descr = &lock->cll_descr;
- int result;
- int rc0;
- int rc1;
+ long page_count;
+ int result;
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
io->ci_obj = cl_object_top(descr->cld_obj);
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result == 0) {
-
cl_2queue_init(queue);
cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
descr->cld_end, &queue->c2_qin);
- if (queue->c2_qin.pl_nr > 0) {
+ page_count = queue->c2_qin.pl_nr;
+ if (page_count > 0) {
result = cl_page_list_unmap(env, io, &queue->c2_qin);
if (!discard) {
- rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_CANCEL);
- rc1 = cl_page_list_own(env, io,
- &queue->c2_qout);
- result = result ?: rc0 ?: rc1;
+ long timeout = 600; /* 10 minutes. */
+ /* for debug purpose, if this request can't be
+ * finished in 10 minutes, we hope it can
+ * notify us.
+ */
+ result = cl_io_submit_sync(env, io, CRT_WRITE,
+ queue, CRP_CANCEL,
+ timeout);
+ if (result)
+ CWARN("Writing %lu pages error: %d\n",
+ page_count, result);
}
cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
cl_2queue_discard(env, io, queue);
CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
(const struct lu_env *,
const struct cl_page_slice *, int), ioret);
+ if (pg->cp_sync_io) {
+ cl_sync_io_note(pg->cp_sync_io, ioret);
+ pg->cp_sync_io = NULL;
+ }
/* Don't assert the page writeback bit here because the lustre file
* may be as a backend of swap space. in this case, the page writeback
struct echo_page {
struct cl_page_slice ep_cl;
- struct cl_sync_io *ep_sync_io;
cfs_page_t *ep_vmpage;
};
struct cl_2queue eti_queue;
struct cl_io eti_io;
- struct cl_sync_io eti_anchor;
struct cl_lock_descr eti_descr;
struct lu_fid eti_fid;
};
const struct cl_page_slice *slice,
int ioret)
{
- struct echo_page *ecp = cl2echo_page(slice);
- struct cl_sync_io *anchor = ecp->ep_sync_io;
- ENTRY;
-
- LASSERT(anchor != NULL);
- ecp->ep_sync_io = NULL;
- cl_sync_io_note(anchor, ioret);
- EXIT;
+ LASSERT(slice->cpl_page->cp_sync_io != NULL);
}
static void echo_page_fini(const struct lu_env *env,
int result = 0;
ENTRY;
- cl_page_list_splice(&queue->c2_qin, &queue->c2_qout);
- cl_page_list_for_each_safe(clp, temp, &queue->c2_qout) {
+ cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
int rc;
rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
if (rc == 0)
continue;
- cl_page_list_move(&queue->c2_qin, &queue->c2_qout, clp);
result = result ?: rc;
}
- RETURN(list_empty(&queue->c2_qout.pl_pages) ? result : 0);
+ RETURN(result);
}
static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
struct echo_thread_info *info;
struct cl_object *obj = echo_obj2cl(eco);
struct echo_device *ed = eco->eo_dev;
- struct cl_sync_io *anchor;
struct cl_2queue *queue;
struct cl_io *io;
struct cl_page *clp;
- struct echo_page *ep;
int page_size = cl_page_size(obj);
int refcheck;
info = echo_env_info(env);
io = &info->eti_io;
- anchor = &info->eti_anchor;
queue = &info->eti_queue;
- cl_sync_io_init(anchor, npages);
cl_2queue_init(queue);
rc = cl_io_init(env, io, CIT_MISC, obj);
if (rc < 0)
break;
}
- ep = cl2echo_page(cl_page_at(clp, &echo_device_type));
- ep->ep_sync_io = anchor;
cl_2queue_add(queue, clp);
/* drop the reference count for cl_page_find, so that the page
if (async)
rc = cl_echo_async_brw(env, io, typ, queue);
else
- rc = cl_io_submit_rw(env, io,typ, queue, CRP_NORMAL);
+ rc = cl_io_submit_sync(env, io, typ, queue,
+ CRP_NORMAL, 0);
CDEBUG(D_INFO, "echo_client %s write returns %d\n",
async ? "async" : "sync", rc);
- if (rc == 0) {
- /*
- * If some pages weren't sent for any reason (e.g.,
- * direct-io read found up-to-date pages in the
- * cache), count them as completed to avoid infinite
- * wait.
- */
- cl_page_list_for_each(clp, &queue->c2_qin)
- cl_sync_io_note(anchor, +1);
- /* wait for the IO to be finished. */
- rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
- }
}
cl_2queue_discard(env, io, queue);
* Thread that submitted this page for transfer. For debugging.
*/
cfs_task_t *ops_submitter;
+ /**
+ * Submit time - the time when the page is starting RPC. For debugging.
+ */
+ cfs_time_t ops_submit_time;
};
extern cfs_mem_cache_t *osc_page_kmem;
*/
LASSERT(result == 0);
}
+ opg->ops_submit_time = cfs_time_current();
} else {
LASSERT(result < 0);
if (result != -EALREADY)
return list_empty(head) ? "-" : "+";
}
+static inline cfs_time_t osc_submit_duration(struct osc_page *opg)
+{
+ if (opg->ops_submit_time == 0)
+ return 0;
+
+ return (cfs_time_current() - opg->ops_submit_time);
+}
+
static int osc_page_print(const struct lu_env *env,
const struct cl_page_slice *slice,
void *cookie, lu_printer_t printer)
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
"< %#x %d %u %s %s %s >"
"< %llu %u %#x %#x %p %p %p %p %p >"
- "< %s %p %d >\n",
+ "< %s %p %d %lu >\n",
opg,
/* 1 */
oap->oap_magic, oap->oap_cmd,
oap->oap_caller_data,
/* 3 */
osc_list(&opg->ops_inflight),
- opg->ops_submitter, opg->ops_transfer_pinned);
+ opg->ops_submitter, opg->ops_transfer_pinned,
+ osc_submit_duration(opg));
}
static void osc_page_delete(const struct lu_env *env,
ENTRY;
result = cl_page_make_ready(env, page, CRT_WRITE);
+ if (result == 0)
+ opg->ops_submit_time = cfs_time_current();
RETURN(result);
}
list_del_init(&opg->ops_inflight);
spin_unlock(&obj->oo_seatbelt);
+ opg->ops_submit_time = 0;
+
cl_page_completion(env, page, crt, rc);
/* statistic */