Whamcloud - gitweb
i=liangzhen,b=18075:
[fs/lustre-release.git] / lnet / selftest / rpc.c
index 942ff26..49f483f 100644 (file)
@@ -43,8 +43,6 @@
 #include "selftest.h"
 
 
-#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
-
 typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
@@ -57,17 +55,12 @@ typedef enum {
 struct smoketest_rpc {
         spinlock_t        rpc_glock;     /* global lock */
         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
-        struct list_head *rpc_peers;     /* hash table of known peers */
         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
         srpc_state_t      rpc_state;
         srpc_counters_t   rpc_counters;
         __u64             rpc_matchbits; /* matchbits counter */
 } srpc_data;
 
-static int srpc_peer_credits = 16;
-CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
-                "# in-flight RPCs per peer (16 by default)");
-
 /* forward ref's */
 int srpc_handle_rpc (swi_workitem_t *wi);
 
@@ -181,92 +174,6 @@ srpc_alloc_bulk (int npages, int sink)
         return bk;
 }
 
-
-static inline struct list_head *
-srpc_nid2peerlist (lnet_nid_t nid)
-{
-        unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
-
-        return &srpc_data.rpc_peers[hash];
-}
-
-static inline srpc_peer_t *
-srpc_create_peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
-        if (peer == NULL) {
-                CERROR ("Failed to allocate peer structure for %s\n",
-                        libcfs_nid2str(nid));
-                return NULL;
-        }
-
-        memset(peer, 0, sizeof(srpc_peer_t));
-        peer->stp_nid     = nid;
-        peer->stp_credits = srpc_peer_credits;
-
-        spin_lock_init(&peer->stp_lock);
-        CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
-        CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
-        return peer;
-}
-
-srpc_peer_t *
-srpc_find_peer_locked (lnet_nid_t nid)
-{
-        struct list_head *peer_list = srpc_nid2peerlist(nid);
-        srpc_peer_t      *peer;
-
-        LASSERT (nid != LNET_NID_ANY);
-
-        cfs_list_for_each_entry_typed (peer, peer_list, 
-                                       srpc_peer_t, stp_list) {
-                if (peer->stp_nid == nid)
-                        return peer;
-        }
-
-        return NULL;
-}
-
-static srpc_peer_t *
-srpc_nid2peer (lnet_nid_t nid)
-{
-        srpc_peer_t *peer;
-        srpc_peer_t *new_peer;
-
-        spin_lock(&srpc_data.rpc_glock);
-        peer = srpc_find_peer_locked(nid);
-        spin_unlock(&srpc_data.rpc_glock);
-
-        if (peer != NULL)
-                return peer;
-
-        new_peer = srpc_create_peer(nid);
-
-        spin_lock(&srpc_data.rpc_glock);
-
-        peer = srpc_find_peer_locked(nid);
-        if (peer != NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                if (new_peer != NULL)
-                        LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
-
-                return peer;
-        }
-
-        if (new_peer == NULL) {
-                spin_unlock(&srpc_data.rpc_glock);
-                return NULL;
-        }
-
-        list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
-        spin_unlock(&srpc_data.rpc_glock);
-        return new_peer;
-}
-
 static inline __u64
 srpc_next_id (void)
 {
@@ -609,10 +516,10 @@ srpc_finish_service (srpc_service_t *sv)
                         rpc = list_entry(sv->sv_active_rpcq.next,
                                          srpc_server_rpc_t, srpc_list);
                         CDEBUG (D_NETERROR,
-                                "Active RPC on shutdown: sv %s, peer %s, "
+                                "Active RPC %p on shutdown: sv %s, peer %s, "
                                 "wi %s scheduled %d running %d, "
                                 "ev fired %d type %d status %d lnet %d\n",
-                                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
                                 swi_state2str(rpc->srpc_wi.wi_state),
                                 rpc->srpc_wi.wi_scheduled,
                                 rpc->srpc_wi.wi_running,
@@ -673,6 +580,42 @@ free:
         spin_lock(&sv->sv_lock);
 }
 
+/* called with srpc_service_t::sv_lock held */
+inline void
+srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
+{
+        srpc_service_t *sv = rpc->srpc_service;
+
+        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
+                swi_schedule_workitem(&rpc->srpc_wi);
+        else    /* framework RPCs are handled one by one */
+                swi_schedule_serial_workitem(&rpc->srpc_wi);
+
+        return;
+}
+
+void
+srpc_abort_service (srpc_service_t *sv)
+{
+        srpc_server_rpc_t *rpc;
+
+        spin_lock(&sv->sv_lock);
+
+        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
+
+        /* schedule in-flight RPCs to notice the abort, NB:
+         * racing with incoming RPCs; complete fix should make test
+         * RPCs carry session ID in its headers */
+        list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
+                rpc->srpc_aborted = 1;
+                srpc_schedule_server_rpc(rpc);
+        }
+
+        spin_unlock(&sv->sv_lock);
+        return;
+}
+
 void
 srpc_shutdown_service (srpc_service_t *sv)
 {
@@ -681,15 +624,15 @@ srpc_shutdown_service (srpc_service_t *sv)
 
         spin_lock(&sv->sv_lock);
 
-        CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
-                sv->sv_id, sv->sv_name);
+        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
+               sv->sv_id, sv->sv_name);
 
         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 
         /* schedule in-flight RPCs to notice the shutdown */
         cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq,
                                        srpc_server_rpc_t, srpc_list) {
-                swi_schedule_workitem(&rpc->srpc_wi);
+                srpc_schedule_server_rpc(rpc);
         }
 
         spin_unlock(&sv->sv_lock);
