Whamcloud - gitweb
LU-1078 lnet: two minor issues in lnet
[fs/lustre-release.git] / lnet / selftest / rpc.c
index 7fee578..71b1e6d 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -26,7 +24,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
 #include "selftest.h"
 
 
-#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
-
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
         SRPC_STATE_EQ_INIT,
-        SRPC_STATE_WI_INIT,
         SRPC_STATE_RUNNING,
         SRPC_STATE_STOPPING,
 } srpc_state_t;
 
 struct smoketest_rpc {
-        spinlock_t        rpc_glock;     /* global lock */
+        cfs_spinlock_t    rpc_glock;     /* global lock */
         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
-        struct list_head *rpc_peers;     /* hash table of known peers */
         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
         srpc_state_t      rpc_state;
         srpc_counters_t   rpc_counters;
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
-static int srpc_peer_credits = 16;
-CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
-                "# in-flight RPCs per peer (16 by default)");
-
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
 void srpc_get_counters (srpc_counters_t *cnt)
 {
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
         *cnt = srpc_data.rpc_counters;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
 }
 
 void srpc_set_counters (const srpc_counters_t *cnt)
 {
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
         srpc_data.rpc_counters = *cnt;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
 }
 
 void
@@ -181,99 +171,14 @@ srpc_alloc_bulk (int npages, int sink)
         return bk;
 }
 
-
-static inline struct list_head *
-srpc_nid2peerlist (lnet_nid_t nid)
-{
-        unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
-
-        return &srpc_data.rpc_peers[hash];
-}
-
-static inline srpc_peer_t *
-srpc_create_peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
-        if (peer == NULL) {
-                CERROR ("Failed to allocate peer structure for %s\n",
-                        libcfs_nid2str(nid));
-                return NULL;
-        }
-
-        memset(peer, 0, sizeof(srpc_peer_t));
-        peer->stp_nid     = nid;
-        peer->stp_credits = srpc_peer_credits;
-
-        spin_lock_init(&peer->stp_lock);
-        CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
-        CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
-        return peer;
-}
-
-srpc_peer_t *
-srpc_find_peer_locked (lnet_nid_t nid)
-{
-        struct list_head *peer_list = srpc_nid2peerlist(nid);
-        srpc_peer_t      *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        list_for_each_entry (peer, peer_list, stp_list) {
-                if (peer->stp_nid == nid)
-                        return peer;
-        }
-
-        return NULL;
-}
-
-static srpc_peer_t *
-srpc_nid2peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-        srpc_peer_t *new_peer;
-
-        spin_lock(&srpc_data.rpc_glock);
-        peer = srpc_find_peer_locked(nid);
-        spin_unlock(&srpc_data.rpc_glock);
-
-        if (peer != NULL)
-                return peer;
-
-        new_peer = srpc_create_peer(nid);
-
-        spin_lock(&srpc_data.rpc_glock);
-
-        peer = srpc_find_peer_locked(nid);
-        if (peer != NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                if (new_peer != NULL)
-                        LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
-
-                return peer;
-        }
-
-        if (new_peer == NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                return NULL;
-        }
-
-        list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
-        spin_unlock(&srpc_data.rpc_glock);
-        return new_peer;
-}
-
 static inline __u64
 srpc_next_id (void)
 {
         __u64 id;
 
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
         id = srpc_data.rpc_matchbits++;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
         return id;
 }
 
@@ -282,7 +187,9 @@ srpc_init_server_rpc (srpc_server_rpc_t *rpc,
                       srpc_service_t *sv, srpc_buffer_t *buffer)
 {
         memset(rpc, 0, sizeof(*rpc));
-        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
+        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
+                          sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ?
+                          CFS_WI_SCHED_SERIAL : CFS_WI_SCHED_ANY);
 
         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 
@@ -290,7 +197,7 @@ srpc_init_server_rpc (srpc_server_rpc_t *rpc,
         rpc->srpc_reqstbuf = buffer;
         rpc->srpc_peer     = buffer->buf_peer;
         rpc->srpc_self     = buffer->buf_self;
-        rpc->srpc_replymdh = LNET_INVALID_HANDLE;
+        LNetInvalidateHandle(&rpc->srpc_replymdh);
 }
 
 int
@@ -303,22 +210,22 @@ srpc_add_service (srpc_service_t *sv)
         LASSERT (sv->sv_concur > 0);
         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
 
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
 
         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
 
         if (srpc_data.rpc_services[id] != NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
                 return -EBUSY;
         }
 
         srpc_data.rpc_services[id] = sv;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
 
         sv->sv_nprune       = 0;
         sv->sv_nposted_msg  = 0;
         sv->sv_shuttingdown = 0;
-        spin_lock_init(&sv->sv_lock);
+        cfs_spin_lock_init(&sv->sv_lock);
         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
@@ -331,7 +238,7 @@ srpc_add_service (srpc_service_t *sv)
                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
                 if (rpc == NULL) goto enomem;
 
-                list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
+                cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
         }
 
         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
