Whamcloud - gitweb
LU-56 lnet: SMP improvements for LNet selftest
authorLiang Zhen <liang@whamcloud.com>
Wed, 16 May 2012 06:53:43 +0000 (14:53 +0800)
committerAndreas Dilger <adilger@whamcloud.com>
Fri, 29 Jun 2012 20:30:46 +0000 (16:30 -0400)
LNet selftest is using a global WI threads pool to handle all RPCs,
it has performance problem on fat cores machine because all threads
will contend on global queues protected by a single spinlock.
This patch will fix this by creating WI scheduler for each CPT,
RPCs will be dispatched to WI schedulers on different CPTs, and there
is no contention between threads in different WI schedulers.

Another major change in this patch is create percpt data for LST
service. In current implementation each service has a global data
structure to store buffer list and RPC list etch, and all operations
on them are protected by per-service lock, again, this could be a
serious performance issue if the service is busy enough. Having
percpt service data would resolve this issue because threads running
in one CPT will only require lock lock and access local data.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I8035faf2e87d8e424a8c2fac903bf3b241668e00
Reviewed-on: http://review.whamcloud.com/2805
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Lai Siyao <laisiyao@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lnet/selftest/brw_test.c
lnet/selftest/console.c
lnet/selftest/framework.c
lnet/selftest/module.c
lnet/selftest/ping_test.c
lnet/selftest/rpc.c
lnet/selftest/rpc.h
lnet/selftest/selftest.h

index 59c6e60..345c828 100644 (file)
 
 #include "selftest.h"
 
 
 #include "selftest.h"
 
+static int brw_srv_workitems = SFW_TEST_WI_MAX;
+CFS_MODULE_PARM(brw_srv_workitems, "i", int, 0644, "# BRW server workitems");
 
 
-extern int brw_inject_errors;
+static int brw_inject_errors;
+CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644,
+               "# data errors to inject randomly, zero by default");
 
 static void
 brw_client_fini (sfw_test_instance_t *tsi)
 
 static void
 brw_client_fini (sfw_test_instance_t *tsi)
@@ -80,9 +84,10 @@ brw_client_init (sfw_test_instance_t *tsi)
             flags != LST_BRW_CHECK_FULL && flags != LST_BRW_CHECK_SIMPLE)
                 return -EINVAL;
 
             flags != LST_BRW_CHECK_FULL && flags != LST_BRW_CHECK_SIMPLE)
                 return -EINVAL;
 
-        cfs_list_for_each_entry_typed (tsu, &tsi->tsi_units,
-                                       sfw_test_unit_t, tsu_list) {
-                bulk = srpc_alloc_bulk(npg, breq->blk_opc == LST_BRW_READ);
+       cfs_list_for_each_entry_typed(tsu, &tsi->tsi_units,
+                                     sfw_test_unit_t, tsu_list) {
+               bulk = srpc_alloc_bulk(lnet_cpt_of_nid(tsu->tsu_dest.nid),
+                                      npg, breq->blk_opc == LST_BRW_READ);
                 if (bulk == NULL) {
                         brw_client_fini(tsi);
                         return -ENOMEM;
                 if (bulk == NULL) {
                         brw_client_fini(tsi);
                         return -ENOMEM;
@@ -367,9 +372,9 @@ brw_bulk_ready (srpc_server_rpc_t *rpc, int status)
 }
 
 int
 }
 
 int
-brw_server_handle (srpc_server_rpc_t *rpc)
+brw_server_handle(struct srpc_server_rpc *rpc)
 {
 {
-        srpc_service_t   *sv = rpc->srpc_service;
+       struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
         srpc_msg_t       *replymsg = &rpc->srpc_replymsg;
         srpc_msg_t       *reqstmsg = &rpc->srpc_reqstbuf->buf_msg;
         srpc_brw_reply_t *reply = &replymsg->msg_body.brw_reply;
         srpc_msg_t       *replymsg = &rpc->srpc_replymsg;
         srpc_msg_t       *reqstmsg = &rpc->srpc_reqstbuf->buf_msg;
         srpc_brw_reply_t *reply = &replymsg->msg_body.brw_reply;
@@ -403,9 +408,12 @@ brw_server_handle (srpc_server_rpc_t *rpc)
         }
 
         reply->brw_status = 0;
         }
 
         reply->brw_status = 0;
-        rc = sfw_alloc_pages(rpc, reqst->brw_len / CFS_PAGE_SIZE,
-                             reqst->brw_rw == LST_BRW_WRITE);
-        if (rc != 0) return rc;
+       /* allocate from "local" node */
+       rc = sfw_alloc_pages(rpc, rpc->srpc_scd->scd_cpt,
+                            reqst->brw_len / CFS_PAGE_SIZE,
+                            reqst->brw_rw == LST_BRW_WRITE);
+       if (rc != 0)
+               return rc;
 
         if (reqst->brw_rw == LST_BRW_READ)
                 brw_fill_bulk(rpc->srpc_bulk, reqst->brw_flags, BRW_MAGIC);
 
         if (reqst->brw_rw == LST_BRW_READ)
                 brw_fill_bulk(rpc->srpc_bulk, reqst->brw_flags, BRW_MAGIC);
@@ -427,8 +435,16 @@ void brw_init_test_client(void)
 srpc_service_t brw_test_service;
 void brw_init_test_service(void)
 {
 srpc_service_t brw_test_service;
 void brw_init_test_service(void)
 {
+#ifndef __KERNEL__
+       char *s;
+
+       s = getenv("BRW_INJECT_ERRORS");
+       brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors;
+#endif
+
         brw_test_service.sv_id         = SRPC_SERVICE_BRW;
         brw_test_service.sv_name       = "brw_test";
         brw_test_service.sv_handler    = brw_server_handle;
         brw_test_service.sv_bulk_ready = brw_bulk_ready;
         brw_test_service.sv_id         = SRPC_SERVICE_BRW;
         brw_test_service.sv_name       = "brw_test";
         brw_test_service.sv_handler    = brw_server_handle;
         brw_test_service.sv_bulk_ready = brw_bulk_ready;
+       brw_test_service.sv_wi_total   = brw_srv_workitems;
 }
 }
index 0d440ab..9dd1a49 100644 (file)
@@ -1915,7 +1915,7 @@ void lstcon_init_acceptor_service(void)
         lstcon_acceptor_service.sv_name    = "join session";
         lstcon_acceptor_service.sv_handler = lstcon_acceptor_handle;
         lstcon_acceptor_service.sv_id      = SRPC_SERVICE_JOIN;
         lstcon_acceptor_service.sv_name    = "join session";
         lstcon_acceptor_service.sv_handler = lstcon_acceptor_handle;
         lstcon_acceptor_service.sv_id      = SRPC_SERVICE_JOIN;
-        lstcon_acceptor_service.sv_concur  = SFW_SERVICE_CONCURRENCY;
+       lstcon_acceptor_service.sv_wi_total = SFW_FRWK_WI_MAX;
 }
 
 extern int lstcon_ioctl_entry(unsigned int cmd, struct libcfs_ioctl_data *data);
 }
 
 extern int lstcon_ioctl_entry(unsigned int cmd, struct libcfs_ioctl_data *data);
@@ -1927,7 +1927,6 @@ int
 lstcon_console_init(void)
 {
         int     i;
 lstcon_console_init(void)
 {
         int     i;
-        int     n;
         int     rc;
 
         memset(&console_session, 0, sizeof(lstcon_session_t));
         int     rc;
 
         memset(&console_session, 0, sizeof(lstcon_session_t));
@@ -1966,8 +1965,9 @@ lstcon_console_init(void)
                 return rc;
         }
 
                 return rc;
         }
 
-        n = srpc_service_add_buffers(&lstcon_acceptor_service, SFW_POST_BUFFERS);
-        if (n != SFW_POST_BUFFERS) {
+       rc = srpc_service_add_buffers(&lstcon_acceptor_service,
+                                     lstcon_acceptor_service.sv_wi_total);
+       if (rc != 0) {
                 rc = -ENOMEM;
                 goto out;
         }
                 rc = -ENOMEM;
                 goto out;
         }
index c034665..23bae6e 100644 (file)
 
 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
 
 
 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
 
-int brw_inject_errors = 0;
-CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644,
-                "# data errors to inject randomly, zero by default");
-
 static int session_timeout = 100;
 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
                 "test session timeout in seconds (100 by default, 0 == never)");
 static int session_timeout = 100;
 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
                 "test session timeout in seconds (100 by default, 0 == never)");
@@ -55,11 +51,6 @@ static int rpc_timeout = 64;
 CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
                 "rpc timeout in seconds (64 by default, 0 == never)");
 
 CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
                 "rpc timeout in seconds (64 by default, 0 == never)");
 
-#define SFW_TEST_CONCURRENCY     1792
-#define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
-
-#define sfw_test_buffers(tsi)    ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS)
-
 #define sfw_unpack_id(id)               \
 do {                                    \
         __swab64s(&(id).nid);           \
 #define sfw_unpack_id(id)               \
 do {                                    \
         __swab64s(&(id).nid);           \
@@ -309,10 +300,10 @@ sfw_init_session (sfw_session_t *sn, lst_sid_t sid, const char *name)
 
 /* completion handler for incoming framework RPCs */
 void
 
 /* completion handler for incoming framework RPCs */
 void
-sfw_server_rpc_done (srpc_server_rpc_t *rpc)
+sfw_server_rpc_done(struct srpc_server_rpc *rpc)
 {
 {
-        srpc_service_t *sv = rpc->srpc_service;
-        int             status = rpc->srpc_status;
+       struct srpc_service     *sv     = rpc->srpc_scd->scd_svc;
+       int                     status  = rpc->srpc_status;
 
         CDEBUG (D_NET,
                 "Incoming framework RPC done: "
 
         CDEBUG (D_NET,
                 "Incoming framework RPC done: "
@@ -553,45 +544,66 @@ sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
         cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
 }
 
         cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
 }
 
-int
-sfw_load_test (sfw_test_instance_t *tsi)
+static inline int
+sfw_test_buffers(sfw_test_instance_t *tsi)
 {
 {
-        sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
-        int              nrequired = sfw_test_buffers(tsi);
-        int              nposted;
-
-        LASSERT (tsc != NULL);
-
-        if (tsi->tsi_is_client) {
-                tsi->tsi_ops = tsc->tsc_cli_ops;
-                return 0;
-        }
+       struct sfw_test_case    *tsc = sfw_find_test_case(tsi->tsi_service);
+       struct srpc_service     *svc = tsc->tsc_srv_service;
+       int                     nbuf;
 
 
-        nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired);
-        if (nposted != nrequired) {
-                CWARN ("Failed to reserve enough buffers: "
-                       "service %s, %d needed, %d reserved\n",
-                       tsc->tsc_srv_service->sv_name, nrequired, nposted);
-                srpc_service_remove_buffers(tsc->tsc_srv_service, nposted);
-                return -ENOMEM;
-        }
+       nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
+       return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
+}
 
 
-        CDEBUG (D_NET, "Reserved %d buffers for test %s\n",
-                nposted, tsc->tsc_srv_service->sv_name);
-        return 0;
+int
+sfw_load_test(struct sfw_test_instance *tsi)
+{
+       struct sfw_test_case    *tsc = sfw_find_test_case(tsi->tsi_service);
+       struct srpc_service     *svc = tsc->tsc_srv_service;
+       int                     nbuf = sfw_test_buffers(tsi);
+       int                     rc;
+
+       LASSERT(tsc != NULL);
+
+       if (tsi->tsi_is_client) {
+               tsi->tsi_ops = tsc->tsc_cli_ops;
+               return 0;
+       }
+
+       rc = srpc_service_add_buffers(svc, nbuf);
+       if (rc != 0) {
+               CWARN("Failed to reserve enough buffers: "
+                     "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
+               /* NB: this error handler is not strictly correct, because
+                * it may release more buffers than already allocated,
+                * but it doesn't matter because request portal should
+                * be lazy portal and will grow buffers if necessary. */
+               srpc_service_remove_buffers(svc, nbuf);
+               return -ENOMEM;
+       }
+
+       CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
+              nbuf * (srpc_serv_is_framework(svc) ?
+                      1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
+       return 0;
 }
 
 void
 }
 
 void
-sfw_unload_test (sfw_test_instance_t *tsi)
+sfw_unload_test(struct sfw_test_instance *tsi)
 {
 {
-        sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
+       struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
 
 
-        LASSERT (tsc != NULL);
+       LASSERT(tsc != NULL);
 
 
-        if (!tsi->tsi_is_client)
-                srpc_service_remove_buffers(tsc->tsc_srv_service,
-                                            sfw_test_buffers(tsi));
-        return;
+       if (tsi->tsi_is_client)
+               return;
+
+       /* shrink buffers, because request portal is lazy portal
+        * which can grow buffers at runtime so we may leave
+        * some buffers behind, but never mind... */
+       srpc_service_remove_buffers(tsc->tsc_srv_service,
+                                   sfw_test_buffers(tsi));
+       return;
 }
 
 void
 }
 
 void
@@ -900,18 +912,20 @@ sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer,
                                      srpc_client_rpc_t, crpc_list);
                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
                 cfs_list_del_init(&rpc->crpc_list);
                                      srpc_client_rpc_t, crpc_list);
                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
                 cfs_list_del_init(&rpc->crpc_list);
-
-                srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
-                                     blklen, sfw_test_rpc_done,
-                                     sfw_test_rpc_fini, tsu);
         }
 
         cfs_spin_unlock(&tsi->tsi_lock);
         }
 
         cfs_spin_unlock(&tsi->tsi_lock);
