static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
+ int ptlrpc);
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
int cmd)
{
- int optimal;
ENTRY;
if (lop->lop_num_pending == 0)
CDEBUG(D_CACHE, "urgent request forcing RPC\n");
RETURN(1);
}
- /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
- optimal = cli->cl_max_pages_per_rpc;
+
if (cmd & OBD_BRW_WRITE) {
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
- /* +16 to avoid triggering rpcs that would want to include pages
- * that are being queued but which can't be made ready until
- * the queuer finishes with the page. this is a wart for
- * llite::commit_write() */
- optimal += 16;
}
- if (lop->lop_num_pending >= optimal)
+ if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
RETURN(1);
RETURN(0);
EXIT;
}
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+ struct client_obd *cli = data;
+
+ CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ osc_check_rpcs0(env, cli, 1);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(0);
+}
+
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
}
cli = aa->aa_cli;
-
client_obd_list_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
- osc_check_rpcs(env, cli);
+ osc_check_rpcs0(env, cli, 1);
client_obd_list_unlock(&cli->cl_loi_list_lock);
+
if (!async)
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
*/
static int
osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
- struct lov_oinfo *loi,
- int cmd, struct loi_oap_pages *lop)
+ struct lov_oinfo *loi, int cmd,
+ struct loi_oap_pages *lop, pdl_policy_t pol)
{
struct ptlrpc_request *req;
obd_count page_count = 0;
* single ptlrpcd thread cannot process in time. So more ptlrpcd
* threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
*/
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req, pol, -1);
RETURN(1);
}
}
/* called with the loi list lock held */
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
{
struct lov_oinfo *loi;
int rc = 0, race_counter = 0;
+ pdl_policy_t pol;
ENTRY;
+ pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
+
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
* do io on writes while there are cache waiters */
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
- &loi->loi_write_lop);
+ &loi->loi_write_lop, pol);
if (rc < 0) {
CERROR("Write request failed with %d\n", rc);
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
- &loi->loi_read_lop);
+ &loi->loi_read_lop, pol);
if (rc < 0)
CERROR("Read request failed with %d\n", rc);
if (race_counter == 10)
break;
}
- EXIT;
+}
+
+void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+{
+ osc_check_rpcs0(env, cli, 0);
}
/* we're trying to queue a page in the osc so we're subject to the
}
}
- osc_oap_to_pending(oap);
- loi_list_maint(cli, loi);
-
LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
cmd);
- osc_check_rpcs(env, cli);
+ osc_oap_to_pending(oap);
+ loi_list_maint(cli, loi);
+ if (!osc_max_rpc_in_flight(cli, loi) &&
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
+ LASSERT(cli->cl_writeback_work != NULL);
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+ CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+ cli, rc);
+ }
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
+ struct client_obd *cli = &obd->u.cli;
int rc;
ENTRY;
RETURN(rc);
rc = client_obd_setup(obd, lcfg);
- if (rc) {
- ptlrpcd_decref();
- } else {
+ if (rc == 0) {
+ void *handler;
+ handler = ptlrpcd_alloc_work(cli->cl_import,
+ brw_queue_work, cli);
+ if (!IS_ERR(handler))
+ cli->cl_writeback_work = handler;
+ else
+ rc = PTR_ERR(handler);
+ }
+
+ if (rc == 0) {
struct lprocfs_static_vars lvars = { 0 };
- struct client_obd *cli = &obd->u.cli;
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
+ if (rc)
+ ptlrpcd_decref();
RETURN(rc);
}
break;
}
case OBD_CLEANUP_EXPORTS: {
+ struct client_obd *cli = &obd->u.cli;
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
* client_disconnect_export()
*/
obd_zombie_barrier();
+ if (cli->cl_writeback_work) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
obd_cleanup_client_import(obd);
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);