@@ -339,16 +246,16 @@ srpc_add_service (srpc_service_t *sv)
         return 0;
 
 enomem:
-        while (!list_empty(&sv->sv_free_rpcq)) {
-                rpc = list_entry(sv->sv_free_rpcq.next,
-                                 srpc_server_rpc_t, srpc_list);
-                list_del(&rpc->srpc_list);
+        while (!cfs_list_empty(&sv->sv_free_rpcq)) {
+                rpc = cfs_list_entry(sv->sv_free_rpcq.next,
+                                     srpc_server_rpc_t, srpc_list);
+                cfs_list_del(&rpc->srpc_list);
                 LIBCFS_FREE(rpc, sizeof(*rpc));
         }
 
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
         srpc_data.rpc_services[id] = NULL;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
         return -ENOMEM;
 }
 
@@ -357,15 +264,15 @@ srpc_remove_service (srpc_service_t *sv)
 {
         int id = sv->sv_id;
 
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
 
         if (srpc_data.rpc_services[id] != sv) {
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
                 return -ENOENT;
         }
 
         srpc_data.rpc_services[id] = NULL;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
         return 0;
 }
 
@@ -485,8 +392,10 @@ srpc_post_passive_rqtbuf(int service, void *buf, int len,
 {
         int               rc;
         int               portal;
-        lnet_process_id_t any = {.nid = LNET_NID_ANY,
-                                 .pid = LNET_PID_ANY};
+        lnet_process_id_t any = {0};
+
+        any.nid = LNET_NID_ANY;
+        any.pid = LNET_PID_ANY;
 
         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
                 portal = SRPC_REQUEST_PORTAL;
@@ -506,10 +415,10 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
 
         LASSERT (!sv->sv_shuttingdown);
 
-        buf->buf_mdh = LNET_INVALID_HANDLE;
-        list_add(&buf->buf_list, &sv->sv_posted_msgq);
+        LNetInvalidateHandle(&buf->buf_mdh);
+        cfs_list_add(&buf->buf_list, &sv->sv_posted_msgq);
         sv->sv_nposted_msg++;
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
 
         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
                                       &buf->buf_mdh, &sv->sv_ev);
@@ -518,17 +427,17 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
          * msg and its event handler has been called. So we must add
          * buf to sv_posted_msgq _before_ dropping sv_lock */
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
         if (rc == 0) {
                 if (sv->sv_shuttingdown) {
-                        spin_unlock(&sv->sv_lock);
+                        cfs_spin_unlock(&sv->sv_lock);
 
                         /* srpc_shutdown_service might have tried to unlink me
                          * when my buf_mdh was still invalid */
                         LNetMDUnlink(buf->buf_mdh);
 
-                        spin_lock(&sv->sv_lock);
+                        cfs_spin_lock(&sv->sv_lock);
                 }
                 return 0;
         }
@@ -536,11 +445,11 @@ srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
         sv->sv_nposted_msg--;
         if (sv->sv_shuttingdown) return rc;
 
-        list_del(&buf->buf_list);
+        cfs_list_del(&buf->buf_list);
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
         LIBCFS_FREE(buf, sizeof(*buf));
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
         return rc;
 }
 
@@ -558,9 +467,9 @@ srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
                 LIBCFS_ALLOC(buf, sizeof(*buf));
                 if (buf == NULL) break;
 
-                spin_lock(&sv->sv_lock);
+                cfs_spin_lock(&sv->sv_lock);
                 rc = srpc_service_post_buffer(sv, buf);
-                spin_unlock(&sv->sv_lock);
+                cfs_spin_unlock(&sv->sv_lock);
 
                 if (rc != 0) break;
         }
@@ -574,14 +483,14 @@ srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
         LASSERTF (nbuffer > 0,
                   "nbuffer must be positive: %d\n", nbuffer);
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
         LASSERT (sv->sv_nprune >= 0);
         LASSERT (!sv->sv_shuttingdown);
 
         sv->sv_nprune += nbuffer;
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
         return;
 }
 
@@ -592,59 +501,58 @@ srpc_finish_service (srpc_service_t *sv)
         srpc_server_rpc_t *rpc;
         srpc_buffer_t     *buf;
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
 