-        
-        if (rpc == NULL)
-                rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
-                                             blklen, sfw_test_rpc_done, 
-                                             sfw_test_rpc_fini, tsu);
+
+       if (rpc == NULL) {
+               rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
+                                            blklen, sfw_test_rpc_done,
+                                            sfw_test_rpc_fini, tsu);
+       } else {
+               srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
+                                    blklen, sfw_test_rpc_done,
+                                    sfw_test_rpc_fini, tsu);
+       }
+
         if (rpc == NULL) {
                 CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
                 return -ENOMEM;
         if (rpc == NULL) {
                 CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
                 return -ENOMEM;
@@ -966,9 +980,9 @@ test_done:
          * - my batch is still active; no one can run it again now.
          * Cancel pending schedules and prevent future schedule attempts:
          */
          * - my batch is still active; no one can run it again now.
          * Cancel pending schedules and prevent future schedule attempts:
          */
-        swi_kill_workitem(wi);
-        sfw_test_unit_done(tsu);
-        return 1;
+       swi_exit_workitem(wi);
+       sfw_test_unit_done(tsu);
+       return 1;
 }
 
 int
 }
 
 int
@@ -1000,7 +1014,8 @@ sfw_run_batch (sfw_batch_t *tsb)
                         tsu->tsu_loop = tsi->tsi_loop;
                         wi = &tsu->tsu_worker;
                        swi_init_workitem(wi, tsu, sfw_run_test,
                         tsu->tsu_loop = tsi->tsi_loop;
                         wi = &tsu->tsu_worker;
                        swi_init_workitem(wi, tsu, sfw_run_test,
-                                         lst_sched_test);
+                                         lst_sched_test[\
+                                         lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
                         swi_schedule_workitem(wi);
                 }
         }
                         swi_schedule_workitem(wi);
                 }
         }
@@ -1085,15 +1100,16 @@ sfw_free_pages (srpc_server_rpc_t *rpc)
 }
 
 int
 }
 
 int
-sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink)
+sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int sink)
 {
 {
-        LASSERT (rpc->srpc_bulk == NULL);
-        LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
+       LASSERT(rpc->srpc_bulk == NULL);
+       LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
 
 
-        rpc->srpc_bulk = srpc_alloc_bulk(npages, sink);
-        if (rpc->srpc_bulk == NULL) return -ENOMEM;
+       rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, sink);
+       if (rpc->srpc_bulk == NULL)
+               return -ENOMEM;
 
 
-        return 0;
+       return 0;
 }
 
 int
 }
 
 int
@@ -1129,7 +1145,7 @@ sfw_add_test (srpc_server_rpc_t *rpc)
         bat = sfw_bid2batch(request->tsr_bid);
         if (bat == NULL) {
                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
         bat = sfw_bid2batch(request->tsr_bid);
         if (bat == NULL) {
                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
-                        rpc->srpc_service->sv_name,
+                       rpc->srpc_scd->scd_svc->sv_name,
                         libcfs_id2str(rpc->srpc_peer));
                 return -ENOMEM;
         }
                         libcfs_id2str(rpc->srpc_peer));
                 return -ENOMEM;
         }
@@ -1140,9 +1156,9 @@ sfw_add_test (srpc_server_rpc_t *rpc)
         }
 
         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
         }
 
         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
-                /* rpc will be resumed later in sfw_bulk_ready */
-                return sfw_alloc_pages(rpc,
-                                       sfw_id_pages(request->tsr_ndest), 1);
+               /* rpc will be resumed later in sfw_bulk_ready */
+               return sfw_alloc_pages(rpc, CFS_CPT_ANY,
+                                      sfw_id_pages(request->tsr_ndest), 1);
         }
 
         rc = sfw_add_test_instance(bat, rpc);
         }
 
         rc = sfw_add_test_instance(bat, rpc);
@@ -1198,9 +1214,9 @@ sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
 }
 
 int
 }
 
 int
-sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
+sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
 {
 {
-        srpc_service_t *sv = rpc->srpc_service;
+       struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
         srpc_msg_t     *reply = &rpc->srpc_replymsg;
         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
         int             rc = 0;
         srpc_msg_t     *reply = &rpc->srpc_replymsg;
         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
         int             rc = 0;
@@ -1279,10 +1295,10 @@ sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
 }
 
 int
 }
 
 int
-sfw_bulk_ready (srpc_server_rpc_t *rpc, int status)
+sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
 {
 {
-        srpc_service_t *sv = rpc->srpc_service;
-        int             rc;
+       struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
+       int                     rc;
 
         LASSERT (rpc->srpc_bulk != NULL);
         LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
 
         LASSERT (rpc->srpc_bulk != NULL);
         LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
@@ -1601,9 +1617,6 @@ sfw_startup (void)
         s = getenv("SESSION_TIMEOUT");
         session_timeout = s != NULL ? atoi(s) : session_timeout;
 
         s = getenv("SESSION_TIMEOUT");
         session_timeout = s != NULL ? atoi(s) : session_timeout;
 
-        s = getenv("BRW_INJECT_ERRORS");
-        brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors;
-
         s = getenv("RPC_TIMEOUT");
         rpc_timeout = s != NULL ? atoi(s) : rpc_timeout;
 #endif
         s = getenv("RPC_TIMEOUT");
         rpc_timeout = s != NULL ? atoi(s) : rpc_timeout;
 #endif
@@ -1652,7 +1665,6 @@ sfw_startup (void)
         cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
                                        sfw_test_case_t, tsc_list) {
                 sv = tsc->tsc_srv_service;
         cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
                                        sfw_test_case_t, tsc_list) {
                 sv = tsc->tsc_srv_service;
-                sv->sv_concur = SFW_TEST_CONCURRENCY;
 
                 rc = srpc_add_service(sv);
                 LASSERT (rc != -EBUSY);
 
                 rc = srpc_add_service(sv);
                 LASSERT (rc != -EBUSY);
@@ -1669,7 +1681,7 @@ sfw_startup (void)
 
                 sv->sv_bulk_ready = NULL;
                 sv->sv_handler    = sfw_handle_server_rpc;
 
                 sv->sv_bulk_ready = NULL;
                 sv->sv_handler    = sfw_handle_server_rpc;
-                sv->sv_concur     = SFW_SERVICE_CONCURRENCY;
+               sv->sv_wi_total   = SFW_FRWK_WI_MAX;
                 if (sv->sv_id == SRPC_SERVICE_TEST)
                         sv->sv_bulk_ready = sfw_bulk_ready;
 
                 if (sv->sv_id == SRPC_SERVICE_TEST)
                         sv->sv_bulk_ready = sfw_bulk_ready;
 
@@ -1684,13 +1696,13 @@ sfw_startup (void)
                 /* about to sfw_shutdown, no need to add buffer */
                 if (error) continue;
 
                 /* about to sfw_shutdown, no need to add buffer */
                 if (error) continue;
 
-                rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS);
-                if (rc != SFW_POST_BUFFERS) {
-                        CWARN ("Failed to reserve enough buffers: "
-                               "service %s, %d needed, %d reserved\n",
-                               sv->sv_name, SFW_POST_BUFFERS, rc);
-                        error = -ENOMEM;
-                }
+               rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
+               if (rc != 0) {
+                       CWARN("Failed to reserve enough buffers: "
+                             "service %s, %d needed: %d\n",
+                             sv->sv_name, sv->sv_wi_total, rc);
+                       error = -ENOMEM;
+               }
         }
 
         if (error != 0)
         }
 
         if (error != 0)
index 1377b10..74ea2a8 100644 (file)
 
 #include "selftest.h"
 
 
 #include "selftest.h"
 
