Whamcloud - gitweb
i=liangzhen,b=18075:
authorisaac <isaac>
Mon, 1 Jun 2009 21:25:03 +0000 (21:25 +0000)
committerisaac <isaac>
Mon, 1 Jun 2009 21:25:03 +0000 (21:25 +0000)
- an assortment of LNet selftest fixes and enhancements.

lnet/ChangeLog
lnet/selftest/conrpc.h
lnet/selftest/console.c
lnet/selftest/framework.c
lnet/selftest/rpc.c
lnet/selftest/selftest.h
lnet/selftest/workitem.c

index e4ee186..af2618c 100644 (file)
@@ -17,6 +17,10 @@ Bugzilla   :
 Description: 
 Details    : 
 
 Description: 
 Details    : 
 
+Severity   : normal
+Bugzilla   : 18075
+Description: LNet selftest fixes and enhancements
+
 Severity   : enhancement
 Bugzilla   : 19156
 Description: allow a test node to be a member of multiple test groups
 Severity   : enhancement
 Bugzilla   : 19156
 Description: allow a test node to be a member of multiple test groups
index ba2a72c..1bb4963 100644 (file)
@@ -54,6 +54,9 @@
 /* Console rpc and rpc transaction */
 #define LST_TRANS_TIMEOUT       30
 #define LST_TRANS_MIN_TIMEOUT   3
 /* Console rpc and rpc transaction */
 #define LST_TRANS_TIMEOUT       30
 #define LST_TRANS_MIN_TIMEOUT   3
+
+#define LST_VALIDATE_TIMEOUT(t) MIN(MAX(t, LST_TRANS_MIN_TIMEOUT), LST_TRANS_TIMEOUT)
+
 #define LST_PING_INTERVAL       8
 
 struct lstcon_rpc_trans;
 #define LST_PING_INTERVAL       8
 
 struct lstcon_rpc_trans;
index d8cbabd..12a9f05 100644 (file)
@@ -1475,9 +1475,7 @@ lstcon_ndlist_stat(struct list_head *ndlist,
                 return rc;
         }
 
                 return rc;
         }
 
-        timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout :
-                                                      LST_TRANS_MIN_TIMEOUT;
-        lstcon_rpc_trans_postwait(trans, timeout);
+        lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout));
 
         rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                           lstcon_statrpc_readent);
 
         rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                           lstcon_statrpc_readent);
@@ -1564,10 +1562,7 @@ lstcon_debug_ndlist(struct list_head *ndlist,
                 return rc;
         }
 
                 return rc;
         }
 
-        timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout :
-                                                      LST_TRANS_MIN_TIMEOUT;
-
-        lstcon_rpc_trans_postwait(trans, timeout);
+        lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout));
 
         rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                           lstcon_sesrpc_readent);
 
         rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                           lstcon_sesrpc_readent);
@@ -2000,10 +1995,10 @@ lstcon_console_fini(void)
 {
         int     i;
 
 {
         int     i;
 
-        mutex_down(&console_session.ses_mutex);
-
         libcfs_deregister_ioctl(&lstcon_ioctl_handler);
 
         libcfs_deregister_ioctl(&lstcon_ioctl_handler);
 
+        mutex_down(&console_session.ses_mutex);
+
         srpc_shutdown_service(&lstcon_acceptor_service);
         srpc_remove_service(&lstcon_acceptor_service);
 
         srpc_shutdown_service(&lstcon_acceptor_service);
         srpc_remove_service(&lstcon_acceptor_service);
 
index 73734a0..63e5089 100644 (file)
@@ -53,7 +53,7 @@ static int session_timeout = 100;
 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
                 "test session timeout in seconds (100 by default, 0 == never)");
 
 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
                 "test session timeout in seconds (100 by default, 0 == never)");
 
-#define SFW_TEST_CONCURRENCY     128
+#define SFW_TEST_CONCURRENCY     1792
 #define SFW_TEST_RPC_TIMEOUT     64
 #define SFW_CLIENT_RPC_TIMEOUT   64  /* in seconds */
 #define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
 #define SFW_TEST_RPC_TIMEOUT     64
 #define SFW_CLIENT_RPC_TIMEOUT   64  /* in seconds */
 #define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
@@ -214,6 +214,7 @@ sfw_deactivate_session (void)
         sfw_session_t *sn = sfw_data.fw_session;
         int            nactive = 0;
         sfw_batch_t   *tsb;
         sfw_session_t *sn = sfw_data.fw_session;
         int            nactive = 0;
         sfw_batch_t   *tsb;