-        if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
+        if (sv->sv_nposted_msg != 0 || !cfs_list_empty(&sv->sv_active_rpcq)) {
                 CDEBUG (D_NET,
                         "waiting for %d posted buffers to unlink and "
                         "in-flight RPCs to die.\n",
                         sv->sv_nposted_msg);
 
-                if (!list_empty(&sv->sv_active_rpcq)) {
-                        rpc = list_entry(sv->sv_active_rpcq.next,
-                                         srpc_server_rpc_t, srpc_list);
-                        CDEBUG (D_NETERROR,
-                                "Active RPC on shutdown: sv %s, peer %s, "
+                if (!cfs_list_empty(&sv->sv_active_rpcq)) {
+                        rpc = cfs_list_entry(sv->sv_active_rpcq.next,
+                                             srpc_server_rpc_t, srpc_list);
+                        CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
                                 "wi %s scheduled %d running %d, "
                                 "ev fired %d type %d status %d lnet %d\n",
-                                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                                swi_state2str(rpc->srpc_wi.wi_state),
-                                rpc->srpc_wi.wi_scheduled,
-                                rpc->srpc_wi.wi_running,
+                                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                                swi_state2str(rpc->srpc_wi.swi_state),
+                                rpc->srpc_wi.swi_workitem.wi_scheduled,
+                                rpc->srpc_wi.swi_workitem.wi_running,
                                 rpc->srpc_ev.ev_fired,
                                 rpc->srpc_ev.ev_type,
                                 rpc->srpc_ev.ev_status,
                                 rpc->srpc_ev.ev_lnet);
                 }
 
-                spin_unlock(&sv->sv_lock);
+                cfs_spin_unlock(&sv->sv_lock);
                 return 0;
         }
 
-        spin_unlock(&sv->sv_lock); /* no lock needed from now on */
+        cfs_spin_unlock(&sv->sv_lock); /* no lock needed from now on */
 
         for (;;) {
-                struct list_head *q;
+                cfs_list_t *q;
 
-                if (!list_empty(&sv->sv_posted_msgq))
+                if (!cfs_list_empty(&sv->sv_posted_msgq))
                         q = &sv->sv_posted_msgq;
-                else if (!list_empty(&sv->sv_blocked_msgq))
+                else if (!cfs_list_empty(&sv->sv_blocked_msgq))
                         q = &sv->sv_blocked_msgq;
                 else
                         break;
 
-                buf = list_entry(q->next, srpc_buffer_t, buf_list);
-                list_del(&buf->buf_list);
+                buf = cfs_list_entry(q->next, srpc_buffer_t, buf_list);
+                cfs_list_del(&buf->buf_list);
 
                 LIBCFS_FREE(buf, sizeof(*buf));
         }
 
-        while (!list_empty(&sv->sv_free_rpcq)) {
-                rpc = list_entry(sv->sv_free_rpcq.next,
-                                 srpc_server_rpc_t, srpc_list);
-                list_del(&rpc->srpc_list);
+        while (!cfs_list_empty(&sv->sv_free_rpcq)) {
+                rpc = cfs_list_entry(sv->sv_free_rpcq.next,
+                                     srpc_server_rpc_t, srpc_list);
+                cfs_list_del(&rpc->srpc_list);
                 LIBCFS_FREE(rpc, sizeof(*rpc));
         }
 
@@ -665,9 +573,38 @@ srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
 
         sv->sv_nprune--;
 free:
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
         LIBCFS_FREE(buf, sizeof(*buf));
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
+}
+
+/* called with srpc_service_t::sv_lock held */
+inline void
+srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
+{
+        swi_schedule_workitem(&rpc->srpc_wi);
+}
+
+void
+srpc_abort_service (srpc_service_t *sv)
+{
+        srpc_server_rpc_t *rpc;
+
+        cfs_spin_lock(&sv->sv_lock);
+
+        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
+
+        /* schedule in-flight RPCs to notice the abort, NB:
+         * racing with incoming RPCs; complete fix should make test
+         * RPCs carry session ID in its headers */
+        cfs_list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
+                rpc->srpc_aborted = 1;
+                srpc_schedule_server_rpc(rpc);
+        }
+
+        cfs_spin_unlock(&sv->sv_lock);
+        return;
 }
 
 void
@@ -676,23 +613,25 @@ srpc_shutdown_service (srpc_service_t *sv)
         srpc_server_rpc_t *rpc;
         srpc_buffer_t     *buf;
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
-        CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
-                sv->sv_id, sv->sv_name);
+        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
 
         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 
         /* schedule in-flight RPCs to notice the shutdown */
-        list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
-                swi_schedule_workitem(&rpc->srpc_wi);
+        cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq,
+                                       srpc_server_rpc_t, srpc_list) {
+                srpc_schedule_server_rpc(rpc);
         }
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
 
         /* OK to traverse sv_posted_msgq without lock, since no one
          * touches sv_posted_msgq now */
-        list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
+        cfs_list_for_each_entry_typed (buf, &sv->sv_posted_msgq,
+                                       srpc_buffer_t, buf_list)
                 LNetMDUnlink(buf->buf_mdh);
 
         return;
@@ -809,20 +748,6 @@ srpc_do_bulk (srpc_server_rpc_t *rpc)
         return rc;
 }
 
-/* called with srpc_service_t::sv_lock held */
-inline void
-srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
-{
-        srpc_service_t *sv = rpc->srpc_service;
-
-        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                swi_schedule_workitem(&rpc->srpc_wi);
-        else    /* framework RPCs are handled one by one */
-                swi_schedule_serial_workitem(&rpc->srpc_wi);
-
-        return;
-}
-
 /* only called from srpc_handle_rpc */
 void
 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
@@ -830,26 +755,26 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
         srpc_service_t *sv = rpc->srpc_service;
         srpc_buffer_t  *buffer;
 