@@ -814,20 +757,6 @@ srpc_do_bulk (srpc_server_rpc_t *rpc)
         return rc;
 }
 
-/* called with srpc_service_t::sv_lock held */
-inline void
-srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
-{
-        srpc_service_t *sv = rpc->srpc_service;
-
-        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                swi_schedule_workitem(&rpc->srpc_wi);
-        else    /* framework RPCs are handled one by one */
-                swi_schedule_serial_workitem(&rpc->srpc_wi);
-
-        return;
-}
-
 /* only called from srpc_handle_rpc */
 void
 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
@@ -840,8 +769,8 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
         rpc->srpc_status = status;
 
         CDEBUG (status == 0 ? D_NET : D_NETERROR,
-                "Server RPC done: service %s, peer %s, status %s:%d\n",
-                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
+                "Server RPC %p done: service %s, peer %s, status %s:%d\n",
+                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
                 swi_state2str(rpc->srpc_wi.wi_state), status);
 
         if (status != 0) {
@@ -903,7 +832,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
 
         spin_lock(&sv->sv_lock);
 
-        if (sv->sv_shuttingdown) {
+        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
                 spin_unlock(&sv->sv_lock);
 
                 if (rpc->srpc_bulk != NULL)
@@ -961,8 +890,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 }
         }
         case SWI_STATE_BULK_STARTED:
-                /* we cannot LASSERT ev_fired right here because it
-                 * may be set only upon an event with unlinked==1 */
+                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = ev->ev_status;
@@ -971,20 +899,11 @@ srpc_handle_rpc (swi_workitem_t *wi)
                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
 
                         if (rc != 0) {
-                                if (ev->ev_fired) {
-                                        srpc_server_rpc_done(rpc, rc);
-                                        return 1;
-                                }
-
-                                rpc->srpc_status = rc;
-                                wi->wi_state     = SWI_STATE_BULK_ERRORED;
-                                LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
-                                return 0; /* wait for UNLINK event  */
+                                srpc_server_rpc_done(rpc, rc);
+                                return 1;
                         }
                 }
 
-                LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
-
                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
@@ -993,18 +912,17 @@ srpc_handle_rpc (swi_workitem_t *wi)
                 return 1;
 
         case SWI_STATE_REPLY_SUBMITTED:
-                LASSERT (ev->ev_fired);
+                if (!ev->ev_fired) {
+                        CERROR("RPC %p: bulk %p, service %d\n",
+                               rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id);
+                        CERROR("Event: status %d, type %d, lnet %d\n",
+                               ev->ev_status, ev->ev_type, ev->ev_lnet);
+                        LASSERT (ev->ev_fired);
+                }
 
                 wi->wi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