+        sfw_test_case_t *tsc;
 
         if (sn == NULL) return;
 
 
         if (sn == NULL) return;
 
@@ -223,6 +224,15 @@ sfw_deactivate_session (void)
         atomic_inc(&sfw_data.fw_nzombies);
         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
 
         atomic_inc(&sfw_data.fw_nzombies);
         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
 
+        spin_unlock(&sfw_data.fw_lock);
+
+        cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
+                                       sfw_test_case_t, tsc_list) {
+                srpc_abort_service(tsc->tsc_srv_service);
+        }
+
+        spin_lock(&sfw_data.fw_lock);
+
         cfs_list_for_each_entry_typed (tsb, &sn->sn_batches,
                                        sfw_batch_t, bat_list) {
                 if (sfw_batch_active(tsb)) {
         cfs_list_for_each_entry_typed (tsb, &sn->sn_batches,
                                        sfw_batch_t, bat_list) {
                 if (sfw_batch_active(tsb)) {
index 942ff26..49f483f 100644 (file)
@@ -43,8 +43,6 @@
 #include "selftest.h"
 
 
 #include "selftest.h"
 
 
-#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
-
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
@@ -57,17 +55,12 @@ typedef enum {
 struct smoketest_rpc {
         spinlock_t        rpc_glock;     /* global lock */
         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
 struct smoketest_rpc {
         spinlock_t        rpc_glock;     /* global lock */
         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
-        struct list_head *rpc_peers;     /* hash table of known peers */
         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
         srpc_state_t      rpc_state;
         srpc_counters_t   rpc_counters;
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
         srpc_state_t      rpc_state;
         srpc_counters_t   rpc_counters;
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
-static int srpc_peer_credits = 16;
-CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
-                "# in-flight RPCs per peer (16 by default)");
-
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
@@ -181,92 +174,6 @@ srpc_alloc_bulk (int npages, int sink)
         return bk;
 }
 
         return bk;
 }
 
-
-static inline struct list_head *
-srpc_nid2peerlist (lnet_nid_t nid)
-{
-        unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
-
-        return &srpc_data.rpc_peers[hash];
-}
-
-static inline srpc_peer_t *
-srpc_create_peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
-        if (peer == NULL) {
-                CERROR ("Failed to allocate peer structure for %s\n",
-                        libcfs_nid2str(nid));
-                return NULL;
-        }
-
-        memset(peer, 0, sizeof(srpc_peer_t));
-        peer->stp_nid     = nid;
-        peer->stp_credits = srpc_peer_credits;
-
-        spin_lock_init(&peer->stp_lock);
-        CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
-        CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
-        return peer;
-}
-
-srpc_peer_t *
-srpc_find_peer_locked (lnet_nid_t nid)
-{
-        struct list_head *peer_list = srpc_nid2peerlist(nid);
-        srpc_peer_t      *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        cfs_list_for_each_entry_typed (peer, peer_list, 
-                                       srpc_peer_t, stp_list) {
-                if (peer->stp_nid == nid)
-                        return peer;
-        }
-
-        return NULL;
-}
-
-static srpc_peer_t *
-srpc_nid2peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-        srpc_peer_t *new_peer;
-
-        spin_lock(&srpc_data.rpc_glock);
-        peer = srpc_find_peer_locked(nid);
-        spin_unlock(&srpc_data.rpc_glock);
-
-        if (peer != NULL)
-                return peer;
-
-        new_peer = srpc_create_peer(nid);
-
-        spin_lock(&srpc_data.rpc_glock);
-
-        peer = srpc_find_peer_locked(nid);
-        if (peer != NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                if (new_peer != NULL)
-                        LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
-
-                return peer;
-        }
-
-        if (new_peer == NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                return NULL;
-        }
-
-        list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
-        spin_unlock(&srpc_data.rpc_glock);
-        return new_peer;
-}
-
 static inline __u64
 srpc_next_id (void)
 {
 static inline __u64
 srpc_next_id (void)
 {
@@ -609,10 +516,10 @@ srpc_finish_service (srpc_service_t *sv)
                         rpc = list_entry(sv->sv_active_rpcq.next,
                                          srpc_server_rpc_t, srpc_list);
                         CDEBUG (D_NETERROR,
                         rpc = list_entry(sv->sv_active_rpcq.next,
                                          srpc_server_rpc_t, srpc_list);
                         CDEBUG (D_NETERROR,
-                                "Active RPC on shutdown: sv %s, peer %s, "
+                                "Active RPC %p on shutdown: sv %s, peer %s, "
                                 "wi %s scheduled %d running %d, "
                                 "ev fired %d type %d status %d lnet %d\n",
                                 "wi %s scheduled %d running %d, "
                                 "ev fired %d type %d status %d lnet %d\n",
-                                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
                                 swi_state2str(rpc->srpc_wi.wi_state),
                                 rpc->srpc_wi.wi_scheduled,
                                 rpc->srpc_wi.wi_running,
                                 swi_state2str(rpc->srpc_wi.wi_state),
                                 rpc->srpc_wi.wi_scheduled,
                                 rpc->srpc_wi.wi_running,
@@ -673,6 +580,42 @@ free:
         spin_lock(&sv->sv_lock);
 }
 
         spin_lock(&sv->sv_lock);
 }
 
+/* called with srpc_service_t::sv_lock held */
+inline void
+srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
+{
+        srpc_service_t *sv = rpc->srpc_service;
+
+        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
+                swi_schedule_workitem(&rpc->srpc_wi);
+        else    /* framework RPCs are handled one by one */
+                swi_schedule_serial_workitem(&rpc->srpc_wi);
+
+        return;
+}
+
+void
+srpc_abort_service (srpc_service_t *sv)
+{
+        srpc_server_rpc_t *rpc;
+
+        spin_lock(&sv->sv_lock);
+
+        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
+
+        /* schedule in-flight RPCs to notice the abort, NB:
+         * racing with incoming RPCs; complete fix should make test
+         * RPCs carry session ID in its headers */
+        list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
+                rpc->srpc_aborted = 1;
+                srpc_schedule_server_rpc(rpc);
+        }
+
+        spin_unlock(&sv->sv_lock);
+        return;
+}
+
 void
 srpc_shutdown_service (srpc_service_t *sv)
 {
 void
 srpc_shutdown_service (srpc_service_t *sv)
 {
@@ -681,15 +624,15 @@ srpc_shutdown_service (srpc_service_t *sv)
 
         spin_lock(&sv->sv_lock);
 
 
         spin_lock(&sv->sv_lock);
 
-        CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
-                sv->sv_id, sv->sv_name);
+        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
 
         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 
         /* schedule in-flight RPCs to notice the shutdown */
         cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq,
                                        srpc_server_rpc_t, srpc_list) {
 
         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 
         /* schedule in-flight RPCs to notice the shutdown */
         cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq,
                                        srpc_server_rpc_t, srpc_list) {
-                swi_schedule_workitem(&rpc->srpc_wi);
+                srpc_schedule_server_rpc(rpc);
         }
 
         spin_unlock(&sv->sv_lock);
         }
 
         spin_unlock(&sv->sv_lock);
@@ -814,20 +757,6 @@ srpc_do_bulk (srpc_server_rpc_t *rpc)
         return rc;
 }
 
         return rc;
 }
 
