X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fselftest%2Frpc.c;h=4d8d6530294858f39c1059c15e3bad9bcfdc38fd;hp=ac5a8fe59cfd73a00fe3730550c9e72e79fb35a5;hb=57ba24c7729e8046167f10b1ab1c5bedfd19bb2c;hpb=03ff4ee6484faf914cadbbffedbcd0057708625a diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index ac5a8fe..4d8d653 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -201,8 +201,8 @@ srpc_find_peer_locked (lnet_nid_t nid) static srpc_peer_t * srpc_nid2peer (lnet_nid_t nid) { - srpc_peer_t *peer; - srpc_peer_t *new_peer; + srpc_peer_t *peer; + srpc_peer_t *new_peer; spin_lock(&srpc_data.rpc_glock); peer = srpc_find_peer_locked(nid); @@ -210,7 +210,7 @@ srpc_nid2peer (lnet_nid_t nid) if (peer != NULL) return peer; - + new_peer = srpc_create_peer(nid); spin_lock(&srpc_data.rpc_glock); @@ -228,7 +228,7 @@ srpc_nid2peer (lnet_nid_t nid) spin_unlock(&srpc_data.rpc_glock); return NULL; } - + list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid)); spin_unlock(&srpc_data.rpc_glock); return new_peer; @@ -378,7 +378,7 @@ srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf, } int -srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, +srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, int options, lnet_process_id_t peer, lnet_nid_t self, lnet_handle_md_t *mdh, srpc_event_t *ev) { @@ -441,7 +441,7 @@ srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf, else portal = SRPC_FRAMEWORK_REQUEST_PORTAL; - rc = srpc_post_active_rdma(portal, service, buf, len, + rc = srpc_post_active_rdma(portal, service, buf, len, LNET_MD_OP_PUT, peer, LNET_NID_ANY, mdh, ev); return rc; @@ -509,7 +509,7 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); spin_lock(&sv->sv_lock); - return rc; + return rc; } int @@ -921,7 +921,8 @@ srpc_handle_rpc (swi_workitem_t *wi) } } case SWI_STATE_BULK_STARTED: - LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); + /* we cannot LASSERT ev_fired right here because it + * may be set only upon an event with unlinked==1 */ if (rpc->srpc_bulk != NULL) { rc = ev->ev_status; @@ -930,11 +931,20 @@ srpc_handle_rpc (swi_workitem_t *wi) rc = (*sv->sv_bulk_ready) (rpc, rc); if (rc != 0) { - srpc_server_rpc_done(rpc, rc); - return 1; + if (ev->ev_fired) { + srpc_server_rpc_done(rpc, rc); + return 1; + } + + rpc->srpc_status = rc; + wi->wi_state = SWI_STATE_BULK_ERRORED; + LNetMDUnlink(rpc->srpc_bulk->bk_mdh); + return 0; /* wait for UNLINK event */ } } + LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); + wi->wi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) @@ -948,6 +958,13 @@ srpc_handle_rpc (swi_workitem_t *wi) wi->wi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; + + case SWI_STATE_BULK_ERRORED: + LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired); + LASSERT (rpc->srpc_status != 0); + + srpc_server_rpc_done(rpc, rpc->srpc_status); + return 1; } return 0; @@ -985,20 +1002,20 @@ srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc) CFS_INIT_LIST_HEAD(&timer->stt_list); timer->stt_data = rpc; timer->stt_func = srpc_client_rpc_expired; - timer->stt_expires = cfs_time_add(rpc->crpc_timeout, + timer->stt_expires = cfs_time_add(rpc->crpc_timeout, cfs_time_current_sec()); stt_add_timer(timer); return; } -/* +/* * Called with rpc->crpc_lock held. * * Upon exit the RPC expiry timer is not queued and the handler is not * running on any CPU. */ void srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) -{ +{ /* timer not planted or already exploded */ if (rpc->crpc_timeout == 0) return; @@ -1010,7 +1027,7 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) while (rpc->crpc_timeout != 0) { spin_unlock(&rpc->crpc_lock); - cfs_schedule(); + cfs_schedule(); spin_lock(&rpc->crpc_lock); } @@ -1078,7 +1095,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) * No one can schedule me now since: * - RPC timer has been defused. * - all LNet events have been fired. - * - crpc_closed has been set, preventing srpc_abort_rpc from + * - crpc_closed has been set, preventing srpc_abort_rpc from * scheduling me. * Cancel pending schedules and prevent future schedule attempts: */ @@ -1136,7 +1153,7 @@ srpc_send_rpc (swi_workitem_t *wi) case SWI_STATE_REQUEST_SUBMITTED: /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any - * order; however, they're processed in a strict order: + * order; however, they're processed in a strict order: * rqt, rpy, and bulk. */ if (!rpc->crpc_reqstev.ev_fired) break; @@ -1153,7 +1170,7 @@ srpc_send_rpc (swi_workitem_t *wi) rc = rpc->crpc_replyev.ev_status; if (rc != 0) break; - if ((reply->msg_type != type && + if ((reply->msg_type != type && reply->msg_type != __swab32(type)) || (reply->msg_magic != SRPC_MSG_MAGIC && reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) { @@ -1222,7 +1239,7 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service, { srpc_client_rpc_t *rpc; - LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, + LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, crpc_bulk.bk_iovs[nbulkiov])); if (rpc == NULL) return NULL; @@ -1371,7 +1388,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc) } /* when in kernel always called with LNET_LOCK() held, and in thread context */ -void +void srpc_lnet_ev_handler (lnet_event_t *ev) { srpc_event_t *rpcev = ev->md.user_ptr; @@ -1381,6 +1398,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) srpc_service_t *sv; srpc_msg_t *msg; srpc_msg_type_t type; + int fired_flag = 1; LASSERT (!in_interrupt()); @@ -1413,7 +1431,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev->ev_fired == 0); rpcev->ev_fired = 1; - rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? + rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; swi_schedule_workitem(&crpc->crpc_wi); @@ -1441,7 +1459,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (sv->sv_nposted_msg >= 0); if (sv->sv_shuttingdown) { - /* Leave buffer on sv->sv_posted_msgq since + /* Leave buffer on sv->sv_posted_msgq since * srpc_finish_service needs to traverse it. */ spin_unlock(&sv->sv_lock); break; @@ -1452,7 +1470,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) type = srpc_service2request(sv->sv_id); if (ev->status != 0 || ev->mlength != sizeof(*msg) || - (msg->msg_type != type && + (msg->msg_type != type && msg->msg_type != __swab32(type)) || (msg->msg_magic != SRPC_MSG_MAGIC && msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) { @@ -1502,10 +1520,13 @@ srpc_lnet_ev_handler (lnet_event_t *ev) ev->type == LNET_EVENT_REPLY || ev->type == LNET_EVENT_UNLINK); - if (ev->type == LNET_EVENT_SEND && - ev->status == 0 && !ev->unlinked) - break; /* wait for the final LNET_EVENT_REPLY */ - + if (ev->type == LNET_EVENT_SEND && !ev->unlinked) { + if (ev->status == 0) + break; /* wait for the final LNET_EVENT_REPLY */ + else + fired_flag = 0; /* LNET_EVENT_REPLY may arrive + (optimized GET case) */ + } case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { spin_lock(&srpc_data.rpc_glock); @@ -1524,9 +1545,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev == &srpc->srpc_ev); spin_lock(&sv->sv_lock); - rpcev->ev_fired = 1; - rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? + if (fired_flag) + rpcev->ev_fired = 1; + + rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; + srpc_schedule_server_rpc(srpc); spin_unlock(&sv->sv_lock); break;