-        LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
+        LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 
         rpc->srpc_status = status;
 
-        CDEBUG (status == 0 ? D_NET : D_NETERROR,
-                "Server RPC done: service %s, peer %s, status %s:%d\n",
-                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                swi_state2str(rpc->srpc_wi.wi_state), status);
+        CDEBUG_LIMIT (status == 0 ? D_NET : D_NETERROR,
+                "Server RPC %p done: service %s, peer %s, status %s:%d\n",
+                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                swi_state2str(rpc->srpc_wi.swi_state), status);
 
         if (status != 0) {
-                spin_lock(&srpc_data.rpc_glock);
+                cfs_spin_lock(&srpc_data.rpc_glock);
                 srpc_data.rpc_counters.rpcs_dropped++;
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
         }
 
         if (rpc->srpc_done != NULL)
                 (*rpc->srpc_done) (rpc);
         LASSERT (rpc->srpc_bulk == NULL);
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
         if (rpc->srpc_reqstbuf != NULL) {
                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
@@ -858,7 +783,7 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
                 rpc->srpc_reqstbuf = NULL;
         }
 
-        list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
+        cfs_list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
 
         /*
          * No one can schedule me now since:
@@ -869,19 +794,19 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
         LASSERT (rpc->srpc_ev.ev_fired);
         swi_kill_workitem(&rpc->srpc_wi);
 
-        if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
-                buffer = list_entry(sv->sv_blocked_msgq.next,
+        if (!sv->sv_shuttingdown && !cfs_list_empty(&sv->sv_blocked_msgq)) {
+                buffer = cfs_list_entry(sv->sv_blocked_msgq.next,
                                     srpc_buffer_t, buf_list);
-                list_del(&buffer->buf_list);
+                cfs_list_del(&buffer->buf_list);
 
                 srpc_init_server_rpc(rpc, sv, buffer);
-                list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
+                cfs_list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
                 srpc_schedule_server_rpc(rpc);
         } else {
-                list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
+                cfs_list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
         }
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
         return;
 }
 
@@ -889,17 +814,17 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
 int
 srpc_handle_rpc (swi_workitem_t *wi)
 {
-        srpc_server_rpc_t *rpc = wi->wi_data;
+        srpc_server_rpc_t *rpc = wi->swi_workitem.wi_data;
         srpc_service_t    *sv = rpc->srpc_service;
         srpc_event_t      *ev = &rpc->srpc_ev;
         int                rc = 0;
 
         LASSERT (wi == &rpc->srpc_wi);
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
-        if (sv->sv_shuttingdown) {
-                spin_unlock(&sv->sv_lock);
+        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
+                cfs_spin_unlock(&sv->sv_lock);
 
                 if (rpc->srpc_bulk != NULL)
                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
@@ -912,9 +837,9 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 return 0;
         }
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
 
-        switch (wi->wi_state) {
+        switch (wi->swi_state) {
         default:
                 LBUG ();
         case SWI_STATE_NEWBORN: {
@@ -924,8 +849,11 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 msg = &rpc->srpc_reqstbuf->buf_msg;
                 reply = &rpc->srpc_replymsg.msg_body.reply;
 
-                if (msg->msg_version != SRPC_MSG_VERSION &&
-                    msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
+                if (msg->msg_magic == 0) {
+                        /* moaned already in srpc_lnet_ev_handler */
+                        rc = EBADMSG;
+                } else if (msg->msg_version != SRPC_MSG_VERSION &&
+                           msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
                                msg->msg_version, SRPC_MSG_VERSION,
                                libcfs_id2str(rpc->srpc_peer));
@@ -941,7 +869,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                         return 1;
                 }
 
-                wi->wi_state = SWI_STATE_BULK_STARTED;
+                wi->swi_state = SWI_STATE_BULK_STARTED;
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = srpc_do_bulk(rpc);
@@ -953,8 +881,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 }
         }
         case SWI_STATE_BULK_STARTED:
-                /* we cannot LASSERT ev_fired right here because it
-                 * may be set only upon an event with unlinked==1 */
+                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
@@ -963,21 +890,12 @@ srpc_handle_rpc (swi_workitem_t *wi)
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
-                                if (ev->ev_fired) {
-                                        srpc_server_rpc_done(rpc, rc);
-                                        return 1;
-                                }
-
-                                rpc->srpc_status = rc;
-                                wi->wi_state     = SWI_STATE_BULK_ERRORED;
-                                LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
-                                return 0; /* wait for UNLINK event  */
+                                srpc_server_rpc_done(rpc, rc);
+                                return 1;
                         }
                 }
 
-                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
-
-                wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
+                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
                         return 0; /* wait for reply */
@@ -985,18 +903,17 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 return 1;
 
         case SWI_STATE_REPLY_SUBMITTED:
