X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fselftest%2Frpc.c;h=abaed8a1ccf40b4b67c0426d384319015454b6f9;hb=a59745924e3c71c89f162091917c78118dff59cd;hp=4d8d6530294858f39c1059c15e3bad9bcfdc38fd;hpb=57ba24c7729e8046167f10b1ab1c5bedfd19bb2c;p=fs%2Flustre-release.git diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 4d8d653..abaed8a 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -1,9 +1,41 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001, 2002 Cluster File Systems, Inc. - * Author: Isaac Huang + * GPL HEADER START * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lnet/selftest/rpc.c + * + * Author: Isaac Huang */ #define DEBUG_SUBSYSTEM S_LNET @@ -11,8 +43,6 @@ #include "selftest.h" -#define SRPC_PEER_HASH_SIZE 101 /* # peer lists */ - typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, @@ -23,34 +53,29 @@ typedef enum { } srpc_state_t; struct smoketest_rpc { - spinlock_t rpc_glock; /* global lock */ + cfs_spinlock_t rpc_glock; /* global lock */ srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1]; - struct list_head *rpc_peers; /* hash table of known peers */ lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */ srpc_state_t rpc_state; srpc_counters_t rpc_counters; __u64 rpc_matchbits; /* matchbits counter */ } srpc_data; -static int srpc_peer_credits = 16; -CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444, - "# in-flight RPCs per peer (16 by default)"); - /* forward ref's */ int srpc_handle_rpc (swi_workitem_t *wi); void srpc_get_counters (srpc_counters_t *cnt) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); *cnt = srpc_data.rpc_counters; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } void srpc_set_counters (const srpc_counters_t *cnt) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters = *cnt; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } void @@ -149,99 +174,14 @@ srpc_alloc_bulk (int npages, int sink) return bk; } - -static inline struct list_head * -srpc_nid2peerlist (lnet_nid_t nid) -{ - unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE; - - return &srpc_data.rpc_peers[hash]; -} - -static inline srpc_peer_t * -srpc_create_peer (lnet_nid_t nid) -{ - srpc_peer_t *peer; - - LASSERT (nid != LNET_NID_ANY); - - LIBCFS_ALLOC(peer, sizeof(srpc_peer_t)); - if (peer == NULL) { - CERROR ("Failed to allocate peer structure for %s\n", - libcfs_nid2str(nid)); - return NULL; - } - - memset(peer, 0, sizeof(srpc_peer_t)); - peer->stp_nid = nid; - peer->stp_credits = srpc_peer_credits; - - spin_lock_init(&peer->stp_lock); - CFS_INIT_LIST_HEAD(&peer->stp_rpcq); - CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq); - return peer; -} - -srpc_peer_t * -srpc_find_peer_locked (lnet_nid_t nid) -{ - struct list_head *peer_list = srpc_nid2peerlist(nid); - srpc_peer_t *peer; - - LASSERT (nid != LNET_NID_ANY); - - list_for_each_entry (peer, peer_list, stp_list) { - if (peer->stp_nid == nid) - return peer; - } - - return NULL; -} - -static srpc_peer_t * -srpc_nid2peer (lnet_nid_t nid) -{ - srpc_peer_t *peer; - srpc_peer_t *new_peer; - - spin_lock(&srpc_data.rpc_glock); - peer = srpc_find_peer_locked(nid); - spin_unlock(&srpc_data.rpc_glock); - - if (peer != NULL) - return peer; - - new_peer = srpc_create_peer(nid); - - spin_lock(&srpc_data.rpc_glock); - - peer = srpc_find_peer_locked(nid); - if (peer != NULL) { - spin_unlock(&srpc_data.rpc_glock); - if (new_peer != NULL) - LIBCFS_FREE(new_peer, sizeof(srpc_peer_t)); - - return peer; - } - - if (new_peer == NULL) { - spin_unlock(&srpc_data.rpc_glock); - return NULL; - } - - list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid)); - spin_unlock(&srpc_data.rpc_glock); - return new_peer; -} - static inline __u64 srpc_next_id (void) { __u64 id; - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); id = srpc_data.rpc_matchbits++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return id; } @@ -258,7 +198,7 @@ srpc_init_server_rpc (srpc_server_rpc_t *rpc, rpc->srpc_reqstbuf = buffer; rpc->srpc_peer = buffer->buf_peer; rpc->srpc_self = buffer->buf_self; - rpc->srpc_replymdh = LNET_INVALID_HANDLE; + LNetInvalidateHandle(&rpc->srpc_replymdh); } int @@ -271,22 +211,22 @@ srpc_add_service (srpc_service_t *sv) LASSERT (sv->sv_concur > 0); LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID); - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); if (srpc_data.rpc_services[id] != NULL) { - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return -EBUSY; } srpc_data.rpc_services[id] = sv; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); sv->sv_nprune = 0; sv->sv_nposted_msg = 0; sv->sv_shuttingdown = 0; - spin_lock_init(&sv->sv_lock); + cfs_spin_lock_init(&sv->sv_lock); CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq); CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq); CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq); @@ -299,7 +239,7 @@ srpc_add_service (srpc_service_t *sv) LIBCFS_ALLOC(rpc, sizeof(*rpc)); if (rpc == NULL) goto enomem; - list_add(&rpc->srpc_list, &sv->sv_free_rpcq); + cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq); } CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n", @@ -307,16 +247,16 @@ srpc_add_service (srpc_service_t *sv) return 0; enomem: - while (!list_empty(&sv->sv_free_rpcq)) { - rpc = list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - list_del(&rpc->srpc_list); + while (!cfs_list_empty(&sv->sv_free_rpcq)) { + rpc = cfs_list_entry(sv->sv_free_rpcq.next, + srpc_server_rpc_t, srpc_list); + cfs_list_del(&rpc->srpc_list); LIBCFS_FREE(rpc, sizeof(*rpc)); } - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_services[id] = NULL; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return -ENOMEM; } @@ -325,15 +265,15 @@ srpc_remove_service (srpc_service_t *sv) { int id = sv->sv_id; - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); if (srpc_data.rpc_services[id] != sv) { - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return -ENOENT; } srpc_data.rpc_services[id] = NULL; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return 0; } @@ -453,8 +393,10 @@ srpc_post_passive_rqtbuf(int service, void *buf, int len, { int rc; int portal; - lnet_process_id_t any = {.nid = LNET_NID_ANY, - .pid = LNET_PID_ANY}; + lnet_process_id_t any = {0}; + + any.nid = LNET_NID_ANY; + any.pid = LNET_PID_ANY; if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID) portal = SRPC_REQUEST_PORTAL; @@ -474,10 +416,10 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) LASSERT (!sv->sv_shuttingdown); - buf->buf_mdh = LNET_INVALID_HANDLE; - list_add(&buf->buf_list, &sv->sv_posted_msgq); + LNetInvalidateHandle(&buf->buf_mdh); + cfs_list_add(&buf->buf_list, &sv->sv_posted_msgq); sv->sv_nposted_msg++; - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg), &buf->buf_mdh, &sv->sv_ev); @@ -486,17 +428,17 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) * msg and its event handler has been called. So we must add * buf to sv_posted_msgq _before_ dropping sv_lock */ - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); if (rc == 0) { if (sv->sv_shuttingdown) { - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); /* srpc_shutdown_service might have tried to unlink me * when my buf_mdh was still invalid */ LNetMDUnlink(buf->buf_mdh); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); } return 0; } @@ -504,11 +446,11 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf) sv->sv_nposted_msg--; if (sv->sv_shuttingdown) return rc; - list_del(&buf->buf_list); + cfs_list_del(&buf->buf_list); - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); return rc; } @@ -526,9 +468,9 @@ srpc_service_add_buffers (srpc_service_t *sv, int nbuffer) LIBCFS_ALLOC(buf, sizeof(*buf)); if (buf == NULL) break; - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); rc = srpc_service_post_buffer(sv, buf); - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); if (rc != 0) break; } @@ -542,14 +484,14 @@ srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer) LASSERTF (nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); LASSERT (sv->sv_nprune >= 0); LASSERT (!sv->sv_shuttingdown); sv->sv_nprune += nbuffer; - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); return; } @@ -560,24 +502,24 @@ srpc_finish_service (srpc_service_t *sv) srpc_server_rpc_t *rpc; srpc_buffer_t *buf; - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */ - if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) { + if (sv->sv_nposted_msg != 0 || !cfs_list_empty(&sv->sv_active_rpcq)) { CDEBUG (D_NET, "waiting for %d posted buffers to unlink and " "in-flight RPCs to die.\n", sv->sv_nposted_msg); - if (!list_empty(&sv->sv_active_rpcq)) { - rpc = list_entry(sv->sv_active_rpcq.next, - srpc_server_rpc_t, srpc_list); + if (!cfs_list_empty(&sv->sv_active_rpcq)) { + rpc = cfs_list_entry(sv->sv_active_rpcq.next, + srpc_server_rpc_t, srpc_list); CDEBUG (D_NETERROR, - "Active RPC on shutdown: sv %s, peer %s, " + "Active RPC %p on shutdown: sv %s, peer %s, " "wi %s scheduled %d running %d, " "ev fired %d type %d status %d lnet %d\n", - sv->sv_name, libcfs_id2str(rpc->srpc_peer), + rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), rpc->srpc_wi.wi_scheduled, rpc->srpc_wi.wi_running, @@ -587,32 +529,32 @@ srpc_finish_service (srpc_service_t *sv) rpc->srpc_ev.ev_lnet); } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); return 0; } - spin_unlock(&sv->sv_lock); /* no lock needed from now on */ + cfs_spin_unlock(&sv->sv_lock); /* no lock needed from now on */ for (;;) { - struct list_head *q; + cfs_list_t *q; - if (!list_empty(&sv->sv_posted_msgq)) + if (!cfs_list_empty(&sv->sv_posted_msgq)) q = &sv->sv_posted_msgq; - else if (!list_empty(&sv->sv_blocked_msgq)) + else if (!cfs_list_empty(&sv->sv_blocked_msgq)) q = &sv->sv_blocked_msgq; else break; - buf = list_entry(q->next, srpc_buffer_t, buf_list); - list_del(&buf->buf_list); + buf = cfs_list_entry(q->next, srpc_buffer_t, buf_list); + cfs_list_del(&buf->buf_list); LIBCFS_FREE(buf, sizeof(*buf)); } - while (!list_empty(&sv->sv_free_rpcq)) { - rpc = list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - list_del(&rpc->srpc_list); + while (!cfs_list_empty(&sv->sv_free_rpcq)) { + rpc = cfs_list_entry(sv->sv_free_rpcq.next, + srpc_server_rpc_t, srpc_list); + cfs_list_del(&rpc->srpc_list); LIBCFS_FREE(rpc, sizeof(*rpc)); } @@ -633,9 +575,45 @@ srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf) sv->sv_nprune--; free: - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); +} + +/* called with srpc_service_t::sv_lock held */ +inline void +srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) +{ + srpc_service_t *sv = rpc->srpc_service; + + if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) + swi_schedule_workitem(&rpc->srpc_wi); + else /* framework RPCs are handled one by one */ + swi_schedule_serial_workitem(&rpc->srpc_wi); + + return; +} + +void +srpc_abort_service (srpc_service_t *sv) +{ + srpc_server_rpc_t *rpc; + + cfs_spin_lock(&sv->sv_lock); + + CDEBUG(D_NET, "Aborting service: id %d, name %s\n", + sv->sv_id, sv->sv_name); + + /* schedule in-flight RPCs to notice the abort, NB: + * racing with incoming RPCs; complete fix should make test + * RPCs carry session ID in its headers */ + cfs_list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) { + rpc->srpc_aborted = 1; + srpc_schedule_server_rpc(rpc); + } + + cfs_spin_unlock(&sv->sv_lock); + return; } void @@ -644,23 +622,25 @@ srpc_shutdown_service (srpc_service_t *sv) srpc_server_rpc_t *rpc; srpc_buffer_t *buf; - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); - CDEBUG (D_NET, "Shutting down service: id %d, name %s\n", - sv->sv_id, sv->sv_name); + CDEBUG(D_NET, "Shutting down service: id %d, name %s\n", + sv->sv_id, sv->sv_name); sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ /* schedule in-flight RPCs to notice the shutdown */ - list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) { - swi_schedule_workitem(&rpc->srpc_wi); + cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq, + srpc_server_rpc_t, srpc_list) { + srpc_schedule_server_rpc(rpc); } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); /* OK to traverse sv_posted_msgq without lock, since no one * touches sv_posted_msgq now */ - list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list) + cfs_list_for_each_entry_typed (buf, &sv->sv_posted_msgq, + srpc_buffer_t, buf_list) LNetMDUnlink(buf->buf_mdh); return; @@ -777,20 +757,6 @@ srpc_do_bulk (srpc_server_rpc_t *rpc) return rc; } -/* called with srpc_service_t::sv_lock held */ -inline void -srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) -{ - srpc_service_t *sv = rpc->srpc_service; - - if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) - swi_schedule_workitem(&rpc->srpc_wi); - else /* framework RPCs are handled one by one */ - swi_schedule_serial_workitem(&rpc->srpc_wi); - - return; -} - /* only called from srpc_handle_rpc */ void srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) @@ -803,21 +769,21 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) rpc->srpc_status = status; CDEBUG (status == 0 ? D_NET : D_NETERROR, - "Server RPC done: service %s, peer %s, status %s:%d\n", - sv->sv_name, libcfs_id2str(rpc->srpc_peer), + "Server RPC %p done: service %s, peer %s, status %s:%d\n", + rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), status); if (status != 0) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_dropped++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } if (rpc->srpc_done != NULL) (*rpc->srpc_done) (rpc); LASSERT (rpc->srpc_bulk == NULL); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); if (rpc->srpc_reqstbuf != NULL) { /* NB might drop sv_lock in srpc_service_recycle_buffer, but @@ -826,7 +792,7 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) rpc->srpc_reqstbuf = NULL; } - list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */ + cfs_list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */ /* * No one can schedule me now since: @@ -837,19 +803,19 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) LASSERT (rpc->srpc_ev.ev_fired); swi_kill_workitem(&rpc->srpc_wi); - if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) { - buffer = list_entry(sv->sv_blocked_msgq.next, + if (!sv->sv_shuttingdown && !cfs_list_empty(&sv->sv_blocked_msgq)) { + buffer = cfs_list_entry(sv->sv_blocked_msgq.next, srpc_buffer_t, buf_list); - list_del(&buffer->buf_list); + cfs_list_del(&buffer->buf_list); srpc_init_server_rpc(rpc, sv, buffer); - list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq); + cfs_list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq); srpc_schedule_server_rpc(rpc); } else { - list_add(&rpc->srpc_list, &sv->sv_free_rpcq); + cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq); } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); return; } @@ -864,10 +830,10 @@ srpc_handle_rpc (swi_workitem_t *wi) LASSERT (wi == &rpc->srpc_wi); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); - if (sv->sv_shuttingdown) { - spin_unlock(&sv->sv_lock); + if (sv->sv_shuttingdown || rpc->srpc_aborted) { + cfs_spin_unlock(&sv->sv_lock); if (rpc->srpc_bulk != NULL) LNetMDUnlink(rpc->srpc_bulk->bk_mdh); @@ -880,7 +846,7 @@ srpc_handle_rpc (swi_workitem_t *wi) return 0; } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); switch (wi->wi_state) { default: @@ -892,8 +858,11 @@ srpc_handle_rpc (swi_workitem_t *wi) msg = &rpc->srpc_reqstbuf->buf_msg; reply = &rpc->srpc_replymsg.msg_body.reply; - if (msg->msg_version != SRPC_MSG_VERSION && - msg->msg_version != __swab32(SRPC_MSG_VERSION)) { + if (msg->msg_magic == 0) { + /* moaned already in srpc_lnet_ev_handler */ + rc = EBADMSG; + } else if (msg->msg_version != SRPC_MSG_VERSION && + msg->msg_version != __swab32(SRPC_MSG_VERSION)) { CWARN ("Version mismatch: %u, %u expected, from %s\n", msg->msg_version, SRPC_MSG_VERSION, libcfs_id2str(rpc->srpc_peer)); @@ -921,8 +890,7 @@ srpc_handle_rpc (swi_workitem_t *wi) } } case SWI_STATE_BULK_STARTED: - /* we cannot LASSERT ev_fired right here because it - * may be set only upon an event with unlinked==1 */ + LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); if (rpc->srpc_bulk != NULL) { rc = ev->ev_status; @@ -931,20 +899,11 @@ srpc_handle_rpc (swi_workitem_t *wi) rc = (*sv->sv_bulk_ready) (rpc, rc); if (rc != 0) { - if (ev->ev_fired) { - srpc_server_rpc_done(rpc, rc); - return 1; - } - - rpc->srpc_status = rc; - wi->wi_state = SWI_STATE_BULK_ERRORED; - LNetMDUnlink(rpc->srpc_bulk->bk_mdh); - return 0; /* wait for UNLINK event */ + srpc_server_rpc_done(rpc, rc); + return 1; } } - LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); - wi->wi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) @@ -953,18 +912,17 @@ srpc_handle_rpc (swi_workitem_t *wi) return 1; case SWI_STATE_REPLY_SUBMITTED: - LASSERT (ev->ev_fired); + if (!ev->ev_fired) { + CERROR("RPC %p: bulk %p, service %d\n", + rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id); + CERROR("Event: status %d, type %d, lnet %d\n", + ev->ev_status, ev->ev_type, ev->ev_lnet); + LASSERT (ev->ev_fired); + } wi->wi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; - - case SWI_STATE_BULK_ERRORED: - LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired); - LASSERT (rpc->srpc_status != 0); - - srpc_server_rpc_done(rpc, rpc->srpc_status); - return 1; } return 0; @@ -979,16 +937,16 @@ srpc_client_rpc_expired (void *data) rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), rpc->crpc_timeout); - spin_lock(&rpc->crpc_lock); + cfs_spin_lock(&rpc->crpc_lock); rpc->crpc_timeout = 0; srpc_abort_rpc(rpc, -ETIMEDOUT); - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_expired++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); return; } @@ -1025,11 +983,11 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) #ifdef __KERNEL__ /* timer detonated, wait for it to explode */ while (rpc->crpc_timeout != 0) { - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); cfs_schedule(); - spin_lock(&rpc->crpc_lock); + cfs_spin_lock(&rpc->crpc_lock); } #else LBUG(); /* impossible in single-threaded runtime */ @@ -1038,47 +996,13 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) } void -srpc_check_sends (srpc_peer_t *peer, int credits) -{ - struct list_head *q; - srpc_client_rpc_t *rpc; - - LASSERT (credits >= 0); - LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); - - spin_lock(&peer->stp_lock); - peer->stp_credits += credits; - - while (peer->stp_credits) { - if (!list_empty(&peer->stp_ctl_rpcq)) - q = &peer->stp_ctl_rpcq; - else if (!list_empty(&peer->stp_rpcq)) - q = &peer->stp_rpcq; - else - break; - - peer->stp_credits--; - - rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl); - list_del_init(&rpc->crpc_privl); - srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ - - swi_schedule_workitem(&rpc->crpc_wi); - } - - spin_unlock(&peer->stp_lock); - return; -} - -void srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) { swi_workitem_t *wi = &rpc->crpc_wi; - srpc_peer_t *peer = rpc->crpc_peer; LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE); - spin_lock(&rpc->crpc_lock); + cfs_spin_lock(&rpc->crpc_lock); rpc->crpc_closed = 1; if (rpc->crpc_status == 0) @@ -1102,12 +1026,9 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) LASSERT (!srpc_event_pending(rpc)); swi_kill_workitem(wi); - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); (*rpc->crpc_done) (rpc); - - if (peer != NULL) - srpc_check_sends(peer, 1); return; } @@ -1123,14 +1044,14 @@ srpc_send_rpc (swi_workitem_t *wi) LASSERT (rpc != NULL); LASSERT (wi == &rpc->crpc_wi); - spin_lock(&rpc->crpc_lock); + cfs_spin_lock(&rpc->crpc_lock); if (rpc->crpc_aborted) { - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); goto abort; } - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); switch (wi->wi_state) { default: @@ -1212,9 +1133,9 @@ srpc_send_rpc (swi_workitem_t *wi) } if (rc != 0) { - spin_lock(&rpc->crpc_lock); + cfs_spin_lock(&rpc->crpc_lock); srpc_abort_rpc(rpc, rc); - spin_unlock(&rpc->crpc_lock); + cfs_spin_unlock(&rpc->crpc_lock); } abort: @@ -1250,38 +1171,9 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service, } /* called with rpc->crpc_lock held */ -static inline void -srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc) -{ - int service = rpc->crpc_service; - - LASSERT (peer->stp_nid == rpc->crpc_dest.nid); - LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); - - rpc->crpc_peer = peer; - - spin_lock(&peer->stp_lock); - - /* Framework RPCs that alter session state shall take precedence - * over test RPCs and framework query RPCs */ - if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID && - service != SRPC_SERVICE_DEBUG && - service != SRPC_SERVICE_QUERY_STAT) - list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq); - else - list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq); - - srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */ - spin_unlock(&peer->stp_lock); - return; -} - -/* called with rpc->crpc_lock held */ void srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) { - srpc_peer_t *peer = rpc->crpc_peer; - LASSERT (why != 0); if (rpc->crpc_aborted || /* already aborted */ @@ -1295,19 +1187,6 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) rpc->crpc_aborted = 1; rpc->crpc_status = why; - - if (peer != NULL) { - spin_lock(&peer->stp_lock); - - if (!list_empty(&rpc->crpc_privl)) { /* still queued */ - list_del_init(&rpc->crpc_privl); - srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ - rpc->crpc_peer = NULL; /* no credit taken */ - } - - spin_unlock(&peer->stp_lock); - } - swi_schedule_workitem(&rpc->crpc_wi); return; } @@ -1316,10 +1195,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) void srpc_post_rpc (srpc_client_rpc_t *rpc) { - srpc_peer_t *peer; - LASSERT (!rpc->crpc_aborted); - LASSERT (rpc->crpc_peer == NULL); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0); @@ -1328,18 +1204,7 @@ srpc_post_rpc (srpc_client_rpc_t *rpc) rpc->crpc_timeout); srpc_add_client_rpc_timer(rpc); - - peer = srpc_nid2peer(rpc->crpc_dest.nid); - if (peer == NULL) { - srpc_abort_rpc(rpc, -ENOMEM); - return; - } - - srpc_queue_rpc(peer, rpc); - - spin_unlock(&rpc->crpc_lock); - srpc_check_sends(peer, 0); - spin_lock(&rpc->crpc_lock); + swi_schedule_workitem(&rpc->crpc_wi); return; } @@ -1357,7 +1222,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc) LASSERT (buffer != NULL); rpyid = buffer->buf_msg.msg_body.reqst.rpyid; - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); if (!sv->sv_shuttingdown && sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) { @@ -1368,7 +1233,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc) rpc->srpc_reqstbuf = NULL; } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); ev->ev_fired = 0; ev->ev_data = rpc; @@ -1398,36 +1263,44 @@ srpc_lnet_ev_handler (lnet_event_t *ev) srpc_service_t *sv; srpc_msg_t *msg; srpc_msg_type_t type; - int fired_flag = 1; - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); if (ev->status != 0) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.errors++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } rpcev->ev_lnet = ev->type; switch (rpcev->ev_type) { default: + CERROR("Unknown event: status %d, type %d, lnet %d\n", + rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); LBUG (); case SRPC_REQUEST_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_sent++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } case SRPC_REPLY_RCVD: case SRPC_BULK_REQ_RCVD: crpc = rpcev->ev_data; - LASSERT (rpcev == &crpc->crpc_reqstev || - rpcev == &crpc->crpc_replyev || - rpcev == &crpc->crpc_bulkev); + if (rpcev != &crpc->crpc_reqstev && + rpcev != &crpc->crpc_replyev && + rpcev != &crpc->crpc_bulkev) { + CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n", + rpcev, crpc, &crpc->crpc_reqstev, + &crpc->crpc_replyev, &crpc->crpc_bulkev); + CERROR("Bad event: status %d, type %d, lnet %d\n", + rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); + LBUG (); + } - spin_lock(&crpc->crpc_lock); + cfs_spin_lock(&crpc->crpc_lock); LASSERT (rpcev->ev_fired == 0); rpcev->ev_fired = 1; @@ -1435,7 +1308,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) -EINTR : ev->status; swi_schedule_workitem(&crpc->crpc_wi); - spin_unlock(&crpc->crpc_lock); + cfs_spin_unlock(&crpc->crpc_lock); break; case SRPC_REQUEST_RCVD: @@ -1443,7 +1316,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev == &sv->sv_ev); - spin_lock(&sv->sv_lock); + cfs_spin_lock(&sv->sv_lock); LASSERT (ev->unlinked); LASSERT (ev->type == LNET_EVENT_PUT || @@ -1461,11 +1334,11 @@ srpc_lnet_ev_handler (lnet_event_t *ev) if (sv->sv_shuttingdown) { /* Leave buffer on sv->sv_posted_msgq since * srpc_finish_service needs to traverse it. */ - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); break; } - list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */ + cfs_list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */ msg = &buffer->buf_msg; type = srpc_service2request(sv->sv_id); @@ -1480,39 +1353,31 @@ srpc_lnet_ev_handler (lnet_event_t *ev) ev->status, ev->mlength, msg->msg_type, msg->msg_magic); - /* NB might drop sv_lock in srpc_service_recycle_buffer, - * sv_nposted_msg++ as an implicit reference to prevent - * sv from disappearing under me */ - sv->sv_nposted_msg++; - srpc_service_recycle_buffer(sv, buffer); - sv->sv_nposted_msg--; - spin_unlock(&sv->sv_lock); - - if (ev->status == 0) { /* status!=0 counted already */ - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters.errors++; - spin_unlock(&srpc_data.rpc_glock); - } - break; + /* NB can't call srpc_service_recycle_buffer here since + * it may call LNetM[DE]Attach. The invalid magic tells + * srpc_handle_rpc to drop this RPC */ + msg->msg_magic = 0; } - if (!list_empty(&sv->sv_free_rpcq)) { - srpc = list_entry(sv->sv_free_rpcq.next, - srpc_server_rpc_t, srpc_list); - list_del(&srpc->srpc_list); + if (!cfs_list_empty(&sv->sv_free_rpcq)) { + srpc = cfs_list_entry(sv->sv_free_rpcq.next, + srpc_server_rpc_t, srpc_list); + cfs_list_del(&srpc->srpc_list); srpc_init_server_rpc(srpc, sv, buffer); - list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq); + cfs_list_add_tail(&srpc->srpc_list, + &sv->sv_active_rpcq); srpc_schedule_server_rpc(srpc); } else { - list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq); + cfs_list_add_tail(&buffer->buf_list, + &sv->sv_blocked_msgq); } - spin_unlock(&sv->sv_lock); + cfs_spin_unlock(&sv->sv_lock); - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_rcvd++; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); break; case SRPC_BULK_GET_RPLD: @@ -1520,23 +1385,19 @@ srpc_lnet_ev_handler (lnet_event_t *ev) ev->type == LNET_EVENT_REPLY || ev->type == LNET_EVENT_UNLINK); - if (ev->type == LNET_EVENT_SEND && !ev->unlinked) { - if (ev->status == 0) - break; /* wait for the final LNET_EVENT_REPLY */ - else - fired_flag = 0; /* LNET_EVENT_REPLY may arrive - (optimized GET case) */ - } + if (!ev->unlinked) + break; /* wait for final event */ + case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); if (rpcev->ev_type == SRPC_BULK_GET_RPLD) srpc_data.rpc_counters.bulk_get += ev->mlength; else srpc_data.rpc_counters.bulk_put += ev->mlength; - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); } case SRPC_REPLY_SENT: srpc = rpcev->ev_data; @@ -1544,15 +1405,14 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev == &srpc->srpc_ev); - spin_lock(&sv->sv_lock); - if (fired_flag) - rpcev->ev_fired = 1; + cfs_spin_lock(&sv->sv_lock); + rpcev->ev_fired = 1; rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; - srpc_schedule_server_rpc(srpc); - spin_unlock(&sv->sv_lock); + + cfs_spin_unlock(&sv->sv_lock); break; } @@ -1589,23 +1449,10 @@ srpc_check_event (int timeout) int srpc_startup (void) { - int i; int rc; -#ifndef __KERNEL__ - char *s; - - s = getenv("SRPC_PEER_CREDITS"); - srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits; -#endif - - if (srpc_peer_credits <= 0) { - CERROR("Peer credits must be positive: %d\n", srpc_peer_credits); - return -EINVAL; - } - memset(&srpc_data, 0, sizeof(struct smoketest_rpc)); - spin_lock_init(&srpc_data.rpc_glock); + cfs_spin_lock_init(&srpc_data.rpc_glock); /* 1 second pause to avoid timestamp reuse */ cfs_pause(cfs_time_seconds(1)); @@ -1613,16 +1460,6 @@ srpc_startup (void) srpc_data.rpc_state = SRPC_STATE_NONE; - LIBCFS_ALLOC(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); - if (srpc_data.rpc_peers == NULL) { - CERROR ("Failed to alloc peer hash.\n"); - return -ENOMEM; - } - - for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) - CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]); - #ifdef __KERNEL__ rc = LNetNIInit(LUSTRE_SRV_LNET_PID); #else @@ -1633,14 +1470,12 @@ srpc_startup (void) #endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); - LIBCFS_FREE(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return rc; } srpc_data.rpc_state = SRPC_STATE_NI_INIT; - srpc_data.rpc_lnet_eq = LNET_EQ_NONE; + LNetInvalidateHandle(&srpc_data.rpc_lnet_eq); #ifdef __KERNEL__ rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq); #else @@ -1687,7 +1522,7 @@ srpc_shutdown (void) default: LBUG (); case SRPC_STATE_RUNNING: - spin_lock(&srpc_data.rpc_glock); + cfs_spin_lock(&srpc_data.rpc_glock); for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) { srpc_service_t *sv = srpc_data.rpc_services[i]; @@ -1697,7 +1532,7 @@ srpc_shutdown (void) i, sv->sv_name); } - spin_unlock(&srpc_data.rpc_glock); + cfs_spin_unlock(&srpc_data.rpc_glock); stt_shutdown(); @@ -1715,24 +1550,5 @@ srpc_shutdown (void) break; } - /* srpc_peer_t's are kept in hash until shutdown */ - for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) { - srpc_peer_t *peer; - - while (!list_empty(&srpc_data.rpc_peers[i])) { - peer = list_entry(srpc_data.rpc_peers[i].next, - srpc_peer_t, stp_list); - list_del(&peer->stp_list); - - LASSERT (list_empty(&peer->stp_rpcq)); - LASSERT (list_empty(&peer->stp_ctl_rpcq)); - LASSERT (peer->stp_credits == srpc_peer_credits); - - LIBCFS_FREE(peer, sizeof(srpc_peer_t)); - } - } - - LIBCFS_FREE(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return; }