X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fselftest%2Frpc.c;h=4d8d6530294858f39c1059c15e3bad9bcfdc38fd;hp=bb6f58720fd82abd0080885335ff39af97372c89;hb=57ba24c7729e8046167f10b1ab1c5bedfd19bb2c;hpb=776615e6825e2c90c2635c8b55e7bb02da33726c diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index bb6f587..4d8d653 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -8,13 +8,11 @@ #define DEBUG_SUBSYSTEM S_LNET -#include -#include -#include - #include "selftest.h" +#define SRPC_PEER_HASH_SIZE 101 /* # peer lists */ + typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, @@ -24,9 +22,6 @@ typedef enum { SRPC_STATE_STOPPING, } srpc_state_t; -#define SRPC_PEER_HASH_SIZE 101 /* # peer lists */ -#define SRPC_PEER_CREDITS 16 /* >= most LND's default peer credit */ - struct smoketest_rpc { spinlock_t rpc_glock; /* global lock */ srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1]; @@ -37,6 +32,10 @@ struct smoketest_rpc { __u64 rpc_matchbits; /* matchbits counter */ } srpc_data; +static int srpc_peer_credits = 16; +CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444, + "# in-flight RPCs per peer (16 by default)"); + /* forward ref's */ int srpc_handle_rpc (swi_workitem_t *wi); @@ -175,7 +174,7 @@ srpc_create_peer (lnet_nid_t nid) memset(peer, 0, sizeof(srpc_peer_t)); peer->stp_nid = nid; - peer->stp_credits = SRPC_PEER_CREDITS; + peer->stp_credits = srpc_peer_credits; spin_lock_init(&peer->stp_lock); CFS_INIT_LIST_HEAD(&peer->stp_rpcq); @@ -202,8 +201,8 @@ srpc_find_peer_locked (lnet_nid_t nid) static srpc_peer_t * srpc_nid2peer (lnet_nid_t nid) { - srpc_peer_t *peer; - srpc_peer_t *new_peer; + srpc_peer_t *peer; + srpc_peer_t *new_peer; spin_lock(&srpc_data.rpc_glock); peer = srpc_find_peer_locked(nid); @@ -211,7 +210,7 @@ srpc_nid2peer (lnet_nid_t nid) if (peer != NULL) return peer; - + new_peer = srpc_create_peer(nid); spin_lock(&srpc_data.rpc_glock); @@ -229,7 +228,7 @@ srpc_nid2peer (lnet_nid_t nid) spin_unlock(&srpc_data.rpc_glock); return NULL; } - + list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid)); spin_unlock(&srpc_data.rpc_glock); return new_peer; @@ -379,7 +378,7 @@ srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf, } int -srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, +srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, int options, lnet_process_id_t peer, lnet_nid_t self, lnet_handle_md_t *mdh, srpc_event_t *ev) { @@ -403,11 +402,11 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options. * they're only meaningful for MDs attached to an ME (i.e. passive * buffers... */ - if ((options & LNET_MD_OP_PUT) != 0) { + if ((options & LNET_MD_OP_PUT) != 0) { rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer, portal, matchbits, 0, 0); } else { - LASSERT ((options & LNET_MD_OP_GET) != 0); + LASSERT ((options & LNET_MD_OP_GET) != 0); rc = LNetGet(self, *mdh, peer, portal, matchbits, 0); } @@ -442,7 +441,7 @@ srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf, else portal = SRPC_FRAMEWORK_REQUEST_PORTAL; - rc = srpc_post_active_rdma(portal, service, buf, len, + rc = srpc_post_active_rdma(portal, service, buf, len, LNET_MD_OP_PUT, peer, LNET_NID_ANY, mdh, ev); return rc; @@ -510,7 +509,7 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); spin_lock(&sv->sv_lock); - return rc; + return rc; } int @@ -624,8 +623,7 @@ srpc_finish_service (srpc_service_t *sv) void srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf) { - if (sv->sv_shuttingdown) - goto free; + if (sv->sv_shuttingdown) goto free; if (sv->sv_nprune == 0) { if (srpc_service_post_buffer(sv, buf) != 0) @@ -923,7 +921,8 @@ srpc_handle_rpc (swi_workitem_t *wi) } } case SWI_STATE_BULK_STARTED: - LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); + /* we cannot LASSERT ev_fired right here because it + * may be set only upon an event with unlinked==1 */ if (rpc->srpc_bulk != NULL) { rc = ev->ev_status; @@ -932,11 +931,20 @@ srpc_handle_rpc (swi_workitem_t *wi) rc = (*sv->sv_bulk_ready) (rpc, rc); if (rc != 0) { - srpc_server_rpc_done(rpc, rc); - return 1; + if (ev->ev_fired) { + srpc_server_rpc_done(rpc, rc); + return 1; + } + + rpc->srpc_status = rc; + wi->wi_state = SWI_STATE_BULK_ERRORED; + LNetMDUnlink(rpc->srpc_bulk->bk_mdh); + return 0; /* wait for UNLINK event */ } } + LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); + wi->wi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) @@ -950,6 +958,13 @@ srpc_handle_rpc (swi_workitem_t *wi) wi->wi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; + + case SWI_STATE_BULK_ERRORED: + LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired); + LASSERT (rpc->srpc_status != 0); + + srpc_server_rpc_done(rpc, rpc->srpc_status); + return 1; } return 0; @@ -987,20 +1002,20 @@ srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc) CFS_INIT_LIST_HEAD(&timer->stt_list); timer->stt_data = rpc; timer->stt_func = srpc_client_rpc_expired; - timer->stt_expires = cfs_time_add(rpc->crpc_timeout, + timer->stt_expires = cfs_time_add(rpc->crpc_timeout, cfs_time_current_sec()); stt_add_timer(timer); return; } -/* +/* * Called with rpc->crpc_lock held. * * Upon exit the RPC expiry timer is not queued and the handler is not * running on any CPU. */ void srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) -{ +{ /* timer not planted or already exploded */ if (rpc->crpc_timeout == 0) return; @@ -1012,7 +1027,7 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) while (rpc->crpc_timeout != 0) { spin_unlock(&rpc->crpc_lock); - cfs_schedule(); + cfs_schedule(); spin_lock(&rpc->crpc_lock); } @@ -1080,7 +1095,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) * No one can schedule me now since: * - RPC timer has been defused. * - all LNet events have been fired. - * - crpc_closed has been set, preventing srpc_abort_rpc from + * - crpc_closed has been set, preventing srpc_abort_rpc from * scheduling me. * Cancel pending schedules and prevent future schedule attempts: */ @@ -1138,7 +1153,7 @@ srpc_send_rpc (swi_workitem_t *wi) case SWI_STATE_REQUEST_SUBMITTED: /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any - * order; however, they're processed in a strict order: + * order; however, they're processed in a strict order: * rqt, rpy, and bulk. */ if (!rpc->crpc_reqstev.ev_fired) break; @@ -1155,7 +1170,7 @@ srpc_send_rpc (swi_workitem_t *wi) rc = rpc->crpc_replyev.ev_status; if (rc != 0) break; - if ((reply->msg_type != type && + if ((reply->msg_type != type && reply->msg_type != __swab32(type)) || (reply->msg_magic != SRPC_MSG_MAGIC && reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) { @@ -1224,7 +1239,7 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service, { srpc_client_rpc_t *rpc; - LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, + LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, crpc_bulk.bk_iovs[nbulkiov])); if (rpc == NULL) return NULL; @@ -1372,8 +1387,8 @@ srpc_send_reply (srpc_server_rpc_t *rpc) return rc; } -/* always called with LNET_LOCK() held, and in thread context */ -void +/* when in kernel always called with LNET_LOCK() held, and in thread context */ +void srpc_lnet_ev_handler (lnet_event_t *ev) { srpc_event_t *rpcev = ev->md.user_ptr; @@ -1383,6 +1398,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) srpc_service_t *sv; srpc_msg_t *msg; srpc_msg_type_t type; + int fired_flag = 1; LASSERT (!in_interrupt()); @@ -1415,7 +1431,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev->ev_fired == 0); rpcev->ev_fired = 1; - rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? + rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; swi_schedule_workitem(&crpc->crpc_wi); @@ -1443,7 +1459,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (sv->sv_nposted_msg >= 0); if (sv->sv_shuttingdown) { - /* Leave buffer on sv->sv_posted_msgq since + /* Leave buffer on sv->sv_posted_msgq since * srpc_finish_service needs to traverse it. */ spin_unlock(&sv->sv_lock); break; @@ -1454,7 +1470,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) type = srpc_service2request(sv->sv_id); if (ev->status != 0 || ev->mlength != sizeof(*msg) || - (msg->msg_type != type && + (msg->msg_type != type && msg->msg_type != __swab32(type)) || (msg->msg_magic != SRPC_MSG_MAGIC && msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) { @@ -1504,10 +1520,13 @@ srpc_lnet_ev_handler (lnet_event_t *ev) ev->type == LNET_EVENT_REPLY || ev->type == LNET_EVENT_UNLINK); - if (ev->type == LNET_EVENT_SEND && - ev->status == 0 && !ev->unlinked) - break; /* wait for the final LNET_EVENT_REPLY */ - + if (ev->type == LNET_EVENT_SEND && !ev->unlinked) { + if (ev->status == 0) + break; /* wait for the final LNET_EVENT_REPLY */ + else + fired_flag = 0; /* LNET_EVENT_REPLY may arrive + (optimized GET case) */ + } case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { spin_lock(&srpc_data.rpc_glock); @@ -1526,9 +1545,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev == &srpc->srpc_ev); spin_lock(&sv->sv_lock); - rpcev->ev_fired = 1; - rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? + if (fired_flag) + rpcev->ev_fired = 1; + + rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; + srpc_schedule_server_rpc(srpc); spin_unlock(&sv->sv_lock); break; @@ -1548,17 +1570,16 @@ srpc_check_event (int timeout) rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1, timeout * 1000, &ev, &i); - if (rc == 0) - return 0; - + if (rc == 0) return 0; + LASSERT (rc == -EOVERFLOW || rc == 1); - + /* We can't affort to miss any events... */ if (rc == -EOVERFLOW) { CERROR ("Dropped an event!!!\n"); abort(); } - + srpc_lnet_ev_handler(&ev); return 1; } @@ -1571,6 +1592,18 @@ srpc_startup (void) int i; int rc; +#ifndef __KERNEL__ + char *s; + + s = getenv("SRPC_PEER_CREDITS"); + srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits; +#endif + + if (srpc_peer_credits <= 0) { + CERROR("Peer credits must be positive: %d\n", srpc_peer_credits); + return -EINVAL; + } + memset(&srpc_data, 0, sizeof(struct smoketest_rpc)); spin_lock_init(&srpc_data.rpc_glock); @@ -1593,7 +1626,10 @@ srpc_startup (void) #ifdef __KERNEL__ rc = LNetNIInit(LUSTRE_SRV_LNET_PID); #else - rc = LNetNIInit(getpid()); + if (the_lnet.ln_server_mode_flag) + rc = LNetNIInit(LUSTRE_SRV_LNET_PID); + else + rc = LNetNIInit(getpid() | LNET_PID_USERFLAG); #endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); @@ -1690,7 +1726,7 @@ srpc_shutdown (void) LASSERT (list_empty(&peer->stp_rpcq)); LASSERT (list_empty(&peer->stp_ctl_rpcq)); - LASSERT (peer->stp_credits == SRPC_PEER_CREDITS); + LASSERT (peer->stp_credits == srpc_peer_credits); LIBCFS_FREE(peer, sizeof(srpc_peer_t)); }