-                LASSERT (ev->ev_fired);
+                if (!ev->ev_fired) {
+                        CERROR("RPC %p: bulk %p, service %d\n",
+                               rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id);
+                        CERROR("Event: status %d, type %d, lnet %d\n",
+                               ev->ev_status, ev->ev_type, ev->ev_lnet);
+                        LASSERT (ev->ev_fired);
+                }
 
-                wi->wi_state = SWI_STATE_DONE;
+                wi->swi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
-
-        case SWI_STATE_BULK_ERRORED:
-                LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
-                LASSERT (rpc->srpc_status != 0);
-
-                srpc_server_rpc_done(rpc, rpc->srpc_status);
-                return 1;
         }
 
         return 0;
@@ -1011,16 +928,16 @@ srpc_client_rpc_expired (void *data)
                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
                rpc->crpc_timeout);
 
-        spin_lock(&rpc->crpc_lock);
+        cfs_spin_lock(&rpc->crpc_lock);
 
         rpc->crpc_timeout = 0;
         srpc_abort_rpc(rpc, -ETIMEDOUT);
 
-        spin_unlock(&rpc->crpc_lock);
+        cfs_spin_unlock(&rpc->crpc_lock);
 
-        spin_lock(&srpc_data.rpc_glock);
+        cfs_spin_lock(&srpc_data.rpc_glock);
         srpc_data.rpc_counters.rpcs_expired++;
-        spin_unlock(&srpc_data.rpc_glock);
+        cfs_spin_unlock(&srpc_data.rpc_glock);
         return;
 }
 
@@ -1057,11 +974,11 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
 #ifdef __KERNEL__
         /* timer detonated, wait for it to explode */
         while (rpc->crpc_timeout != 0) {
-                spin_unlock(&rpc->crpc_lock);
+                cfs_spin_unlock(&rpc->crpc_lock);
 
                 cfs_schedule();
 
-                spin_lock(&rpc->crpc_lock);
+                cfs_spin_lock(&rpc->crpc_lock);
         }
 #else
         LBUG(); /* impossible in single-threaded runtime */
@@ -1070,47 +987,13 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
 }
 
 void