-/* called with srpc_service_t::sv_lock held */
-inline void
-srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
-{
-        srpc_service_t *sv = rpc->srpc_service;
-
-        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                swi_schedule_workitem(&rpc->srpc_wi);
-        else    /* framework RPCs are handled one by one */
-                swi_schedule_serial_workitem(&rpc->srpc_wi);
-
-        return;
-}
-
 /* only called from srpc_handle_rpc */
 void
 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
 /* only called from srpc_handle_rpc */
 void
 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
@@ -840,8 +769,8 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
         rpc->srpc_status = status;
 
         CDEBUG (status == 0 ? D_NET : D_NETERROR,
         rpc->srpc_status = status;
 
         CDEBUG (status == 0 ? D_NET : D_NETERROR,
-                "Server RPC done: service %s, peer %s, status %s:%d\n",
-                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                "Server RPC %p done: service %s, peer %s, status %s:%d\n",
+                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
                 swi_state2str(rpc->srpc_wi.wi_state), status);
 
         if (status != 0) {
                 swi_state2str(rpc->srpc_wi.wi_state), status);
 
         if (status != 0) {
@@ -903,7 +832,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
 
         spin_lock(&sv->sv_lock);
 
 
         spin_lock(&sv->sv_lock);
 
-        if (sv->sv_shuttingdown) {
+        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
                 spin_unlock(&sv->sv_lock);
 
                 if (rpc->srpc_bulk != NULL)
                 spin_unlock(&sv->sv_lock);
 
                 if (rpc->srpc_bulk != NULL)
@@ -961,8 +890,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 }
         }
         case SWI_STATE_BULK_STARTED:
                 }
         }
         case SWI_STATE_BULK_STARTED:
