Add a new "async" parameter to osc_check_rpcs(); if it is called with
async, it will compose a fake ptlrpc_request so that RPCs will be
composed and issued in ptlrpcd context.
Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I1c2f8ae43da8146428c474f17ddf3dc23a2df9ef
Reviewed-on: http://review.whamcloud.com/1825
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
struct lu_client_seq *cl_seq;
cfs_atomic_t cl_resends; /* resend count */
struct lu_client_seq *cl_seq;
cfs_atomic_t cl_resends; /* resend count */
+
+ /* ptlrpc work for writeback in ptlrpcd context */
+ void *cl_writeback_work;
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
};
#define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
struct lov_stripe_md *lsm, struct lov_oinfo *loi,
struct osc_async_page *oap, int cmd, int off,
int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
struct lov_stripe_md *lsm, struct lov_oinfo *loi,
struct osc_async_page *oap, int cmd, int off,
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
+ int ptlrpc);
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
int cmd)
{
static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
int cmd)
{
ENTRY;
if (lop->lop_num_pending == 0)
ENTRY;
if (lop->lop_num_pending == 0)
CDEBUG(D_CACHE, "urgent request forcing RPC\n");
RETURN(1);
}
CDEBUG(D_CACHE, "urgent request forcing RPC\n");
RETURN(1);
}
- /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
- optimal = cli->cl_max_pages_per_rpc;
if (cmd & OBD_BRW_WRITE) {
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
if (cmd & OBD_BRW_WRITE) {
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
- /* +16 to avoid triggering rpcs that would want to include pages
- * that are being queued but which can't be made ready until
- * the queuer finishes with the page. this is a wart for
- * llite::commit_write() */
- optimal += 16;
- if (lop->lop_num_pending >= optimal)
+ if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+ struct client_obd *cli = data;
+
+ CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ osc_check_rpcs0(env, cli, 1);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(0);
+}
+
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
client_obd_list_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
client_obd_list_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
- osc_check_rpcs(env, cli);
+ osc_check_rpcs0(env, cli, 1);
client_obd_list_unlock(&cli->cl_loi_list_lock);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!async)
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
if (!async)
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
*/
static int
osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
*/
static int
osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
- struct lov_oinfo *loi,
- int cmd, struct loi_oap_pages *lop)
+ struct lov_oinfo *loi, int cmd,
+ struct loi_oap_pages *lop, pdl_policy_t pol)
{
struct ptlrpc_request *req;
obd_count page_count = 0;
{
struct ptlrpc_request *req;
obd_count page_count = 0;
* single ptlrpcd thread cannot process in time. So more ptlrpcd
* threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
*/
* single ptlrpcd thread cannot process in time. So more ptlrpcd
* threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
*/
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req, pol, -1);
}
/* called with the loi list lock held */
}
/* called with the loi list lock held */
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
{
struct lov_oinfo *loi;
int rc = 0, race_counter = 0;
{
struct lov_oinfo *loi;
int rc = 0, race_counter = 0;
+ pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
+
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
* do io on writes while there are cache waiters */
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
* do io on writes while there are cache waiters */
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
+ &loi->loi_write_lop, pol);
if (rc < 0) {
CERROR("Write request failed with %d\n", rc);
if (rc < 0) {
CERROR("Write request failed with %d\n", rc);
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
+ &loi->loi_read_lop, pol);
if (rc < 0)
CERROR("Read request failed with %d\n", rc);
if (rc < 0)
CERROR("Read request failed with %d\n", rc);
if (race_counter == 10)
break;
}
if (race_counter == 10)
break;
}
+}
+
+void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+{
+ osc_check_rpcs0(env, cli, 0);
}
/* we're trying to queue a page in the osc so we're subject to the
}
/* we're trying to queue a page in the osc so we're subject to the
- osc_oap_to_pending(oap);
- loi_list_maint(cli, loi);
-
LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
cmd);
LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
cmd);
- osc_check_rpcs(env, cli);
+ osc_oap_to_pending(oap);
+ loi_list_maint(cli, loi);
+ if (!osc_max_rpc_in_flight(cli, loi) &&
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
+ LASSERT(cli->cl_writeback_work != NULL);
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+ CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+ cli, rc);
+ }
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
+ struct client_obd *cli = &obd->u.cli;
RETURN(rc);
rc = client_obd_setup(obd, lcfg);
RETURN(rc);
rc = client_obd_setup(obd, lcfg);
- if (rc) {
- ptlrpcd_decref();
- } else {
+ if (rc == 0) {
+ void *handler;
+ handler = ptlrpcd_alloc_work(cli->cl_import,
+ brw_queue_work, cli);
+ if (!IS_ERR(handler))
+ cli->cl_writeback_work = handler;
+ else
+ rc = PTR_ERR(handler);
+ }
+
+ if (rc == 0) {
struct lprocfs_static_vars lvars = { 0 };
struct lprocfs_static_vars lvars = { 0 };
- struct client_obd *cli = &obd->u.cli;
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
+ if (rc)
+ ptlrpcd_decref();
break;
}
case OBD_CLEANUP_EXPORTS: {
break;
}
case OBD_CLEANUP_EXPORTS: {
+ struct client_obd *cli = &obd->u.cli;
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
* client_disconnect_export()
*/
obd_zombie_barrier();
* client_disconnect_export()
*/
obd_zombie_barrier();
+ if (cli->cl_writeback_work) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
obd_cleanup_client_import(obd);
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
obd_cleanup_client_import(obd);
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
{
int req_ok = req->rq_reqmsg != NULL;
int rep_ok = req->rq_repmsg != NULL;
{
int req_ok = req->rq_reqmsg != NULL;
int rep_ok = req->rq_repmsg != NULL;
+ lnet_nid_t nid = LNET_NID_ANY;
va_list args;
if (ptlrpc_req_need_swab(req)) {
va_list args;
if (ptlrpc_req_need_swab(req)) {
rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
}
rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
}
+ if (req->rq_import && req->rq_import->imp_connection)
+ nid = req->rq_import->imp_connection->c_peer.nid;
+ else if (req->rq_export && req->rq_export->exp_connection)
+ nid = req->rq_export->exp_connection->c_peer.nid;
+
va_start(args, fmt);
libcfs_debug_vmsg2(data->msg_cdls, data->msg_subsys,mask,data->msg_file,
data->msg_fn, data->msg_line, fmt, args,
va_start(args, fmt);
libcfs_debug_vmsg2(data->msg_cdls, data->msg_subsys,mask,data->msg_file,
data->msg_fn, data->msg_line, fmt, args,
req->rq_export ?
req->rq_export->exp_client_uuid.uuid :
"<?>",
req->rq_export ?
req->rq_export->exp_client_uuid.uuid :
"<?>",
- libcfs_nid2str(req->rq_import ?
- req->rq_import->imp_connection->c_peer.nid :
- req->rq_export ?
- req->rq_export->exp_connection->c_peer.nid:
- LNET_NID_ANY),
req->rq_request_portal, req->rq_reply_portal,
req->rq_reqlen, req->rq_replen,
req->rq_early_count, req->rq_timedout,
req->rq_request_portal, req->rq_reply_portal,
req->rq_reqlen, req->rq_replen,
req->rq_early_count, req->rq_timedout,
do_facet $SINGLEMDS $LCTL set_param mdt.*.md_stats=clear
do_facet ost1 $LCTL set_param obdfilter.*.stats=clear
do_facet $SINGLEMDS $LCTL set_param mdt.*.md_stats=clear
do_facet ost1 $LCTL set_param obdfilter.*.stats=clear
- dd if=/dev/zero of=${testdir}/${tfile} conv=notrunc bs=1024k count=1 || error "dd failed"
+ dd if=/dev/zero of=${testdir}/${tfile} conv=notrunc bs=512k count=1 || error "dd failed"
sync
cancel_lru_locks osc
check_stats ost "write" 1
sync
cancel_lru_locks osc
check_stats ost "write" 1