-srpc_check_sends (srpc_peer_t *peer, int credits)
-{
-        struct list_head  *q;
-        srpc_client_rpc_t *rpc;
-
-        LASSERT (credits >= 0);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        spin_lock(&peer->stp_lock);
-        peer->stp_credits += credits;
-
-        while (peer->stp_credits) {
-                if (!list_empty(&peer->stp_ctl_rpcq))
-                        q = &peer->stp_ctl_rpcq;
-                else if (!list_empty(&peer->stp_rpcq))
-                        q = &peer->stp_rpcq;
-                else
-                        break;
-
-                peer->stp_credits--;
-
-                rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
-                list_del_init(&rpc->crpc_privl);
-                srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
-
-                swi_schedule_workitem(&rpc->crpc_wi);
-        }
-
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-void
 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 {
         swi_workitem_t *wi = &rpc->crpc_wi;
-        srpc_peer_t    *peer = rpc->crpc_peer;
 
-        LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
+        LASSERT (status != 0 || wi->swi_state == SWI_STATE_DONE);
 
-        spin_lock(&rpc->crpc_lock);
+        cfs_spin_lock(&rpc->crpc_lock);
 
         rpc->crpc_closed = 1;
         if (rpc->crpc_status == 0)
@@ -1118,10 +1001,10 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 
         srpc_del_client_rpc_timer(rpc);
 
-        CDEBUG ((status == 0) ? D_NET : D_NETERROR,
+        CDEBUG_LIMIT ((status == 0) ? D_NET : D_NETERROR,
                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
-                swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
+                swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
 
         /*
          * No one can schedule me now since:
@@ -1134,12 +1017,9 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
         LASSERT (!srpc_event_pending(rpc));
         swi_kill_workitem(wi);
 
-        spin_unlock(&rpc->crpc_lock);
+        cfs_spin_unlock(&rpc->crpc_lock);
 
         (*rpc->crpc_done) (rpc);
-
-        if (peer != NULL)
-                srpc_check_sends(peer, 1);
         return;
 }
 
@@ -1148,23 +1028,23 @@ int
 srpc_send_rpc (swi_workitem_t *wi)
 {
         int                rc = 0;
-        srpc_client_rpc_t *rpc = wi->wi_data;
+        srpc_client_rpc_t *rpc = wi->swi_workitem.wi_data;
         srpc_msg_t        *reply = &rpc->crpc_replymsg;
         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
 
         LASSERT (rpc != NULL);
         LASSERT (wi == &rpc->crpc_wi);
 
-        spin_lock(&rpc->crpc_lock);
+        cfs_spin_lock(&rpc->crpc_lock);
 
         if (rpc->crpc_aborted) {
-                spin_unlock(&rpc->crpc_lock);
+                cfs_spin_unlock(&rpc->crpc_lock);
                 goto abort;
         }
 
-        spin_unlock(&rpc->crpc_lock);
+        cfs_spin_unlock(&rpc->crpc_lock);
 
-        switch (wi->wi_state) {
+        switch (wi->swi_state) {
         default:
                 LBUG ();
         case SWI_STATE_NEWBORN:
@@ -1179,7 +1059,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = srpc_prepare_bulk(rpc);
                 if (rc != 0) break;
 
-                wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
+                wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
                 rc = srpc_send_request(rpc);
                 break;
 
@@ -1192,7 +1072,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = rpc->crpc_reqstev.ev_status;
                 if (rc != 0) break;
 
-                wi->wi_state = SWI_STATE_REQUEST_SENT;
+                wi->swi_state = SWI_STATE_REQUEST_SENT;
                 /* perhaps more events, fall thru */
         case SWI_STATE_REQUEST_SENT: {
                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
@@ -1223,7 +1103,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
                 }
 
-                wi->wi_state = SWI_STATE_REPLY_RECEIVED;
+                wi->swi_state = SWI_STATE_REPLY_RECEIVED;
         }
         case SWI_STATE_REPLY_RECEIVED:
                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
@@ -1238,15 +1118,15 @@ srpc_send_rpc (swi_workitem_t *wi)
                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
                         rc = 0;
 
-                wi->wi_state = SWI_STATE_DONE;
+                wi->swi_state = SWI_STATE_DONE;
                 srpc_client_rpc_done(rpc, rc);
                 return 1;
         }
 
         if (rc != 0) {
-                spin_lock(&rpc->crpc_lock);
+                cfs_spin_lock(&rpc->crpc_lock);
                 srpc_abort_rpc(rpc, rc);
-                spin_unlock(&rpc->crpc_lock);
+                cfs_spin_unlock(&rpc->crpc_lock);
         }
 
 abort:
@@ -1282,38 +1162,9 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service,
 }
 
 /* called with rpc->crpc_lock held */
-static inline void
-srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
-{
-        int service = rpc->crpc_service;
-
-        LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        rpc->crpc_peer = peer;
-
-        spin_lock(&peer->stp_lock);
-
-        /* Framework RPCs that alter session state shall take precedence
-         * over test RPCs and framework query RPCs */
-        if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
-            service != SRPC_SERVICE_DEBUG &&
-            service != SRPC_SERVICE_QUERY_STAT)
-                list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
-        else
-                list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
-
-        srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-/* called with rpc->crpc_lock held */
 void
 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 {
-        srpc_peer_t *peer = rpc->crpc_peer;
-
         LASSERT (why != 0);
 
         if (rpc->crpc_aborted || /* already aborted */
@@ -1323,23 +1174,10 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
         CDEBUG (D_NET,
                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
-                swi_state2str(rpc->crpc_wi.wi_state), why);
+                swi_state2str(rpc->crpc_wi.swi_state), why);
 
         rpc->crpc_aborted = 1;
         rpc->crpc_status  = why;
-
-        if (peer != NULL) {
-                spin_lock(&peer->stp_lock);
-
-                if (!list_empty(&rpc->crpc_privl)) { /* still queued */
-                        list_del_init(&rpc->crpc_privl);
-                        srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
-                        rpc->crpc_peer = NULL;       /* no credit taken */
-                }
-
-                spin_unlock(&peer->stp_lock);
-        }
-
         swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
@@ -1348,10 +1186,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 void
 srpc_post_rpc (srpc_client_rpc_t *rpc)
 {
-        srpc_peer_t *peer;
-
         LASSERT (!rpc->crpc_aborted);
-        LASSERT (rpc->crpc_peer == NULL);
         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
 
@@ -1360,18 +1195,7 @@ srpc_post_rpc (srpc_client_rpc_t *rpc)
                 rpc->crpc_timeout);
 
         srpc_add_client_rpc_timer(rpc);
-
-        peer = srpc_nid2peer(rpc->crpc_dest.nid);
-        if (peer == NULL) {
-                srpc_abort_rpc(rpc, -ENOMEM);
-                return;
-        }
-
-        srpc_queue_rpc(peer, rpc);
-
-        spin_unlock(&rpc->crpc_lock);
-        srpc_check_sends(peer, 0);
-        spin_lock(&rpc->crpc_lock);
+        swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
 
@@ -1389,7 +1213,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc)
         LASSERT (buffer != NULL);
         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
 
-        spin_lock(&sv->sv_lock);
+        cfs_spin_lock(&sv->sv_lock);
 
         if (!sv->sv_shuttingdown &&
             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
@@ -1400,7 +1224,7 @@ srpc_send_reply (srpc_server_rpc_t *rpc)
                 rpc->srpc_reqstbuf = NULL;
         }
 
-        spin_unlock(&sv->sv_lock);
+        cfs_spin_unlock(&sv->sv_lock);
 
         ev->ev_fired = 0;
         ev->ev_data  = rpc;
@@ -1430,36 +1254,44 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
-        int                fired_flag = 1;
 
-        LASSERT (!in_interrupt());
+        LASSERT (!cfs_in_interrupt());
 
         if (ev->status != 0) {
-                spin_lock(&srpc_data.rpc_glock);
+                cfs_spin_lock(&srpc_data.rpc_glock);
                 srpc_data.rpc_counters.errors++;
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
         }
 
         rpcev->ev_lnet = ev->type;
 
         switch (rpcev->ev_type) {
         default:
+                CERROR("Unknown event: status %d, type %d, lnet %d\n",
+                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
                 LBUG ();
         case SRPC_REQUEST_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
-                        spin_lock(&srpc_data.rpc_glock);
+                        cfs_spin_lock(&srpc_data.rpc_glock);
                         srpc_data.rpc_counters.rpcs_sent++;
-                        spin_unlock(&srpc_data.rpc_glock);
+                        cfs_spin_unlock(&srpc_data.rpc_glock);
                 }
         case SRPC_REPLY_RCVD:
         case SRPC_BULK_REQ_RCVD:
                 crpc = rpcev->ev_data;
 
-                LASSERT (rpcev == &crpc->crpc_reqstev ||
-                         rpcev == &crpc->crpc_replyev ||
-                         rpcev == &crpc->crpc_bulkev);
+                if (rpcev != &crpc->crpc_reqstev &&
+                    rpcev != &crpc->crpc_replyev &&
+                    rpcev != &crpc->crpc_bulkev) {
+                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
+                               rpcev, crpc, &crpc->crpc_reqstev,
+                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
+                        CERROR("Bad event: status %d, type %d, lnet %d\n",
+                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
+                        LBUG ();
+                }
 
-                spin_lock(&crpc->crpc_lock);
+                cfs_spin_lock(&crpc->crpc_lock);
 
                 LASSERT (rpcev->ev_fired == 0);
                 rpcev->ev_fired  = 1;
@@ -1467,7 +1299,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                                                 -EINTR : ev->status;
                 swi_schedule_workitem(&crpc->crpc_wi);
 
-                spin_unlock(&crpc->crpc_lock);
+                cfs_spin_unlock(&crpc->crpc_lock);
                 break;
 
         case SRPC_REQUEST_RCVD:
@@ -1475,7 +1307,7 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
                 LASSERT (rpcev == &sv->sv_ev);
 
-                spin_lock(&sv->sv_lock);
+                cfs_spin_lock(&sv->sv_lock);
 
                 LASSERT (ev->unlinked);
                 LASSERT (ev->type == LNET_EVENT_PUT ||
@@ -1493,11 +1325,11 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 if (sv->sv_shuttingdown) {
                         /* Leave buffer on sv->sv_posted_msgq since
                          * srpc_finish_service needs to traverse it. */
-                        spin_unlock(&sv->sv_lock);
+                        cfs_spin_unlock(&sv->sv_lock);
                         break;
                 }
 
-                list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
+                cfs_list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
                 msg = &buffer->buf_msg;
                 type = srpc_service2request(sv->sv_id);
 
@@ -1512,39 +1344,31 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                                 ev->status, ev->mlength,
                                 msg->msg_type, msg->msg_magic);
 
-                        /* NB might drop sv_lock in srpc_service_recycle_buffer,
-                         * sv_nposted_msg++ as an implicit reference to prevent
-                         * sv from disappearing under me */
-                        sv->sv_nposted_msg++;
-                        srpc_service_recycle_buffer(sv, buffer);
-                        sv->sv_nposted_msg--;
-                        spin_unlock(&sv->sv_lock);
-
-                        if (ev->status == 0) { /* status!=0 counted already */
-                                spin_lock(&srpc_data.rpc_glock);
-                                srpc_data.rpc_counters.errors++;
-                                spin_unlock(&srpc_data.rpc_glock);
-                        }
-                        break;
+                        /* NB can't call srpc_service_recycle_buffer here since
+                         * it may call LNetM[DE]Attach. The invalid magic tells
+                         * srpc_handle_rpc to drop this RPC */
+                        msg->msg_magic = 0;
                 }
 
-                if (!list_empty(&sv->sv_free_rpcq)) {
-                        srpc = list_entry(sv->sv_free_rpcq.next,
-                                          srpc_server_rpc_t, srpc_list);
-                        list_del(&srpc->srpc_list);
+                if (!cfs_list_empty(&sv->sv_free_rpcq)) {
+                        srpc = cfs_list_entry(sv->sv_free_rpcq.next,
+                                              srpc_server_rpc_t, srpc_list);
+                        cfs_list_del(&srpc->srpc_list);
 
                         srpc_init_server_rpc(srpc, sv, buffer);
-                        list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
+                        cfs_list_add_tail(&srpc->srpc_list,
+                                          &sv->sv_active_rpcq);
                         srpc_schedule_server_rpc(srpc);
                 } else {
-                        list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
+                        cfs_list_add_tail(&buffer->buf_list,
+                                          &sv->sv_blocked_msgq);
                 }
 
-                spin_unlock(&sv->sv_lock);
+                cfs_spin_unlock(&sv->sv_lock);
 
-                spin_lock(&srpc_data.rpc_glock);
+                cfs_spin_lock(&srpc_data.rpc_glock);
                 srpc_data.rpc_counters.rpcs_rcvd++;
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
                 break;
 
         case SRPC_BULK_GET_RPLD:
@@ -1552,23 +1376,19 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
-                if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
-                        if (ev->status == 0)
-                                break; /* wait for the final LNET_EVENT_REPLY */
-                        else
-                                fired_flag = 0; /* LNET_EVENT_REPLY may arrive
-                                                   (optimized GET case) */
-                }
+                if (!ev->unlinked)
+                        break; /* wait for final event */
+
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
-                        spin_lock(&srpc_data.rpc_glock);
+                        cfs_spin_lock(&srpc_data.rpc_glock);
 
                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
                         else
                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
 
-                        spin_unlock(&srpc_data.rpc_glock);
+                        cfs_spin_unlock(&srpc_data.rpc_glock);
                 }
         case SRPC_REPLY_SENT:
                 srpc = rpcev->ev_data;
@@ -1576,15 +1396,14 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
                 LASSERT (rpcev == &srpc->srpc_ev);
 
-                spin_lock(&sv->sv_lock);
-                if (fired_flag)
-                        rpcev->ev_fired  = 1;
+                cfs_spin_lock(&sv->sv_lock);
 
+                rpcev->ev_fired  = 1;
                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
-
                 srpc_schedule_server_rpc(srpc);
-                spin_unlock(&sv->sv_lock);
+
+                cfs_spin_unlock(&sv->sv_lock);
                 break;
         }
 
