From c03783fce46ae0b40db0680388df6e2d6fca5008 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Wed, 16 May 2012 14:53:43 +0800 Subject: [PATCH] LU-56 lnet: SMP improvements for LNet selftest LNet selftest is using a global WI threads pool to handle all RPCs, it has performance problem on fat cores machine because all threads will contend on global queues protected by a single spinlock. This patch will fix this by creating WI scheduler for each CPT, RPCs will be dispatched to WI schedulers on different CPTs, and there is no contention between threads in different WI schedulers. Another major change in this patch is create percpt data for LST service. In current implementation each service has a global data structure to store buffer list and RPC list etch, and all operations on them are protected by per-service lock, again, this could be a serious performance issue if the service is busy enough. Having percpt service data would resolve this issue because threads running in one CPT will only require lock lock and access local data. Signed-off-by: Liang Zhen Change-Id: I8035faf2e87d8e424a8c2fac903bf3b241668e00 Reviewed-on: http://review.whamcloud.com/2805 Reviewed-by: Doug Oucharek Tested-by: Hudson Reviewed-by: Lai Siyao Tested-by: Maloo Reviewed-by: Andreas Dilger --- lnet/selftest/brw_test.c | 34 +- lnet/selftest/console.c | 8 +- lnet/selftest/framework.c | 176 +++++---- lnet/selftest/module.c | 74 ++-- lnet/selftest/ping_test.c | 14 +- lnet/selftest/rpc.c | 978 +++++++++++++++++++++++++++------------------- lnet/selftest/rpc.h | 9 +- lnet/selftest/selftest.h | 114 ++++-- 8 files changed, 854 insertions(+), 553 deletions(-) diff --git a/lnet/selftest/brw_test.c b/lnet/selftest/brw_test.c index 59c6e60..345c828 100644 --- a/lnet/selftest/brw_test.c +++ b/lnet/selftest/brw_test.c @@ -38,8 +38,12 @@ #include "selftest.h" +static int brw_srv_workitems = SFW_TEST_WI_MAX; +CFS_MODULE_PARM(brw_srv_workitems, "i", int, 0644, "# BRW server workitems"); -extern int brw_inject_errors; +static int brw_inject_errors; +CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644, + "# data errors to inject randomly, zero by default"); static void brw_client_fini (sfw_test_instance_t *tsi) @@ -80,9 +84,10 @@ brw_client_init (sfw_test_instance_t *tsi) flags != LST_BRW_CHECK_FULL && flags != LST_BRW_CHECK_SIMPLE) return -EINVAL; - cfs_list_for_each_entry_typed (tsu, &tsi->tsi_units, - sfw_test_unit_t, tsu_list) { - bulk = srpc_alloc_bulk(npg, breq->blk_opc == LST_BRW_READ); + cfs_list_for_each_entry_typed(tsu, &tsi->tsi_units, + sfw_test_unit_t, tsu_list) { + bulk = srpc_alloc_bulk(lnet_cpt_of_nid(tsu->tsu_dest.nid), + npg, breq->blk_opc == LST_BRW_READ); if (bulk == NULL) { brw_client_fini(tsi); return -ENOMEM; @@ -367,9 +372,9 @@ brw_bulk_ready (srpc_server_rpc_t *rpc, int status) } int -brw_server_handle (srpc_server_rpc_t *rpc) +brw_server_handle(struct srpc_server_rpc *rpc) { - srpc_service_t *sv = rpc->srpc_service; + struct srpc_service *sv = rpc->srpc_scd->scd_svc; srpc_msg_t *replymsg = &rpc->srpc_replymsg; srpc_msg_t *reqstmsg = &rpc->srpc_reqstbuf->buf_msg; srpc_brw_reply_t *reply = &replymsg->msg_body.brw_reply; @@ -403,9 +408,12 @@ brw_server_handle (srpc_server_rpc_t *rpc) } reply->brw_status = 0; - rc = sfw_alloc_pages(rpc, reqst->brw_len / CFS_PAGE_SIZE, - reqst->brw_rw == LST_BRW_WRITE); - if (rc != 0) return rc; + /* allocate from "local" node */ + rc = sfw_alloc_pages(rpc, rpc->srpc_scd->scd_cpt, + reqst->brw_len / CFS_PAGE_SIZE, + reqst->brw_rw == LST_BRW_WRITE); + if (rc != 0) + return rc; if (reqst->brw_rw == LST_BRW_READ) brw_fill_bulk(rpc->srpc_bulk, reqst->brw_flags, BRW_MAGIC); @@ -427,8 +435,16 @@ void brw_init_test_client(void) srpc_service_t brw_test_service; void brw_init_test_service(void) { +#ifndef __KERNEL__ + char *s; + + s = getenv("BRW_INJECT_ERRORS"); + brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors; +#endif + brw_test_service.sv_id = SRPC_SERVICE_BRW; brw_test_service.sv_name = "brw_test"; brw_test_service.sv_handler = brw_server_handle; brw_test_service.sv_bulk_ready = brw_bulk_ready; + brw_test_service.sv_wi_total = brw_srv_workitems; } diff --git a/lnet/selftest/console.c b/lnet/selftest/console.c index 0d440ab..9dd1a49 100644 --- a/lnet/selftest/console.c +++ b/lnet/selftest/console.c @@ -1915,7 +1915,7 @@ void lstcon_init_acceptor_service(void) lstcon_acceptor_service.sv_name = "join session"; lstcon_acceptor_service.sv_handler = lstcon_acceptor_handle; lstcon_acceptor_service.sv_id = SRPC_SERVICE_JOIN; - lstcon_acceptor_service.sv_concur = SFW_SERVICE_CONCURRENCY; + lstcon_acceptor_service.sv_wi_total = SFW_FRWK_WI_MAX; } extern int lstcon_ioctl_entry(unsigned int cmd, struct libcfs_ioctl_data *data); @@ -1927,7 +1927,6 @@ int lstcon_console_init(void) { int i; - int n; int rc; memset(&console_session, 0, sizeof(lstcon_session_t)); @@ -1966,8 +1965,9 @@ lstcon_console_init(void) return rc; } - n = srpc_service_add_buffers(&lstcon_acceptor_service, SFW_POST_BUFFERS); - if (n != SFW_POST_BUFFERS) { + rc = srpc_service_add_buffers(&lstcon_acceptor_service, + lstcon_acceptor_service.sv_wi_total); + if (rc != 0) { rc = -ENOMEM; goto out; } diff --git a/lnet/selftest/framework.c b/lnet/selftest/framework.c index c034665..23bae6e 100644 --- a/lnet/selftest/framework.c +++ b/lnet/selftest/framework.c @@ -43,10 +43,6 @@ lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1}; -int brw_inject_errors = 0; -CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644, - "# data errors to inject randomly, zero by default"); - static int session_timeout = 100; CFS_MODULE_PARM(session_timeout, "i", int, 0444, "test session timeout in seconds (100 by default, 0 == never)"); @@ -55,11 +51,6 @@ static int rpc_timeout = 64; CFS_MODULE_PARM(rpc_timeout, "i", int, 0644, "rpc timeout in seconds (64 by default, 0 == never)"); -#define SFW_TEST_CONCURRENCY 1792 -#define SFW_EXTRA_TEST_BUFFERS 8 /* tolerate buggy peers with extra buffers */ - -#define sfw_test_buffers(tsi) ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS) - #define sfw_unpack_id(id) \ do { \ __swab64s(&(id).nid); \ @@ -309,10 +300,10 @@ sfw_init_session (sfw_session_t *sn, lst_sid_t sid, const char *name) /* completion handler for incoming framework RPCs */ void -sfw_server_rpc_done (srpc_server_rpc_t *rpc) +sfw_server_rpc_done(struct srpc_server_rpc *rpc) { - srpc_service_t *sv = rpc->srpc_service; - int status = rpc->srpc_status; + struct srpc_service *sv = rpc->srpc_scd->scd_svc; + int status = rpc->srpc_status; CDEBUG (D_NET, "Incoming framework RPC done: " @@ -553,45 +544,66 @@ sfw_test_rpc_fini (srpc_client_rpc_t *rpc) cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); } -int -sfw_load_test (sfw_test_instance_t *tsi) +static inline int +sfw_test_buffers(sfw_test_instance_t *tsi) { - sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service); - int nrequired = sfw_test_buffers(tsi); - int nposted; - - LASSERT (tsc != NULL); - - if (tsi->tsi_is_client) { - tsi->tsi_ops = tsc->tsc_cli_ops; - return 0; - } + struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service); + struct srpc_service *svc = tsc->tsc_srv_service; + int nbuf; - nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired); - if (nposted != nrequired) { - CWARN ("Failed to reserve enough buffers: " - "service %s, %d needed, %d reserved\n", - tsc->tsc_srv_service->sv_name, nrequired, nposted); - srpc_service_remove_buffers(tsc->tsc_srv_service, nposted); - return -ENOMEM; - } + nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts; + return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA); +} - CDEBUG (D_NET, "Reserved %d buffers for test %s\n", - nposted, tsc->tsc_srv_service->sv_name); - return 0; +int +sfw_load_test(struct sfw_test_instance *tsi) +{ + struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service); + struct srpc_service *svc = tsc->tsc_srv_service; + int nbuf = sfw_test_buffers(tsi); + int rc; + + LASSERT(tsc != NULL); + + if (tsi->tsi_is_client) { + tsi->tsi_ops = tsc->tsc_cli_ops; + return 0; + } + + rc = srpc_service_add_buffers(svc, nbuf); + if (rc != 0) { + CWARN("Failed to reserve enough buffers: " + "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc); + /* NB: this error handler is not strictly correct, because + * it may release more buffers than already allocated, + * but it doesn't matter because request portal should + * be lazy portal and will grow buffers if necessary. */ + srpc_service_remove_buffers(svc, nbuf); + return -ENOMEM; + } + + CDEBUG(D_NET, "Reserved %d buffers for test %s\n", + nbuf * (srpc_serv_is_framework(svc) ? + 1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name); + return 0; } void -sfw_unload_test (sfw_test_instance_t *tsi) +sfw_unload_test(struct sfw_test_instance *tsi) { - sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service); + struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service); - LASSERT (tsc != NULL); + LASSERT(tsc != NULL); - if (!tsi->tsi_is_client) - srpc_service_remove_buffers(tsc->tsc_srv_service, - sfw_test_buffers(tsi)); - return; + if (tsi->tsi_is_client) + return; + + /* shrink buffers, because request portal is lazy portal + * which can grow buffers at runtime so we may leave + * some buffers behind, but never mind... */ + srpc_service_remove_buffers(tsc->tsc_srv_service, + sfw_test_buffers(tsi)); + return; } void @@ -900,18 +912,20 @@ sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer, srpc_client_rpc_t, crpc_list); LASSERT (nblk == rpc->crpc_bulk.bk_niov); cfs_list_del_init(&rpc->crpc_list); - - srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk, - blklen, sfw_test_rpc_done, - sfw_test_rpc_fini, tsu); } cfs_spin_unlock(&tsi->tsi_lock); - - if (rpc == NULL) - rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk, - blklen, sfw_test_rpc_done, - sfw_test_rpc_fini, tsu); + + if (rpc == NULL) { + rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk, + blklen, sfw_test_rpc_done, + sfw_test_rpc_fini, tsu); + } else { + srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk, + blklen, sfw_test_rpc_done, + sfw_test_rpc_fini, tsu); + } + if (rpc == NULL) { CERROR ("Can't create rpc for test %d\n", tsi->tsi_service); return -ENOMEM; @@ -966,9 +980,9 @@ test_done: * - my batch is still active; no one can run it again now. * Cancel pending schedules and prevent future schedule attempts: */ - swi_kill_workitem(wi); - sfw_test_unit_done(tsu); - return 1; + swi_exit_workitem(wi); + sfw_test_unit_done(tsu); + return 1; } int @@ -1000,7 +1014,8 @@ sfw_run_batch (sfw_batch_t *tsb) tsu->tsu_loop = tsi->tsi_loop; wi = &tsu->tsu_worker; swi_init_workitem(wi, tsu, sfw_run_test, - lst_sched_test); + lst_sched_test[\ + lnet_cpt_of_nid(tsu->tsu_dest.nid)]); swi_schedule_workitem(wi); } } @@ -1085,15 +1100,16 @@ sfw_free_pages (srpc_server_rpc_t *rpc) } int -sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink) +sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int sink) { - LASSERT (rpc->srpc_bulk == NULL); - LASSERT (npages > 0 && npages <= LNET_MAX_IOV); + LASSERT(rpc->srpc_bulk == NULL); + LASSERT(npages > 0 && npages <= LNET_MAX_IOV); - rpc->srpc_bulk = srpc_alloc_bulk(npages, sink); - if (rpc->srpc_bulk == NULL) return -ENOMEM; + rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, sink); + if (rpc->srpc_bulk == NULL) + return -ENOMEM; - return 0; + return 0; } int @@ -1129,7 +1145,7 @@ sfw_add_test (srpc_server_rpc_t *rpc) bat = sfw_bid2batch(request->tsr_bid); if (bat == NULL) { CERROR ("Dropping RPC (%s) from %s under memory pressure.\n", - rpc->srpc_service->sv_name, + rpc->srpc_scd->scd_svc->sv_name, libcfs_id2str(rpc->srpc_peer)); return -ENOMEM; } @@ -1140,9 +1156,9 @@ sfw_add_test (srpc_server_rpc_t *rpc) } if (request->tsr_is_client && rpc->srpc_bulk == NULL) { - /* rpc will be resumed later in sfw_bulk_ready */ - return sfw_alloc_pages(rpc, - sfw_id_pages(request->tsr_ndest), 1); + /* rpc will be resumed later in sfw_bulk_ready */ + return sfw_alloc_pages(rpc, CFS_CPT_ANY, + sfw_id_pages(request->tsr_ndest), 1); } rc = sfw_add_test_instance(bat, rpc); @@ -1198,9 +1214,9 @@ sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply) } int -sfw_handle_server_rpc (srpc_server_rpc_t *rpc) +sfw_handle_server_rpc(struct srpc_server_rpc *rpc) { - srpc_service_t *sv = rpc->srpc_service; + struct srpc_service *sv = rpc->srpc_scd->scd_svc; srpc_msg_t *reply = &rpc->srpc_replymsg; srpc_msg_t *request = &rpc->srpc_reqstbuf->buf_msg; int rc = 0; @@ -1279,10 +1295,10 @@ sfw_handle_server_rpc (srpc_server_rpc_t *rpc) } int -sfw_bulk_ready (srpc_server_rpc_t *rpc, int status) +sfw_bulk_ready(struct srpc_server_rpc *rpc, int status) { - srpc_service_t *sv = rpc->srpc_service; - int rc; + struct srpc_service *sv = rpc->srpc_scd->scd_svc; + int rc; LASSERT (rpc->srpc_bulk != NULL); LASSERT (sv->sv_id == SRPC_SERVICE_TEST); @@ -1601,9 +1617,6 @@ sfw_startup (void) s = getenv("SESSION_TIMEOUT"); session_timeout = s != NULL ? atoi(s) : session_timeout; - s = getenv("BRW_INJECT_ERRORS"); - brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors; - s = getenv("RPC_TIMEOUT"); rpc_timeout = s != NULL ? atoi(s) : rpc_timeout; #endif @@ -1652,7 +1665,6 @@ sfw_startup (void) cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests, sfw_test_case_t, tsc_list) { sv = tsc->tsc_srv_service; - sv->sv_concur = SFW_TEST_CONCURRENCY; rc = srpc_add_service(sv); LASSERT (rc != -EBUSY); @@ -1669,7 +1681,7 @@ sfw_startup (void) sv->sv_bulk_ready = NULL; sv->sv_handler = sfw_handle_server_rpc; - sv->sv_concur = SFW_SERVICE_CONCURRENCY; + sv->sv_wi_total = SFW_FRWK_WI_MAX; if (sv->sv_id == SRPC_SERVICE_TEST) sv->sv_bulk_ready = sfw_bulk_ready; @@ -1684,13 +1696,13 @@ sfw_startup (void) /* about to sfw_shutdown, no need to add buffer */ if (error) continue; - rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS); - if (rc != SFW_POST_BUFFERS) { - CWARN ("Failed to reserve enough buffers: " - "service %s, %d needed, %d reserved\n", - sv->sv_name, SFW_POST_BUFFERS, rc); - error = -ENOMEM; - } + rc = srpc_service_add_buffers(sv, sv->sv_wi_total); + if (rc != 0) { + CWARN("Failed to reserve enough buffers: " + "service %s, %d needed: %d\n", + sv->sv_name, sv->sv_wi_total, rc); + error = -ENOMEM; + } } if (error != 0) diff --git a/lnet/selftest/module.c b/lnet/selftest/module.c index 1377b10..74ea2a8 100644 --- a/lnet/selftest/module.c +++ b/lnet/selftest/module.c @@ -36,10 +36,10 @@ #include "selftest.h" - enum { LST_INIT_NONE = 0, - LST_INIT_WI, + LST_INIT_WI_SERIAL, + LST_INIT_WI_TEST, LST_INIT_RPC, LST_INIT_FW, LST_INIT_CONSOLE @@ -51,11 +51,13 @@ extern int lstcon_console_fini(void); static int lst_init_step = LST_INIT_NONE; struct cfs_wi_sched *lst_sched_serial; -struct cfs_wi_sched *lst_sched_test; +struct cfs_wi_sched **lst_sched_test; void -lnet_selftest_fini (void) +lnet_selftest_fini(void) { + int i; + switch (lst_init_step) { #ifdef __KERNEL__ case LST_INIT_CONSOLE: @@ -65,11 +67,21 @@ lnet_selftest_fini (void) sfw_shutdown(); case LST_INIT_RPC: srpc_shutdown(); - case LST_INIT_WI: + case LST_INIT_WI_TEST: + for (i = 0; + i < cfs_cpt_number(lnet_cpt_table()); i++) { + if (lst_sched_test[i] == NULL) + continue; + cfs_wi_sched_destroy(lst_sched_test[i]); + } + LIBCFS_FREE(lst_sched_test, + sizeof(lst_sched_test[0]) * + cfs_cpt_number(lnet_cpt_table())); + lst_sched_test = NULL; + + case LST_INIT_WI_SERIAL: cfs_wi_sched_destroy(lst_sched_serial); - cfs_wi_sched_destroy(lst_sched_test); lst_sched_serial = NULL; - lst_sched_test = NULL; case LST_INIT_NONE: break; default: @@ -78,7 +90,6 @@ lnet_selftest_fini (void) return; } - void lnet_selftest_structure_assertion(void) { @@ -91,25 +102,39 @@ lnet_selftest_structure_assertion(void) } int -lnet_selftest_init (void) +lnet_selftest_init(void) { - int nthrs; - int rc; + int nscheds; + int rc; + int i; rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY, 1, &lst_sched_serial); - if (rc != 0) - return rc; - - nthrs = cfs_cpt_weight(lnet_cpt_table(), CFS_CPT_ANY); - rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), CFS_CPT_ANY, - nthrs, &lst_sched_test); if (rc != 0) { - cfs_wi_sched_destroy(lst_sched_serial); - lst_sched_serial = NULL; + CERROR("Failed to create serial WI scheduler for LST\n"); return rc; } - lst_init_step = LST_INIT_WI; + lst_init_step = LST_INIT_WI_SERIAL; + + nscheds = cfs_cpt_number(lnet_cpt_table()); + LIBCFS_ALLOC(lst_sched_test, sizeof(lst_sched_test[0]) * nscheds); + if (lst_sched_test == NULL) + goto error; + + lst_init_step = LST_INIT_WI_TEST; + for (i = 0; i < nscheds; i++) { + int nthrs = cfs_cpt_weight(lnet_cpt_table(), i); + + /* reserve at least one CPU for LND */ + nthrs = max(nthrs - 1, 1); + rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i, + nthrs, &lst_sched_test[i]); + if (rc != 0) { + CERROR("Failed to create CPT affinity WI scheduler " + "%d for LST\n", i); + goto error; + } + } rc = srpc_startup(); if (rc != 0) { @@ -131,13 +156,12 @@ lnet_selftest_init (void) CERROR("LST can't startup console\n"); goto error; } - lst_init_step = LST_INIT_CONSOLE; + lst_init_step = LST_INIT_CONSOLE; #endif - - return 0; + return 0; error: - lnet_selftest_fini(); - return rc; + lnet_selftest_fini(); + return rc; } #ifdef __KERNEL__ diff --git a/lnet/selftest/ping_test.c b/lnet/selftest/ping_test.c index 069d14d..6fa4c55 100644 --- a/lnet/selftest/ping_test.c +++ b/lnet/selftest/ping_test.c @@ -42,6 +42,9 @@ #define LST_PING_TEST_MAGIC 0xbabeface +int ping_srv_workitems = SFW_TEST_WI_MAX; +CFS_MODULE_PARM(ping_srv_workitems, "i", int, 0644, "# PING server workitems"); + typedef struct { cfs_spinlock_t pnd_lock; /* serialize */ int pnd_counter; /* sequence counter */ @@ -155,9 +158,9 @@ ping_client_done_rpc (sfw_test_unit_t *tsu, srpc_client_rpc_t *rpc) } static int -ping_server_handle (srpc_server_rpc_t *rpc) +ping_server_handle(struct srpc_server_rpc *rpc) { - srpc_service_t *sv = rpc->srpc_service; + struct srpc_service *sv = rpc->srpc_scd->scd_svc; srpc_msg_t *reqstmsg = &rpc->srpc_reqstbuf->buf_msg; srpc_ping_reqst_t *req = &reqstmsg->msg_body.ping_reqst; srpc_ping_reply_t *rep = &rpc->srpc_replymsg.msg_body.ping_reply; @@ -201,7 +204,8 @@ void ping_init_test_client(void) srpc_service_t ping_test_service; void ping_init_test_service(void) { - ping_test_service.sv_id = SRPC_SERVICE_PING; - ping_test_service.sv_name = "ping_test"; - ping_test_service.sv_handler = ping_server_handle; + ping_test_service.sv_id = SRPC_SERVICE_PING; + ping_test_service.sv_name = "ping_test"; + ping_test_service.sv_handler = ping_server_handle; + ping_test_service.sv_wi_total = ping_srv_workitems; } diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 5329b94..2193a3a 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -34,13 +34,16 @@ * lnet/selftest/rpc.c * * Author: Isaac Huang + * + * 2012-05-13: Liang Zhen + * - percpt data for service to improve smp performance + * - code cleanup */ #define DEBUG_SUBSYSTEM S_LNET #include "selftest.h" - typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, @@ -58,6 +61,13 @@ struct smoketest_rpc { __u64 rpc_matchbits; /* matchbits counter */ } srpc_data; +static inline int +srpc_serv_portal(int svc_id) +{ + return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ? + SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL; +} + /* forward ref's */ int srpc_handle_rpc (swi_workitem_t *wi); @@ -124,7 +134,7 @@ srpc_free_bulk (srpc_bulk_t *bk) } srpc_bulk_t * -srpc_alloc_bulk (int npages, int sink) +srpc_alloc_bulk(int cpt, int npages, int sink) { srpc_bulk_t *bk; cfs_page_t **pages; @@ -132,7 +142,8 @@ srpc_alloc_bulk (int npages, int sink) LASSERT (npages > 0 && npages <= LNET_MAX_IOV); - LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages])); + LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt, + offsetof(srpc_bulk_t, bk_iovs[npages])); if (bk == NULL) { CERROR ("Can't allocate descriptor for %d pages\n", npages); return NULL; @@ -143,7 +154,8 @@ srpc_alloc_bulk (int npages, int sink) bk->bk_niov = npages; bk->bk_len = npages * CFS_PAGE_SIZE; #ifndef __KERNEL__ - LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages); + LIBCFS_CPT_ALLOC(pages, lnet_cpt_table(), cpt, + sizeof(cfs_page_t *) * npages); if (pages == NULL) { LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages])); CERROR ("Can't allocate page array for %d pages\n", npages); @@ -157,8 +169,9 @@ srpc_alloc_bulk (int npages, int sink) #endif for (i = 0; i < npages; i++) { - cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD); + cfs_page_t *pg; + pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, CFS_ALLOC_STD); if (pg == NULL) { CERROR ("Can't allocate page %d of %d\n", i, npages); srpc_free_bulk(bk); @@ -183,80 +196,167 @@ srpc_next_id (void) } void -srpc_init_server_rpc (srpc_server_rpc_t *rpc, - srpc_service_t *sv, srpc_buffer_t *buffer) +srpc_init_server_rpc(struct srpc_server_rpc *rpc, + struct srpc_service_cd *scd, + struct srpc_buffer *buffer) { memset(rpc, 0, sizeof(*rpc)); swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc, - sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ? - lst_sched_serial : lst_sched_test); + srpc_serv_is_framework(scd->scd_svc) ? + lst_sched_serial : lst_sched_test[scd->scd_cpt]); - rpc->srpc_ev.ev_fired = 1; /* no event expected now */ + rpc->srpc_ev.ev_fired = 1; /* no event expected now */ - rpc->srpc_service = sv; - rpc->srpc_reqstbuf = buffer; - rpc->srpc_peer = buffer->buf_peer; - rpc->srpc_self = buffer->buf_self; - LNetInvalidateHandle(&rpc->srpc_replymdh); + rpc->srpc_scd = scd; + rpc->srpc_reqstbuf = buffer; + rpc->srpc_peer = buffer->buf_peer; + rpc->srpc_self = buffer->buf_self; + LNetInvalidateHandle(&rpc->srpc_replymdh); } -int -srpc_add_service (srpc_service_t *sv) +static void +srpc_service_fini(struct srpc_service *svc) { - int id = sv->sv_id; - int i; - srpc_server_rpc_t *rpc; + struct srpc_service_cd *scd; + struct srpc_server_rpc *rpc; + struct srpc_buffer *buf; + cfs_list_t *q; + int i; + + if (svc->sv_cpt_data == NULL) + return; + + cfs_percpt_for_each(scd, i, svc->sv_cpt_data) { + while (1) { + if (!cfs_list_empty(&scd->scd_buf_posted)) + q = &scd->scd_buf_posted; + else if (!cfs_list_empty(&scd->scd_buf_blocked)) + q = &scd->scd_buf_blocked; + else + break; + + while (!cfs_list_empty(q)) { + buf = cfs_list_entry(q->next, + struct srpc_buffer, + buf_list); + cfs_list_del(&buf->buf_list); + LIBCFS_FREE(buf, sizeof(*buf)); + } + } + + LASSERT(cfs_list_empty(&scd->scd_rpc_active)); + + while (!cfs_list_empty(&scd->scd_rpc_free)) { + rpc = cfs_list_entry(scd->scd_rpc_free.next, + struct srpc_server_rpc, + srpc_list); + cfs_list_del(&rpc->srpc_list); + LIBCFS_FREE(rpc, sizeof(*rpc)); + } + } + + cfs_percpt_free(svc->sv_cpt_data); + svc->sv_cpt_data = NULL; +} - LASSERT (sv->sv_concur > 0); - LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID); +static int +srpc_service_nrpcs(struct srpc_service *svc) +{ + int nrpcs = svc->sv_wi_total / svc->sv_ncpts; - cfs_spin_lock(&srpc_data.rpc_glock); + return srpc_serv_is_framework(svc) ? + max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN); +} - LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); +int srpc_add_buffer(struct swi_workitem *wi); - if (srpc_data.rpc_services[id] != NULL) { - cfs_spin_unlock(&srpc_data.rpc_glock); - return -EBUSY; - } +static int +srpc_service_init(struct srpc_service *svc) +{ + struct srpc_service_cd *scd; + struct srpc_server_rpc *rpc; + int nrpcs; + int i; + int j; + + svc->sv_shuttingdown = 0; + + svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(struct srpc_service_cd)); + if (svc->sv_cpt_data == NULL) + return -ENOMEM; + + svc->sv_ncpts = srpc_serv_is_framework(svc) ? + 1 : cfs_cpt_number(lnet_cpt_table()); + nrpcs = srpc_service_nrpcs(svc); + + cfs_percpt_for_each(scd, i, svc->sv_cpt_data) { + scd->scd_cpt = i; + scd->scd_svc = svc; + cfs_spin_lock_init(&scd->scd_lock); + CFS_INIT_LIST_HEAD(&scd->scd_rpc_free); + CFS_INIT_LIST_HEAD(&scd->scd_rpc_active); + CFS_INIT_LIST_HEAD(&scd->scd_buf_posted); + CFS_INIT_LIST_HEAD(&scd->scd_buf_blocked); + + scd->scd_ev.ev_data = scd; + scd->scd_ev.ev_type = SRPC_REQUEST_RCVD; + + /* NB: don't use lst_sched_serial for adding buffer, + * see details in srpc_service_add_buffers() */ + swi_init_workitem(&scd->scd_buf_wi, scd, + srpc_add_buffer, lst_sched_test[i]); + + if (i != 0 && srpc_serv_is_framework(svc)) { + /* NB: framework service only needs srpc_service_cd for + * one partition, but we allocate for all to make + * it easier to implement, it will waste a little + * memory but nobody should care about this */ + continue; + } + + for (j = 0; j < nrpcs; j++) { + LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(), + i, sizeof(*rpc)); + if (rpc == NULL) { + srpc_service_fini(svc); + return -ENOMEM; + } + cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free); + } + } + + return 0; +} - srpc_data.rpc_services[id] = sv; - cfs_spin_unlock(&srpc_data.rpc_glock); +int +srpc_add_service(struct srpc_service *sv) +{ + int id = sv->sv_id; - sv->sv_nprune = 0; - sv->sv_nposted_msg = 0; - sv->sv_shuttingdown = 0; - cfs_spin_lock_init(&sv->sv_lock); - CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq); - CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq); - CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq); - CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq); + LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID); - sv->sv_ev.ev_data = sv; - sv->sv_ev.ev_type = SRPC_REQUEST_RCVD; + if (srpc_service_init(sv) != 0) + return -ENOMEM; - for (i = 0; i < sv->sv_concur; i++) { - LIBCFS_ALLOC(rpc, sizeof(*rpc)); - if (rpc == NULL) goto enomem; + cfs_spin_lock(&srpc_data.rpc_glock); - cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq); - } + LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING); - CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n", - id, sv->sv_name, sv->sv_concur); - return 0; + if (srpc_data.rpc_services[id] != NULL) { + cfs_spin_unlock(&srpc_data.rpc_glock); + goto failed; + } -enomem: - while (!cfs_list_empty(&sv->sv_free_rpcq)) { - rpc = cfs_list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - cfs_list_del(&rpc->srpc_list); - LIBCFS_FREE(rpc, sizeof(*rpc)); - } + srpc_data.rpc_services[id] = sv; + cfs_spin_unlock(&srpc_data.rpc_glock); - cfs_spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_services[id] = NULL; - cfs_spin_unlock(&srpc_data.rpc_glock); - return -ENOMEM; + CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name); + return 0; + + failed: + srpc_service_fini(sv); + return -EBUSY; } int @@ -277,16 +377,16 @@ srpc_remove_service (srpc_service_t *sv) } int -srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf, - int len, int options, lnet_process_id_t peer, - lnet_handle_md_t *mdh, srpc_event_t *ev) +srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf, + int len, int options, lnet_process_id_t peer, + lnet_handle_md_t *mdh, srpc_event_t *ev) { - int rc; - lnet_md_t md; - lnet_handle_me_t meh; + int rc; + lnet_md_t md; + lnet_handle_me_t meh; - rc = LNetMEAttach(portal, peer, matchbits, 0, - LNET_UNLINK, LNET_INS_AFTER, &meh); + rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK, + local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh); if (rc != 0) { CERROR ("LNetMEAttach failed: %d\n", rc); LASSERT (rc == -ENOMEM); @@ -370,271 +470,344 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, int srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf, - int len, lnet_handle_md_t *mdh, srpc_event_t *ev) + int len, lnet_handle_md_t *mdh, srpc_event_t *ev) { - int rc; - int portal; - - if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID) - portal = SRPC_REQUEST_PORTAL; - else - portal = SRPC_FRAMEWORK_REQUEST_PORTAL; - - rc = srpc_post_active_rdma(portal, service, buf, len, - LNET_MD_OP_PUT, peer, - LNET_NID_ANY, mdh, ev); - return rc; + return srpc_post_active_rdma(srpc_serv_portal(service), service, + buf, len, LNET_MD_OP_PUT, peer, + LNET_NID_ANY, mdh, ev); } int -srpc_post_passive_rqtbuf(int service, void *buf, int len, - lnet_handle_md_t *mdh, srpc_event_t *ev) +srpc_post_passive_rqtbuf(int service, int local, void *buf, int len, + lnet_handle_md_t *mdh, srpc_event_t *ev) { - int rc; - int portal; - lnet_process_id_t any = {0}; + lnet_process_id_t any = {0}; - any.nid = LNET_NID_ANY; - any.pid = LNET_PID_ANY; + any.nid = LNET_NID_ANY; + any.pid = LNET_PID_ANY; - if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID) - portal = SRPC_REQUEST_PORTAL; - else - portal = SRPC_FRAMEWORK_REQUEST_PORTAL; - - rc = srpc_post_passive_rdma(portal, service, buf, len, - LNET_MD_OP_PUT, any, mdh, ev); - return rc; + return srpc_post_passive_rdma(srpc_serv_portal(service), + local, service, buf, len, + LNET_MD_OP_PUT, any, mdh, ev); } int -srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) +srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf) { - srpc_msg_t *msg = &buf->buf_msg; - int rc; - - LASSERT (!sv->sv_shuttingdown); + struct srpc_service *sv = scd->scd_svc; + struct srpc_msg *msg = &buf->buf_msg; + int rc; - LNetInvalidateHandle(&buf->buf_mdh); - cfs_list_add(&buf->buf_list, &sv->sv_posted_msgq); - sv->sv_nposted_msg++; - cfs_spin_unlock(&sv->sv_lock); + LNetInvalidateHandle(&buf->buf_mdh); + cfs_list_add(&buf->buf_list, &scd->scd_buf_posted); + scd->scd_buf_nposted++; + cfs_spin_unlock(&scd->scd_lock); - rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg), - &buf->buf_mdh, &sv->sv_ev); + rc = srpc_post_passive_rqtbuf(sv->sv_id, + !srpc_serv_is_framework(sv), + msg, sizeof(*msg), &buf->buf_mdh, + &scd->scd_ev); - /* At this point, a RPC (new or delayed) may have arrived in - * msg and its event handler has been called. So we must add - * buf to sv_posted_msgq _before_ dropping sv_lock */ + /* At this point, a RPC (new or delayed) may have arrived in + * msg and its event handler has been called. So we must add + * buf to scd_buf_posted _before_ dropping scd_lock */ - cfs_spin_lock(&sv->sv_lock); + cfs_spin_lock(&scd->scd_lock); - if (rc == 0) { - if (sv->sv_shuttingdown) { - cfs_spin_unlock(&sv->sv_lock); + if (rc == 0) { + if (!sv->sv_shuttingdown) + return 0; - /* srpc_shutdown_service might have tried to unlink me - * when my buf_mdh was still invalid */ - LNetMDUnlink(buf->buf_mdh); + cfs_spin_unlock(&scd->scd_lock); + /* srpc_shutdown_service might have tried to unlink me + * when my buf_mdh was still invalid */ + LNetMDUnlink(buf->buf_mdh); + cfs_spin_lock(&scd->scd_lock); + return 0; + } - cfs_spin_lock(&sv->sv_lock); - } - return 0; - } + scd->scd_buf_nposted--; + if (sv->sv_shuttingdown) + return rc; /* don't allow to change scd_buf_posted */ - sv->sv_nposted_msg--; - if (sv->sv_shuttingdown) return rc; + cfs_list_del(&buf->buf_list); + cfs_spin_unlock(&scd->scd_lock); - cfs_list_del(&buf->buf_list); + LIBCFS_FREE(buf, sizeof(*buf)); - cfs_spin_unlock(&sv->sv_lock); - LIBCFS_FREE(buf, sizeof(*buf)); - cfs_spin_lock(&sv->sv_lock); - return rc; + cfs_spin_lock(&scd->scd_lock); + return rc; } int -srpc_service_add_buffers (srpc_service_t *sv, int nbuffer) +srpc_add_buffer(struct swi_workitem *wi) { - int rc; - int posted; - srpc_buffer_t *buf; - - LASSERTF (nbuffer > 0, - "nbuffer must be positive: %d\n", nbuffer); - - for (posted = 0; posted < nbuffer; posted++) { - LIBCFS_ALLOC(buf, sizeof(*buf)); - if (buf == NULL) break; - - cfs_spin_lock(&sv->sv_lock); - rc = srpc_service_post_buffer(sv, buf); - cfs_spin_unlock(&sv->sv_lock); - - if (rc != 0) break; - } + struct srpc_service_cd *scd = wi->swi_workitem.wi_data; + struct srpc_buffer *buf; + int rc = 0; + + /* it's called by workitem scheduler threads, these threads + * should have been set CPT affinity, so buffers will be posted + * on CPT local list of Portal */ + cfs_spin_lock(&scd->scd_lock); + + while (scd->scd_buf_adjust > 0 && + !scd->scd_svc->sv_shuttingdown) { + scd->scd_buf_adjust--; /* consume it */ + scd->scd_buf_posting++; + + cfs_spin_unlock(&scd->scd_lock); + + LIBCFS_ALLOC(buf, sizeof(*buf)); + if (buf == NULL) { + CERROR("Failed to add new buf to service: %s\n", + scd->scd_svc->sv_name); + cfs_spin_lock(&scd->scd_lock); + rc = -ENOMEM; + break; + } + + cfs_spin_lock(&scd->scd_lock); + if (scd->scd_svc->sv_shuttingdown) { + cfs_spin_unlock(&scd->scd_lock); + LIBCFS_FREE(buf, sizeof(*buf)); + + cfs_spin_lock(&scd->scd_lock); + rc = -ESHUTDOWN; + break; + } + + rc = srpc_service_post_buffer(scd, buf); + if (rc != 0) + break; /* buf has been freed inside */ + + LASSERT(scd->scd_buf_posting > 0); + scd->scd_buf_posting--; + scd->scd_buf_total++; + scd->scd_buf_low = MAX(2, scd->scd_buf_total / 4); + } + + if (rc != 0) { + scd->scd_buf_err_stamp = cfs_time_current_sec(); + scd->scd_buf_err = rc; + + LASSERT(scd->scd_buf_posting > 0); + scd->scd_buf_posting--; + } + + cfs_spin_unlock(&scd->scd_lock); + return 0; +} - return posted; +int +srpc_service_add_buffers(struct srpc_service *sv, int nbuffer) +{ + struct srpc_service_cd *scd; + int rc = 0; + int i; + + LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer); + + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); + + scd->scd_buf_err = 0; + scd->scd_buf_err_stamp = 0; + scd->scd_buf_posting = 0; + scd->scd_buf_adjust = nbuffer; + /* start to post buffers */ + swi_schedule_workitem(&scd->scd_buf_wi); + cfs_spin_unlock(&scd->scd_lock); + + /* framework service only post buffer for one partition */ + if (srpc_serv_is_framework(sv)) + break; + } + + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); + /* + * NB: srpc_service_add_buffers() can be called inside + * thread context of lst_sched_serial, and we don't normally + * allow to sleep inside thread context of WI scheduler + * because it will block current scheduler thread from doing + * anything else, even worse, it could deadlock if it's + * waiting on result from another WI of the same scheduler. + * However, it's safe at here because scd_buf_wi is scheduled + * by thread in a different WI scheduler (lst_sched_test), + * so we don't have any risk of deadlock, though this could + * block all WIs pending on lst_sched_serial for a moment + * which is not good but not fatal. + */ + lst_wait_until(scd->scd_buf_err != 0 || + (scd->scd_buf_adjust == 0 && + scd->scd_buf_posting == 0), + scd->scd_lock, "waiting for adding buffer\n"); + + if (scd->scd_buf_err != 0 && rc == 0) + rc = scd->scd_buf_err; + + cfs_spin_unlock(&scd->scd_lock); + } + + return rc; } void -srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer) +srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer) { - LASSERTF (nbuffer > 0, - "nbuffer must be positive: %d\n", nbuffer); + struct srpc_service_cd *scd; + int num; + int i; - cfs_spin_lock(&sv->sv_lock); + LASSERT(!sv->sv_shuttingdown); - LASSERT (sv->sv_nprune >= 0); - LASSERT (!sv->sv_shuttingdown); + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); - sv->sv_nprune += nbuffer; + num = scd->scd_buf_total + scd->scd_buf_posting; + scd->scd_buf_adjust -= min(nbuffer, num); - cfs_spin_unlock(&sv->sv_lock); - return; + cfs_spin_unlock(&scd->scd_lock); + } } /* returns 1 if sv has finished, otherwise 0 */ int -srpc_finish_service (srpc_service_t *sv) +srpc_finish_service(struct srpc_service *sv) { - srpc_server_rpc_t *rpc; - srpc_buffer_t *buf; - - cfs_spin_lock(&sv->sv_lock); - - LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */ - - if (sv->sv_nposted_msg != 0 || !cfs_list_empty(&sv->sv_active_rpcq)) { - CDEBUG (D_NET, - "waiting for %d posted buffers to unlink and " - "in-flight RPCs to die.\n", - sv->sv_nposted_msg); - - if (!cfs_list_empty(&sv->sv_active_rpcq)) { - rpc = cfs_list_entry(sv->sv_active_rpcq.next, - srpc_server_rpc_t, srpc_list); - CNETERR("Active RPC %p on shutdown: sv %s, peer %s, " - "wi %s scheduled %d running %d, " - "ev fired %d type %d status %d lnet %d\n", - rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), - swi_state2str(rpc->srpc_wi.swi_state), - rpc->srpc_wi.swi_workitem.wi_scheduled, - rpc->srpc_wi.swi_workitem.wi_running, - rpc->srpc_ev.ev_fired, - rpc->srpc_ev.ev_type, - rpc->srpc_ev.ev_status, - rpc->srpc_ev.ev_lnet); - } - - cfs_spin_unlock(&sv->sv_lock); - return 0; - } - - cfs_spin_unlock(&sv->sv_lock); /* no lock needed from now on */ - - for (;;) { - cfs_list_t *q; - - if (!cfs_list_empty(&sv->sv_posted_msgq)) - q = &sv->sv_posted_msgq; - else if (!cfs_list_empty(&sv->sv_blocked_msgq)) - q = &sv->sv_blocked_msgq; - else - break; - - buf = cfs_list_entry(q->next, srpc_buffer_t, buf_list); - cfs_list_del(&buf->buf_list); - - LIBCFS_FREE(buf, sizeof(*buf)); - } - - while (!cfs_list_empty(&sv->sv_free_rpcq)) { - rpc = cfs_list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - cfs_list_del(&rpc->srpc_list); - LIBCFS_FREE(rpc, sizeof(*rpc)); - } - - return 1; + struct srpc_service_cd *scd; + struct srpc_server_rpc *rpc; + int i; + + LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */ + + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); + if (!swi_deschedule_workitem(&scd->scd_buf_wi)) + return 0; + + if (scd->scd_buf_nposted > 0) { + CDEBUG(D_NET, "waiting for %d posted buffers to unlink", + scd->scd_buf_nposted); + cfs_spin_unlock(&scd->scd_lock); + return 0; + } + + if (cfs_list_empty(&scd->scd_rpc_active)) { + cfs_spin_unlock(&scd->scd_lock); + continue; + } + + rpc = cfs_list_entry(scd->scd_rpc_active.next, + struct srpc_server_rpc, srpc_list); + CNETERR("Active RPC %p on shutdown: sv %s, peer %s, " + "wi %s scheduled %d running %d, " + "ev fired %d type %d status %d lnet %d\n", + rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), + swi_state2str(rpc->srpc_wi.swi_state), + rpc->srpc_wi.swi_workitem.wi_scheduled, + rpc->srpc_wi.swi_workitem.wi_running, + rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, + rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet); + cfs_spin_unlock(&scd->scd_lock); + return 0; + } + + /* no lock needed from now on */ + srpc_service_fini(sv); + return 1; } /* called with sv->sv_lock held */ void -srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf) -{ - if (sv->sv_shuttingdown) goto free; - - if (sv->sv_nprune == 0) { - if (srpc_service_post_buffer(sv, buf) != 0) - CWARN ("Failed to post %s buffer\n", sv->sv_name); - return; - } - - sv->sv_nprune--; -free: - cfs_spin_unlock(&sv->sv_lock); - LIBCFS_FREE(buf, sizeof(*buf)); - cfs_spin_lock(&sv->sv_lock); -} - -/* called with srpc_service_t::sv_lock held */ -inline void -srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) +srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf) { - swi_schedule_workitem(&rpc->srpc_wi); + if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) { + if (srpc_service_post_buffer(scd, buf) != 0) { + CWARN("Failed to post %s buffer\n", + scd->scd_svc->sv_name); + } + return; + } + + /* service is shutting down, or we want to recycle some buffers */ + scd->scd_buf_total--; + + if (scd->scd_buf_adjust < 0) { + scd->scd_buf_adjust++; + if (scd->scd_buf_adjust < 0 && + scd->scd_buf_total == 0 && scd->scd_buf_posting == 0) { + CDEBUG(D_INFO, + "Try to recyle %d buffers but nothing left\n", + scd->scd_buf_adjust); + scd->scd_buf_adjust = 0; + } + } + + cfs_spin_unlock(&scd->scd_lock); + LIBCFS_FREE(buf, sizeof(*buf)); + cfs_spin_lock(&scd->scd_lock); } void -srpc_abort_service (srpc_service_t *sv) +srpc_abort_service(struct srpc_service *sv) { - srpc_server_rpc_t *rpc; - - cfs_spin_lock(&sv->sv_lock); - - CDEBUG(D_NET, "Aborting service: id %d, name %s\n", - sv->sv_id, sv->sv_name); - - /* schedule in-flight RPCs to notice the abort, NB: - * racing with incoming RPCs; complete fix should make test - * RPCs carry session ID in its headers */ - cfs_list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) { - rpc->srpc_aborted = 1; - srpc_schedule_server_rpc(rpc); - } - - cfs_spin_unlock(&sv->sv_lock); - return; + struct srpc_service_cd *scd; + struct srpc_server_rpc *rpc; + int i; + + CDEBUG(D_NET, "Aborting service: id %d, name %s\n", + sv->sv_id, sv->sv_name); + + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); + + /* schedule in-flight RPCs to notice the abort, NB: + * racing with incoming RPCs; complete fix should make test + * RPCs carry session ID in its headers */ + cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) { + rpc->srpc_aborted = 1; + swi_schedule_workitem(&rpc->srpc_wi); + } + + cfs_spin_unlock(&scd->scd_lock); + } } void -srpc_shutdown_service (srpc_service_t *sv) +srpc_shutdown_service(srpc_service_t *sv) { - srpc_server_rpc_t *rpc; - srpc_buffer_t *buf; + struct srpc_service_cd *scd; + struct srpc_server_rpc *rpc; + srpc_buffer_t *buf; + int i; - cfs_spin_lock(&sv->sv_lock); + CDEBUG(D_NET, "Shutting down service: id %d, name %s\n", + sv->sv_id, sv->sv_name); - CDEBUG(D_NET, "Shutting down service: id %d, name %s\n", - sv->sv_id, sv->sv_name); + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) + cfs_spin_lock(&scd->scd_lock); - sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ + sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ - /* schedule in-flight RPCs to notice the shutdown */ - cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq, - srpc_server_rpc_t, srpc_list) { - srpc_schedule_server_rpc(rpc); - } + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) + cfs_spin_unlock(&scd->scd_lock); - cfs_spin_unlock(&sv->sv_lock); + cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { + cfs_spin_lock(&scd->scd_lock); - /* OK to traverse sv_posted_msgq without lock, since no one - * touches sv_posted_msgq now */ - cfs_list_for_each_entry_typed (buf, &sv->sv_posted_msgq, - srpc_buffer_t, buf_list) - LNetMDUnlink(buf->buf_mdh); + /* schedule in-flight RPCs to notice the shutdown */ + cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) + swi_schedule_workitem(&rpc->srpc_wi); - return; + cfs_spin_unlock(&scd->scd_lock); + + /* OK to traverse scd_buf_posted without lock, since no one + * touches scd_buf_posted now */ + cfs_list_for_each_entry(buf, &scd->scd_buf_posted, buf_list) + LNetMDUnlink(buf->buf_mdh); + } } int @@ -670,7 +843,7 @@ srpc_prepare_reply (srpc_client_rpc_t *rpc) *id = srpc_next_id(); - rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id, + rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id, &rpc->crpc_replymsg, sizeof(srpc_msg_t), LNET_MD_OP_PUT, rpc->crpc_dest, &rpc->crpc_replymdh, ev); @@ -707,7 +880,7 @@ srpc_prepare_bulk (srpc_client_rpc_t *rpc) *id = srpc_next_id(); - rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id, + rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id, &bk->bk_iovs[0], bk->bk_niov, opt, rpc->crpc_dest, &bk->bk_mdh, ev); if (rc != 0) { @@ -750,10 +923,11 @@ srpc_do_bulk (srpc_server_rpc_t *rpc) /* only called from srpc_handle_rpc */ void -srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) +srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status) { - srpc_service_t *sv = rpc->srpc_service; - srpc_buffer_t *buffer; + struct srpc_service_cd *scd = rpc->srpc_scd; + struct srpc_service *sv = scd->scd_svc; + srpc_buffer_t *buffer; LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE); @@ -774,57 +948,58 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) (*rpc->srpc_done) (rpc); LASSERT (rpc->srpc_bulk == NULL); - cfs_spin_lock(&sv->sv_lock); - - if (rpc->srpc_reqstbuf != NULL) { - /* NB might drop sv_lock in srpc_service_recycle_buffer, but - * sv won't go away for sv_active_rpcq must not be empty */ - srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf); - rpc->srpc_reqstbuf = NULL; - } - - cfs_list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */ - - /* - * No one can schedule me now since: - * - I'm not on sv_active_rpcq. - * - all LNet events have been fired. - * Cancel pending schedules and prevent future schedule attempts: - */ - LASSERT (rpc->srpc_ev.ev_fired); - swi_kill_workitem(&rpc->srpc_wi); - - if (!sv->sv_shuttingdown && !cfs_list_empty(&sv->sv_blocked_msgq)) { - buffer = cfs_list_entry(sv->sv_blocked_msgq.next, - srpc_buffer_t, buf_list); - cfs_list_del(&buffer->buf_list); - - srpc_init_server_rpc(rpc, sv, buffer); - cfs_list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq); - srpc_schedule_server_rpc(rpc); - } else { - cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq); - } - - cfs_spin_unlock(&sv->sv_lock); - return; + cfs_spin_lock(&scd->scd_lock); + + if (rpc->srpc_reqstbuf != NULL) { + /* NB might drop sv_lock in srpc_service_recycle_buffer, but + * sv won't go away for scd_rpc_active must not be empty */ + srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf); + rpc->srpc_reqstbuf = NULL; + } + + cfs_list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */ + + /* + * No one can schedule me now since: + * - I'm not on scd_rpc_active. + * - all LNet events have been fired. + * Cancel pending schedules and prevent future schedule attempts: + */ + LASSERT(rpc->srpc_ev.ev_fired); + swi_exit_workitem(&rpc->srpc_wi); + + if (!sv->sv_shuttingdown && !cfs_list_empty(&scd->scd_buf_blocked)) { + buffer = cfs_list_entry(scd->scd_buf_blocked.next, + srpc_buffer_t, buf_list); + cfs_list_del(&buffer->buf_list); + + srpc_init_server_rpc(rpc, scd, buffer); + cfs_list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active); + swi_schedule_workitem(&rpc->srpc_wi); + } else { + cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free); + } + + cfs_spin_unlock(&scd->scd_lock); + return; } /* handles an incoming RPC */ int -srpc_handle_rpc (swi_workitem_t *wi) +srpc_handle_rpc(swi_workitem_t *wi) { - srpc_server_rpc_t *rpc = wi->swi_workitem.wi_data; - srpc_service_t *sv = rpc->srpc_service; - srpc_event_t *ev = &rpc->srpc_ev; - int rc = 0; + struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data; + struct srpc_service_cd *scd = rpc->srpc_scd; + struct srpc_service *sv = scd->scd_svc; + srpc_event_t *ev = &rpc->srpc_ev; + int rc = 0; - LASSERT (wi == &rpc->srpc_wi); + LASSERT(wi == &rpc->srpc_wi); - cfs_spin_lock(&sv->sv_lock); + cfs_spin_lock(&scd->scd_lock); - if (sv->sv_shuttingdown || rpc->srpc_aborted) { - cfs_spin_unlock(&sv->sv_lock); + if (sv->sv_shuttingdown || rpc->srpc_aborted) { + cfs_spin_unlock(&scd->scd_lock); if (rpc->srpc_bulk != NULL) LNetMDUnlink(rpc->srpc_bulk->bk_mdh); @@ -837,7 +1012,7 @@ srpc_handle_rpc (swi_workitem_t *wi) return 0; } - cfs_spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&scd->scd_lock); switch (wi->swi_state) { default: @@ -905,7 +1080,7 @@ srpc_handle_rpc (swi_workitem_t *wi) case SWI_STATE_REPLY_SUBMITTED: if (!ev->ev_fired) { CERROR("RPC %p: bulk %p, service %d\n", - rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id); + rpc, rpc->srpc_bulk, sv->sv_id); CERROR("Event: status %d, type %d, lnet %d\n", ev->ev_status, ev->ev_type, ev->ev_lnet); LASSERT (ev->ev_fired); @@ -1015,12 +1190,12 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) * Cancel pending schedules and prevent future schedule attempts: */ LASSERT (!srpc_event_pending(rpc)); - swi_kill_workitem(wi); + swi_exit_workitem(wi); - cfs_spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); - (*rpc->crpc_done) (rpc); - return; + (*rpc->crpc_done)(rpc); + return; } /* sends an outgoing RPC */ @@ -1201,30 +1376,30 @@ srpc_post_rpc (srpc_client_rpc_t *rpc) int -srpc_send_reply (srpc_server_rpc_t *rpc) +srpc_send_reply(struct srpc_server_rpc *rpc) { - srpc_event_t *ev = &rpc->srpc_ev; - srpc_msg_t *msg = &rpc->srpc_replymsg; - srpc_buffer_t *buffer = rpc->srpc_reqstbuf; - srpc_service_t *sv = rpc->srpc_service; - __u64 rpyid; - int rc; - - LASSERT (buffer != NULL); - rpyid = buffer->buf_msg.msg_body.reqst.rpyid; - - cfs_spin_lock(&sv->sv_lock); - - if (!sv->sv_shuttingdown && - sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) { - /* Repost buffer before replying since test client - * might send me another RPC once it gets the reply */ - if (srpc_service_post_buffer(sv, buffer) != 0) - CWARN ("Failed to repost %s buffer\n", sv->sv_name); - rpc->srpc_reqstbuf = NULL; - } + srpc_event_t *ev = &rpc->srpc_ev; + struct srpc_msg *msg = &rpc->srpc_replymsg; + struct srpc_buffer *buffer = rpc->srpc_reqstbuf; + struct srpc_service_cd *scd = rpc->srpc_scd; + struct srpc_service *sv = scd->scd_svc; + __u64 rpyid; + int rc; + + LASSERT(buffer != NULL); + rpyid = buffer->buf_msg.msg_body.reqst.rpyid; + + cfs_spin_lock(&scd->scd_lock); - cfs_spin_unlock(&sv->sv_lock); + if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) { + /* Repost buffer before replying since test client + * might send me another RPC once it gets the reply */ + if (srpc_service_post_buffer(scd, buffer) != 0) + CWARN("Failed to repost %s buffer\n", sv->sv_name); + rpc->srpc_reqstbuf = NULL; + } + + cfs_spin_unlock(&scd->scd_lock); ev->ev_fired = 0; ev->ev_data = rpc; @@ -1245,8 +1420,9 @@ srpc_send_reply (srpc_server_rpc_t *rpc) /* when in kernel always called with LNET_LOCK() held, and in thread context */ void -srpc_lnet_ev_handler (lnet_event_t *ev) +srpc_lnet_ev_handler(lnet_event_t *ev) { + struct srpc_service_cd *scd; srpc_event_t *rpcev = ev->md.user_ptr; srpc_client_rpc_t *crpc; srpc_server_rpc_t *srpc; @@ -1303,11 +1479,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev) break; case SRPC_REQUEST_RCVD: - sv = rpcev->ev_data; + scd = rpcev->ev_data; + sv = scd->scd_svc; - LASSERT (rpcev == &sv->sv_ev); + LASSERT(rpcev == &scd->scd_ev); - cfs_spin_lock(&sv->sv_lock); + cfs_spin_lock(&scd->scd_lock); LASSERT (ev->unlinked); LASSERT (ev->type == LNET_EVENT_PUT || @@ -1319,17 +1496,32 @@ srpc_lnet_ev_handler (lnet_event_t *ev) buffer->buf_peer = ev->initiator; buffer->buf_self = ev->target.nid; - sv->sv_nposted_msg--; - LASSERT (sv->sv_nposted_msg >= 0); - - if (sv->sv_shuttingdown) { - /* Leave buffer on sv->sv_posted_msgq since - * srpc_finish_service needs to traverse it. */ - cfs_spin_unlock(&sv->sv_lock); - break; - } - - cfs_list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */ + LASSERT(scd->scd_buf_nposted > 0); + scd->scd_buf_nposted--; + + if (sv->sv_shuttingdown) { + /* Leave buffer on scd->scd_buf_nposted since + * srpc_finish_service needs to traverse it. */ + cfs_spin_unlock(&scd->scd_lock); + break; + } + + if (scd->scd_buf_err_stamp != 0 && + scd->scd_buf_err_stamp < cfs_time_current_sec()) { + /* re-enable adding buffer */ + scd->scd_buf_err_stamp = 0; + scd->scd_buf_err = 0; + } + + if (scd->scd_buf_err == 0 && /* adding buffer is enabled */ + scd->scd_buf_adjust == 0 && + scd->scd_buf_nposted < scd->scd_buf_low) { + scd->scd_buf_adjust = MAX(scd->scd_buf_total / 2, + SFW_TEST_WI_MIN); + swi_schedule_workitem(&scd->scd_buf_wi); + } + + cfs_list_del(&buffer->buf_list); /* from scd->scd_buf_posted */ msg = &buffer->buf_msg; type = srpc_service2request(sv->sv_id); @@ -1350,21 +1542,22 @@ srpc_lnet_ev_handler (lnet_event_t *ev) msg->msg_magic = 0; } - if (!cfs_list_empty(&sv->sv_free_rpcq)) { - srpc = cfs_list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - cfs_list_del(&srpc->srpc_list); + if (!cfs_list_empty(&scd->scd_rpc_free)) { + srpc = cfs_list_entry(scd->scd_rpc_free.next, + struct srpc_server_rpc, + srpc_list); + cfs_list_del(&srpc->srpc_list); - srpc_init_server_rpc(srpc, sv, buffer); - cfs_list_add_tail(&srpc->srpc_list, - &sv->sv_active_rpcq); - srpc_schedule_server_rpc(srpc); - } else { - cfs_list_add_tail(&buffer->buf_list, - &sv->sv_blocked_msgq); - } + srpc_init_server_rpc(srpc, scd, buffer); + cfs_list_add_tail(&srpc->srpc_list, + &scd->scd_rpc_active); + swi_schedule_workitem(&srpc->srpc_wi); + } else { + cfs_list_add_tail(&buffer->buf_list, + &scd->scd_buf_blocked); + } - cfs_spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&scd->scd_lock); cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_rcvd++; @@ -1391,23 +1584,21 @@ srpc_lnet_ev_handler (lnet_event_t *ev) cfs_spin_unlock(&srpc_data.rpc_glock); } case SRPC_REPLY_SENT: - srpc = rpcev->ev_data; - sv = srpc->srpc_service; + srpc = rpcev->ev_data; + scd = srpc->srpc_scd; - LASSERT (rpcev == &srpc->srpc_ev); + LASSERT(rpcev == &srpc->srpc_ev); - cfs_spin_lock(&sv->sv_lock); + cfs_spin_lock(&scd->scd_lock); - rpcev->ev_fired = 1; - rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? - -EINTR : ev->status; - srpc_schedule_server_rpc(srpc); + rpcev->ev_fired = 1; + rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? + -EINTR : ev->status; + swi_schedule_workitem(&srpc->srpc_wi); - cfs_spin_unlock(&sv->sv_lock); - break; - } - - return; + cfs_spin_unlock(&scd->scd_lock); + break; + } } #ifndef __KERNEL__ @@ -1477,8 +1668,10 @@ srpc_startup (void) goto bail; } - rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); - LASSERT (rc == 0); + rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); + LASSERT(rc == 0); + rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL); + LASSERT(rc == 0); srpc_data.rpc_state = SRPC_STATE_EQ_INIT; @@ -1523,6 +1716,7 @@ srpc_shutdown (void) case SRPC_STATE_EQ_INIT: rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); + rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL); LASSERT (rc == 0); rc = LNetEQFree(srpc_data.rpc_lnet_eq); LASSERT (rc == 0); /* the EQ should have no user by now */ diff --git a/lnet/selftest/rpc.h b/lnet/selftest/rpc.h index dca68b7..505c0b5 100644 --- a/lnet/selftest/rpc.h +++ b/lnet/selftest/rpc.h @@ -232,10 +232,11 @@ typedef struct { #define SRPC_MSG_MAGIC 0xeeb0f00d #define SRPC_MSG_VERSION 1 -typedef struct { - __u32 msg_magic; /* magic */ - __u32 msg_version; /* # version */ - __u32 msg_type; /* what's in msg_body? srpc_msg_type_t */ +typedef struct srpc_msg { + __u32 msg_magic; /* magic */ + __u32 msg_version; /* # version */ + /* what's in msg_body? srpc_msg_type_t */ + __u32 msg_type; __u32 msg_reserved0; /* reserved seats */ __u32 msg_reserved1; __u32 msg_reserved2; diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index 9943fa5..786501d 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -74,6 +74,7 @@ /* forward refs */ struct srpc_service; +struct srpc_service_cd; struct sfw_test_unit; struct sfw_test_instance; @@ -173,7 +174,7 @@ typedef struct { } srpc_bulk_t; /* bulk descriptor */ /* message buffer descriptor */ -typedef struct { +typedef struct srpc_buffer { cfs_list_t buf_list; /* chain on srpc_service::*_msgq */ srpc_msg_t buf_msg; lnet_handle_md_t buf_mdh; @@ -193,8 +194,9 @@ typedef struct swi_workitem { /* server-side state of a RPC */ typedef struct srpc_server_rpc { - cfs_list_t srpc_list; /* chain on srpc_service::*_rpcq */ - struct srpc_service *srpc_service; + /* chain on srpc_service::*_rpcq */ + cfs_list_t srpc_list; + struct srpc_service_cd *srpc_scd; swi_workitem_t srpc_wi; srpc_event_t srpc_ev; /* bulk/reply event */ lnet_nid_t srpc_self; @@ -268,21 +270,61 @@ do { \ (rpc)->crpc_reqstev.ev_fired == 0 || \ (rpc)->crpc_replyev.ev_fired == 0) -typedef struct srpc_service { - int sv_id; /* service id */ - const char *sv_name; /* human readable name */ - int sv_nprune; /* # posted RPC to be pruned */ - int sv_concur; /* max # concurrent RPCs */ - - cfs_spinlock_t sv_lock; - int sv_shuttingdown; - srpc_event_t sv_ev; /* LNet event */ - int sv_nposted_msg; /* # posted message buffers */ - cfs_list_t sv_free_rpcq; /* free RPC descriptors */ - cfs_list_t sv_active_rpcq; /* in-flight RPCs */ - cfs_list_t sv_posted_msgq; /* posted message buffers */ - cfs_list_t sv_blocked_msgq; /* blocked for RPC descriptor */ +/* CPU partition data of srpc service */ +struct srpc_service_cd { + /** serialize */ + cfs_spinlock_t scd_lock; + /** backref to service */ + struct srpc_service *scd_svc; + /** event buffer */ + srpc_event_t scd_ev; + /** free RPC descriptors */ + cfs_list_t scd_rpc_free; + /** in-flight RPCs */ + cfs_list_t scd_rpc_active; + /** workitem for posting buffer */ + swi_workitem_t scd_buf_wi; + /** CPT id */ + int scd_cpt; + /** error code for scd_buf_wi */ + int scd_buf_err; + /** timestamp for scd_buf_err */ + unsigned long scd_buf_err_stamp; + /** total # request buffers */ + int scd_buf_total; + /** # posted request buffers */ + int scd_buf_nposted; + /** in progress of buffer posting */ + int scd_buf_posting; + /** allocate more buffers if scd_buf_nposted < scd_buf_low */ + int scd_buf_low; + /** increase/decrease some buffers */ + int scd_buf_adjust; + /** posted message buffers */ + cfs_list_t scd_buf_posted; + /** blocked for RPC descriptor */ + cfs_list_t scd_buf_blocked; +}; + +/* number of server workitems (mini-thread) for testing service */ +#define SFW_TEST_WI_MIN 256 +#define SFW_TEST_WI_MAX 2048 +/* extra buffers for tolerating buggy peers, or unbalanced number + * of peers between partitions */ +#define SFW_TEST_WI_EXTRA 64 + +/* number of server workitems (mini-thread) for framework service */ +#define SFW_FRWK_WI_MIN 16 +#define SFW_FRWK_WI_MAX 256 +typedef struct srpc_service { + int sv_id; /* service id */ + const char *sv_name; /* human readable name */ + int sv_wi_total; /* total server workitems */ + int sv_shuttingdown; + int sv_ncpts; + /* percpt data for srpc_service */ + struct srpc_service_cd **sv_cpt_data; /* Service callbacks: * - sv_handler: process incoming RPC request * - sv_bulk_ready: notify bulk data @@ -291,9 +333,6 @@ typedef struct srpc_service { int (*sv_bulk_ready) (srpc_server_rpc_t *, int); } srpc_service_t; -#define SFW_POST_BUFFERS 256 -#define SFW_SERVICE_CONCURRENCY (SFW_POST_BUFFERS/2) - typedef struct { cfs_list_t sn_list; /* chain on fw_zombie_sessions */ lst_sid_t sn_id; /* unique identifier */ @@ -372,7 +411,7 @@ typedef struct sfw_test_unit { swi_workitem_t tsu_worker; /* workitem of the test unit */ } sfw_test_unit_t; -typedef struct { +typedef struct sfw_test_case { cfs_list_t tsc_list; /* chain on fw_tests */ srpc_service_t *tsc_srv_service; /* test service */ sfw_test_client_ops_t *tsc_cli_ops; /* ops of test client */ @@ -389,7 +428,7 @@ void sfw_client_rpc_done(srpc_client_rpc_t *rpc); void sfw_unpack_message(srpc_msg_t *msg); void sfw_free_pages(srpc_server_rpc_t *rpc); void sfw_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i); -int sfw_alloc_pages(srpc_server_rpc_t *rpc, int npages, int sink); +int sfw_alloc_pages(srpc_server_rpc_t *rpc, int cpt, int npages, int sink); int sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply); srpc_client_rpc_t * @@ -400,7 +439,7 @@ srpc_create_client_rpc(lnet_process_id_t peer, int service, void srpc_post_rpc(srpc_client_rpc_t *rpc); void srpc_abort_rpc(srpc_client_rpc_t *rpc, int why); void srpc_free_bulk(srpc_bulk_t *bk); -srpc_bulk_t *srpc_alloc_bulk(int npages, int sink); +srpc_bulk_t *srpc_alloc_bulk(int cpt, int npages, int sink); int srpc_send_rpc(swi_workitem_t *wi); int srpc_send_reply(srpc_server_rpc_t *rpc); int srpc_add_service(srpc_service_t *sv); @@ -414,7 +453,13 @@ void srpc_get_counters(srpc_counters_t *cnt); void srpc_set_counters(const srpc_counters_t *cnt); extern struct cfs_wi_sched *lst_sched_serial; -extern struct cfs_wi_sched *lst_sched_test; +extern struct cfs_wi_sched **lst_sched_test; + +static inline int +srpc_serv_is_framework(struct srpc_service *svc) +{ + return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID; +} static inline int swi_wi_action(cfs_workitem_t *wi) @@ -441,11 +486,17 @@ swi_schedule_workitem(swi_workitem_t *wi) } static inline void -swi_kill_workitem(swi_workitem_t *swi) +swi_exit_workitem(swi_workitem_t *swi) { cfs_wi_exit(swi->swi_sched, &swi->swi_workitem); } +static inline int +swi_deschedule_workitem(swi_workitem_t *swi) +{ + return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem); +} + #ifndef __KERNEL__ static inline int swi_check_events(void) @@ -490,7 +541,8 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer, crpc_bulk.bk_iovs[nbulkiov])); CFS_INIT_LIST_HEAD(&rpc->crpc_list); - swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, lst_sched_test); + swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, + lst_sched_test[lnet_cpt_of_nid(peer.nid)]); cfs_spin_lock_init(&rpc->crpc_lock); cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ @@ -551,7 +603,7 @@ int selftest_wait_events(void); #else -#define selftest_wait_events() cfs_pause(cfs_time_seconds(1)) +#define selftest_wait_events() cfs_pause(cfs_time_seconds(1) / 10) #endif @@ -570,13 +622,11 @@ do { \ } while (0) static inline void -srpc_wait_service_shutdown (srpc_service_t *sv) +srpc_wait_service_shutdown(srpc_service_t *sv) { - int i = 2; + int i = 2; - cfs_spin_lock(&sv->sv_lock); - LASSERT (sv->sv_shuttingdown); - cfs_spin_unlock(&sv->sv_lock); + LASSERT(sv->sv_shuttingdown); while (srpc_finish_service(sv) == 0) { i++; -- 1.8.3.1