-
 enum {
        LST_INIT_NONE           = 0,
 enum {
        LST_INIT_NONE           = 0,
-       LST_INIT_WI,
+       LST_INIT_WI_SERIAL,
+       LST_INIT_WI_TEST,
        LST_INIT_RPC,
        LST_INIT_FW,
        LST_INIT_CONSOLE
        LST_INIT_RPC,
        LST_INIT_FW,
        LST_INIT_CONSOLE
@@ -51,11 +51,13 @@ extern int lstcon_console_fini(void);
 static int lst_init_step = LST_INIT_NONE;
 
 struct cfs_wi_sched *lst_sched_serial;
 static int lst_init_step = LST_INIT_NONE;
 
 struct cfs_wi_sched *lst_sched_serial;
-struct cfs_wi_sched *lst_sched_test;
+struct cfs_wi_sched **lst_sched_test;
 
 void
 
 void
-lnet_selftest_fini (void)
+lnet_selftest_fini(void)
 {
 {
+       int     i;
+
         switch (lst_init_step) {
 #ifdef __KERNEL__
                 case LST_INIT_CONSOLE:
         switch (lst_init_step) {
 #ifdef __KERNEL__
                 case LST_INIT_CONSOLE:
@@ -65,11 +67,21 @@ lnet_selftest_fini (void)
                         sfw_shutdown();
                 case LST_INIT_RPC:
                         srpc_shutdown();
                         sfw_shutdown();
                 case LST_INIT_RPC:
                         srpc_shutdown();
-               case LST_INIT_WI:
+               case LST_INIT_WI_TEST:
+                       for (i = 0;
+                            i < cfs_cpt_number(lnet_cpt_table()); i++) {
+                               if (lst_sched_test[i] == NULL)
+                                       continue;
+                               cfs_wi_sched_destroy(lst_sched_test[i]);
+                       }
+                       LIBCFS_FREE(lst_sched_test,
+                                   sizeof(lst_sched_test[0]) *
+                                   cfs_cpt_number(lnet_cpt_table()));
+                       lst_sched_test = NULL;
+
+               case LST_INIT_WI_SERIAL:
                        cfs_wi_sched_destroy(lst_sched_serial);
                        cfs_wi_sched_destroy(lst_sched_serial);
-                       cfs_wi_sched_destroy(lst_sched_test);
                        lst_sched_serial = NULL;
                        lst_sched_serial = NULL;
-                       lst_sched_test = NULL;
                 case LST_INIT_NONE:
                         break;
                 default:
                 case LST_INIT_NONE:
                         break;
                 default:
@@ -78,7 +90,6 @@ lnet_selftest_fini (void)
         return;
 }
 
         return;
 }
 
-
 void
 lnet_selftest_structure_assertion(void)
 {
 void
 lnet_selftest_structure_assertion(void)
 {
@@ -91,25 +102,39 @@ lnet_selftest_structure_assertion(void)
 }
 
 int
 }
 
 int
-lnet_selftest_init (void)
+lnet_selftest_init(void)
 {
 {
-       int     nthrs;
-        int    rc;
+       int     nscheds;
+       int     rc;
+       int     i;
 
        rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY,
                                 1, &lst_sched_serial);
 
        rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY,
                                 1, &lst_sched_serial);
-       if (rc != 0)
-               return rc;
-
-       nthrs = cfs_cpt_weight(lnet_cpt_table(), CFS_CPT_ANY);
-       rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), CFS_CPT_ANY,
-                                nthrs, &lst_sched_test);
        if (rc != 0) {
        if (rc != 0) {
-               cfs_wi_sched_destroy(lst_sched_serial);
-               lst_sched_serial = NULL;
+               CERROR("Failed to create serial WI scheduler for LST\n");
                return rc;
        }
                return rc;
        }
-       lst_init_step = LST_INIT_WI;
+       lst_init_step = LST_INIT_WI_SERIAL;
+
+       nscheds = cfs_cpt_number(lnet_cpt_table());
+       LIBCFS_ALLOC(lst_sched_test, sizeof(lst_sched_test[0]) * nscheds);
+       if (lst_sched_test == NULL)
+               goto error;
+
+       lst_init_step = LST_INIT_WI_TEST;
+       for (i = 0; i < nscheds; i++) {
+               int nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
+
+               /* reserve at least one CPU for LND */
+               nthrs = max(nthrs - 1, 1);
+               rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i,
+                                        nthrs, &lst_sched_test[i]);
+               if (rc != 0) {
+                       CERROR("Failed to create CPT affinity WI scheduler "
+                              "%d for LST\n", i);
+                       goto error;
+               }
+       }
 
         rc = srpc_startup();
         if (rc != 0) {
 
         rc = srpc_startup();
         if (rc != 0) {
@@ -131,13 +156,12 @@ lnet_selftest_init (void)
                 CERROR("LST can't startup console\n");
                 goto error;
         }
                 CERROR("LST can't startup console\n");
                 goto error;
         }
-        lst_init_step = LST_INIT_CONSOLE;  
+       lst_init_step = LST_INIT_CONSOLE;
 #endif
 #endif
-
-        return 0;
+       return 0;
 error:
 error:
-        lnet_selftest_fini();
-        return rc;
+       lnet_selftest_fini();
+       return rc;
 }
 
 #ifdef __KERNEL__
 }
 
 #ifdef __KERNEL__
index 069d14d..6fa4c55 100644 (file)
@@ -42,6 +42,9 @@
 
 #define LST_PING_TEST_MAGIC     0xbabeface
 
 
 #define LST_PING_TEST_MAGIC     0xbabeface
 
+int ping_srv_workitems = SFW_TEST_WI_MAX;
+CFS_MODULE_PARM(ping_srv_workitems, "i", int, 0644, "# PING server workitems");
+
 typedef struct {
         cfs_spinlock_t  pnd_lock;       /* serialize */
         int             pnd_counter;    /* sequence counter */
 typedef struct {
         cfs_spinlock_t  pnd_lock;       /* serialize */
         int             pnd_counter;    /* sequence counter */
@@ -155,9 +158,9 @@ ping_client_done_rpc (sfw_test_unit_t *tsu, srpc_client_rpc_t *rpc)
 }
 
 static int
 }
 
 static int
-ping_server_handle (srpc_server_rpc_t *rpc)
+ping_server_handle(struct srpc_server_rpc *rpc)
 {
 {
-        srpc_service_t    *sv  = rpc->srpc_service;
+       struct srpc_service     *sv  = rpc->srpc_scd->scd_svc;
         srpc_msg_t        *reqstmsg = &rpc->srpc_reqstbuf->buf_msg;
         srpc_ping_reqst_t *req = &reqstmsg->msg_body.ping_reqst;
         srpc_ping_reply_t *rep = &rpc->srpc_replymsg.msg_body.ping_reply;
         srpc_msg_t        *reqstmsg = &rpc->srpc_reqstbuf->buf_msg;
         srpc_ping_reqst_t *req = &reqstmsg->msg_body.ping_reqst;
         srpc_ping_reply_t *rep = &rpc->srpc_replymsg.msg_body.ping_reply;
@@ -201,7 +204,8 @@ void ping_init_test_client(void)
 srpc_service_t ping_test_service;
 void ping_init_test_service(void)
 {
 srpc_service_t ping_test_service;
 void ping_init_test_service(void)
 {
-        ping_test_service.sv_id       = SRPC_SERVICE_PING;
-        ping_test_service.sv_name     = "ping_test";
-        ping_test_service.sv_handler  = ping_server_handle;
+       ping_test_service.sv_id       = SRPC_SERVICE_PING;
+       ping_test_service.sv_name     = "ping_test";
+       ping_test_service.sv_handler  = ping_server_handle;
+       ping_test_service.sv_wi_total = ping_srv_workitems;
 }
 }
index 5329b94..2193a3a 100644 (file)
  * lnet/selftest/rpc.c
  *
  * Author: Isaac Huang <isaac@clusterfs.com>
  * lnet/selftest/rpc.c
  *
  * Author: Isaac Huang <isaac@clusterfs.com>
+ *
+ * 2012-05-13: Liang Zhen <liang@whamcloud.com>
+ * - percpt data for service to improve smp performance
+ * - code cleanup
  */
 
 #define DEBUG_SUBSYSTEM S_LNET
 
 #include "selftest.h"
 
  */
 
 #define DEBUG_SUBSYSTEM S_LNET
 
 #include "selftest.h"
 
-
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
@@ -58,6 +61,13 @@ struct smoketest_rpc {
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
+static inline int
+srpc_serv_portal(int svc_id)
+{
+       return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
+              SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
+}
+
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
@@ -124,7 +134,7 @@ srpc_free_bulk (srpc_bulk_t *bk)
 }
 
 srpc_bulk_t *
 }
 
 srpc_bulk_t *
-srpc_alloc_bulk (int npages, int sink)
+srpc_alloc_bulk(int cpt, int npages, int sink)
 {
         srpc_bulk_t  *bk;
         cfs_page_t  **pages;
 {
         srpc_bulk_t  *bk;
         cfs_page_t  **pages;
@@ -132,7 +142,8 @@ srpc_alloc_bulk (int npages, int sink)
 
         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
 
 
         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
 
-        LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
+       LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
+                        offsetof(srpc_bulk_t, bk_iovs[npages]));
         if (bk == NULL) {
                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
                 return NULL;
         if (bk == NULL) {
                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
                 return NULL;
@@ -143,7 +154,8 @@ srpc_alloc_bulk (int npages, int sink)
         bk->bk_niov = npages;
         bk->bk_len  = npages * CFS_PAGE_SIZE;
 #ifndef __KERNEL__
         bk->bk_niov = npages;
         bk->bk_len  = npages * CFS_PAGE_SIZE;
 #ifndef __KERNEL__
-        LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
+       LIBCFS_CPT_ALLOC(pages, lnet_cpt_table(), cpt,
+                        sizeof(cfs_page_t *) * npages);
         if (pages == NULL) {
                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
                 CERROR ("Can't allocate page array for %d pages\n", npages);
         if (pages == NULL) {
                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
                 CERROR ("Can't allocate page array for %d pages\n", npages);
@@ -157,8 +169,9 @@ srpc_alloc_bulk (int npages, int sink)
 #endif
 
         for (i = 0; i < npages; i++) {
 #endif
 
         for (i = 0; i < npages; i++) {
-                cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
+               cfs_page_t *pg;
 
 
+               pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, CFS_ALLOC_STD);
                 if (pg == NULL) {
                         CERROR ("Can't allocate page %d of %d\n", i, npages);
                         srpc_free_bulk(bk);
                 if (pg == NULL) {
                         CERROR ("Can't allocate page %d of %d\n", i, npages);
                         srpc_free_bulk(bk);
@@ -183,80 +196,167 @@ srpc_next_id (void)
 }
 
 void
 }
 
 void
-srpc_init_server_rpc (srpc_server_rpc_t *rpc,
-                      srpc_service_t *sv, srpc_buffer_t *buffer)
+srpc_init_server_rpc(struct srpc_server_rpc *rpc,
+                    struct srpc_service_cd *scd,
+                    struct srpc_buffer *buffer)
 {
        memset(rpc, 0, sizeof(*rpc));
        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
 {
        memset(rpc, 0, sizeof(*rpc));
        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
-                         sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ?
-                         lst_sched_serial : lst_sched_test);
+                         srpc_serv_is_framework(scd->scd_svc) ?
+                         lst_sched_serial : lst_sched_test[scd->scd_cpt]);
 
 
-        rpc->srpc_ev.ev_fired = 1; /* no event expected now */
+       rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 
 
-        rpc->srpc_service  = sv;
-        rpc->srpc_reqstbuf = buffer;
-        rpc->srpc_peer     = buffer->buf_peer;
-        rpc->srpc_self     = buffer->buf_self;
-        LNetInvalidateHandle(&rpc->srpc_replymdh);
+       rpc->srpc_scd      = scd;
+       rpc->srpc_reqstbuf = buffer;
+       rpc->srpc_peer     = buffer->buf_peer;
+       rpc->srpc_self     = buffer->buf_self;
+       LNetInvalidateHandle(&rpc->srpc_replymdh);
 }
 
 }
 
-int
-srpc_add_service (srpc_service_t *sv)
+static void
+srpc_service_fini(struct srpc_service *svc)
 {
 {
-        int                id = sv->sv_id;
-        int                i;
-        srpc_server_rpc_t *rpc;
+       struct srpc_service_cd  *scd;
+       struct srpc_server_rpc  *rpc;
+       struct srpc_buffer      *buf;
+       cfs_list_t              *q;
+       int                     i;
+
+       if (svc->sv_cpt_data == NULL)
+               return;
+
+       cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
+               while (1) {
+                       if (!cfs_list_empty(&scd->scd_buf_posted))
+                               q = &scd->scd_buf_posted;
+                       else if (!cfs_list_empty(&scd->scd_buf_blocked))
+                               q = &scd->scd_buf_blocked;
+                       else
+                               break;
+
+                       while (!cfs_list_empty(q)) {
+                               buf = cfs_list_entry(q->next,
+                                                    struct srpc_buffer,
+                                                    buf_list);
+                               cfs_list_del(&buf->buf_list);
+                               LIBCFS_FREE(buf, sizeof(*buf));
+                       }
+               }
+
+               LASSERT(cfs_list_empty(&scd->scd_rpc_active));
+
+               while (!cfs_list_empty(&scd->scd_rpc_free)) {
+                       rpc = cfs_list_entry(scd->scd_rpc_free.next,
+                                            struct srpc_server_rpc,
+                                            srpc_list);
+                       cfs_list_del(&rpc->srpc_list);
+                       LIBCFS_FREE(rpc, sizeof(*rpc));
+               }
+       }
+
+       cfs_percpt_free(svc->sv_cpt_data);
+       svc->sv_cpt_data = NULL;
+}
 
 
-        LASSERT (sv->sv_concur > 0);
-        LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
+static int
+srpc_service_nrpcs(struct srpc_service *svc)
+{
+       int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
 
 
-        cfs_spin_lock(&srpc_data.rpc_glock);
+       return srpc_serv_is_framework(svc) ?
+              max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
+}
 
 
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
+int srpc_add_buffer(struct swi_workitem *wi);
 
 
-        if (srpc_data.rpc_services[id] != NULL) {
-                cfs_spin_unlock(&srpc_data.rpc_glock);
-                return -EBUSY;
-        }
+static int
+srpc_service_init(struct srpc_service *svc)
+{
+       struct srpc_service_cd  *scd;
+       struct srpc_server_rpc  *rpc;
+       int                     nrpcs;
+       int                     i;
+       int                     j;
+
+       svc->sv_shuttingdown = 0;
+
+       svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
+                                           sizeof(struct srpc_service_cd));
+       if (svc->sv_cpt_data == NULL)
+               return -ENOMEM;
+
+       svc->sv_ncpts = srpc_serv_is_framework(svc) ?
+                       1 : cfs_cpt_number(lnet_cpt_table());
+       nrpcs = srpc_service_nrpcs(svc);
+
+       cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
+               scd->scd_cpt = i;
+               scd->scd_svc = svc;
+               cfs_spin_lock_init(&scd->scd_lock);
+               CFS_INIT_LIST_HEAD(&scd->scd_rpc_free);
+               CFS_INIT_LIST_HEAD(&scd->scd_rpc_active);
+               CFS_INIT_LIST_HEAD(&scd->scd_buf_posted);
+               CFS_INIT_LIST_HEAD(&scd->scd_buf_blocked);
+
+               scd->scd_ev.ev_data = scd;
+               scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
+
+               /* NB: don't use lst_sched_serial for adding buffer,
+                * see details in srpc_service_add_buffers() */
+               swi_init_workitem(&scd->scd_buf_wi, scd,
+                                 srpc_add_buffer, lst_sched_test[i]);
+
+               if (i != 0 && srpc_serv_is_framework(svc)) {
+                       /* NB: framework service only needs srpc_service_cd for
+                        * one partition, but we allocate for all to make
+                        * it easier to implement, it will waste a little
+                        * memory but nobody should care about this */
+                       continue;
+               }
+
+               for (j = 0; j < nrpcs; j++) {
+                       LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(),
+                                        i, sizeof(*rpc));
+                       if (rpc == NULL) {
+                               srpc_service_fini(svc);
+                               return -ENOMEM;
+                       }
+                       cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+               }
+       }
+
+       return 0;
+}
 
 
-        srpc_data.rpc_services[id] = sv;
-        cfs_spin_unlock(&srpc_data.rpc_glock);
+int
+srpc_add_service(struct srpc_service *sv)
+{
+       int id = sv->sv_id;
 
 
-        sv->sv_nprune       = 0;
-        sv->sv_nposted_msg  = 0;
-        sv->sv_shuttingdown = 0;
-        cfs_spin_lock_init(&sv->sv_lock);
-        CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
-        CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
-        CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
-        CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
+       LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
 
 
-        sv->sv_ev.ev_data = sv;
-        sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
+       if (srpc_service_init(sv) != 0)
+               return -ENOMEM;
 
 
-        for (i = 0; i < sv->sv_concur; i++) {
-                LIBCFS_ALLOC(rpc, sizeof(*rpc));
-                if (rpc == NULL) goto enomem;
+       cfs_spin_lock(&srpc_data.rpc_glock);
 
 
-                cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
-        }
+       LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
 
 
-        CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
-                id, sv->sv_name, sv->sv_concur);
-        return 0;
+       if (srpc_data.rpc_services[id] != NULL) {
+               cfs_spin_unlock(&srpc_data.rpc_glock);
+               goto failed;
+       }
 
 
-enomem:
-        while (!cfs_list_empty(&sv->sv_free_rpcq)) {
-                rpc = cfs_list_entry(sv->sv_free_rpcq.next,
-                                     srpc_server_rpc_t, srpc_list);
-                cfs_list_del(&rpc->srpc_list);
-                LIBCFS_FREE(rpc, sizeof(*rpc));
-        }
+       srpc_data.rpc_services[id] = sv;
+       cfs_spin_unlock(&srpc_data.rpc_glock);
 
 
-        cfs_spin_lock(&srpc_data.rpc_glock);
-        srpc_data.rpc_services[id] = NULL;
-        cfs_spin_unlock(&srpc_data.rpc_glock);
-        return -ENOMEM;
+       CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
+       return 0;
+
+ failed:
+       srpc_service_fini(sv);
+       return -EBUSY;
 }
 
 int
 }
 
 int