@@ -1621,23 +1440,10 @@ srpc_check_event (int timeout)
 int
 srpc_startup (void)
 {
-        int i;
         int rc;
 
-#ifndef __KERNEL__
-        char *s;
-
-        s = getenv("SRPC_PEER_CREDITS");
-        srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
-#endif
-
-        if (srpc_peer_credits <= 0) {
-                CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
-                return -EINVAL;
-        }
-
         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
-        spin_lock_init(&srpc_data.rpc_glock);
+        cfs_spin_lock_init(&srpc_data.rpc_glock);
 
         /* 1 second pause to avoid timestamp reuse */
         cfs_pause(cfs_time_seconds(1));
@@ -1645,16 +1451,6 @@ srpc_startup (void)
 
         srpc_data.rpc_state = SRPC_STATE_NONE;
 
-        LIBCFS_ALLOC(srpc_data.rpc_peers,
-                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
-        if (srpc_data.rpc_peers == NULL) {
-                CERROR ("Failed to alloc peer hash.\n");
-                return -ENOMEM;
-        }
-
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
-                CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
-
 #ifdef __KERNEL__
         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
 #else
@@ -1665,14 +1461,12 @@ srpc_startup (void)
 #endif
         if (rc < 0) {
                 CERROR ("LNetNIInit() has failed: %d\n", rc);
-                LIBCFS_FREE(srpc_data.rpc_peers,
-                            sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
                 return rc;
         }
 
         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
 
-        srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
+        LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
 #ifdef __KERNEL__
         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
 #else
@@ -1688,12 +1482,6 @@ srpc_startup (void)
 
         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
 
-        rc = swi_startup();
-        if (rc != 0)
-                goto bail;
-
-        srpc_data.rpc_state = SRPC_STATE_WI_INIT;
-
         rc = stt_startup();
 
 bail:
@@ -1719,7 +1507,7 @@ srpc_shutdown (void)
         default:
                 LBUG ();
         case SRPC_STATE_RUNNING:
-                spin_lock(&srpc_data.rpc_glock);
+                cfs_spin_lock(&srpc_data.rpc_glock);
 
                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
                         srpc_service_t *sv = srpc_data.rpc_services[i];
@@ -1729,13 +1517,10 @@ srpc_shutdown (void)
                                   i, sv->sv_name);
                 }
 
-                spin_unlock(&srpc_data.rpc_glock);
+                cfs_spin_unlock(&srpc_data.rpc_glock);
 
                 stt_shutdown();
 
-        case SRPC_STATE_WI_INIT:
-                swi_shutdown();
-
         case SRPC_STATE_EQ_INIT:
                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
                 LASSERT (rc == 0);
@@ -1747,24 +1532,5 @@ srpc_shutdown (void)
                 break;
         }
 
-        /* srpc_peer_t's are kept in hash until shutdown */
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
-                srpc_peer_t *peer;
-
-                while (!list_empty(&srpc_data.rpc_peers[i])) {
-                        peer = list_entry(srpc_data.rpc_peers[i].next,
-                                          srpc_peer_t, stp_list);
-                        list_del(&peer->stp_list);
-
-                        LASSERT (list_empty(&peer->stp_rpcq));
-                        LASSERT (list_empty(&peer->stp_ctl_rpcq));
-                        LASSERT (peer->stp_credits == srpc_peer_credits);
-
-                        LIBCFS_FREE(peer, sizeof(srpc_peer_t));
-                }
-        }
-
-        LIBCFS_FREE(srpc_data.rpc_peers,
-                    sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
         return;
 }