* \see cl_page_own()
* \see vvp_page_own(), lov_page_own()
*/
- void (*cpo_own)(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *io);
+ int (*cpo_own)(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *io, int nonblock);
/** Called when ownership it yielded. Optional.
*
* \see cl_page_disown()
struct cl_object *obj,
struct cl_io *io,
pgoff_t start, pgoff_t end,
- struct cl_page_list *plist);
+ struct cl_page_list *plist,
+ int nonblock);
struct cl_page *cl_page_find (const struct lu_env *env,
struct cl_object *obj,
pgoff_t idx, struct page *vmpage,
int cl_page_own (const struct lu_env *env,
struct cl_io *io, struct cl_page *page);
+int cl_page_own_try (const struct lu_env *env,
+ struct cl_io *io, struct cl_page *page);
void cl_page_assume (const struct lu_env *env,
struct cl_io *io, struct cl_page *page);
void cl_page_unassume (const struct lu_env *env,
const struct cl_page_slice *slice, struct cl_io *io);
int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice);
void ccc_transient_page_verify(const struct cl_page *page);
-void ccc_transient_page_own(const struct lu_env *env,
+int ccc_transient_page_own(const struct lu_env *env,
const struct cl_page_slice *slice,
- struct cl_io *io);
+ struct cl_io *io, int nonblock);
void ccc_transient_page_assume(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *io);
{
}
-void ccc_transient_page_own(const struct lu_env *env,
+int ccc_transient_page_own(const struct lu_env *env,
const struct cl_page_slice *slice,
- struct cl_io *unused)
+ struct cl_io *unused,
+ int nonblock)
{
ccc_transient_page_verify(slice->cpl_page);
+ return 0;
}
void ccc_transient_page_assume(const struct lu_env *env,
vvp_page_fini_common(cp);
}
-static void vvp_page_own(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *io)
+static int vvp_page_own(const struct lu_env *env,
+ const struct cl_page_slice *slice, struct cl_io *io,
+ int nonblock)
{
struct ccc_page *vpg = cl2ccc_page(slice);
cfs_page_t *vmpage = vpg->cpg_page;
LASSERT(vmpage != NULL);
+ if (nonblock) {
+ if (TestSetPageLocked(vmpage))
+ return -EAGAIN;
+
+ if (unlikely(PageWriteback(vmpage))) {
+ /* Something gets wrong? */
+ unlock_page(vmpage);
+ return -EAGAIN;
+ }
+
+ return 0;
+ }
+
/* DEBUG CODE FOR #18881 */
while (TestSetPageLocked(vmpage)) {
cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
cfs_time_seconds(1)/10);
- if (++count > 600) {
+ if (++count > 1200) {
CL_PAGE_DEBUG(D_ERROR, env,
cl_page_top(slice->cpl_page),
"XXX page %p blocked on acquiring the"
/* lock_page(vmpage); */
wait_on_page_writeback(vmpage);
+ return 0;
}
static void vvp_page_assume(const struct lu_env *env,
/* LASSERT_SEM_LOCKED(&inode->i_alloc_sem); */
}
-static void vvp_transient_page_own(const struct lu_env *env,
- const struct cl_page_slice *slice,
- struct cl_io *unused)
+static int vvp_transient_page_own(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *unused, int nonblock)
{
vvp_transient_page_verify(slice->cpl_page);
+ return 0;
}
static void vvp_transient_page_assume(const struct lu_env *env,
EXIT;
}
-static void lov_page_own(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *io)
+static int lov_page_own(const struct lu_env *env,
+ const struct cl_page_slice *slice, struct cl_io *io,
+ int nonblock)
{
struct lov_io *lio = lov_env_io(env);
struct lov_io_sub *sub;
lov_sub_put(sub);
} else
LBUG(); /* Arrgh */
- EXIT;
+ RETURN(0);
}
static void lov_page_assume(const struct lu_env *env,
const struct cl_page_slice *slice, struct cl_io *io)
{
- return lov_page_own(env, slice, io);
+ lov_page_own(env, slice, io, 0);
}
static int lov_page_print(const struct lu_env *env,
io->ci_obj = cl_object_top(descr->cld_obj);
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result == 0) {
+ int nonblock = 1;
+
+restart:
cl_2queue_init(queue);
cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
- descr->cld_end, &queue->c2_qin);
+ descr->cld_end, &queue->c2_qin, nonblock);
page_count = queue->c2_qin.pl_nr;
if (page_count > 0) {
result = cl_page_list_unmap(env, io, &queue->c2_qin);
cl_2queue_disown(env, io, queue);
}
cl_2queue_fini(env, queue);
+
+ if (nonblock) {
+ nonblock = 0;
+ goto restart;
+ }
}
cl_io_fini(env, io);
RETURN(result);
*/
void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io, pgoff_t start, pgoff_t end,
- struct cl_page_list *queue)
+ struct cl_page_list *queue, int nonblock)
{
struct cl_object_header *hdr;
struct cl_page *page;
unsigned int nr;
unsigned int i;
unsigned int j;
+ int (*page_own)(const struct lu_env *env,
+ struct cl_io *io,
+ struct cl_page *pg);
ENTRY;
+ page_own = nonblock ? cl_page_own_try : cl_page_own;
+
idx = start;
hdr = cl_object_header(obj);
pvec = cl_env_info(env)->clt_pvec;
spin_unlock(&hdr->coh_page_guard);
for (i = 0; i < j; ++i) {
page = pvec[i];
- if (cl_page_own(env, io, page) == 0)
+ if (page_own(env, io, page) == 0)
cl_page_list_add(queue, page);
lu_ref_del(&page->cp_reference,
"page_list", cfs_current());
EXPORT_SYMBOL(cl_page_is_owned);
/**
- * Owns a page by IO.
+ * Try to own a page by IO.
*
* Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
* into cl_page_state::CPS_OWNED state.
*
* \retval -ve failure, e.g., page was destroyed (and landed in
* cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
+ * or, page was owned by another thread, or in IO.
*
* \see cl_page_disown()
* \see cl_page_operations::cpo_own()
+ * \see cl_page_own_try()
+ * \see cl_page_own
*/
-int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
+static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *pg, int nonblock)
{
int result;
if (pg->cp_state == CPS_FREEING) {
result = -EAGAIN;
} else {
- cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_own));
- PASSERT(env, pg, pg->cp_owner == NULL);
- PASSERT(env, pg, pg->cp_req == NULL);
- pg->cp_owner = io;
- pg->cp_task = current;
- cl_page_owner_set(pg);
- if (pg->cp_state != CPS_FREEING) {
- cl_page_state_set(env, pg, CPS_OWNED);
- result = 0;
- } else {
- cl_page_disown0(env, io, pg);
- result = -EAGAIN;
+ result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
+ (const struct lu_env *,
+ const struct cl_page_slice *,
+ struct cl_io *, int),
+ io, nonblock);
+ if (result == 0) {
+ PASSERT(env, pg, pg->cp_owner == NULL);
+ PASSERT(env, pg, pg->cp_req == NULL);
+ pg->cp_owner = io;
+ pg->cp_task = current;
+ cl_page_owner_set(pg);
+ if (pg->cp_state != CPS_FREEING) {
+ cl_page_state_set(env, pg, CPS_OWNED);
+ } else {
+ cl_page_disown0(env, io, pg);
+ result = -EAGAIN;
+ }
}
}
PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
RETURN(result);
}
+
+/**
+ * Own a page, might be blocked.
+ *
+ * \see cl_page_own0()
+ */
+int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
+{
+ return cl_page_own0(env, io, pg, 0);
+}
EXPORT_SYMBOL(cl_page_own);
/**
+ * Nonblock version of cl_page_own().
+ *
+ * \see cl_page_own0()
+ */
+int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *pg)
+{
+ return cl_page_own0(env, io, pg, 1);
+}
+EXPORT_SYMBOL(cl_page_own_try);
+
+
+/**
* Assume page ownership.
*
* Called when page is already locked by the hosting VM.
}
cl_page_list_init(plist);
- cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist);
+ cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0);
/*
* Since we're purging the pages of an object, we don't care
* the possible outcomes of the following functions.
* XXX this is quite expensive check.
*/
cl_page_list_init(list);
- cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list);
+ cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0);
cl_page_list_for_each(page, list)
CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start);
io->ci_obj = cl_object_top(obj);
cl_io_init(env, io, CIT_MISC, io->ci_obj);
cl_page_gang_lookup(env, obj, io,
- descr->cld_start, descr->cld_end, plist);
+ descr->cld_start, descr->cld_end, plist, 0);
cl_lock_page_list_fixup(env, io, lock, plist);
if (plist->pl_nr > 0) {
CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
struct osc_brw_async_args *aa;
const struct obd_async_page_ops *ops;
CFS_LIST_HEAD(rpc_list);
+ CFS_LIST_HEAD(tmp_list);
unsigned int ending_offset;
unsigned starting_offset = 0;
int srvlock = 0;
struct cl_object *clob = NULL;
ENTRY;
- /* If there are HP OAPs we need to handle at least 1 of them,
- * move it the beginning of the pending list for that. */
- if (!list_empty(&lop->lop_urgent)) {
- oap = list_entry(lop->lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
- if (oap->oap_async_flags & ASYNC_HP)
- list_move(&oap->oap_pending_item, &lop->lop_pending);
+ /* ASYNC_HP pages first. At present, when the lock the pages is
+ * to be canceled, the pages covered by the lock will be sent out
+ * with ASYNC_HP. We have to send out them as soon as possible. */
+ list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
+ if (oap->oap_async_flags & ASYNC_HP)
+ list_move(&oap->oap_pending_item, &tmp_list);
+ else
+ list_move_tail(&oap->oap_pending_item, &tmp_list);
+ if (++page_count >= cli->cl_max_pages_per_rpc)
+ break;
}
+ list_splice(&tmp_list, &lop->lop_pending);
+ page_count = 0;
+
/* first we find the pages we're allowed to work with */
list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
oap_pending_item) {
oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
break;
}
+
+ /* If there is a gap at the start of this page, it can't merge
+ * with any previous page, so we'll hand the network a
+ * "fragmented" page array that it can't transfer in 1 RDMA */
+ if (page_count != 0 && oap->oap_page_off != 0)
+ break;
+
/* in llite being 'ready' equates to the page being locked
* until completion unlocks it. commit_write submits a page
* as not ready because its unlock will happen unconditionally
}
}
#endif
- /* If there is a gap at the start of this page, it can't merge
- * with any previous page, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (page_count != 0 && oap->oap_page_off != 0)
- break;
/* take the page out of our book-keeping */
list_del_init(&oap->oap_pending_item);
req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
if (IS_ERR(req)) {
LASSERT(list_empty(&rpc_list));
- /* loi_list_maint(cli, loi); */
+ loi_list_maint(cli, loi);
RETURN(PTR_ERR(req));
}
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
&loi->loi_write_lop);
- if (rc < 0)
- break;
+ if (rc < 0) {
+ CERROR("Write request failed with %d\n", rc);
+
+ /* osc_send_oap_rpc failed, mostly because of
+ * memory pressure.
+ *
+ * It can't break here, because if:
+ * - a page was submitted by osc_io_submit, so
+ * page locked;
+ * - no request in flight
+ * - no subsequent request
+ * The system will be in live-lock state,
+ * because there is no chance to call
+ * osc_io_unplug() and osc_check_rpcs() any
+ * more. pdflush can't help in this case,
+ * because it might be blocked at grabbing
+ * the page lock as we mentioned.
+ *
+ * Anyway, continue to drain pages. */
+ /* break; */
+ }
+
if (rc > 0)
race_counter = 0;
else
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
&loi->loi_read_lop);
if (rc < 0)
- break;
+ CERROR("Read request failed with %d\n", rc);
+
if (rc > 0)
race_counter = 0;
else