RETURN(0);
}
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_HP) {
+ CDEBUG(D_CACHE, "hp request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
+
static void on_list(struct list_head *item, struct list_head *list,
int should_be_on)
{
* can find pages to build into rpcs quickly */
static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_hprpc(&loi->loi_write_lop) ||
+ lop_makes_hprpc(&loi->loi_read_lop)) {
+ /* HP rpc */
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+ } else {
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ }
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
else
lop = &oap->oap_loi->loi_read_lop;
- if (oap->oap_async_flags & ASYNC_URGENT)
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
int srvlock = 0;
ENTRY;
+ /* If there are HP OAPs we need to handle at least 1 of them,
+ * move it the beginning of the pending list for that. */
+ if (!list_empty(&lop->lop_urgent)) {
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ if (oap->oap_async_flags & ASYNC_HP)
+ list_move(&oap->oap_pending_item, &lop->lop_pending);
+ }
+
/* first we find the pages we're allowed to work with */
list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
ops = oap->oap_caller_ops;
#define LOI_DEBUG(LOI, STR, args...) \
CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_cli_item), \
+ !list_empty(&(LOI)->loi_ready_item) || \
+ !list_empty(&(LOI)->loi_hp_ready_item), \
(LOI)->loi_write_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
struct lov_oinfo *osc_next_loi(struct client_obd *cli)
{
ENTRY;
- /* first return all objects which we already know to have
- * pages ready to be stuffed into rpcs */
+ /* First return objects that have blocked locks so that they
+ * will be flushed quickly and other clients can get the lock,
+ * then objects which have pages ready to be stuffed into RPCs */
+ if (!list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+ struct lov_oinfo, loi_hp_ready_item));
if (!list_empty(&cli->cl_loi_ready_list))
RETURN(list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_cli_item));
+ struct lov_oinfo, loi_ready_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
RETURN(NULL);
}
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+ struct osc_async_page *oap;
+ int hprpc = 0;
+
+ if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
/* called with the loi list lock held */
static void osc_check_rpcs(struct client_obd *cli)
{
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
- if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+ if (osc_max_rpc_in_flight(cli, loi))
break;
/* attempt some read/write balancing by alternating between
/* attempt some inter-object balancing by issueing rpcs
* for each object in turn */
- if (!list_empty(&loi->loi_cli_item))
- list_del_init(&loi->loi_cli_item);
+ if (!list_empty(&loi->loi_hp_ready_item))
+ list_del_init(&loi->loi_hp_ready_item);
+ if (!list_empty(&loi->loi_ready_item))
+ list_del_init(&loi->loi_ready_item);
if (!list_empty(&loi->loi_write_item))
list_del_init(&loi->loi_write_item);
if (!list_empty(&loi->loi_read_item))
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
oap->oap_async_flags |= ASYNC_READY;
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
- if (list_empty(&oap->oap_rpc_item)) {
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+ list_empty(&oap->oap_rpc_item)) {
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- loi_list_maint(cli, loi);
- }
+ else
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ oap->oap_async_flags |= ASYNC_URGENT;
+ loi_list_maint(cli, loi);
}
LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
- oap->oap_async_flags &= ~ASYNC_URGENT;
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
}
+
if (!list_empty(&oap->oap_pending_item)) {
list_del_init(&oap->oap_pending_item);
lop_update_pending(cli, lop, oap->oap_cmd, -1);