@@ -277,16 +377,16 @@ srpc_remove_service (srpc_service_t *sv)
 }
 
 int
 }
 
 int
-srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
-                       int len, int options, lnet_process_id_t peer,
-                       lnet_handle_md_t *mdh, srpc_event_t *ev)
+srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
+                      int len, int options, lnet_process_id_t peer,
+                      lnet_handle_md_t *mdh, srpc_event_t *ev)
 {
 {
-        int              rc;
-        lnet_md_t        md;
-        lnet_handle_me_t meh;
+       int              rc;
+       lnet_md_t        md;
+       lnet_handle_me_t meh;
 
 
-        rc = LNetMEAttach(portal, peer, matchbits, 0,
-                          LNET_UNLINK, LNET_INS_AFTER, &meh);
+       rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
+                         local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
         if (rc != 0) {
                 CERROR ("LNetMEAttach failed: %d\n", rc);
                 LASSERT (rc == -ENOMEM);
         if (rc != 0) {
                 CERROR ("LNetMEAttach failed: %d\n", rc);
                 LASSERT (rc == -ENOMEM);
@@ -370,271 +470,344 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
 
 int
 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
 
 int
 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
-                        int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
+                       int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
 {
 {
-        int rc;
-        int portal;
-
-        if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                portal = SRPC_REQUEST_PORTAL;
-        else
-                portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
-
-        rc = srpc_post_active_rdma(portal, service, buf, len,
-                                   LNET_MD_OP_PUT, peer,
-                                   LNET_NID_ANY, mdh, ev);
-        return rc;
+       return srpc_post_active_rdma(srpc_serv_portal(service), service,
+                                    buf, len, LNET_MD_OP_PUT, peer,
+                                    LNET_NID_ANY, mdh, ev);
 }
 
 int
 }
 
 int
-srpc_post_passive_rqtbuf(int service, void *buf, int len,
-                         lnet_handle_md_t *mdh, srpc_event_t *ev)
+srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
+                        lnet_handle_md_t *mdh, srpc_event_t *ev)
 {
 {
-        int               rc;
-        int               portal;
-        lnet_process_id_t any = {0};
+       lnet_process_id_t any = {0};
 
 
-        any.nid = LNET_NID_ANY;
-        any.pid = LNET_PID_ANY;
+       any.nid = LNET_NID_ANY;
+       any.pid = LNET_PID_ANY;
 
 
-        if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                portal = SRPC_REQUEST_PORTAL;
-        else
-                portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
-
-        rc = srpc_post_passive_rdma(portal, service, buf, len,
-                                    LNET_MD_OP_PUT, any, mdh, ev);
-        return rc;
+       return srpc_post_passive_rdma(srpc_serv_portal(service),
+                                     local, service, buf, len,
+                                     LNET_MD_OP_PUT, any, mdh, ev);
 }
 
 int
 }
 
 int
-srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
+srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
 {
 {
-        srpc_msg_t *msg = &buf->buf_msg;
-        int         rc;
-
-        LASSERT (!sv->sv_shuttingdown);
+       struct srpc_service     *sv = scd->scd_svc;
+       struct srpc_msg         *msg = &buf->buf_msg;
+       int                     rc;
 
 
-        LNetInvalidateHandle(&buf->buf_mdh);
-        cfs_list_add(&buf->buf_list, &sv->sv_posted_msgq);
-        sv->sv_nposted_msg++;
-        cfs_spin_unlock(&sv->sv_lock);
+       LNetInvalidateHandle(&buf->buf_mdh);
+       cfs_list_add(&buf->buf_list, &scd->scd_buf_posted);
+       scd->scd_buf_nposted++;
+       cfs_spin_unlock(&scd->scd_lock);
 
 
-        rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
-                                      &buf->buf_mdh, &sv->sv_ev);
+       rc = srpc_post_passive_rqtbuf(sv->sv_id,
+                                     !srpc_serv_is_framework(sv),
+                                     msg, sizeof(*msg), &buf->buf_mdh,
+                                     &scd->scd_ev);
 
 
-        /* At this point, a RPC (new or delayed) may have arrived in
-         * msg and its event handler has been called. So we must add
-         * buf to sv_posted_msgq _before_ dropping sv_lock */
+       /* At this point, a RPC (new or delayed) may have arrived in
+        * msg and its event handler has been called. So we must add
+        * buf to scd_buf_posted _before_ dropping scd_lock */
 
 
-        cfs_spin_lock(&sv->sv_lock);
+       cfs_spin_lock(&scd->scd_lock);
 
 
-        if (rc == 0) {
-                if (sv->sv_shuttingdown) {
-                        cfs_spin_unlock(&sv->sv_lock);
+       if (rc == 0) {
+               if (!sv->sv_shuttingdown)
+                       return 0;
 
 
-                        /* srpc_shutdown_service might have tried to unlink me
-                         * when my buf_mdh was still invalid */
-                        LNetMDUnlink(buf->buf_mdh);
+               cfs_spin_unlock(&scd->scd_lock);
+               /* srpc_shutdown_service might have tried to unlink me
+                * when my buf_mdh was still invalid */
+               LNetMDUnlink(buf->buf_mdh);
+               cfs_spin_lock(&scd->scd_lock);
+               return 0;
+       }
 
 
-                        cfs_spin_lock(&sv->sv_lock);
-                }
-                return 0;
-        }
+       scd->scd_buf_nposted--;
+       if (sv->sv_shuttingdown)
+               return rc; /* don't allow to change scd_buf_posted */
 
 
-        sv->sv_nposted_msg--;
-        if (sv->sv_shuttingdown) return rc;
+       cfs_list_del(&buf->buf_list);
+       cfs_spin_unlock(&scd->scd_lock);
 
 
-        cfs_list_del(&buf->buf_list);
+       LIBCFS_FREE(buf, sizeof(*buf));
 
 
-        cfs_spin_unlock(&sv->sv_lock);
-        LIBCFS_FREE(buf, sizeof(*buf));
-        cfs_spin_lock(&sv->sv_lock);
-        return rc;
+       cfs_spin_lock(&scd->scd_lock);
+       return rc;
 }
 
 int
 }
 
 int
-srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
+srpc_add_buffer(struct swi_workitem *wi)
 {
 {
-        int                rc;
-        int                posted;
-        srpc_buffer_t     *buf;
-
-        LASSERTF (nbuffer > 0,
-                  "nbuffer must be positive: %d\n", nbuffer);
-
-        for (posted = 0; posted < nbuffer; posted++) {
-                LIBCFS_ALLOC(buf, sizeof(*buf));
-                if (buf == NULL) break;
-
-                cfs_spin_lock(&sv->sv_lock);
-                rc = srpc_service_post_buffer(sv, buf);
-                cfs_spin_unlock(&sv->sv_lock);
-
-                if (rc != 0) break;
-        }
+       struct srpc_service_cd  *scd = wi->swi_workitem.wi_data;
+       struct srpc_buffer      *buf;
+       int                     rc = 0;
+
+       /* it's called by workitem scheduler threads, these threads
+        * should have been set CPT affinity, so buffers will be posted
+        * on CPT local list of Portal */
+       cfs_spin_lock(&scd->scd_lock);
+
+       while (scd->scd_buf_adjust > 0 &&
+              !scd->scd_svc->sv_shuttingdown) {
+               scd->scd_buf_adjust--; /* consume it */
+               scd->scd_buf_posting++;
+
+               cfs_spin_unlock(&scd->scd_lock);
+
+               LIBCFS_ALLOC(buf, sizeof(*buf));
+               if (buf == NULL) {
+                       CERROR("Failed to add new buf to service: %s\n",
+                              scd->scd_svc->sv_name);
+                       cfs_spin_lock(&scd->scd_lock);
+                       rc = -ENOMEM;
+                       break;
+               }
+
+               cfs_spin_lock(&scd->scd_lock);
+               if (scd->scd_svc->sv_shuttingdown) {
+                       cfs_spin_unlock(&scd->scd_lock);
+                       LIBCFS_FREE(buf, sizeof(*buf));
+
+                       cfs_spin_lock(&scd->scd_lock);
+                       rc = -ESHUTDOWN;
+                       break;
+               }
+
+               rc = srpc_service_post_buffer(scd, buf);
+               if (rc != 0)
+                       break; /* buf has been freed inside */
+
+               LASSERT(scd->scd_buf_posting > 0);
+               scd->scd_buf_posting--;
+               scd->scd_buf_total++;
+               scd->scd_buf_low = MAX(2, scd->scd_buf_total / 4);
+       }
+
+       if (rc != 0) {
+               scd->scd_buf_err_stamp = cfs_time_current_sec();
+               scd->scd_buf_err = rc;
+
+               LASSERT(scd->scd_buf_posting > 0);
+               scd->scd_buf_posting--;
+       }
+
+       cfs_spin_unlock(&scd->scd_lock);
+       return 0;
+}
 
 
-        return posted;
+int
+srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
+{
+       struct srpc_service_cd  *scd;
+       int                     rc = 0;
+       int                     i;
+
+       LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
+
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
+
+               scd->scd_buf_err = 0;
+               scd->scd_buf_err_stamp = 0;
+               scd->scd_buf_posting = 0;
+               scd->scd_buf_adjust = nbuffer;
+               /* start to post buffers */
+               swi_schedule_workitem(&scd->scd_buf_wi);
+               cfs_spin_unlock(&scd->scd_lock);
+
+               /* framework service only post buffer for one partition  */
+               if (srpc_serv_is_framework(sv))
+                       break;
+       }
+
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
+               /*
+                * NB: srpc_service_add_buffers() can be called inside
+                * thread context of lst_sched_serial, and we don't normally
+                * allow to sleep inside thread context of WI scheduler
+                * because it will block current scheduler thread from doing
+                * anything else, even worse, it could deadlock if it's
+                * waiting on result from another WI of the same scheduler.
+                * However, it's safe at here because scd_buf_wi is scheduled
+                * by thread in a different WI scheduler (lst_sched_test),
+                * so we don't have any risk of deadlock, though this could
+                * block all WIs pending on lst_sched_serial for a moment
+                * which is not good but not fatal.
+                */
+               lst_wait_until(scd->scd_buf_err != 0 ||
+                              (scd->scd_buf_adjust == 0 &&
+                               scd->scd_buf_posting == 0),
+                              scd->scd_lock, "waiting for adding buffer\n");
+
+               if (scd->scd_buf_err != 0 && rc == 0)
+                       rc = scd->scd_buf_err;
+
+               cfs_spin_unlock(&scd->scd_lock);
+       }
+
+       return rc;
 }
 
 void
 }
 
 void
-srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
+srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
 {
 {
-        LASSERTF (nbuffer > 0,
-                  "nbuffer must be positive: %d\n", nbuffer);
+       struct srpc_service_cd  *scd;
+       int                     num;
+       int                     i;
 
 
-        cfs_spin_lock(&sv->sv_lock);
+       LASSERT(!sv->sv_shuttingdown);
 
 
-        LASSERT (sv->sv_nprune >= 0);
-        LASSERT (!sv->sv_shuttingdown);
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
 
 
-        sv->sv_nprune += nbuffer;
+               num = scd->scd_buf_total + scd->scd_buf_posting;
+               scd->scd_buf_adjust -= min(nbuffer, num);
 
 
-        cfs_spin_unlock(&sv->sv_lock);
-        return;
+               cfs_spin_unlock(&scd->scd_lock);
+       }
 }
 
 /* returns 1 if sv has finished, otherwise 0 */
 int
 }
 
 /* returns 1 if sv has finished, otherwise 0 */
 int
-srpc_finish_service (srpc_service_t *sv)
+srpc_finish_service(struct srpc_service *sv)
 {
 {
-        srpc_server_rpc_t *rpc;
-        srpc_buffer_t     *buf;
-
-        cfs_spin_lock(&sv->sv_lock);
-
-        LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
-
-        if (sv->sv_nposted_msg != 0 || !cfs_list_empty(&sv->sv_active_rpcq)) {
-                CDEBUG (D_NET,
-                        "waiting for %d posted buffers to unlink and "
-                        "in-flight RPCs to die.\n",
-                        sv->sv_nposted_msg);
-
-                if (!cfs_list_empty(&sv->sv_active_rpcq)) {
-                        rpc = cfs_list_entry(sv->sv_active_rpcq.next,
-                                             srpc_server_rpc_t, srpc_list);
-                        CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
-                                "wi %s scheduled %d running %d, "
-                                "ev fired %d type %d status %d lnet %d\n",
-                                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                                swi_state2str(rpc->srpc_wi.swi_state),
-                                rpc->srpc_wi.swi_workitem.wi_scheduled,
-                                rpc->srpc_wi.swi_workitem.wi_running,
-                                rpc->srpc_ev.ev_fired,
-                                rpc->srpc_ev.ev_type,
-                                rpc->srpc_ev.ev_status,
-                                rpc->srpc_ev.ev_lnet);
-                }
-
-                cfs_spin_unlock(&sv->sv_lock);
-                return 0;
-        }
-
-        cfs_spin_unlock(&sv->sv_lock); /* no lock needed from now on */
-
-        for (;;) {
-                cfs_list_t *q;
-
-                if (!cfs_list_empty(&sv->sv_posted_msgq))
-                        q = &sv->sv_posted_msgq;
-                else if (!cfs_list_empty(&sv->sv_blocked_msgq))
-                        q = &sv->sv_blocked_msgq;
-                else
-                        break;
-
-                buf = cfs_list_entry(q->next, srpc_buffer_t, buf_list);
-                cfs_list_del(&buf->buf_list);
-
-                LIBCFS_FREE(buf, sizeof(*buf));
-        }
-
-        while (!cfs_list_empty(&sv->sv_free_rpcq)) {
-                rpc = cfs_list_entry(sv->sv_free_rpcq.next,
-                                     srpc_server_rpc_t, srpc_list);
-                cfs_list_del(&rpc->srpc_list);
-                LIBCFS_FREE(rpc, sizeof(*rpc));
-        }
-
-        return 1;
+       struct srpc_service_cd  *scd;
+       struct srpc_server_rpc  *rpc;
+       int                     i;
+
+       LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
+
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
+               if (!swi_deschedule_workitem(&scd->scd_buf_wi))
+                       return 0;
+
+               if (scd->scd_buf_nposted > 0) {
+                       CDEBUG(D_NET, "waiting for %d posted buffers to unlink",
+                              scd->scd_buf_nposted);
+                       cfs_spin_unlock(&scd->scd_lock);
+                       return 0;
+               }
+
+               if (cfs_list_empty(&scd->scd_rpc_active)) {
+                       cfs_spin_unlock(&scd->scd_lock);
+                       continue;
+               }
+
+               rpc = cfs_list_entry(scd->scd_rpc_active.next,
+                                    struct srpc_server_rpc, srpc_list);
+               CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
+                       "wi %s scheduled %d running %d, "
+                       "ev fired %d type %d status %d lnet %d\n",
+                       rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                       swi_state2str(rpc->srpc_wi.swi_state),
+                       rpc->srpc_wi.swi_workitem.wi_scheduled,
+                       rpc->srpc_wi.swi_workitem.wi_running,
+                       rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
+                       rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
+               cfs_spin_unlock(&scd->scd_lock);
+               return 0;
+       }
+
+       /* no lock needed from now on */
+       srpc_service_fini(sv);
+       return 1;
 }
 
 /* called with sv->sv_lock held */
 void
 }
 
 /* called with sv->sv_lock held */
 void
-srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
-{
-        if (sv->sv_shuttingdown) goto free;
-
-        if (sv->sv_nprune == 0) {
-                if (srpc_service_post_buffer(sv, buf) != 0)
-                        CWARN ("Failed to post %s buffer\n", sv->sv_name);
-                return;
-        }
-
-        sv->sv_nprune--;
-free:
-        cfs_spin_unlock(&sv->sv_lock);
-        LIBCFS_FREE(buf, sizeof(*buf));
-        cfs_spin_lock(&sv->sv_lock);
-}
-
-/* called with srpc_service_t::sv_lock held */
-inline void
-srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
+srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf)
 {
 {
-        swi_schedule_workitem(&rpc->srpc_wi);
+       if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
+               if (srpc_service_post_buffer(scd, buf) != 0) {
+                       CWARN("Failed to post %s buffer\n",
+                             scd->scd_svc->sv_name);
+               }
+               return;
+       }
+
+       /* service is shutting down, or we want to recycle some buffers */
+       scd->scd_buf_total--;
+
+       if (scd->scd_buf_adjust < 0) {
+               scd->scd_buf_adjust++;
+               if (scd->scd_buf_adjust < 0 &&
+                   scd->scd_buf_total == 0 && scd->scd_buf_posting == 0) {
+                       CDEBUG(D_INFO,
+                              "Try to recyle %d buffers but nothing left\n",
+                              scd->scd_buf_adjust);
+                       scd->scd_buf_adjust = 0;
+               }
+       }
+
+       cfs_spin_unlock(&scd->scd_lock);
+       LIBCFS_FREE(buf, sizeof(*buf));
+       cfs_spin_lock(&scd->scd_lock);
 }
 
 void
 }
 
 void