-                /* we cannot LASSERT ev_fired right here because it
-                 * may be set only upon an event with unlinked==1 */
+                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
@@ -971,20 +899,11 @@ srpc_handle_rpc (swi_workitem_t *wi)
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
-                                if (ev->ev_fired) {
-                                        srpc_server_rpc_done(rpc, rc);
-                                        return 1;
-                                }
-
-                                rpc->srpc_status = rc;
-                                wi->wi_state     = SWI_STATE_BULK_ERRORED;
-                                LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
-                                return 0; /* wait for UNLINK event  */
+                                srpc_server_rpc_done(rpc, rc);
+                                return 1;
                         }
                 }
 
                         }
                 }
 
-                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
-
                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
@@ -993,18 +912,17 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 return 1;
 
         case SWI_STATE_REPLY_SUBMITTED:
                 return 1;
 
         case SWI_STATE_REPLY_SUBMITTED:
-                LASSERT (ev->ev_fired);
+                if (!ev->ev_fired) {
+                        CERROR("RPC %p: bulk %p, service %d\n",
+                               rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id);
+                        CERROR("Event: status %d, type %d, lnet %d\n",
+                               ev->ev_status, ev->ev_type, ev->ev_lnet);
+                        LASSERT (ev->ev_fired);
+                }
 
                 wi->wi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
 
                 wi->wi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
-
-        case SWI_STATE_BULK_ERRORED:
-                LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
-                LASSERT (rpc->srpc_status != 0);
-
-                srpc_server_rpc_done(rpc, rpc->srpc_status);
-                return 1;
         }
 
         return 0;
         }
 
         return 0;
@@ -1078,43 +996,9 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
 }
 
 void
 }
 
 void
-srpc_check_sends (srpc_peer_t *peer, int credits)
-{
-        struct list_head  *q;
-        srpc_client_rpc_t *rpc;
-
-        LASSERT (credits >= 0);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        spin_lock(&peer->stp_lock);
-        peer->stp_credits += credits;
-
-        while (peer->stp_credits) {
-                if (!list_empty(&peer->stp_ctl_rpcq))
-                        q = &peer->stp_ctl_rpcq;
-                else if (!list_empty(&peer->stp_rpcq))
-                        q = &peer->stp_rpcq;
-                else
-                        break;
-
-                peer->stp_credits--;
-
-                rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
-                list_del_init(&rpc->crpc_privl);
-                srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
-
-                swi_schedule_workitem(&rpc->crpc_wi);
-        }
-
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-void
 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 {
         swi_workitem_t *wi = &rpc->crpc_wi;
 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 {
         swi_workitem_t *wi = &rpc->crpc_wi;
-        srpc_peer_t    *peer = rpc->crpc_peer;
 
         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
 
 
         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
 
@@ -1145,9 +1029,6 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
         spin_unlock(&rpc->crpc_lock);
 
         (*rpc->crpc_done) (rpc);
         spin_unlock(&rpc->crpc_lock);
 
         (*rpc->crpc_done) (rpc);
-
-        if (peer != NULL)
-                srpc_check_sends(peer, 1);
         return;
 }
 
         return;
 }
 
@@ -1290,38 +1171,9 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service,
 }
 
 /* called with rpc->crpc_lock held */
 }
 
 /* called with rpc->crpc_lock held */
