CIS_FINI
};
+enum cl_req_priority {
+ CRP_NORMAL,
+ CRP_CANCEL
+};
+
/**
* IO state private for a layer.
*
int (*cio_submit)(const struct lu_env *env,
const struct cl_io_slice *slice,
enum cl_req_type crt,
- struct cl_2queue *queue);
+ struct cl_2queue *queue,
+ enum cl_req_priority priority);
} req_op[CRT_NR];
/**
* Read missing page.
int cl_io_commit_write (const struct lu_env *env, struct cl_io *io,
struct cl_page *page, unsigned from, unsigned to);
int cl_io_submit_rw (const struct lu_env *env, struct cl_io *io,
- enum cl_req_type iot, struct cl_2queue *queue);
+ enum cl_req_type iot, struct cl_2queue *queue,
+ enum cl_req_priority priority);
void cl_io_rw_advance (const struct lu_env *env, struct cl_io *io,
size_t nob);
int cl_io_cancel (const struct lu_env *env, struct cl_io *io,
/* used by the osc to keep track of what objects to build into rpcs */
struct loi_oap_pages loi_read_lop;
struct loi_oap_pages loi_write_lop;
- /* _cli_ is poorly named, it should be _ready_ */
- struct list_head loi_cli_item;
+ struct list_head loi_ready_item;
+ struct list_head loi_hp_ready_item;
struct list_head loi_write_item;
struct list_head loi_read_item;
CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending);
CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent);
CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
- CFS_INIT_LIST_HEAD(&loi->loi_cli_item);
+ CFS_INIT_LIST_HEAD(&loi->loi_ready_item);
+ CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item);
CFS_INIT_LIST_HEAD(&loi->loi_write_item);
CFS_INIT_LIST_HEAD(&loi->loi_read_item);
}
*/
client_obd_lock_t cl_loi_list_lock;
struct list_head cl_loi_ready_list;
+ struct list_head cl_loi_hp_ready_list;
struct list_head cl_loi_write_list;
struct list_head cl_loi_read_list;
int cl_r_in_flight;
cli->cl_dirty_max = num_physpages << (CFS_PAGE_SHIFT - 3);
CFS_INIT_LIST_HEAD(&cli->cl_cache_waiters);
CFS_INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+ CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list);
CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list);
client_obd_list_lock_init(&cli->cl_loi_list_lock);
/* printk("Inited anchor with %d pages\n", npages); */
if (rc == 0) {
- rc = cl_io_submit_rw(env, io,
- io->ci_type == CIT_READ ? CRT_READ :
- CRT_WRITE,
- queue);
+ enum cl_req_type crt;
+
+ crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
+ rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
if (rc == 0) {
/* If some pages weren't sent for any reason, count
* then as completed, to avoid infinite wait. */
*/
set_page_dirty(vmpage);
cl_2queue_init_page(queue, page);
- result = cl_io_submit_rw(env, io, CRT_WRITE, queue);
+ result = cl_io_submit_rw(env, io, CRT_WRITE,
+ queue, CRP_NORMAL);
cl_page_list_disown(env, io, &queue->c2_qin);
if (result != 0) {
/*
if (rc == 0) {
rc = cl_io_submit_rw(env, io, rw == READ ? CRT_READ : CRT_WRITE,
- queue);
+ queue, CRP_NORMAL);
if (rc == 0) {
/*
* If some pages weren't sent for any reason (e.g.,
cl_sync_io_init(anchor, 1);
cp->cpg_sync_io = anchor;
cl_page_clip(env, page, 0, to);
- result = cl_io_submit_rw(env, io, crt, queue);
+ result = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
if (result == 0)
result = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
else
*/
static int lov_io_submit(const struct lu_env *env,
const struct cl_io_slice *ios,
- enum cl_req_type crt, struct cl_2queue *queue)
+ enum cl_req_type crt, struct cl_2queue *queue,
+ enum cl_req_priority priority)
{
struct lov_io *lio = cl2lov_io(env, ios);
struct lov_object *obj = lio->lis_object;
sub = lov_sub_get(env, lio, idx);
LASSERT(!IS_ERR(sub));
LASSERT(sub->sub_io == &lio->lis_single_subio);
- rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, crt, queue);
+ rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+ crt, queue, priority);
lov_sub_put(sub);
RETURN(rc);
}
sub = lov_sub_get(env, lio, stripe);
if (!IS_ERR(sub)) {
rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
- crt, cl2q);
+ crt, cl2q, priority);
lov_sub_put(sub);
} else
rc = PTR_ERR(sub);
}
}
if (result == 0)
- result = cl_io_submit_rw(env, io, CRT_READ, queue);
+ result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL);
/*
* Unlock unsent pages in case of error.
*/
* \see cl_io_operations::cio_submit()
*/
int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
- enum cl_req_type crt, struct cl_2queue *queue)
+ enum cl_req_type crt, struct cl_2queue *queue,
+ enum cl_req_priority priority)
{
const struct cl_io_slice *scan;
int result = 0;
if (scan->cis_iop->req_op[crt].cio_submit == NULL)
continue;
result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
- queue);
+ queue, priority);
if (result != 0)
break;
}
if (queue->c2_qin.pl_nr > 0) {
result = cl_page_list_unmap(env, io, &queue->c2_qin);
if (!discard) {
- rc0 = cl_io_submit_rw(env, io,
- CRT_WRITE, queue);
+ rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
+ queue, CRP_CANCEL);
rc1 = cl_page_list_own(env, io,
&queue->c2_qout);
result = result ?: rc0 ?: rc1;
enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
async = async && (typ == CRT_WRITE);
- rc = (async ? cl_echo_async_brw : cl_io_submit_rw)(env, io,
- typ, queue);
+ if (async)
+ rc = cl_echo_async_brw(env, io, typ, queue);
+ else
+ rc = cl_io_submit_rw(env, io,typ, queue, CRP_NORMAL);
CDEBUG(D_INFO, "echo_client %s write returns %d\n",
async ? "async" : "sync", rc);
if (rc == 0) {
ASYNC_COUNT_STABLE = 0x4, /* ap_refresh_count will not be called
to give the caller a chance to update
or cancel the size of the io */
+ ASYNC_HP = 0x10,
};
struct obd_async_page_ops {
*/
static int osc_io_submit(const struct lu_env *env,
const struct cl_io_slice *ios,
- enum cl_req_type crt, struct cl_2queue *queue)
+ enum cl_req_type crt, struct cl_2queue *queue,
+ enum cl_req_priority priority)
{
struct cl_page *page;
struct cl_page *tmp;
osc = cl2osc(opg->ops_cl.cpl_obj);
exp = osc_export(osc);
+ if (priority > CRP_NORMAL)
+ oap->oap_async_flags |= ASYNC_HP;
/*
* This can be checked without cli->cl_loi_list_lock, because
* ->oap_*_item are always manipulated when the page is owned.
RETURN(0);
}
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_HP) {
+ CDEBUG(D_CACHE, "hp request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
+
static void on_list(struct list_head *item, struct list_head *list,
int should_be_on)
{
* can find pages to build into rpcs quickly */
void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_hprpc(&loi->loi_write_lop) ||
+ lop_makes_hprpc(&loi->loi_read_lop)) {
+ /* HP rpc */
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+ } else {
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ }
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
else
lop = &oap->oap_loi->loi_read_lop;
- if (oap->oap_async_flags & ASYNC_URGENT)
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
struct cl_object *clob = NULL;
ENTRY;
+ /* If there are HP OAPs we need to handle at least 1 of them,
+ * move it the beginning of the pending list for that. */
+ if (!list_empty(&lop->lop_urgent)) {
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ if (oap->oap_async_flags & ASYNC_HP)
+ list_move(&oap->oap_pending_item, &lop->lop_pending);
+ }
+
/* first we find the pages we're allowed to work with */
list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
oap_pending_item) {
#define LOI_DEBUG(LOI, STR, args...) \
CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_cli_item), \
+ !list_empty(&(LOI)->loi_ready_item) || \
+ !list_empty(&(LOI)->loi_hp_ready_item), \
(LOI)->loi_write_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
struct lov_oinfo *osc_next_loi(struct client_obd *cli)
{
ENTRY;
- /* first return all objects which we already know to have
- * pages ready to be stuffed into rpcs */
+
+ /* First return objects that have blocked locks so that they
+ * will be flushed quickly and other clients can get the lock,
+ * then objects which have pages ready to be stuffed into RPCs */
+ if (!list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+ struct lov_oinfo, loi_hp_ready_item));
if (!list_empty(&cli->cl_loi_ready_list))
RETURN(list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_cli_item));
+ struct lov_oinfo, loi_ready_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
RETURN(NULL);
}
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+ struct osc_async_page *oap;
+ int hprpc = 0;
+
+ if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
/* called with the loi list lock held */
void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
{
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
- if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+ if (osc_max_rpc_in_flight(cli, loi))
break;
/* attempt some read/write balancing by alternating between
/* attempt some inter-object balancing by issueing rpcs
* for each object in turn */
- if (!list_empty(&loi->loi_cli_item))
- list_del_init(&loi->loi_cli_item);
+ if (!list_empty(&loi->loi_hp_ready_item))
+ list_del_init(&loi->loi_hp_ready_item);
+ if (!list_empty(&loi->loi_ready_item))
+ list_del_init(&loi->loi_ready_item);
if (!list_empty(&loi->loi_write_item))
list_del_init(&loi->loi_write_item);
if (!list_empty(&loi->loi_read_item))
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
oap->oap_async_flags |= ASYNC_READY;
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
- if (list_empty(&oap->oap_rpc_item)) {
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+ list_empty(&oap->oap_rpc_item)) {
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- loi_list_maint(cli, loi);
- }
+ else
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ oap->oap_async_flags |= ASYNC_URGENT;
+ loi_list_maint(cli, loi);
}
LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
- oap->oap_async_flags &= ~ASYNC_URGENT;
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
}
if (!list_empty(&oap->oap_pending_item)) {
list_del_init(&oap->oap_pending_item);