-srpc_abort_service (srpc_service_t *sv)
+srpc_abort_service(struct srpc_service *sv)
 {
 {
-        srpc_server_rpc_t *rpc;
-
-        cfs_spin_lock(&sv->sv_lock);
-
-        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
-               sv->sv_id, sv->sv_name);
-
-        /* schedule in-flight RPCs to notice the abort, NB:
-         * racing with incoming RPCs; complete fix should make test
-         * RPCs carry session ID in its headers */
-        cfs_list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
-                rpc->srpc_aborted = 1;
-                srpc_schedule_server_rpc(rpc);
-        }
-
-        cfs_spin_unlock(&sv->sv_lock);
-        return;
+       struct srpc_service_cd  *scd;
+       struct srpc_server_rpc  *rpc;
+       int                     i;
+
+       CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
+              sv->sv_id, sv->sv_name);
+
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
+
+               /* schedule in-flight RPCs to notice the abort, NB:
+                * racing with incoming RPCs; complete fix should make test
+                * RPCs carry session ID in its headers */
+               cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
+                       rpc->srpc_aborted = 1;
+                       swi_schedule_workitem(&rpc->srpc_wi);
+               }
+
+               cfs_spin_unlock(&scd->scd_lock);
+       }
 }
 
 void
 }
 
 void
-srpc_shutdown_service (srpc_service_t *sv)
+srpc_shutdown_service(srpc_service_t *sv)
 {
 {
-        srpc_server_rpc_t *rpc;
-        srpc_buffer_t     *buf;
+       struct srpc_service_cd  *scd;
+       struct srpc_server_rpc  *rpc;
+       srpc_buffer_t           *buf;
+       int                     i;
 
 
-        cfs_spin_lock(&sv->sv_lock);
+       CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
+              sv->sv_id, sv->sv_name);
 
 
-        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
-               sv->sv_id, sv->sv_name);
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
+               cfs_spin_lock(&scd->scd_lock);
 
 
-        sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
+       sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 
 
-        /* schedule in-flight RPCs to notice the shutdown */
-        cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq,
-                                       srpc_server_rpc_t, srpc_list) {
-                srpc_schedule_server_rpc(rpc);
-        }
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
+               cfs_spin_unlock(&scd->scd_lock);
 
 
-        cfs_spin_unlock(&sv->sv_lock);
+       cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+               cfs_spin_lock(&scd->scd_lock);
 
 
-        /* OK to traverse sv_posted_msgq without lock, since no one
-         * touches sv_posted_msgq now */
-        cfs_list_for_each_entry_typed (buf, &sv->sv_posted_msgq,
-                                       srpc_buffer_t, buf_list)
-                LNetMDUnlink(buf->buf_mdh);
+               /* schedule in-flight RPCs to notice the shutdown */
+               cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
+                       swi_schedule_workitem(&rpc->srpc_wi);
 
 
-        return;
+               cfs_spin_unlock(&scd->scd_lock);
+
+               /* OK to traverse scd_buf_posted without lock, since no one
+                * touches scd_buf_posted now */
+               cfs_list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
+                       LNetMDUnlink(buf->buf_mdh);
+       }
 }
 
 int
 }
 
 int
@@ -670,7 +843,7 @@ srpc_prepare_reply (srpc_client_rpc_t *rpc)
 
         *id = srpc_next_id();
 
 
         *id = srpc_next_id();
 
-        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
+       rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
                                     LNET_MD_OP_PUT, rpc->crpc_dest,
                                     &rpc->crpc_replymdh, ev);
                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
                                     LNET_MD_OP_PUT, rpc->crpc_dest,
                                     &rpc->crpc_replymdh, ev);
@@ -707,7 +880,7 @@ srpc_prepare_bulk (srpc_client_rpc_t *rpc)
 
         *id = srpc_next_id();
 
 
         *id = srpc_next_id();
 
-        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
+       rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
                                     &bk->bk_iovs[0], bk->bk_niov, opt,
                                     rpc->crpc_dest, &bk->bk_mdh, ev);
         if (rc != 0) {
                                     &bk->bk_iovs[0], bk->bk_niov, opt,
                                     rpc->crpc_dest, &bk->bk_mdh, ev);
         if (rc != 0) {
@@ -750,10 +923,11 @@ srpc_do_bulk (srpc_server_rpc_t *rpc)
 
 /* only called from srpc_handle_rpc */
 void
 
 /* only called from srpc_handle_rpc */
 void
-srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
+srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status)
 {
 {
-        srpc_service_t *sv = rpc->srpc_service;
-        srpc_buffer_t  *buffer;
+       struct srpc_service_cd  *scd = rpc->srpc_scd;
+       struct srpc_service     *sv  = scd->scd_svc;
+       srpc_buffer_t           *buffer;
 
         LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 
 
         LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 
@@ -774,57 +948,58 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
                 (*rpc->srpc_done) (rpc);
         LASSERT (rpc->srpc_bulk == NULL);
 
                 (*rpc->srpc_done) (rpc);
         LASSERT (rpc->srpc_bulk == NULL);
 
-        cfs_spin_lock(&sv->sv_lock);
-
-        if (rpc->srpc_reqstbuf != NULL) {
-                /* NB might drop sv_lock in srpc_service_recycle_buffer, but
-                 * sv won't go away for sv_active_rpcq must not be empty */
-                srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
-                rpc->srpc_reqstbuf = NULL;
-        }
-
-        cfs_list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
-
-        /*
-         * No one can schedule me now since:
-         * - I'm not on sv_active_rpcq.
-         * - all LNet events have been fired.
-         * Cancel pending schedules and prevent future schedule attempts:
-         */
-        LASSERT (rpc->srpc_ev.ev_fired);
-        swi_kill_workitem(&rpc->srpc_wi);
-
-        if (!sv->sv_shuttingdown && !cfs_list_empty(&sv->sv_blocked_msgq)) {
-                buffer = cfs_list_entry(sv->sv_blocked_msgq.next,
-                                    srpc_buffer_t, buf_list);
-                cfs_list_del(&buffer->buf_list);
-
-                srpc_init_server_rpc(rpc, sv, buffer);
-                cfs_list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
-                srpc_schedule_server_rpc(rpc);
-        } else {
-                cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
-        }
-
-        cfs_spin_unlock(&sv->sv_lock);
-        return;
+       cfs_spin_lock(&scd->scd_lock);
+
+       if (rpc->srpc_reqstbuf != NULL) {
+               /* NB might drop sv_lock in srpc_service_recycle_buffer, but
+                * sv won't go away for scd_rpc_active must not be empty */
+               srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
+               rpc->srpc_reqstbuf = NULL;
+       }
+
+       cfs_list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
+
+       /*
+        * No one can schedule me now since:
+        * - I'm not on scd_rpc_active.
+        * - all LNet events have been fired.
+        * Cancel pending schedules and prevent future schedule attempts:
+        */
+       LASSERT(rpc->srpc_ev.ev_fired);
+       swi_exit_workitem(&rpc->srpc_wi);
+
+       if (!sv->sv_shuttingdown && !cfs_list_empty(&scd->scd_buf_blocked)) {
+               buffer = cfs_list_entry(scd->scd_buf_blocked.next,
+                                       srpc_buffer_t, buf_list);
+               cfs_list_del(&buffer->buf_list);
+
+               srpc_init_server_rpc(rpc, scd, buffer);
+               cfs_list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
+               swi_schedule_workitem(&rpc->srpc_wi);
+       } else {
+               cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+       }
+
+       cfs_spin_unlock(&scd->scd_lock);
+       return;
 }
 
 /* handles an incoming RPC */
 int
 }
 
 /* handles an incoming RPC */
 int