-static inline void
-srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
-{
-        int service = rpc->crpc_service;
-
-        LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        rpc->crpc_peer = peer;
-
-        spin_lock(&peer->stp_lock);
-
-        /* Framework RPCs that alter session state shall take precedence
-         * over test RPCs and framework query RPCs */
-        if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
-            service != SRPC_SERVICE_DEBUG &&
-            service != SRPC_SERVICE_QUERY_STAT)
-                list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
-        else
-                list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
-
-        srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-/* called with rpc->crpc_lock held */
 void
 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 {
 void
 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 {
-        srpc_peer_t *peer = rpc->crpc_peer;
-
         LASSERT (why != 0);
 
         if (rpc->crpc_aborted || /* already aborted */
         LASSERT (why != 0);
 
         if (rpc->crpc_aborted || /* already aborted */
@@ -1335,19 +1187,6 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 
         rpc->crpc_aborted = 1;
         rpc->crpc_status  = why;
 
         rpc->crpc_aborted = 1;
         rpc->crpc_status  = why;
-
-        if (peer != NULL) {
-                spin_lock(&peer->stp_lock);
-
-                if (!list_empty(&rpc->crpc_privl)) { /* still queued */
-                        list_del_init(&rpc->crpc_privl);
-                        srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
-                        rpc->crpc_peer = NULL;       /* no credit taken */
-                }
-
-                spin_unlock(&peer->stp_lock);
-        }
-
         swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
         swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
@@ -1356,10 +1195,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 void
 srpc_post_rpc (srpc_client_rpc_t *rpc)
 {
 void
 srpc_post_rpc (srpc_client_rpc_t *rpc)
 {
-        srpc_peer_t *peer;
-
         LASSERT (!rpc->crpc_aborted);
         LASSERT (!rpc->crpc_aborted);
-        LASSERT (rpc->crpc_peer == NULL);
         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
 
         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
 
@@ -1368,18 +1204,7 @@ srpc_post_rpc (srpc_client_rpc_t *rpc)
                 rpc->crpc_timeout);
 
         srpc_add_client_rpc_timer(rpc);
                 rpc->crpc_timeout);
 
         srpc_add_client_rpc_timer(rpc);
-
-        peer = srpc_nid2peer(rpc->crpc_dest.nid);
-        if (peer == NULL) {
-                srpc_abort_rpc(rpc, -ENOMEM);
-                return;
-        }
-
-        srpc_queue_rpc(peer, rpc);
-
-        spin_unlock(&rpc->crpc_lock);
-        srpc_check_sends(peer, 0);
-        spin_lock(&rpc->crpc_lock);
+        swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
 
         return;
 }
 
@@ -1438,7 +1263,6 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
-        int                fired_flag = 1;
 
         LASSERT (!in_interrupt());
 
 
         LASSERT (!in_interrupt());
 
@@ -1452,6 +1276,8 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
         switch (rpcev->ev_type) {
         default:
 
         switch (rpcev->ev_type) {
         default:
+                CERROR("Unknown event: status %d, type %d, lnet %d\n",
+                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
                 LBUG ();
         case SRPC_REQUEST_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                 LBUG ();
         case SRPC_REQUEST_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
@@ -1463,9 +1289,16 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         case SRPC_BULK_REQ_RCVD:
                 crpc = rpcev->ev_data;
 
         case SRPC_BULK_REQ_RCVD:
                 crpc = rpcev->ev_data;
 
-                LASSERT (rpcev == &crpc->crpc_reqstev ||
-                         rpcev == &crpc->crpc_replyev ||
-                         rpcev == &crpc->crpc_bulkev);
+                if (rpcev != &crpc->crpc_reqstev &&
+                    rpcev != &crpc->crpc_replyev &&
+                    rpcev != &crpc->crpc_bulkev) {
+                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
+                               rpcev, crpc, &crpc->crpc_reqstev,
+                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
+                        CERROR("Bad event: status %d, type %d, lnet %d\n",
+                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
+                        LBUG ();
+                }
 
                 spin_lock(&crpc->crpc_lock);
 
 
                 spin_lock(&crpc->crpc_lock);
 
@@ -1550,13 +1383,9 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
-                if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
-                        if (ev->status == 0)
-                                break; /* wait for the final LNET_EVENT_REPLY */
-                        else
-                                fired_flag = 0; /* LNET_EVENT_REPLY may arrive
-                                                   (optimized GET case) */
-                }
+                if (!ev->unlinked)
+                        break; /* wait for final event */
+
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                         spin_lock(&srpc_data.rpc_glock);
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                         spin_lock(&srpc_data.rpc_glock);
@@ -1575,13 +1404,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (rpcev == &srpc->srpc_ev);
 
                 spin_lock(&sv->sv_lock);
                 LASSERT (rpcev == &srpc->srpc_ev);
 
                 spin_lock(&sv->sv_lock);
-                if (fired_flag)
-                        rpcev->ev_fired  = 1;
 
 
+                rpcev->ev_fired  = 1;
                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
-
                 srpc_schedule_server_rpc(srpc);
                 srpc_schedule_server_rpc(srpc);
+
                 spin_unlock(&sv->sv_lock);
                 break;
         }
                 spin_unlock(&sv->sv_lock);
                 break;
         }
@@ -1619,21 +1447,8 @@ srpc_check_event (int timeout)
 int
 srpc_startup (void)
 {
 int
 srpc_startup (void)
 {
-        int i;
         int rc;
 
         int rc;
 
-#ifndef __KERNEL__
-        char *s;
-
-        s = getenv("SRPC_PEER_CREDITS");
-        srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
-#endif
-
-        if (srpc_peer_credits <= 0) {
-                CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
-                return -EINVAL;
-        }
-
         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
         spin_lock_init(&srpc_data.rpc_glock);
 
         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
         spin_lock_init(&srpc_data.rpc_glock);
 
@@ -1643,16 +1458,6 @@ srpc_startup (void)
 
         srpc_data.rpc_state = SRPC_STATE_NONE;
 
 
         srpc_data.rpc_state = SRPC_STATE_NONE;
 
-        LIBCFS_ALLOC(srpc_data.rpc_peers,
-                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
-        if (srpc_data.rpc_peers == NULL) {
-                CERROR ("Failed to alloc peer hash.\n");
-                return -ENOMEM;
-        }
-
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
-                CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
-
 #ifdef __KERNEL__
         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
 #else
 #ifdef __KERNEL__
         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
 #else
@@ -1663,8 +1468,6 @@ srpc_startup (void)
 #endif
         if (rc < 0) {
                 CERROR ("LNetNIInit() has failed: %d\n", rc);
 #endif
         if (rc < 0) {
                 CERROR ("LNetNIInit() has failed: %d\n", rc);
-                LIBCFS_FREE(srpc_data.rpc_peers,
-                            sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
                 return rc;
         }
 
                 return rc;
         }
 
@@ -1745,24 +1548,5 @@ srpc_shutdown (void)
                 break;
         }
 
                 break;
         }
 
-        /* srpc_peer_t's are kept in hash until shutdown */
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
-                srpc_peer_t *peer;
-
-                while (!list_empty(&srpc_data.rpc_peers[i])) {
-                        peer = list_entry(srpc_data.rpc_peers[i].next,
-                                          srpc_peer_t, stp_list);
-                        list_del(&peer->stp_list);
-
-                        LASSERT (list_empty(&peer->stp_rpcq));
-                        LASSERT (list_empty(&peer->stp_ctl_rpcq));
-                        LASSERT (peer->stp_credits == srpc_peer_credits);
-
-                        LIBCFS_FREE(peer, sizeof(srpc_peer_t));
-                }
-        }
-
-        LIBCFS_FREE(srpc_data.rpc_peers,
-                    sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
         return;
 }
         return;
 }
index edd67b1..7312eac 100644 (file)
@@ -72,7 +72,6 @@
 #define SWI_STATE_REQUEST_SENT             4
 #define SWI_STATE_REPLY_RECEIVED           5
 #define SWI_STATE_BULK_STARTED             6
 #define SWI_STATE_REQUEST_SENT             4
 #define SWI_STATE_REPLY_RECEIVED           5
 #define SWI_STATE_BULK_STARTED             6
-#define SWI_STATE_BULK_ERRORED             7
 #define SWI_STATE_DONE                     10
 
 /* forward refs */
 #define SWI_STATE_DONE                     10
 
 /* forward refs */
@@ -192,13 +191,13 @@ srpc_service2reply (int service)
 }
 
 typedef enum {
 }
 
 typedef enum {
-        SRPC_BULK_REQ_RCVD   = 0, /* passive bulk request(PUT sink/GET source) received */
-        SRPC_BULK_PUT_SENT   = 1, /* active bulk PUT sent (source) */
-        SRPC_BULK_GET_RPLD   = 2, /* active bulk GET replied (sink) */
-        SRPC_REPLY_RCVD      = 3, /* incoming reply received */
-        SRPC_REPLY_SENT      = 4, /* outgoing reply sent */
-        SRPC_REQUEST_RCVD    = 5, /* incoming request received */
-        SRPC_REQUEST_SENT    = 6, /* outgoing request sent */
+        SRPC_BULK_REQ_RCVD   = 1, /* passive bulk request(PUT sink/GET source) received */
+        SRPC_BULK_PUT_SENT   = 2, /* active bulk PUT sent (source) */
+        SRPC_BULK_GET_RPLD   = 3, /* active bulk GET replied (sink) */
+        SRPC_REPLY_RCVD      = 4, /* incoming reply received */
+        SRPC_REPLY_SENT      = 5, /* outgoing reply sent */
+        SRPC_REQUEST_RCVD    = 6, /* incoming request received */
+        SRPC_REQUEST_SENT    = 7, /* outgoing request sent */
 } srpc_event_type_t;
 
 /* RPC event */
 } srpc_event_type_t;
 
 /* RPC event */
@@ -223,15 +222,6 @@ typedef struct {
 #endif
 } srpc_bulk_t; /* bulk descriptor */
 
 #endif
 } srpc_bulk_t; /* bulk descriptor */
 
-typedef struct srpc_peer {
-        struct list_head stp_list;     /* chain on peer hash */
-        struct list_head stp_rpcq;     /* q of non-control RPCs */
-        struct list_head stp_ctl_rpcq; /* q of control RPCs */
-        spinlock_t       stp_lock;     /* serialize */
-        lnet_nid_t       stp_nid;
-        int              stp_credits;  /* available credits */
-} srpc_peer_t;
-
 /* message buffer descriptor */
 typedef struct {
         struct list_head     buf_list; /* chain on srpc_service::*_msgq */
 /* message buffer descriptor */
 typedef struct {
         struct list_head     buf_list; /* chain on srpc_service::*_msgq */
@@ -254,6 +244,7 @@ typedef struct srpc_server_rpc {
         srpc_buffer_t       *srpc_reqstbuf;
         srpc_bulk_t         *srpc_bulk;
 
         srpc_buffer_t       *srpc_reqstbuf;
         srpc_bulk_t         *srpc_bulk;
 
+        unsigned int         srpc_aborted; /* being given up */
         int                  srpc_status;
         void               (*srpc_done)(struct srpc_server_rpc *);
 } srpc_server_rpc_t;
         int                  srpc_status;
         void               (*srpc_done)(struct srpc_server_rpc *);
 } srpc_server_rpc_t;
@@ -261,7 +252,6 @@ typedef struct srpc_server_rpc {
 /* client-side state of a RPC */
 typedef struct srpc_client_rpc {
         struct list_head     crpc_list;   /* chain on user's lists */
 /* client-side state of a RPC */
 typedef struct srpc_client_rpc {
         struct list_head     crpc_list;   /* chain on user's lists */
-        struct list_head     crpc_privl;  /* chain on srpc_peer_t::*rpcq */
         spinlock_t           crpc_lock;   /* serialize */
         int                  crpc_service;
         atomic_t             crpc_refcount;
         spinlock_t           crpc_lock;   /* serialize */
         int                  crpc_service;
         atomic_t             crpc_refcount;
@@ -269,7 +259,6 @@ typedef struct srpc_client_rpc {
         stt_timer_t          crpc_timer;
         swi_workitem_t       crpc_wi;
         lnet_process_id_t    crpc_dest;
         stt_timer_t          crpc_timer;
         swi_workitem_t       crpc_wi;
         lnet_process_id_t    crpc_dest;
-        srpc_peer_t         *crpc_peer;
 
         void               (*crpc_done)(struct srpc_client_rpc *);
         void               (*crpc_fini)(struct srpc_client_rpc *);
 
         void               (*crpc_done)(struct srpc_client_rpc *);
         void               (*crpc_fini)(struct srpc_client_rpc *);
@@ -342,7 +331,7 @@ typedef struct srpc_service {
         int                (*sv_bulk_ready) (srpc_server_rpc_t *, int);
 } srpc_service_t;
 
         int                (*sv_bulk_ready) (srpc_server_rpc_t *, int);
 } srpc_service_t;
 
-#define SFW_POST_BUFFERS         8
+#define SFW_POST_BUFFERS         256
 #define SFW_SERVICE_CONCURRENCY  (SFW_POST_BUFFERS/2)
 
 typedef struct {
 #define SFW_SERVICE_CONCURRENCY  (SFW_POST_BUFFERS/2)
 
 typedef struct {
@@ -456,6 +445,7 @@ int srpc_send_reply(srpc_server_rpc_t *rpc);
 int srpc_add_service(srpc_service_t *sv);
 int srpc_remove_service(srpc_service_t *sv);
 void srpc_shutdown_service(srpc_service_t *sv);
 int srpc_add_service(srpc_service_t *sv);
 int srpc_remove_service(srpc_service_t *sv);
 void srpc_shutdown_service(srpc_service_t *sv);
+void srpc_abort_service(srpc_service_t *sv);
 int srpc_finish_service(srpc_service_t *sv);
 int srpc_service_add_buffers(srpc_service_t *sv, int nbuffer);
 void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer);
 int srpc_finish_service(srpc_service_t *sv);
 int srpc_service_add_buffers(srpc_service_t *sv, int nbuffer);
 void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer);
@@ -477,7 +467,6 @@ srpc_destroy_client_rpc (srpc_client_rpc_t *rpc)
 {
         LASSERT (rpc != NULL);
         LASSERT (!srpc_event_pending(rpc));
 {
         LASSERT (rpc != NULL);
         LASSERT (!srpc_event_pending(rpc));
-        LASSERT (list_empty(&rpc->crpc_privl));
         LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
 #ifndef __KERNEL__
         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
         LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
 #ifndef __KERNEL__
         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
@@ -504,7 +493,6 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer,
                                 crpc_bulk.bk_iovs[nbulkiov]));
 
         CFS_INIT_LIST_HEAD(&rpc->crpc_list);
                                 crpc_bulk.bk_iovs[nbulkiov]));
 
         CFS_INIT_LIST_HEAD(&rpc->crpc_list);
-        CFS_INIT_LIST_HEAD(&rpc->crpc_privl);
         swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc);
         spin_lock_init(&rpc->crpc_lock);
         atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
         swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc);
         spin_lock_init(&rpc->crpc_lock);
         atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
@@ -545,7 +533,6 @@ swi_state2str (int state)
                 STATE2STR(SWI_STATE_REQUEST_SENT);
                 STATE2STR(SWI_STATE_REPLY_RECEIVED);
                 STATE2STR(SWI_STATE_BULK_STARTED);
                 STATE2STR(SWI_STATE_REQUEST_SENT);
                 STATE2STR(SWI_STATE_REPLY_RECEIVED);
                 STATE2STR(SWI_STATE_BULK_STARTED);
-                STATE2STR(SWI_STATE_BULK_ERRORED);
                 STATE2STR(SWI_STATE_DONE);
         }
 #undef STATE2STR
                 STATE2STR(SWI_STATE_DONE);
         }
 #undef STATE2STR