-
-        case SWI_STATE_BULK_ERRORED:
-                LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
-                LASSERT (rpc->srpc_status != 0);
-
-                srpc_server_rpc_done(rpc, rpc->srpc_status);
-                return 1;
         }
 
         return 0;
@@ -1078,43 +996,9 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
 }
 
 void
-srpc_check_sends (srpc_peer_t *peer, int credits)
-{
-        struct list_head  *q;
-        srpc_client_rpc_t *rpc;
-
-        LASSERT (credits >= 0);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        spin_lock(&peer->stp_lock);
-        peer->stp_credits += credits;
-
-        while (peer->stp_credits) {
-                if (!list_empty(&peer->stp_ctl_rpcq))
-                        q = &peer->stp_ctl_rpcq;
-                else if (!list_empty(&peer->stp_rpcq))
-                        q = &peer->stp_rpcq;
-                else
-                        break;
-
-                peer->stp_credits--;
-
-                rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
-                list_del_init(&rpc->crpc_privl);
-                srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
-
-                swi_schedule_workitem(&rpc->crpc_wi);
-        }
-
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-void
 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 {
         swi_workitem_t *wi = &rpc->crpc_wi;
-        srpc_peer_t    *peer = rpc->crpc_peer;
 
         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
 
@@ -1145,9 +1029,6 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
         spin_unlock(&rpc->crpc_lock);
 
         (*rpc->crpc_done) (rpc);
-
-        if (peer != NULL)
-                srpc_check_sends(peer, 1);
         return;
 }
 
@@ -1290,38 +1171,9 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service,
 }
 
 /* called with rpc->crpc_lock held */
-static inline void
-srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
-{
-        int service = rpc->crpc_service;
-
-        LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
-        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
-
-        rpc->crpc_peer = peer;
-
-        spin_lock(&peer->stp_lock);
-
-        /* Framework RPCs that alter session state shall take precedence
-         * over test RPCs and framework query RPCs */
-        if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
-            service != SRPC_SERVICE_DEBUG &&
-            service != SRPC_SERVICE_QUERY_STAT)
-                list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
-        else
-                list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
-
-        srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
-        spin_unlock(&peer->stp_lock);
-        return;
-}
-
-/* called with rpc->crpc_lock held */
 void
 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 {
-        srpc_peer_t *peer = rpc->crpc_peer;
-
         LASSERT (why != 0);
 
         if (rpc->crpc_aborted || /* already aborted */
@@ -1335,19 +1187,6 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 
         rpc->crpc_aborted = 1;
         rpc->crpc_status  = why;
-
-        if (peer != NULL) {
-                spin_lock(&peer->stp_lock);
-
-                if (!list_empty(&rpc->crpc_privl)) { /* still queued */
-                        list_del_init(&rpc->crpc_privl);
-                        srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
-                        rpc->crpc_peer = NULL;       /* no credit taken */
-                }
-
-                spin_unlock(&peer->stp_lock);
-        }
-
         swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
@@ -1356,10 +1195,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
 void
 srpc_post_rpc (srpc_client_rpc_t *rpc)
 {
-        srpc_peer_t *peer;
-
         LASSERT (!rpc->crpc_aborted);
-        LASSERT (rpc->crpc_peer == NULL);
         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
 
@@ -1368,18 +1204,7 @@ srpc_post_rpc (srpc_client_rpc_t *rpc)
                 rpc->crpc_timeout);
 
         srpc_add_client_rpc_timer(rpc);
-
-        peer = srpc_nid2peer(rpc->crpc_dest.nid);
-        if (peer == NULL) {
-                srpc_abort_rpc(rpc, -ENOMEM);
-                return;
-        }
-
-        srpc_queue_rpc(peer, rpc);
-
-        spin_unlock(&rpc->crpc_lock);
-        srpc_check_sends(peer, 0);
-        spin_lock(&rpc->crpc_lock);
+        swi_schedule_workitem(&rpc->crpc_wi);
         return;
 }
 
@@ -1438,7 +1263,6 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         srpc_service_t    *sv;
         srpc_msg_t        *msg;
         srpc_msg_type_t    type;
-        int                fired_flag = 1;
 
         LASSERT (!in_interrupt());
 
@@ -1452,6 +1276,8 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
 
         switch (rpcev->ev_type) {
         default:
+                CERROR("Unknown event: status %d, type %d, lnet %d\n",
+                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
                 LBUG ();
         case SRPC_REQUEST_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
@@ -1463,9 +1289,16 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
         case SRPC_BULK_REQ_RCVD:
                 crpc = rpcev->ev_data;
 
-                LASSERT (rpcev == &crpc->crpc_reqstev ||
-                         rpcev == &crpc->crpc_replyev ||
-                         rpcev == &crpc->crpc_bulkev);
+                if (rpcev != &crpc->crpc_reqstev &&
+                    rpcev != &crpc->crpc_replyev &&
+                    rpcev != &crpc->crpc_bulkev) {
+                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
+                               rpcev, crpc, &crpc->crpc_reqstev,
+                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
+                        CERROR("Bad event: status %d, type %d, lnet %d\n",
+                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
+                        LBUG ();
+                }
 
                 spin_lock(&crpc->crpc_lock);
 
@@ -1550,13 +1383,9 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                          ev->type == LNET_EVENT_REPLY ||
                          ev->type == LNET_EVENT_UNLINK);
 
-                if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
-                        if (ev->status == 0)
-                                break; /* wait for the final LNET_EVENT_REPLY */
-                        else
-                                fired_flag = 0; /* LNET_EVENT_REPLY may arrive
-                                                   (optimized GET case) */
-                }
+                if (!ev->unlinked)
+                        break; /* wait for final event */
+
         case SRPC_BULK_PUT_SENT:
                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
                         spin_lock(&srpc_data.rpc_glock);
@@ -1575,13 +1404,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev)
                 LASSERT (rpcev == &srpc->srpc_ev);
 
                 spin_lock(&sv->sv_lock);
-                if (fired_flag)
-                        rpcev->ev_fired  = 1;
 
+                rpcev->ev_fired  = 1;
                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
                                                 -EINTR : ev->status;
-
                 srpc_schedule_server_rpc(srpc);
+
                 spin_unlock(&sv->sv_lock);
                 break;
         }