-srpc_handle_rpc (swi_workitem_t *wi)
+srpc_handle_rpc(swi_workitem_t *wi)
 {
 {
-        srpc_server_rpc_t *rpc = wi->swi_workitem.wi_data;
-        srpc_service_t    *sv = rpc->srpc_service;
-        srpc_event_t      *ev = &rpc->srpc_ev;
-        int                rc = 0;
+       struct srpc_server_rpc  *rpc = wi->swi_workitem.wi_data;
+       struct srpc_service_cd  *scd = rpc->srpc_scd;
+       struct srpc_service     *sv = scd->scd_svc;
+       srpc_event_t            *ev = &rpc->srpc_ev;
+       int                     rc = 0;
 
 
-        LASSERT (wi == &rpc->srpc_wi);
+       LASSERT(wi == &rpc->srpc_wi);
 
 
-        cfs_spin_lock(&sv->sv_lock);
+       cfs_spin_lock(&scd->scd_lock);
 
 
-        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
-                cfs_spin_unlock(&sv->sv_lock);
+       if (sv->sv_shuttingdown || rpc->srpc_aborted) {
+               cfs_spin_unlock(&scd->scd_lock);
 
                 if (rpc->srpc_bulk != NULL)
                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
 
                 if (rpc->srpc_bulk != NULL)
                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
@@ -837,7 +1012,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 return 0;
         }
 
                 return 0;
         }
 
-        cfs_spin_unlock(&sv->sv_lock);
+       cfs_spin_unlock(&scd->scd_lock);
 
         switch (wi->swi_state) {
         default:
 
         switch (wi->swi_state) {
         default:
@@ -905,7 +1080,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
         case SWI_STATE_REPLY_SUBMITTED:
                 if (!ev->ev_fired) {
                         CERROR("RPC %p: bulk %p, service %d\n",
         case SWI_STATE_REPLY_SUBMITTED:
                 if (!ev->ev_fired) {
                         CERROR("RPC %p: bulk %p, service %d\n",
-                               rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id);
+                              rpc, rpc->srpc_bulk, sv->sv_id);
                         CERROR("Event: status %d, type %d, lnet %d\n",
                                ev->ev_status, ev->ev_type, ev->ev_lnet);
                         LASSERT (ev->ev_fired);
                         CERROR("Event: status %d, type %d, lnet %d\n",
                                ev->ev_status, ev->ev_type, ev->ev_lnet);
                         LASSERT (ev->ev_fired);
@@ -1015,12 +1190,12 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
          * Cancel pending schedules and prevent future schedule attempts:
          */
         LASSERT (!srpc_event_pending(rpc));
          * Cancel pending schedules and prevent future schedule attempts:
          */
         LASSERT (!srpc_event_pending(rpc));
-        swi_kill_workitem(wi);
+       swi_exit_workitem(wi);
 
 
-        cfs_spin_unlock(&rpc->crpc_lock);
+       cfs_spin_unlock(&rpc->crpc_lock);
 
 
-        (*rpc->crpc_done) (rpc);
-        return;
+       (*rpc->crpc_done)(rpc);
+       return;
 }
 
 /* sends an outgoing RPC */
 }
 
 /* sends an outgoing RPC */
@@ -1201,30 +1376,30 @@ srpc_post_rpc (srpc_client_rpc_t *rpc)
 
 
 int
 
 
 int
-srpc_send_reply (srpc_server_rpc_t *rpc)
+srpc_send_reply(struct srpc_server_rpc *rpc)
 {
 {
-        srpc_event_t   *ev = &rpc->srpc_ev;
-        srpc_msg_t     *msg = &rpc->srpc_replymsg;
-        srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
-        srpc_service_t *sv = rpc->srpc_service;
-        __u64           rpyid;
-        int             rc;
-
-        LASSERT (buffer != NULL);
-        rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
-
-        cfs_spin_lock(&sv->sv_lock);
-
-        if (!sv->sv_shuttingdown &&
-            sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
-                /* Repost buffer before replying since test client
-                 * might send me another RPC once it gets the reply */
-                if (srpc_service_post_buffer(sv, buffer) != 0)
-                        CWARN ("Failed to repost %s buffer\n", sv->sv_name);
-                rpc->srpc_reqstbuf = NULL;
-        }
+       srpc_event_t            *ev = &rpc->srpc_ev;
+       struct srpc_msg         *msg = &rpc->srpc_replymsg;
+       struct srpc_buffer      *buffer = rpc->srpc_reqstbuf;
+       struct srpc_service_cd  *scd = rpc->srpc_scd;
+       struct srpc_service     *sv = scd->scd_svc;
+       __u64                   rpyid;
+       int                     rc;
+
+       LASSERT(buffer != NULL);
+       rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
+
+       cfs_spin_lock(&scd->scd_lock);
 
 
-        cfs_spin_unlock(&sv->sv_lock);
+       if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
+               /* Repost buffer before replying since test client
+                * might send me another RPC once it gets the reply */
+               if (srpc_service_post_buffer(scd, buffer) != 0)
+                       CWARN("Failed to repost %s buffer\n", sv->sv_name);
+               rpc->srpc_reqstbuf = NULL;
+       }
+
+       cfs_spin_unlock(&scd->scd_lock);
 
         ev->ev_fired = 0;
         ev->ev_data  = rpc;
 
         ev->ev_fired = 0;
         ev->ev_data  = rpc;
@@ -1245,8 +1420,9 @@ srpc_send_reply (srpc_server_rpc_t *rpc)
 
 /* when in kernel always called with LNET_LOCK() held, and in thread context */
 void
 
 /* when in kernel always called with LNET_LOCK() held, and in thread context */
 void
-srpc_lnet_ev_handler (lnet_event_t *ev)
+srpc_lnet_ev_handler(lnet_event_t *ev)
 {
 {
+       struct srpc_service_cd  *scd;
         srpc_event_t      *rpcev = ev->md.user_ptr;
         srpc_client_rpc_t *crpc;
         srpc_server_rpc_t *srpc;
         srpc_event_t      *rpcev = ev->md.user_ptr;
         srpc_client_rpc_t *crpc;
         srpc_server_rpc_t *srpc;
@@ -1303,11 +1479,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 break;
 
         case SRPC_REQUEST_RCVD:
                 break;
 
         case SRPC_REQUEST_RCVD:
-                sv = rpcev->ev_data;
+               scd = rpcev->ev_data;
+               sv = scd->scd_svc;
 
 
-                LASSERT (rpcev == &sv->sv_ev);
+               LASSERT(rpcev == &scd->scd_ev);
 
 
-                cfs_spin_lock(&sv->sv_lock);
+               cfs_spin_lock(&scd->scd_lock);
 
                 LASSERT (ev->unlinked);
                 LASSERT (ev->type == LNET_EVENT_PUT ||
 
                 LASSERT (ev->unlinked);
                 LASSERT (ev->type == LNET_EVENT_PUT ||
@@ -1319,17 +1496,32 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 buffer->buf_peer = ev->initiator;
                 buffer->buf_self = ev->target.nid;
 
                 buffer->buf_peer = ev->initiator;
                 buffer->buf_self = ev->target.nid;
 
-                sv->sv_nposted_msg--;
-                LASSERT (sv->sv_nposted_msg >= 0);
-
-                if (sv->sv_shuttingdown) {
-                        /* Leave buffer on sv->sv_posted_msgq since
-                         * srpc_finish_service needs to traverse it. */
-                        cfs_spin_unlock(&sv->sv_lock);
-                        break;
-                }
-
-                cfs_list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
+               LASSERT(scd->scd_buf_nposted > 0);
+               scd->scd_buf_nposted--;
+
+               if (sv->sv_shuttingdown) {
+                       /* Leave buffer on scd->scd_buf_nposted since
+                        * srpc_finish_service needs to traverse it. */
+                       cfs_spin_unlock(&scd->scd_lock);
+                       break;
+               }
+
+               if (scd->scd_buf_err_stamp != 0 &&
+                   scd->scd_buf_err_stamp < cfs_time_current_sec()) {
+                       /* re-enable adding buffer */
+                       scd->scd_buf_err_stamp = 0;
+                       scd->scd_buf_err = 0;
+               }
+
+               if (scd->scd_buf_err == 0 && /* adding buffer is enabled */
+                   scd->scd_buf_adjust == 0 &&
+                   scd->scd_buf_nposted < scd->scd_buf_low) {
+                       scd->scd_buf_adjust = MAX(scd->scd_buf_total / 2,
+                                                 SFW_TEST_WI_MIN);
+                       swi_schedule_workitem(&scd->scd_buf_wi);
+               }
+
+               cfs_list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
                 msg = &buffer->buf_msg;
                 type = srpc_service2request(sv->sv_id);
 
                 msg = &buffer->buf_msg;
                 type = srpc_service2request(sv->sv_id);
 
@@ -1350,21 +1542,22 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                         msg->msg_magic = 0;
                 }
 
                         msg->msg_magic = 0;
                 }
 
-                if (!cfs_list_empty(&sv->sv_free_rpcq)) {
-                        srpc = cfs_list_entry(sv->sv_free_rpcq.next,
-                                              srpc_server_rpc_t, srpc_list);
-                        cfs_list_del(&srpc->srpc_list);
+               if (!cfs_list_empty(&scd->scd_rpc_free)) {
+                       srpc = cfs_list_entry(scd->scd_rpc_free.next,
+                                             struct srpc_server_rpc,
+                                             srpc_list);
+                       cfs_list_del(&srpc->srpc_list);
 
 
-                        srpc_init_server_rpc(srpc, sv, buffer);
-                        cfs_list_add_tail(&srpc->srpc_list,
-                                          &sv->sv_active_rpcq);
-                        srpc_schedule_server_rpc(srpc);
-                } else {
-                        cfs_list_add_tail(&buffer->buf_list,
-                                          &sv->sv_blocked_msgq);
-                }
+                       srpc_init_server_rpc(srpc, scd, buffer);
+                       cfs_list_add_tail(&srpc->srpc_list,
+                                         &scd->scd_rpc_active);
+                       swi_schedule_workitem(&srpc->srpc_wi);
+               } else {
+                       cfs_list_add_tail(&buffer->buf_list,
+                                         &scd->scd_buf_blocked);
+               }
 
 
-                cfs_spin_unlock(&sv->sv_lock);
+               cfs_spin_unlock(&scd->scd_lock);
 
                 cfs_spin_lock(&srpc_data.rpc_glock);
                 srpc_data.rpc_counters.rpcs_rcvd++;
 
                 cfs_spin_lock(&srpc_data.rpc_glock);
                 srpc_data.rpc_counters.rpcs_rcvd++;
@@ -1391,23 +1584,21 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                         cfs_spin_unlock(&srpc_data.rpc_glock);
                 }
         case SRPC_REPLY_SENT:
                         cfs_spin_unlock(&srpc_data.rpc_glock);
                 }
         case SRPC_REPLY_SENT:
-                srpc = rpcev->ev_data;
-                sv = srpc->srpc_service;
+               srpc = rpcev->ev_data;
+               scd  = srpc->srpc_scd;
 
 
-                LASSERT (rpcev == &srpc->srpc_ev);
+               LASSERT(rpcev == &srpc->srpc_ev);
 
 
-                cfs_spin_lock(&sv->sv_lock);
+               cfs_spin_lock(&scd->scd_lock);
 
 
-                rpcev->ev_fired  = 1;
-                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
-                                                -EINTR : ev->status;
-                srpc_schedule_server_rpc(srpc);
+               rpcev->ev_fired  = 1;
+               rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
+                                  -EINTR : ev->status;
+               swi_schedule_workitem(&srpc->srpc_wi);
 
 
-                cfs_spin_unlock(&sv->sv_lock);
-                break;
-        }
-
-        return;
+               cfs_spin_unlock(&scd->scd_lock);
+               break;
+       }
 }
 
 #ifndef __KERNEL__
 }
 
 #ifndef __KERNEL__
@@ -1477,8 +1668,10 @@ srpc_startup (void)
                 goto bail;
         }
 
                 goto bail;
         }
 
-        rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
-        LASSERT (rc == 0);
+       rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
+       LASSERT(rc == 0);
+       rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
+       LASSERT(rc == 0);
 
         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
 
 
         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
 
@@ -1523,6 +1716,7 @@ srpc_shutdown (void)
 
         case SRPC_STATE_EQ_INIT:
                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
 
         case SRPC_STATE_EQ_INIT:
                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
+               rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
                 LASSERT (rc == 0);
                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
                 LASSERT (rc == 0); /* the EQ should have no user by now */
                 LASSERT (rc == 0);
                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
                 LASSERT (rc == 0); /* the EQ should have no user by now */