index 960d00f..c638188 100644 (file)
@@ -218,14 +218,15 @@ swi_serial_scheduler_main (void *arg)
                 int             rc;
                 swi_workitem_t *wi;
 
                 int             rc;
                 swi_workitem_t *wi;
 
-                while (!list_empty(&swi_data.wi_serial_runq) && 
+                while (!list_empty(&swi_data.wi_serial_runq) &&
                        nloops < SWI_RESCHED) {
                         wi = list_entry(swi_data.wi_serial_runq.next,
                                         swi_workitem_t, wi_list);
                         list_del_init(&wi->wi_list);
 
                        nloops < SWI_RESCHED) {
                         wi = list_entry(swi_data.wi_serial_runq.next,
                                         swi_workitem_t, wi_list);
                         list_del_init(&wi->wi_list);
 
-                        LASSERT (!wi->wi_running);
-                        LASSERT (wi->wi_scheduled);
+                        LASSERTF (!wi->wi_running && wi->wi_scheduled,
+                                  "wi %p running %d scheduled %d\n",
+                                  wi, wi->wi_running, wi->wi_scheduled);
 
                         nloops++;
                         wi->wi_running   = 1;
 
                         nloops++;
                         wi->wi_running   = 1;
@@ -243,7 +244,7 @@ swi_serial_scheduler_main (void *arg)
 
                 if (nloops < SWI_RESCHED)
                         cfs_wait_event_interruptible_exclusive(
 
                 if (nloops < SWI_RESCHED)
                         cfs_wait_event_interruptible_exclusive(
-                             swi_data.wi_serial_waitq, 
+                             swi_data.wi_serial_waitq,
                              !swi_sched_cansleep(&swi_data.wi_serial_runq), rc);
                 else
                         our_cond_resched();
                              !swi_sched_cansleep(&swi_data.wi_serial_runq), rc);
                 else
                         our_cond_resched();