static int osc_del_shrink_grant(struct client_obd *client)
{
- CDEBUG(D_CACHE, "del grant client %s \n",
- client->cl_import->imp_obd->obd_name);
- return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
+ return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
+ TIMEOUT_GRANT);
}
static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
struct obdo *saved_oa = NULL;
struct brw_page **ppga, **orig;
struct obd_import *imp = class_exp2cliimp(exp);
- struct client_obd *cli = &imp->imp_obd->u.cli;
+ struct client_obd *cli;
int rc, page_count_orig;
ENTRY;
+ LASSERT((imp != NULL) && (imp->imp_obd != NULL));
+ cli = &imp->imp_obd->u.cli;
+
if (cmd & OBD_BRW_CHECK) {
/* The caller just wants to know if there's a chance that this
* I/O can succeed */
- if (imp == NULL || imp->imp_invalid)
+ if (imp->imp_invalid)
RETURN(-EIO);
RETURN(0);
}
RETURN(0);
}
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_HP) {
+ CDEBUG(D_CACHE, "hp request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
+
static void on_list(struct list_head *item, struct list_head *list,
int should_be_on)
{
* can find pages to build into rpcs quickly */
void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_hprpc(&loi->loi_write_lop) ||
+ lop_makes_hprpc(&loi->loi_read_lop)) {
+ /* HP rpc */
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+ } else {
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ }
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
else
lop = &oap->oap_loi->loi_read_lop;
- if (oap->oap_async_flags & ASYNC_URGENT)
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
struct cl_object *clob = NULL;
ENTRY;
+ /* If there are HP OAPs we need to handle at least 1 of them,
+ * move it the beginning of the pending list for that. */
+ if (!list_empty(&lop->lop_urgent)) {
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ if (oap->oap_async_flags & ASYNC_HP)
+ list_move(&oap->oap_pending_item, &lop->lop_pending);
+ }
+
/* first we find the pages we're allowed to work with */
list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
oap_pending_item) {
#define LOI_DEBUG(LOI, STR, args...) \
CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_cli_item), \
+ !list_empty(&(LOI)->loi_ready_item) || \
+ !list_empty(&(LOI)->loi_hp_ready_item), \
(LOI)->loi_write_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
struct lov_oinfo *osc_next_loi(struct client_obd *cli)
{
ENTRY;
- /* first return all objects which we already know to have
- * pages ready to be stuffed into rpcs */
+
+ /* First return objects that have blocked locks so that they
+ * will be flushed quickly and other clients can get the lock,
+ * then objects which have pages ready to be stuffed into RPCs */
+ if (!list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+ struct lov_oinfo, loi_hp_ready_item));
if (!list_empty(&cli->cl_loi_ready_list))
RETURN(list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_cli_item));
+ struct lov_oinfo, loi_ready_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
RETURN(NULL);
}
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+ struct osc_async_page *oap;
+ int hprpc = 0;
+
+ if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
/* called with the loi list lock held */
void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
{
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
- if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+ if (osc_max_rpc_in_flight(cli, loi))
break;
/* attempt some read/write balancing by alternating between
/* attempt some inter-object balancing by issueing rpcs
* for each object in turn */
- if (!list_empty(&loi->loi_cli_item))
- list_del_init(&loi->loi_cli_item);
+ if (!list_empty(&loi->loi_hp_ready_item))
+ list_del_init(&loi->loi_hp_ready_item);
+ if (!list_empty(&loi->loi_ready_item))
+ list_del_init(&loi->loi_ready_item);
if (!list_empty(&loi->loi_write_item))
list_del_init(&loi->loi_write_item);
if (!list_empty(&loi->loi_read_item))
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
oap->oap_async_flags |= ASYNC_READY;
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
- if (list_empty(&oap->oap_rpc_item)) {
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+ list_empty(&oap->oap_rpc_item)) {
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- loi_list_maint(cli, loi);
- }
+ else
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ oap->oap_async_flags |= ASYNC_URGENT;
+ loi_list_maint(cli, loi);
}
LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
- oap->oap_async_flags &= ~ASYNC_URGENT;
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
}
if (!list_empty(&oap->oap_pending_item)) {
list_del_init(&oap->oap_pending_item);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_lvb,
aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
/* Release the lock for async request. */
if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
/*
obd);
}
- osc_del_shrink_grant(&obd->u.cli);
rc = client_disconnect_export(exp);
+ /**
+ * Initially we put del_shrink_grant before disconnect_export, but it
+ * causes the following problem if setup (connect) and cleanup
+ * (disconnect) are tangled together.
+ * connect p1 disconnect p2
+ * ptlrpc_connect_import
+ * ............... class_manual_cleanup
+ * osc_disconnect
+ * del_shrink_grant
+ * ptlrpc_connect_interrupt
+ * init_grant_shrink
+ * add this client to shrink list
+ * cleanup_osc
+ * Bang! pinger trigger the shrink.
+ * So the osc should be disconnected from the shrink list, after we
+ * are sure the import has been destroyed. BUG18662
+ */
+ if (obd->u.cli.cl_import == NULL)
+ osc_del_shrink_grant(&obd->u.cli);
return rc;
}