index dca68b7..505c0b5 100644 (file)
@@ -232,10 +232,11 @@ typedef struct {
 
 #define SRPC_MSG_MAGIC                  0xeeb0f00d
 #define SRPC_MSG_VERSION                1
 
 #define SRPC_MSG_MAGIC                  0xeeb0f00d
 #define SRPC_MSG_VERSION                1
-typedef struct {
-       __u32   msg_magic;              /* magic */
-       __u32   msg_version;            /* # version */
-        __u32  msg_type;               /* what's in msg_body? srpc_msg_type_t */
+typedef struct srpc_msg {
+       __u32   msg_magic;              /* magic */
+       __u32   msg_version;            /* # version */
+       /* what's in msg_body? srpc_msg_type_t */
+       __u32   msg_type;
         __u32   msg_reserved0;          /* reserved seats */
         __u32   msg_reserved1;
         __u32   msg_reserved2;
         __u32   msg_reserved0;          /* reserved seats */
         __u32   msg_reserved1;
         __u32   msg_reserved2;
index 9943fa5..786501d 100644 (file)
@@ -74,6 +74,7 @@
 
 /* forward refs */
 struct srpc_service;
 
 /* forward refs */
 struct srpc_service;
+struct srpc_service_cd;
 struct sfw_test_unit;
 struct sfw_test_instance;
 
 struct sfw_test_unit;
 struct sfw_test_instance;
 
@@ -173,7 +174,7 @@ typedef struct {
 } srpc_bulk_t; /* bulk descriptor */
 
 /* message buffer descriptor */
 } srpc_bulk_t; /* bulk descriptor */
 
 /* message buffer descriptor */
-typedef struct {
+typedef struct srpc_buffer {
         cfs_list_t           buf_list; /* chain on srpc_service::*_msgq */
         srpc_msg_t           buf_msg;
         lnet_handle_md_t     buf_mdh;
         cfs_list_t           buf_list; /* chain on srpc_service::*_msgq */
         srpc_msg_t           buf_msg;
         lnet_handle_md_t     buf_mdh;
@@ -193,8 +194,9 @@ typedef struct swi_workitem {
 
 /* server-side state of a RPC */
 typedef struct srpc_server_rpc {
 
 /* server-side state of a RPC */
 typedef struct srpc_server_rpc {
-        cfs_list_t           srpc_list;    /* chain on srpc_service::*_rpcq */
-        struct srpc_service *srpc_service;
+       /* chain on srpc_service::*_rpcq */
+       cfs_list_t              srpc_list;
+       struct srpc_service_cd *srpc_scd;
         swi_workitem_t       srpc_wi;
         srpc_event_t         srpc_ev;      /* bulk/reply event */
         lnet_nid_t           srpc_self;
         swi_workitem_t       srpc_wi;
         srpc_event_t         srpc_ev;      /* bulk/reply event */
         lnet_nid_t           srpc_self;
@@ -268,21 +270,61 @@ do {                                                                    \
                                    (rpc)->crpc_reqstev.ev_fired == 0 || \
                                    (rpc)->crpc_replyev.ev_fired == 0)
 
                                    (rpc)->crpc_reqstev.ev_fired == 0 || \
                                    (rpc)->crpc_replyev.ev_fired == 0)
 
-typedef struct srpc_service {
-        int                sv_id;            /* service id */
-        const char        *sv_name;          /* human readable name */
-        int                sv_nprune;        /* # posted RPC to be pruned */
-        int                sv_concur;        /* max # concurrent RPCs */
-
-        cfs_spinlock_t     sv_lock;
-        int                sv_shuttingdown;
-        srpc_event_t       sv_ev;            /* LNet event */
-        int                sv_nposted_msg;   /* # posted message buffers */
-        cfs_list_t         sv_free_rpcq;     /* free RPC descriptors */
-        cfs_list_t         sv_active_rpcq;   /* in-flight RPCs */
-        cfs_list_t         sv_posted_msgq;   /* posted message buffers */
-        cfs_list_t         sv_blocked_msgq;  /* blocked for RPC descriptor */
+/* CPU partition data of srpc service */
+struct srpc_service_cd {
+       /** serialize */
+       cfs_spinlock_t          scd_lock;
+       /** backref to service */
+       struct srpc_service     *scd_svc;
+       /** event buffer */
+       srpc_event_t            scd_ev;
+       /** free RPC descriptors */
+       cfs_list_t              scd_rpc_free;
+       /** in-flight RPCs */
+       cfs_list_t              scd_rpc_active;
+       /** workitem for posting buffer */
+       swi_workitem_t          scd_buf_wi;
+       /** CPT id */
+       int                     scd_cpt;
+       /** error code for scd_buf_wi */
+       int                     scd_buf_err;
+       /** timestamp for scd_buf_err */
+       unsigned long           scd_buf_err_stamp;
+       /** total # request buffers */
+       int                     scd_buf_total;
+       /** # posted request buffers */
+       int                     scd_buf_nposted;
+       /** in progress of buffer posting */
+       int                     scd_buf_posting;
+       /** allocate more buffers if scd_buf_nposted < scd_buf_low */
+       int                     scd_buf_low;
+       /** increase/decrease some buffers */
+       int                     scd_buf_adjust;
+       /** posted message buffers */
+       cfs_list_t              scd_buf_posted;
+       /** blocked for RPC descriptor */
+       cfs_list_t              scd_buf_blocked;
+};
+
+/* number of server workitems (mini-thread) for testing service */
+#define SFW_TEST_WI_MIN                256
+#define SFW_TEST_WI_MAX                2048
+/* extra buffers for tolerating buggy peers, or unbalanced number
+ * of peers between partitions  */
+#define SFW_TEST_WI_EXTRA      64
+
+/* number of server workitems (mini-thread) for framework service */
+#define SFW_FRWK_WI_MIN                16
+#define SFW_FRWK_WI_MAX                256
 
 
+typedef struct srpc_service {
+       int                     sv_id;          /* service id */
+       const char              *sv_name;       /* human readable name */
+       int                     sv_wi_total;    /* total server workitems */
+       int                     sv_shuttingdown;
+       int                     sv_ncpts;
+       /* percpt data for srpc_service */
+       struct srpc_service_cd  **sv_cpt_data;
         /* Service callbacks:
          * - sv_handler: process incoming RPC request
          * - sv_bulk_ready: notify bulk data
         /* Service callbacks:
          * - sv_handler: process incoming RPC request
          * - sv_bulk_ready: notify bulk data
@@ -291,9 +333,6 @@ typedef struct srpc_service {
         int              (*sv_bulk_ready) (srpc_server_rpc_t *, int);
 } srpc_service_t;
 
         int              (*sv_bulk_ready) (srpc_server_rpc_t *, int);
 } srpc_service_t;
 
-#define SFW_POST_BUFFERS         256
-#define SFW_SERVICE_CONCURRENCY  (SFW_POST_BUFFERS/2)
-
 typedef struct {
         cfs_list_t        sn_list;    /* chain on fw_zombie_sessions */
         lst_sid_t         sn_id;      /* unique identifier */
 typedef struct {
         cfs_list_t        sn_list;    /* chain on fw_zombie_sessions */
         lst_sid_t         sn_id;      /* unique identifier */
@@ -372,7 +411,7 @@ typedef struct sfw_test_unit {
         swi_workitem_t        tsu_worker;       /* workitem of the test unit */
 } sfw_test_unit_t;
 
         swi_workitem_t        tsu_worker;       /* workitem of the test unit */
 } sfw_test_unit_t;
 
-typedef struct {
+typedef struct sfw_test_case {
         cfs_list_t              tsc_list;         /* chain on fw_tests */
         srpc_service_t         *tsc_srv_service;  /* test service */
         sfw_test_client_ops_t  *tsc_cli_ops;      /* ops of test client */
         cfs_list_t              tsc_list;         /* chain on fw_tests */
         srpc_service_t         *tsc_srv_service;  /* test service */
         sfw_test_client_ops_t  *tsc_cli_ops;      /* ops of test client */
@@ -389,7 +428,7 @@ void sfw_client_rpc_done(srpc_client_rpc_t *rpc);
 void sfw_unpack_message(srpc_msg_t *msg);
 void sfw_free_pages(srpc_server_rpc_t *rpc);
 void sfw_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i);
 void sfw_unpack_message(srpc_msg_t *msg);
 void sfw_free_pages(srpc_server_rpc_t *rpc);
 void sfw_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i);
-int sfw_alloc_pages(srpc_server_rpc_t *rpc, int npages, int sink);
+int sfw_alloc_pages(srpc_server_rpc_t *rpc, int cpt, int npages, int sink);
 int sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply);
 
 srpc_client_rpc_t *
 int sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply);
 
 srpc_client_rpc_t *
@@ -400,7 +439,7 @@ srpc_create_client_rpc(lnet_process_id_t peer, int service,
 void srpc_post_rpc(srpc_client_rpc_t *rpc);
 void srpc_abort_rpc(srpc_client_rpc_t *rpc, int why);
 void srpc_free_bulk(srpc_bulk_t *bk);
 void srpc_post_rpc(srpc_client_rpc_t *rpc);
 void srpc_abort_rpc(srpc_client_rpc_t *rpc, int why);
 void srpc_free_bulk(srpc_bulk_t *bk);
-srpc_bulk_t *srpc_alloc_bulk(int npages, int sink);
+srpc_bulk_t *srpc_alloc_bulk(int cpt, int npages, int sink);
 int srpc_send_rpc(swi_workitem_t *wi);
 int srpc_send_reply(srpc_server_rpc_t *rpc);
 int srpc_add_service(srpc_service_t *sv);
 int srpc_send_rpc(swi_workitem_t *wi);
 int srpc_send_reply(srpc_server_rpc_t *rpc);
 int srpc_add_service(srpc_service_t *sv);
@@ -414,7 +453,13 @@ void srpc_get_counters(srpc_counters_t *cnt);
 void srpc_set_counters(const srpc_counters_t *cnt);
 
 extern struct cfs_wi_sched *lst_sched_serial;
 void srpc_set_counters(const srpc_counters_t *cnt);
 
 extern struct cfs_wi_sched *lst_sched_serial;
-extern struct cfs_wi_sched *lst_sched_test;
+extern struct cfs_wi_sched **lst_sched_test;
+
+static inline int
+srpc_serv_is_framework(struct srpc_service *svc)
+{
+       return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID;
+}
 
 static inline int
 swi_wi_action(cfs_workitem_t *wi)
 
 static inline int
 swi_wi_action(cfs_workitem_t *wi)
@@ -441,11 +486,17 @@ swi_schedule_workitem(swi_workitem_t *wi)
 }
 
 static inline void
 }
 
 static inline void
-swi_kill_workitem(swi_workitem_t *swi)
+swi_exit_workitem(swi_workitem_t *swi)
 {
        cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
 }
 
 {
        cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
 }
 
+static inline int
+swi_deschedule_workitem(swi_workitem_t *swi)
+{
+       return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem);
+}
+
 #ifndef __KERNEL__
 static inline int
 swi_check_events(void)
 #ifndef __KERNEL__
 static inline int
 swi_check_events(void)
@@ -490,7 +541,8 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer,
                                 crpc_bulk.bk_iovs[nbulkiov]));
 
         CFS_INIT_LIST_HEAD(&rpc->crpc_list);
                                 crpc_bulk.bk_iovs[nbulkiov]));
 
         CFS_INIT_LIST_HEAD(&rpc->crpc_list);
-       swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, lst_sched_test);
+       swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc,
+                         lst_sched_test[lnet_cpt_of_nid(peer.nid)]);
         cfs_spin_lock_init(&rpc->crpc_lock);
         cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
 
         cfs_spin_lock_init(&rpc->crpc_lock);
         cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
 
@@ -551,7 +603,7 @@ int selftest_wait_events(void);
 
 #else
 
 
 #else
 
-#define selftest_wait_events()    cfs_pause(cfs_time_seconds(1))
+#define selftest_wait_events() cfs_pause(cfs_time_seconds(1) / 10)
 
 #endif
 
 
 #endif
 
@@ -570,13 +622,11 @@ do {                                                                    \
 } while (0)
 
 static inline void
 } while (0)
 
 static inline void
-srpc_wait_service_shutdown (srpc_service_t *sv)
+srpc_wait_service_shutdown(srpc_service_t *sv)
 {
 {
-        int i = 2;
+       int i = 2;
 
 
-        cfs_spin_lock(&sv->sv_lock);
-        LASSERT (sv->sv_shuttingdown);
-        cfs_spin_unlock(&sv->sv_lock);
+       LASSERT(sv->sv_shuttingdown);
 
         while (srpc_finish_service(sv) == 0) {
                 i++;
 
         while (srpc_finish_service(sv) == 0) {
                 i++;