@@ -1619,21 +1447,8 @@ srpc_check_event (int timeout)
 int
 srpc_startup (void)
 {
-        int i;
         int rc;
 
-#ifndef __KERNEL__
-        char *s;
-
-        s = getenv("SRPC_PEER_CREDITS");
-        srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
-#endif
-
-        if (srpc_peer_credits <= 0) {
-                CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
-                return -EINVAL;
-        }
-
         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
         spin_lock_init(&srpc_data.rpc_glock);
 
@@ -1643,16 +1458,6 @@ srpc_startup (void)
 
         srpc_data.rpc_state = SRPC_STATE_NONE;
 
-        LIBCFS_ALLOC(srpc_data.rpc_peers,
-                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
-        if (srpc_data.rpc_peers == NULL) {
-                CERROR ("Failed to alloc peer hash.\n");
-                return -ENOMEM;
-        }
-
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
-                CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
-
 #ifdef __KERNEL__
         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
 #else
@@ -1663,8 +1468,6 @@ srpc_startup (void)
 #endif
         if (rc < 0) {
                 CERROR ("LNetNIInit() has failed: %d\n", rc);
-                LIBCFS_FREE(srpc_data.rpc_peers,
-                            sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
                 return rc;
         }
 
@@ -1745,24 +1548,5 @@ srpc_shutdown (void)
                 break;
         }
 
-        /* srpc_peer_t's are kept in hash until shutdown */
-        for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
-                srpc_peer_t *peer;
-
-                while (!list_empty(&srpc_data.rpc_peers[i])) {
-                        peer = list_entry(srpc_data.rpc_peers[i].next,
-                                          srpc_peer_t, stp_list);
-                        list_del(&peer->stp_list);
-
-                        LASSERT (list_empty(&peer->stp_rpcq));
-                        LASSERT (list_empty(&peer->stp_ctl_rpcq));
-                        LASSERT (peer->stp_credits == srpc_peer_credits);
-
-                        LIBCFS_FREE(peer, sizeof(srpc_peer_t));
-                }
-        }
-
-        LIBCFS_FREE(srpc_data.rpc_peers,
-                    sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
         return;
 }