X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fselftest%2Fselftest.h;h=3540b44c0c0d1548841ff39f7dc1fc6f11113704;hb=cee1ab3997d70ee5eeece41b50fbe4479eda9d14;hp=a1f9ab4751cc96c243d86fc186e29d5ee01c9782;hpb=e3a7c58aebafce40323db54bf6056029e5af4a70;p=fs%2Flustre-release.git diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index a1f9ab4..3540b44 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -27,6 +27,8 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -74,6 +76,7 @@ /* forward refs */ struct srpc_service; +struct srpc_service_cd; struct sfw_test_unit; struct sfw_test_instance; @@ -173,7 +176,7 @@ typedef struct { } srpc_bulk_t; /* bulk descriptor */ /* message buffer descriptor */ -typedef struct { +typedef struct srpc_buffer { cfs_list_t buf_list; /* chain on srpc_service::*_msgq */ srpc_msg_t buf_msg; lnet_handle_md_t buf_mdh; @@ -185,6 +188,7 @@ struct swi_workitem; typedef int (*swi_action_t) (struct swi_workitem *); typedef struct swi_workitem { + struct cfs_wi_sched *swi_sched; cfs_workitem_t swi_workitem; swi_action_t swi_action; int swi_state; @@ -192,8 +196,9 @@ typedef struct swi_workitem { /* server-side state of a RPC */ typedef struct srpc_server_rpc { - cfs_list_t srpc_list; /* chain on srpc_service::*_rpcq */ - struct srpc_service *srpc_service; + /* chain on srpc_service::*_rpcq */ + cfs_list_t srpc_list; + struct srpc_service_cd *srpc_scd; swi_workitem_t srpc_wi; srpc_event_t srpc_ev; /* bulk/reply event */ lnet_nid_t srpc_self; @@ -210,8 +215,8 @@ typedef struct srpc_server_rpc { /* client-side state of a RPC */ typedef struct srpc_client_rpc { - cfs_list_t crpc_list; /* chain on user's lists */ - cfs_spinlock_t crpc_lock; /* serialize */ + cfs_list_t crpc_list; /* chain on user's lists */ + spinlock_t crpc_lock; /* serialize */ int crpc_service; cfs_atomic_t crpc_refcount; int crpc_timeout; /* # seconds to wait for reply */ @@ -267,21 +272,61 @@ do { \ (rpc)->crpc_reqstev.ev_fired == 0 || \ (rpc)->crpc_replyev.ev_fired == 0) -typedef struct srpc_service { - int sv_id; /* service id */ - const char *sv_name; /* human readable name */ - int sv_nprune; /* # posted RPC to be pruned */ - int sv_concur; /* max # concurrent RPCs */ - - cfs_spinlock_t sv_lock; - int sv_shuttingdown; - srpc_event_t sv_ev; /* LNet event */ - int sv_nposted_msg; /* # posted message buffers */ - cfs_list_t sv_free_rpcq; /* free RPC descriptors */ - cfs_list_t sv_active_rpcq; /* in-flight RPCs */ - cfs_list_t sv_posted_msgq; /* posted message buffers */ - cfs_list_t sv_blocked_msgq; /* blocked for RPC descriptor */ +/* CPU partition data of srpc service */ +struct srpc_service_cd { + /** serialize */ + spinlock_t scd_lock; + /** backref to service */ + struct srpc_service *scd_svc; + /** event buffer */ + srpc_event_t scd_ev; + /** free RPC descriptors */ + cfs_list_t scd_rpc_free; + /** in-flight RPCs */ + cfs_list_t scd_rpc_active; + /** workitem for posting buffer */ + swi_workitem_t scd_buf_wi; + /** CPT id */ + int scd_cpt; + /** error code for scd_buf_wi */ + int scd_buf_err; + /** timestamp for scd_buf_err */ + unsigned long scd_buf_err_stamp; + /** total # request buffers */ + int scd_buf_total; + /** # posted request buffers */ + int scd_buf_nposted; + /** in progress of buffer posting */ + int scd_buf_posting; + /** allocate more buffers if scd_buf_nposted < scd_buf_low */ + int scd_buf_low; + /** increase/decrease some buffers */ + int scd_buf_adjust; + /** posted message buffers */ + cfs_list_t scd_buf_posted; + /** blocked for RPC descriptor */ + cfs_list_t scd_buf_blocked; +}; + +/* number of server workitems (mini-thread) for testing service */ +#define SFW_TEST_WI_MIN 256 +#define SFW_TEST_WI_MAX 2048 +/* extra buffers for tolerating buggy peers, or unbalanced number + * of peers between partitions */ +#define SFW_TEST_WI_EXTRA 64 + +/* number of server workitems (mini-thread) for framework service */ +#define SFW_FRWK_WI_MIN 16 +#define SFW_FRWK_WI_MAX 256 +typedef struct srpc_service { + int sv_id; /* service id */ + const char *sv_name; /* human readable name */ + int sv_wi_total; /* total server workitems */ + int sv_shuttingdown; + int sv_ncpts; + /* percpt data for srpc_service */ + struct srpc_service_cd **sv_cpt_data; /* Service callbacks: * - sv_handler: process incoming RPC request * - sv_bulk_ready: notify bulk data @@ -290,14 +335,12 @@ typedef struct srpc_service { int (*sv_bulk_ready) (srpc_server_rpc_t *, int); } srpc_service_t; -#define SFW_POST_BUFFERS 256 -#define SFW_SERVICE_CONCURRENCY (SFW_POST_BUFFERS/2) - typedef struct { cfs_list_t sn_list; /* chain on fw_zombie_sessions */ lst_sid_t sn_id; /* unique identifier */ unsigned int sn_timeout; /* # seconds' inactivity to expire */ int sn_timer_active; + unsigned int sn_features; stt_timer_t sn_timer; cfs_list_t sn_batches; /* list of batches */ char sn_name[LST_NAME_SIZE]; @@ -341,18 +384,19 @@ typedef struct sfw_test_instance { int tsi_concur; /* concurrency */ int tsi_loop; /* loop count */ - /* status of test instance */ - cfs_spinlock_t tsi_lock; /* serialize */ + /* status of test instance */ + spinlock_t tsi_lock; /* serialize */ int tsi_stopping:1; /* test is stopping */ cfs_atomic_t tsi_nactive; /* # of active test unit */ cfs_list_t tsi_units; /* test units */ cfs_list_t tsi_free_rpcs; /* free rpcs */ cfs_list_t tsi_active_rpcs; /* active rpcs */ - union { - test_bulk_req_t bulk; /* bulk parameter */ - test_ping_req_t ping; /* ping parameter */ - } tsi_u; + union { + test_ping_req_t ping; /* ping parameter */ + test_bulk_req_t bulk_v0; /* bulk parameter */ + test_bulk_req_v1_t bulk_v1; /* bulk v1 parameter */ + } tsi_u; } sfw_test_instance_t; /* XXX: trailing (CFS_PAGE_SIZE % sizeof(lnet_process_id_t)) bytes at @@ -371,24 +415,27 @@ typedef struct sfw_test_unit { swi_workitem_t tsu_worker; /* workitem of the test unit */ } sfw_test_unit_t; -typedef struct { +typedef struct sfw_test_case { cfs_list_t tsc_list; /* chain on fw_tests */ srpc_service_t *tsc_srv_service; /* test service */ sfw_test_client_ops_t *tsc_cli_ops; /* ops of test client */ } sfw_test_case_t; srpc_client_rpc_t * -sfw_create_rpc(lnet_process_id_t peer, int service, int nbulkiov, int bulklen, - void (*done) (srpc_client_rpc_t *), void *priv); -int sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer, - int nblk, int blklen, srpc_client_rpc_t **rpc); +sfw_create_rpc(lnet_process_id_t peer, int service, + unsigned features, int nbulkiov, int bulklen, + void (*done) (srpc_client_rpc_t *), void *priv); +int sfw_create_test_rpc(sfw_test_unit_t *tsu, + lnet_process_id_t peer, unsigned features, + int nblk, int blklen, srpc_client_rpc_t **rpc); void sfw_abort_rpc(srpc_client_rpc_t *rpc); void sfw_post_rpc(srpc_client_rpc_t *rpc); void sfw_client_rpc_done(srpc_client_rpc_t *rpc); void sfw_unpack_message(srpc_msg_t *msg); void sfw_free_pages(srpc_server_rpc_t *rpc); void sfw_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i); -int sfw_alloc_pages(srpc_server_rpc_t *rpc, int npages, int sink); +int sfw_alloc_pages(srpc_server_rpc_t *rpc, int cpt, int npages, int len, + int sink); int sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply); srpc_client_rpc_t * @@ -399,7 +446,8 @@ srpc_create_client_rpc(lnet_process_id_t peer, int service, void srpc_post_rpc(srpc_client_rpc_t *rpc); void srpc_abort_rpc(srpc_client_rpc_t *rpc, int why); void srpc_free_bulk(srpc_bulk_t *bk); -srpc_bulk_t *srpc_alloc_bulk(int npages, int sink); +srpc_bulk_t *srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, + int sink); int srpc_send_rpc(swi_workitem_t *wi); int srpc_send_reply(srpc_server_rpc_t *rpc); int srpc_add_service(srpc_service_t *sv); @@ -412,6 +460,15 @@ void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer); void srpc_get_counters(srpc_counters_t *cnt); void srpc_set_counters(const srpc_counters_t *cnt); +extern struct cfs_wi_sched *lst_sched_serial; +extern struct cfs_wi_sched **lst_sched_test; + +static inline int +srpc_serv_is_framework(struct srpc_service *svc) +{ + return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID; +} + static inline int swi_wi_action(cfs_workitem_t *wi) { @@ -421,24 +478,31 @@ swi_wi_action(cfs_workitem_t *wi) } static inline void -swi_init_workitem (swi_workitem_t *swi, void *data, - swi_action_t action, short sched_id) +swi_init_workitem(swi_workitem_t *swi, void *data, + swi_action_t action, struct cfs_wi_sched *sched) { - swi->swi_action = action; - swi->swi_state = SWI_STATE_NEWBORN; - cfs_wi_init(&swi->swi_workitem, data, swi_wi_action, sched_id); + swi->swi_sched = sched; + swi->swi_action = action; + swi->swi_state = SWI_STATE_NEWBORN; + cfs_wi_init(&swi->swi_workitem, data, swi_wi_action); } static inline void swi_schedule_workitem(swi_workitem_t *wi) { - cfs_wi_schedule(&wi->swi_workitem); + cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem); } static inline void -swi_kill_workitem(swi_workitem_t *swi) +swi_exit_workitem(swi_workitem_t *swi) +{ + cfs_wi_exit(swi->swi_sched, &swi->swi_workitem); +} + +static inline int +swi_deschedule_workitem(swi_workitem_t *swi) { - cfs_wi_exit(&swi->swi_workitem); + return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem); } #ifndef __KERNEL__ @@ -485,9 +549,9 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer, crpc_bulk.bk_iovs[nbulkiov])); CFS_INIT_LIST_HEAD(&rpc->crpc_list); - swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, - CFS_WI_SCHED_ANY); - cfs_spin_lock_init(&rpc->crpc_lock); + swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, + lst_sched_test[lnet_cpt_of_nid(peer.nid)]); + spin_lock_init(&rpc->crpc_lock); cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ rpc->crpc_dest = peer; @@ -547,32 +611,30 @@ int selftest_wait_events(void); #else -#define selftest_wait_events() cfs_pause(cfs_time_seconds(1)) +#define selftest_wait_events() cfs_pause(cfs_time_seconds(1) / 10) #endif -#define lst_wait_until(cond, lock, fmt, ...) \ -do { \ - int __I = 2; \ - while (!(cond)) { \ - CDEBUG(IS_PO2(++__I) ? D_WARNING : D_NET, \ - fmt, ## __VA_ARGS__); \ - cfs_spin_unlock(&(lock)); \ - \ - selftest_wait_events(); \ - \ - cfs_spin_lock(&(lock)); \ - } \ +#define lst_wait_until(cond, lock, fmt, ...) \ +do { \ + int __I = 2; \ + while (!(cond)) { \ + CDEBUG(IS_PO2(++__I) ? D_WARNING : D_NET, \ + fmt, ## __VA_ARGS__); \ + spin_unlock(&(lock)); \ + \ + selftest_wait_events(); \ + \ + spin_lock(&(lock)); \ + } \ } while (0) static inline void -srpc_wait_service_shutdown (srpc_service_t *sv) +srpc_wait_service_shutdown(srpc_service_t *sv) { - int i = 2; + int i = 2; - cfs_spin_lock(&sv->sv_lock); - LASSERT (sv->sv_shuttingdown); - cfs_spin_unlock(&sv->sv_lock); + LASSERT(sv->sv_shuttingdown); while (srpc_finish_service(sv) == 0) { i++;