-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
-int brw_inject_errors = 0;
-CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644,
- "# data errors to inject randomly, zero by default");
-
static int session_timeout = 100;
CFS_MODULE_PARM(session_timeout, "i", int, 0444,
"test session timeout in seconds (100 by default, 0 == never)");
CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
"rpc timeout in seconds (64 by default, 0 == never)");
-#define SFW_TEST_CONCURRENCY 1792
-#define SFW_EXTRA_TEST_BUFFERS 8 /* tolerate buggy peers with extra buffers */
-
-#define sfw_test_buffers(tsi) ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS)
-
#define sfw_unpack_id(id) \
do { \
__swab64s(&(id).nid); \
#define sfw_unpack_fw_counters(fc) \
do { \
- __swab32s(&(fc).brw_errors); \
- __swab32s(&(fc).ping_errors); \
- __swab32s(&(fc).active_tests); \
+ __swab32s(&(fc).running_ms); \
__swab32s(&(fc).active_batches); \
__swab32s(&(fc).zombie_sessions); \
+ __swab32s(&(fc).brw_errors); \
+ __swab32s(&(fc).ping_errors); \
} while (0)
#define sfw_unpack_rpc_counters(rc) \
sn->sn_timer_active = 0;
sn->sn_id = sid;
sn->sn_timeout = session_timeout;
+ sn->sn_started = cfs_time_current();
timer->stt_data = sn;
timer->stt_func = sfw_session_expired;
/* completion handler for incoming framework RPCs */
void
-sfw_server_rpc_done (srpc_server_rpc_t *rpc)
+sfw_server_rpc_done(struct srpc_server_rpc *rpc)
{
- srpc_service_t *sv = rpc->srpc_service;
- int status = rpc->srpc_status;
+ struct srpc_service *sv = rpc->srpc_scd->scd_svc;
+ int status = rpc->srpc_status;
CDEBUG (D_NET,
"Incoming framework RPC done: "
"service %s, peer %s, status %s:%d\n",
sv->sv_name, libcfs_id2str(rpc->srpc_peer),
- swi_state2str(rpc->srpc_wi.wi_state),
+ swi_state2str(rpc->srpc_wi.swi_state),
status);
if (rpc->srpc_bulk != NULL)
"Outgoing framework RPC done: "
"service %d, peer %s, status %s:%d:%d\n",
rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
- swi_state2str(rpc->crpc_wi.wi_state),
+ swi_state2str(rpc->crpc_wi.swi_state),
rpc->crpc_aborted, rpc->crpc_status);
cfs_spin_lock(&sfw_data.fw_lock);
sfw_session_t *sn = sfw_data.fw_session;
sfw_counters_t *cnt = &reply->str_fw;
sfw_batch_t *bat;
+ struct timeval tv;
reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
return 0;
}
- LNET_LOCK();
- reply->str_lnet = the_lnet.ln_counters;
- LNET_UNLOCK();
+ lnet_counters_get(&reply->str_lnet);
+ srpc_get_counters(&reply->str_rpc);
- srpc_get_counters(&reply->str_rpc);
+ /* send over the msecs since the session was started
+ - with 32 bits to send, this is ~49 days */
+ cfs_duration_usec(cfs_time_sub(cfs_time_current(),
+ sn->sn_started), &tv);
+ cnt->running_ms = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
cnt->brw_errors = cfs_atomic_read(&sn->sn_brw_errors);
cnt->ping_errors = cfs_atomic_read(&sn->sn_ping_errors);
cnt->zombie_sessions = cfs_atomic_read(&sfw_data.fw_nzombies);
- cnt->active_tests = cnt->active_batches = 0;
+ cnt->active_batches = 0;
cfs_list_for_each_entry_typed (bat, &sn->sn_batches,
sfw_batch_t, bat_list) {
- int n = cfs_atomic_read(&bat->bat_nactive);
-
- if (n > 0) {
+ if (cfs_atomic_read(&bat->bat_nactive) > 0)
cnt->active_batches++;
- cnt->active_tests += n;
- }
}
reply->str_status = 0;
cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
}
-int
-sfw_load_test (sfw_test_instance_t *tsi)
+static inline int
+sfw_test_buffers(sfw_test_instance_t *tsi)
{
- sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
- int nrequired = sfw_test_buffers(tsi);
- int nposted;
-
- LASSERT (tsc != NULL);
-
- if (tsi->tsi_is_client) {
- tsi->tsi_ops = tsc->tsc_cli_ops;
- return 0;
- }
+ struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
+ struct srpc_service *svc = tsc->tsc_srv_service;
+ int nbuf;
- nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired);
- if (nposted != nrequired) {
- CWARN ("Failed to reserve enough buffers: "
- "service %s, %d needed, %d reserved\n",
- tsc->tsc_srv_service->sv_name, nrequired, nposted);
- srpc_service_remove_buffers(tsc->tsc_srv_service, nposted);
- return -ENOMEM;
- }
+ nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
+ return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
+}
- CDEBUG (D_NET, "Reserved %d buffers for test %s\n",
- nposted, tsc->tsc_srv_service->sv_name);
- return 0;
+int
+sfw_load_test(struct sfw_test_instance *tsi)
+{
+ struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
+ struct srpc_service *svc = tsc->tsc_srv_service;
+ int nbuf = sfw_test_buffers(tsi);
+ int rc;
+
+ LASSERT(tsc != NULL);
+
+ if (tsi->tsi_is_client) {
+ tsi->tsi_ops = tsc->tsc_cli_ops;
+ return 0;
+ }
+
+ rc = srpc_service_add_buffers(svc, nbuf);
+ if (rc != 0) {
+ CWARN("Failed to reserve enough buffers: "
+ "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
+ /* NB: this error handler is not strictly correct, because
+ * it may release more buffers than already allocated,
+ * but it doesn't matter because request portal should
+ * be lazy portal and will grow buffers if necessary. */
+ srpc_service_remove_buffers(svc, nbuf);
+ return -ENOMEM;
+ }
+
+ CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
+ nbuf * (srpc_serv_is_framework(svc) ?
+ 1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
+ return 0;
}
void
-sfw_unload_test (sfw_test_instance_t *tsi)
+sfw_unload_test(struct sfw_test_instance *tsi)
{
- sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
+ struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
- LASSERT (tsc != NULL);
+ LASSERT(tsc != NULL);
- if (!tsi->tsi_is_client)
- srpc_service_remove_buffers(tsc->tsc_srv_service,
- sfw_test_buffers(tsi));
- return;
+ if (tsi->tsi_is_client)
+ return;
+
+ /* shrink buffers, because request portal is lazy portal
+ * which can grow buffers at runtime so we may leave
+ * some buffers behind, but never mind... */
+ srpc_service_remove_buffers(tsc->tsc_srv_service,
+ sfw_test_buffers(tsi));
+ return;
}
void
memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
for (i = 0; i < ndest; i++) {
- lnet_process_id_t *dests;
- lnet_process_id_t id;
- int j;
+ lnet_process_id_packed_t *dests;
+ lnet_process_id_packed_t id;
+ int j;
#ifdef __KERNEL__
dests = cfs_page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
goto error;
}
- tsu->tsu_dest = id;
+ tsu->tsu_dest.nid = id.nid;
+ tsu->tsu_dest.pid = id.pid;
tsu->tsu_instance = tsi;
tsu->tsu_private = NULL;
cfs_list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
srpc_client_rpc_t, crpc_list);
LASSERT (nblk == rpc->crpc_bulk.bk_niov);
cfs_list_del_init(&rpc->crpc_list);
-
- srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
- blklen, sfw_test_rpc_done,
- sfw_test_rpc_fini, tsu);
}
cfs_spin_unlock(&tsi->tsi_lock);
-
- if (rpc == NULL)
- rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
- blklen, sfw_test_rpc_done,
- sfw_test_rpc_fini, tsu);
+
+ if (rpc == NULL) {
+ rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
+ blklen, sfw_test_rpc_done,
+ sfw_test_rpc_fini, tsu);
+ } else {
+ srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
+ blklen, sfw_test_rpc_done,
+ sfw_test_rpc_fini, tsu);
+ }
+
if (rpc == NULL) {
CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
return -ENOMEM;
int
sfw_run_test (swi_workitem_t *wi)
{
- sfw_test_unit_t *tsu = wi->wi_data;
+ sfw_test_unit_t *tsu = wi->swi_workitem.wi_data;
sfw_test_instance_t *tsi = tsu->tsu_instance;
srpc_client_rpc_t *rpc = NULL;
* - my batch is still active; no one can run it again now.
* Cancel pending schedules and prevent future schedule attempts:
*/
- swi_kill_workitem(wi);
- sfw_test_unit_done(tsu);
- return 1;
+ swi_exit_workitem(wi);
+ sfw_test_unit_done(tsu);
+ return 1;
}
int
cfs_atomic_inc(&tsi->tsi_nactive);
tsu->tsu_loop = tsi->tsi_loop;
wi = &tsu->tsu_worker;
- swi_init_workitem(wi, tsu, sfw_run_test);
+ swi_init_workitem(wi, tsu, sfw_run_test,
+ lst_sched_test[\
+ lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
swi_schedule_workitem(wi);
}
}
}
int
-sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink)
+sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int sink)
{
- LASSERT (rpc->srpc_bulk == NULL);
- LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
+ LASSERT(rpc->srpc_bulk == NULL);
+ LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
- rpc->srpc_bulk = srpc_alloc_bulk(npages, sink);
- if (rpc->srpc_bulk == NULL) return -ENOMEM;
+ rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, sink);
+ if (rpc->srpc_bulk == NULL)
+ return -ENOMEM;
- return 0;
+ return 0;
}
int
bat = sfw_bid2batch(request->tsr_bid);
if (bat == NULL) {
CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
- rpc->srpc_service->sv_name,
+ rpc->srpc_scd->scd_svc->sv_name,
libcfs_id2str(rpc->srpc_peer));
return -ENOMEM;
}
}
if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
- /* rpc will be resumed later in sfw_bulk_ready */
- return sfw_alloc_pages(rpc,
- sfw_id_pages(request->tsr_ndest), 1);
+ /* rpc will be resumed later in sfw_bulk_ready */
+ return sfw_alloc_pages(rpc, CFS_CPT_ANY,
+ sfw_id_pages(request->tsr_ndest), 1);
}
rc = sfw_add_test_instance(bat, rpc);
}
int
-sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
+sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
{
- srpc_service_t *sv = rpc->srpc_service;
+ struct srpc_service *sv = rpc->srpc_scd->scd_svc;
srpc_msg_t *reply = &rpc->srpc_replymsg;
srpc_msg_t *request = &rpc->srpc_reqstbuf->buf_msg;
int rc = 0;
}
int
-sfw_bulk_ready (srpc_server_rpc_t *rpc, int status)
+sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
{
- srpc_service_t *sv = rpc->srpc_service;
- int rc;
+ struct srpc_service *sv = rpc->srpc_scd->scd_svc;
+ int rc;
LASSERT (rpc->srpc_bulk != NULL);
LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
s = getenv("SESSION_TIMEOUT");
session_timeout = s != NULL ? atoi(s) : session_timeout;
- s = getenv("BRW_INJECT_ERRORS");
- brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors;
-
s = getenv("RPC_TIMEOUT");
rpc_timeout = s != NULL ? atoi(s) : rpc_timeout;
#endif
cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
sfw_test_case_t, tsc_list) {
sv = tsc->tsc_srv_service;
- sv->sv_concur = SFW_TEST_CONCURRENCY;
rc = srpc_add_service(sv);
LASSERT (rc != -EBUSY);
sv->sv_bulk_ready = NULL;
sv->sv_handler = sfw_handle_server_rpc;
- sv->sv_concur = SFW_SERVICE_CONCURRENCY;
+ sv->sv_wi_total = SFW_FRWK_WI_MAX;
if (sv->sv_id == SRPC_SERVICE_TEST)
sv->sv_bulk_ready = sfw_bulk_ready;
/* about to sfw_shutdown, no need to add buffer */
if (error) continue;
- rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS);
- if (rc != SFW_POST_BUFFERS) {
- CWARN ("Failed to reserve enough buffers: "
- "service %s, %d needed, %d reserved\n",
- sv->sv_name, SFW_POST_BUFFERS, rc);
- error = -ENOMEM;
- }
+ rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
+ if (rc != 0) {
+ CWARN("Failed to reserve enough buffers: "
+ "service %s, %d needed: %d\n",
+ sv->sv_name, sv->sv_wi_total, rc);
+ error = -ENOMEM;
+ }
}
if (error != 0)