static void osc_check_rpcs(struct client_obd *cli);
static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
+static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
static void osc_complete_oap(struct client_obd *cli,
struct osc_async_page *oap, int rc)
/* the loi lock is held across this function but it's allowed to release
* and reacquire it during its work */
-static int osc_send_oap_rpc(struct client_obd *cli, int cmd,
- struct loi_oap_pages *lop)
+static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
+ int cmd, struct loi_oap_pages *lop)
{
struct ptlrpc_request *request;
obd_count page_count = 0;
if (page_count == 0)
RETURN(0);
+ loi_list_maint(cli, loi);
spin_unlock(&cli->cl_loi_list_lock);
request = osc_build_req(cli, &rpc_list, page_count, cmd);
list_add(&oap->oap_urgent_item,
&lop->lop_urgent);
}
+ loi_list_maint(cli, loi);
RETURN(PTR_ERR(request));
}
int optimal;
ENTRY;
- /* stream rpcs until we complete the urgent pages in the object */
+ if (lop->lop_num_pending == 0)
+ RETURN(0);
+
+ /* stream rpcs in queue order as long as as there is an urgent page
+ * queued. this is our cheap solution for good batching in the case
+ * where writepage marks some random page in the middle of the file as
+ * urgent because of, say, memory pressure */
if (!list_empty(&lop->lop_urgent))
RETURN(1);
/* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
optimal = cli->cl_max_pages_per_rpc;
- /* *2 to avoid triggering rpcs that would want to include pages that
- * are being queued but which can't be made ready until the queuer
- * finishes with the page. this is a wart for llite::commit_write() */
- if (cmd == OBD_BRW_WRITE)
+ if (cmd == OBD_BRW_WRITE) {
+ /* trigger a write rpc stream as long as there are dirtiers
+ * waiting for space. as they're waiting, they're not going to
+ * create more pages to coallesce with what's waiting.. */
+ if (!list_empty(&cli->cl_cache_waiters))
+ RETURN(1);
+
+ /* *2 to avoid triggering rpcs that would want to include pages
+ * that are being queued but which can't be made ready until
+ * the queuer finishes with the page. this is a wart for
+ * llite::commit_write() */
optimal *= 2;
+ }
if (lop->lop_num_pending >= optimal)
RETURN(1);
- /* trigger a write rpc stream as long as there are dirtiers waiting
- * for space. as they're waiting, they're not going to create more
- * pages to coallesce with what's waiting.. */
- if (!list_empty(&cli->cl_cache_waiters))
- RETURN(1);
-
RETURN(0);
}
-static int loi_makes_rpc(struct client_obd *cli, struct lov_oinfo *loi)
+static void on_list(struct list_head *item, struct list_head *list,
+ int should_be_on)
{
- return lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ);
+ if (list_empty(item) && should_be_on)
+ list_add_tail(item, list);
+ else if (!list_empty(item) && !should_be_on)
+ list_del_init(item);
}
-static void loi_onto_ready_list(struct client_obd *cli, struct lov_oinfo *loi)
+/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
+ * can find pages to build into rpcs quickly */
+static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- if (list_empty(&loi->loi_cli_item) && loi_makes_rpc(cli, loi))
- list_add_tail(&loi->loi_cli_item, &cli->cl_loi_ready_list);
+ on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+
+ on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
+ loi->loi_write_lop.lop_num_pending);
}
#define LOI_DEBUG(LOI, STR, args...) \
- CDEBUG(D_INODE, "loi rdy %d [%p,%p] wr %d:%d rd %d:%d " STR, \
+ CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
!list_empty(&(LOI)->loi_cli_item), \
- (LOI)->loi_cli_item.next, \
- (LOI)->loi_cli_item.prev, \
(LOI)->loi_write_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_read_lop.lop_urgent), \
args) \
+struct lov_oinfo *osc_next_loi(struct client_obd *cli)
+{
+ ENTRY;
+ /* first return all objects which we already know to have
+ * pages ready to be stuffed into rpcs */
+ if (!list_empty(&cli->cl_loi_ready_list))
+ RETURN(list_entry(cli->cl_loi_ready_list.next,
+ struct lov_oinfo, loi_cli_item));
+
+ /* then if we have cache waiters, return all objects with queued
+ * writes. This is especially important when many small files
+ * have filled up the cache and not been fired into rpcs because
+ * they don't pass the nr_pending/object threshhold */
+ if (!list_empty(&cli->cl_cache_waiters) &&
+ !list_empty(&cli->cl_loi_write_list))
+ RETURN(list_entry(cli->cl_loi_write_list.next,
+ struct lov_oinfo, loi_write_item));
+ RETURN(NULL);
+}
+
/* called with the loi list lock held */
static void osc_check_rpcs(struct client_obd *cli)
{
struct lov_oinfo *loi;
- int rc = 0, making_progress;
+ int rc = 0, race_counter = 0;
ENTRY;
- if (list_empty(&cli->cl_loi_ready_list)) {
- CDEBUG(D_INODE, "no lois ready\n");
- EXIT;
- return;
- }
+ while ((loi = osc_next_loi(cli)) != NULL) {
- while (!list_empty(&cli->cl_loi_ready_list)) {
- loi = list_entry(cli->cl_loi_ready_list.next, struct lov_oinfo,
- loi_cli_item);
+ LOI_DEBUG(loi, "%d in flight", cli->cl_brw_in_flight);
if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight)
break;
- making_progress = 0;
-
- /* hmm, it occurs to me that having rpc preparation fail
- * with num_pending == num_urgent means that there won't
- * be any more calls into here unless other traffic comes
- * in. hmm. */
-
/* attempt some read/write balancing by alternating between
- * reads and writes in an object */
+ * reads and writes in an object. The makes_rpc checks here
+ * would be redundant if we were getting read/write work items
+ * instead of objects. we don't want send_oap_rpc to drain a
+ * partial read pending queue when we're given this object to
+ * do io on writes while there are cache waiters */
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
- rc = osc_send_oap_rpc(cli, OBD_BRW_WRITE,
+ rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
&loi->loi_write_lop);
if (rc < 0)
break;
if (rc > 0)
- making_progress++;
+ race_counter = 0;
+ else
+ race_counter++;
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
- rc = osc_send_oap_rpc(cli, OBD_BRW_READ,
+ rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
&loi->loi_read_lop);
if (rc < 0)
break;
if (rc > 0)
- making_progress++;
+ race_counter = 0;
+ else
+ race_counter++;
}
/* attempt some inter-object balancing by issueing rpcs
* for each object in turn */
if (!list_empty(&loi->loi_cli_item))
list_del_init(&loi->loi_cli_item);
+ if (!list_empty(&loi->loi_write_item))
+ list_del_init(&loi->loi_write_item);
- loi_onto_ready_list(cli, loi);
-
- LOI_DEBUG(loi, "mp %d\n", making_progress);
+ loi_list_maint(cli, loi);
- /* could be smarter, !making_progress can happen in theory
- * if all the pages can not be locked in set_io_ready */
- if (!making_progress)
+ /* send_oap_rpc fails with 0 when make_ready tells it to
+ * back off. llite's make_ready does this when it tries
+ * to lock a page queued for write that is already locked.
+ * we want to try sending rpcs from many objects, but we
+ * don't want to spin failing with 0. */
+ if (race_counter == 10)
break;
}
EXIT;
/* make sure that there are write rpcs in flight to wait for. this
* is a little silly as this object may not have any pending
* but other objects sure might. this should probably be cleaned. */
- loi_onto_ready_list(cli, loi);
+ loi_list_maint(cli, loi);
osc_check_rpcs(cli);
spin_unlock(&cli->cl_loi_list_lock);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(cli, lop, cmd, 1);
- loi_onto_ready_list(cli, loi);
+ loi_list_maint(cli, loi);
LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
cmd);
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
if (list_empty(&oap->oap_rpc_item)) {
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- loi_onto_ready_list(cli, loi);
+ loi_list_maint(cli, loi);
}
}
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
lop_update_pending(cli, lop, cmd, 1);
}
- loi_onto_ready_list(cli, loi);
+ loi_list_maint(cli, loi);
}
static int osc_trigger_sync_io(struct obd_export *exp,
list_del_init(&oap->oap_pending_item);
lop_update_pending(cli, lop, oap->oap_cmd, -1);
}
- if (!list_empty(&loi->loi_cli_item) && !loi_makes_rpc(cli, loi))
- list_del_init(&loi->loi_cli_item);
+ loi_list_maint(cli, loi);
LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
out: