Whamcloud - gitweb
* ranal passes netregression
authoreeb <eeb>
Fri, 4 Feb 2005 12:46:52 +0000 (12:46 +0000)
committereeb <eeb>
Fri, 4 Feb 2005 12:46:52 +0000 (12:46 +0000)
lnet/klnds/ralnd/ralnd.c
lnet/klnds/ralnd/ralnd.h
lnet/klnds/ralnd/ralnd_cb.c

index 2fe1aa0..35f436e 100644 (file)
@@ -37,10 +37,10 @@ kra_tunables_t          kranal_tunables;
 #define RANAL_SYSCTL                   202
 
 static ctl_table kranal_ctl_table[] = {
-        {RANAL_SYSCTL_TIMEOUT, "timeout", 
+        {RANAL_SYSCTL_TIMEOUT, "timeout",
          &kranal_tunables.kra_timeout, sizeof(int),
          0644, NULL, &proc_dointvec},
-        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
+        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout",
          &kranal_tunables.kra_listener_timeout, sizeof(int),
          0644, NULL, &proc_dointvec},
         {RANAL_SYSCTL_BACKLOG, "backlog",
@@ -49,7 +49,7 @@ static ctl_table kranal_ctl_table[] = {
         {RANAL_SYSCTL_PORT, "port",
          &kranal_tunables.kra_port, sizeof(int),
          0644, NULL, kranal_listener_procint},
-        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
+        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate",
          &kranal_tunables.kra_max_immediate, sizeof(int),
          0644, NULL, &proc_dointvec},
         { 0 }
@@ -89,7 +89,7 @@ kranal_sock_write (struct socket *sock, void *buffer, int nob)
 
         if (rc == nob)
                 return 0;
-        
+
         if (rc >= 0)
                 return -EAGAIN;
 
@@ -102,7 +102,6 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
         int            rc;
         mm_segment_t   oldmm = get_fs();
         long           ticks = timeout * HZ;
-        int            wanted = nob;
         unsigned long  then;
         struct timeval tv;
 
@@ -145,9 +144,6 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
                 ticks -= jiffies - then;
                 set_fs(oldmm);
 
-                CDEBUG(D_WARNING, "rc %d at %d/%d bytes %d/%d secs\n",
-                       rc, wanted - nob, wanted, timeout - (int)(ticks/HZ), timeout);
-
                 if (rc < 0)
                         return rc;
 
@@ -233,13 +229,6 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid)
 
         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         LASSERT(rrc == RAP_SUCCESS);
-
-        CDEBUG(D_WARNING,"devid %d, riparams: HID %08x FDH %08x PT %08x CC %08x\n",
-               connreq->racr_devid,
-               connreq->racr_riparams.HostId,
-               connreq->racr_riparams.FmaDomainHndl,
-               connreq->racr_riparams.PTag,
-               connreq->racr_riparams.CompletionCookie);
 }
 
 int
@@ -290,7 +279,7 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
                 return -EPROTO;
         }
-        
+
         return 0;
 }
 
@@ -323,16 +312,16 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
 
                 if (conn->rac_device != newconn->rac_device)
                         continue;
-                
+
                 if (loopback &&
                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
                         continue;
-                    
+
                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
 
                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64
-                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
+                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid,
                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
 
                 count++;
@@ -350,7 +339,7 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
         int               loopback;
 
         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
-        
+
         list_for_each(tmp, &peer->rap_conns) {
                 conn = list_entry(tmp, kra_conn_t, rac_list);
 
@@ -376,12 +365,12 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
                 /* 'newconn' is an earlier connection from 'peer'!!! */
                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
                         return 2;
-                
+
                 /* 'conn' is an earlier connection from 'peer': it will be
                  * removed when we cull stale conns later on... */
                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
                         continue;
-                
+
                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
                  * playing the game... */
                 return 3;
@@ -402,7 +391,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
-        
+
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
@@ -448,7 +437,7 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
 }
 
 void
-kranal_destroy_conn(kra_conn_t *conn) 
+kranal_destroy_conn(kra_conn_t *conn)
 {
         RAP_RETURN         rrc;
 
@@ -512,9 +501,11 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
                 /* Non-persistent peer with no more conns... */
                 kranal_unlink_peer_locked(peer);
         }
-                        
+
         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
-         * full timeout */
+         * full timeout.  If we get a CLOSE we know the peer has stopped all
+         * RDMA.  Otherwise if we wait for the full timeout we can also be sure
+         * all RDMA has stopped. */
         conn->rac_last_rx = jiffies;
         mb();
 
@@ -528,36 +519,29 @@ void
 kranal_close_conn (kra_conn_t *conn, int error)
 {
         unsigned long    flags;
-        
+
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-        
+
         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
                 kranal_close_conn_locked(conn, error);
-        
+
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
 int
-kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
                        __u32 peer_ip, int peer_port)
 {
         RAP_RETURN    rrc;
 
-        CDEBUG(D_WARNING,"devid %d, riparams: HID %08x FDH %08x PT %08x CC %08x\n",
-               conn->rac_device->rad_id,
-               connreq->racr_riparams.HostId,
-               connreq->racr_riparams.FmaDomainHndl,
-               connreq->racr_riparams.PTag,
-               connreq->racr_riparams.CompletionCookie);
-        
         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         if (rrc != RAP_SUCCESS) {
-                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rrc);
                 return -EPROTO;
         }
-        
+
         conn->rac_peerstamp = connreq->racr_peerstamp;
         conn->rac_peer_connstamp = connreq->racr_connstamp;
         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
@@ -566,7 +550,7 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
 }
 
 int
-kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, 
+kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
                                ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         struct sockaddr_in   addr;
@@ -580,8 +564,6 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
         int                  len;
         int                  i;
 
-        CDEBUG(D_WARNING,"!!\n");
-
         len = sizeof(addr);
         rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
         if (rc != 0) {
@@ -592,26 +574,20 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
         peer_ip = ntohl(addr.sin_addr.s_addr);
         peer_port = ntohs(addr.sin_port);
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         if (peer_port >= 1024) {
                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
                        HIPQUAD(peer_ip), peer_port);
                 return -ECONNREFUSED;
         }
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
-        rc = kranal_recv_connreq(sock, &rx_connreq, 
+        rc = kranal_recv_connreq(sock, &rx_connreq,
                                  kranal_tunables.kra_listener_timeout);
         if (rc != 0) {
-                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rc);
                 return rc;
         }
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         for (i = 0;;i++) {
                 if (i == kranal_data.kra_ndevs) {
                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
@@ -623,34 +599,26 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
                         break;
         }
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         rc = kranal_create_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
 
         rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq));
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rc);
                 kranal_conn_decref(conn);
                 return rc;
         }
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
         if (rc != 0) {
                 kranal_conn_decref(conn);
                 return rc;
         }
 
-        CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip));
-
         *connp = conn;
         *src_nidp = rx_connreq.racr_srcnid;
         *dst_nidp = rx_connreq.racr_dstnid;
@@ -668,8 +636,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 
         for (port = 1023; port >= 512; port--) {
 
-                memset(&locaddr, 0, sizeof(locaddr)); 
-                locaddr.sin_family      = AF_INET; 
+                memset(&locaddr, 0, sizeof(locaddr));
+                locaddr.sin_family      = AF_INET;
                 locaddr.sin_port        = htons(port);
                 locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 
@@ -686,7 +654,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                                      (struct sockaddr *)&locaddr, sizeof(locaddr));
                 if (rc != 0) {
                         sock_release(sock);
-                        
+
                         if (rc == -EADDRINUSE) {
                                 CDEBUG(D_NET, "Port %d already in use\n", port);
                                 continue;
@@ -703,7 +671,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                         *sockp = sock;
                         return 0;
                 }
-                
+
                 sock_release(sock);
 
                 if (rc != -EADDRNOTAVAIL) {
@@ -711,8 +679,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                                port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                         return rc;
                 }
-                
-                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
+
+                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n",
                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
         }
 
@@ -722,7 +690,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 
 
 int
-kranal_active_conn_handshake(kra_peer_t *peer, 
+kranal_active_conn_handshake(kra_peer_t *peer,
                              ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         kra_connreq_t       connreq;
@@ -732,8 +700,6 @@ kranal_active_conn_handshake(kra_peer_t *peer,
         int                 rc;
         unsigned int        idx;
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
         /* spread connections over all devices using both peer NIDs to ensure
          * all nids use all devices */
         idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid;
@@ -743,45 +709,37 @@ kranal_active_conn_handshake(kra_peer_t *peer,
         if (rc != 0)
                 return rc;
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
-        
+
         rc = ranal_connect_sock(peer, &sock);
         if (rc != 0)
                 goto failed_0;
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
          * immediately after accepting a connection, so we connect and then
          * send immediately. */
 
         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                 goto failed_1;
         }
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
         rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
         if (rc != 0) {
-                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                 goto failed_1;
         }
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
         sock_release(sock);
         rc = -EPROTO;
 
         if (connreq.racr_srcnid != peer->rap_nid) {
                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
                        "received "LPX64" expected "LPX64"\n",
-                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       HIPQUAD(peer->rap_ip), peer->rap_port,
                        connreq.racr_srcnid, peer->rap_nid);
                 goto failed_0;
         }
@@ -789,28 +747,24 @@ kranal_active_conn_handshake(kra_peer_t *peer,
         if (connreq.racr_devid != dev->rad_id) {
                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
                        "received %d expected %d\n",
-                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       HIPQUAD(peer->rap_ip), peer->rap_port,
                        connreq.racr_devid, dev->rad_id);
                 goto failed_0;
         }
 
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
-
-        rc = kranal_set_conn_params(conn, &connreq, 
+        rc = kranal_set_conn_params(conn, &connreq,
                                     peer->rap_ip, peer->rap_port);
         if (rc != 0)
                 goto failed_0;
 
         *connp = conn;
         *dst_nidp = connreq.racr_dstnid;
-        CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid);
         return 0;
 
  failed_1:
         sock_release(sock);
  failed_0:
         kranal_conn_decref(conn);
-        CDEBUG(D_WARNING,LPX64": %d\n", peer->rap_nid, rc);
         return rc;
 }
 
@@ -831,7 +785,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 /* active: connd wants to connect to 'peer' */
                 LASSERT (peer != NULL);
                 LASSERT (peer->rap_connecting);
-                
+
                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
                 if (rc != 0)
                         return rc;
@@ -840,7 +794,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 
                 if (!kranal_peer_active(peer)) {
                         /* raced with peer getting unlinked */
-                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock,
                                                 flags);
                         kranal_conn_decref(conn);
                         return -ESTALE;
@@ -876,7 +830,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 }
         }
 
-        LASSERT (!new_peer == !kranal_peer_active(peer));
+        LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
 
         /* Refuse connection if peer thinks we are a different NID.  We check
          * this while holding the global lock, to synch with connection
@@ -910,7 +864,10 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 list_add_tail(&peer->rap_list,
                               kranal_nid2peerlist(peer_nid));
         }
-        
+
+        /* initialise timestamps before reaper looks at them */
+        conn->rac_last_tx = conn->rac_last_rx = jiffies;
+
         kranal_peer_addref(peer);               /* +1 ref for conn */
         conn->rac_peer = peer;
         list_add_tail(&conn->rac_list, &peer->rap_conns);
@@ -959,11 +916,11 @@ kranal_connect (kra_peer_t *peer)
 
         LASSERT (peer->rap_connecting);
 
-        CDEBUG(D_WARNING,"About to handshake "LPX64"\n", peer->rap_nid);
+        CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid);
 
         rc = kranal_conn_handshake(NULL, peer);
 
-        CDEBUG(D_WARNING,"Done handshake "LPX64":%d \n", peer->rap_nid, rc);
+        CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc);
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
@@ -1055,7 +1012,7 @@ kranal_listener (void *arg)
 
         rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
         if (rc != 0) {
-                CERROR("Can't set listen backlog %d: %d\n", 
+                CERROR("Can't set listen backlog %d: %d\n",
                        kranal_tunables.kra_backlog, rc);
                 goto out_1;
         }
@@ -1095,7 +1052,7 @@ kranal_listener (void *arg)
                         ras->ras_sock->type = sock->type;
                         ras->ras_sock->ops = sock->ops;
                 }
-                
+
                 set_current_state(TASK_INTERRUPTIBLE);
 
                 rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
@@ -1109,8 +1066,8 @@ kranal_listener (void *arg)
 
                 if (rc == 0) {
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
-                        
-                        list_add_tail(&ras->ras_list, 
+
+                        list_add_tail(&ras->ras_list,
                                       &kranal_data.kra_connd_acceptq);
 
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
@@ -1119,7 +1076,7 @@ kranal_listener (void *arg)
                         ras = NULL;
                         continue;
                 }
-                
+
                 if (rc != -EAGAIN) {
                         CERROR("Accept failed: %d, pausing...\n", rc);
                         kranal_pause(HZ);
@@ -1138,7 +1095,7 @@ kranal_listener (void *arg)
         sock_release(sock);
         kranal_data.kra_listener_sock = NULL;
  out_0:
-        /* set completion status and unblock thread waiting for me 
+        /* set completion status and unblock thread waiting for me
          * (parent on startup failure, executioner on normal shutdown) */
         kranal_data.kra_listener_shutdown = rc;
         up(&kranal_data.kra_listener_signal);
@@ -1152,7 +1109,7 @@ kranal_start_listener (void)
         long           pid;
         int            rc;
 
-        CDEBUG(D_WARNING, "Starting listener\n");
+        CDEBUG(D_NET, "Starting listener\n");
 
         /* Called holding kra_nid_mutex: listener stopped */
         LASSERT (kranal_data.kra_listener_sock == NULL);
@@ -1170,7 +1127,7 @@ kranal_start_listener (void)
         rc = kranal_data.kra_listener_shutdown;
         LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
 
-        CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
+        CDEBUG(D_NET, "Listener %ld started OK\n", pid);
         return rc;
 }
 
@@ -1181,7 +1138,7 @@ kranal_stop_listener(int clear_acceptq)
         unsigned long     flags;
         kra_acceptsock_t *ras;
 
-        CDEBUG(D_WARNING, "Stopping listener\n");
+        CDEBUG(D_NET, "Stopping listener\n");
 
         /* Called holding kra_nid_mutex: listener running */
         LASSERT (kranal_data.kra_listener_sock != NULL);
@@ -1193,11 +1150,11 @@ kranal_stop_listener(int clear_acceptq)
         down(&kranal_data.kra_listener_signal);
 
         LASSERT (kranal_data.kra_listener_sock == NULL);
-        CDEBUG(D_WARNING, "Listener stopped\n");
+        CDEBUG(D_NET, "Listener stopped\n");
 
         if (!clear_acceptq)
                 return;
-        
+
         /* Close any unhandled accepts */
         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
 
@@ -1205,16 +1162,16 @@ kranal_stop_listener(int clear_acceptq)
         list_del_init(&kranal_data.kra_connd_acceptq);
 
         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
-        
+
         while (!list_empty(&zombie_accepts)) {
-                ras = list_entry(zombie_accepts.next, 
+                ras = list_entry(zombie_accepts.next,
                                  kra_acceptsock_t, ras_list);
                 list_del(&ras->ras_list);
                 kranal_free_acceptsock(ras);
         }
 }
 
-int 
+int
 kranal_listener_procint(ctl_table *table, int write, struct file *filp,
                         void *buffer, size_t *lenp)
 {
@@ -1282,7 +1239,7 @@ kranal_set_mynid(ptl_nid_t nid)
         kranal_data.kra_peerstamp++;
         ni->ni_pid.nid = nid;
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
-        
+
         /* Delete all existing peers and their connections after new
          * NID/connstamp set to ensure no old connections in our brave
          * new world. */
@@ -1397,7 +1354,7 @@ kranal_unlink_peer_locked (kra_peer_t *peer)
 }
 
 int
-kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
+kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp,
                       int *persistencep)
 {
         kra_peer_t        *peer;
@@ -1665,7 +1622,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private)
                 break;
         }
         case NAL_CMD_DEL_PEER: {
-                rc = kranal_del_peer(pcfg->pcfg_nid, 
+                rc = kranal_del_peer(pcfg->pcfg_nid,
                                      /* flags == single_share */
                                      pcfg->pcfg_flags != 0);
                 break;
@@ -1739,7 +1696,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
                 PORTAL_ALLOC(tx->tx_phys,
                              PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
                 if (tx->tx_phys == NULL) {
-                        CERROR("Can't allocate %stx[%d]->tx_phys\n", 
+                        CERROR("Can't allocate %stx[%d]->tx_phys\n",
                                isnblk ? "nblk " : "", i);
 
                         PORTAL_FREE(tx, sizeof(*tx));
@@ -1818,7 +1775,7 @@ kranal_api_shutdown (nal_t *nal)
 {
         int           i;
         unsigned long flags;
-        
+
         if (nal->nal_refct != 0) {
                 /* This module got the first ref */
                 PORTAL_MODULE_UNUSE;
@@ -1884,9 +1841,9 @@ kranal_api_shutdown (nal_t *nal)
         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
         LASSERT (list_empty(&kranal_data.kra_connd_peers));
-        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
         wake_up_all(&kranal_data.kra_connd_waitq);
-        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
         i = 2;
         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
@@ -1903,7 +1860,7 @@ kranal_api_shutdown (nal_t *nal)
                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
 
                 PORTAL_FREE(kranal_data.kra_peers,
-                            sizeof (struct list_head) * 
+                            sizeof (struct list_head) *
                             kranal_data.kra_peer_hash_size);
         }
 
@@ -1913,7 +1870,7 @@ kranal_api_shutdown (nal_t *nal)
                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
 
                 PORTAL_FREE(kranal_data.kra_conns,
-                            sizeof (struct list_head) * 
+                            sizeof (struct list_head) *
                             kranal_data.kra_conn_hash_size);
         }
 
@@ -1999,7 +1956,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
 
         /* OK to call kranal_api_shutdown() to cleanup now */
         kranal_data.kra_init = RANAL_INIT_DATA;
-        
+
         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
         PORTAL_ALLOC(kranal_data.kra_peers,
                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
@@ -2091,7 +2048,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         return PTL_OK;
 
  failed:
-        kranal_api_shutdown(&kranal_api);    
+        kranal_api_shutdown(&kranal_api);
         return PTL_FAIL;
 }
 
@@ -2142,7 +2099,7 @@ kranal_module_init (void)
                 return -ENODEV;
         }
 
-        kranal_tunables.kra_sysctl = 
+        kranal_tunables.kra_sysctl =
                 register_sysctl_table(kranal_top_ctl_table, 0);
         if (kranal_tunables.kra_sysctl == NULL) {
                 CERROR("Can't register sysctl table\n");
index 7e43705..e6602dd 100644 (file)
@@ -77,7 +77,7 @@
 #define RANAL_NTX             64                /* # tx descs */
 #define RANAL_NTX_NBLK        256               /* # reserved tx descs */
 
-#define RANAL_FMA_CQ_SIZE     8192              /* # entries in receive CQ 
+#define RANAL_FMA_CQ_SIZE     8192              /* # entries in receive CQ
                                                  * (overflow is a performance hit) */
 
 #define RANAL_RESCHED         100               /* # scheduler loops before reschedule */
@@ -92,7 +92,7 @@
 #define RANAL_PORT             988              /* listener's port */
 #define RANAL_MAX_IMMEDIATE    (2<<10)          /* immediate payload breakpoint */
 
-typedef struct 
+typedef struct
 {
         int               kra_timeout;          /* comms timeout (seconds) */
         int               kra_listener_timeout; /* max time the listener can block */
@@ -117,8 +117,8 @@ typedef struct
         spinlock_t              rad_lock;       /* serialise */
         void                   *rad_scheduler;  /* scheduling thread */
 } kra_device_t;
-        
-typedef struct 
+
+typedef struct
 {
         int               kra_init;             /* initialisation state */
         int               kra_shutdown;         /* shut down? */
@@ -148,7 +148,7 @@ typedef struct
         long              kra_new_min_timeout;  /* minimum timeout on any new conn */
         wait_queue_head_t kra_reaper_waitq;     /* reaper sleeps here */
         spinlock_t        kra_reaper_lock;      /* serialise */
-        
+
         struct list_head  kra_connd_peers;      /* peers waiting for a connection */
         struct list_head  kra_connd_acceptq;    /* accepted sockets to handshake */
         wait_queue_head_t kra_connd_waitq;      /* connection daemons sleep here */
@@ -269,7 +269,7 @@ typedef struct kra_tx                           /* message descriptor */
         struct list_head          tx_list;      /* queue on idle_txs/rac_sendq/rac_waitq */
         struct kra_conn          *tx_conn;      /* owning conn */
         lib_msg_t                *tx_libmsg[2]; /* lib msgs to finalize on completion */
-        unsigned long             tx_qtime;     /* when tx started to wait for something */
+        unsigned long             tx_qtime;     /* when tx started to wait for something (jiffies) */
         int                       tx_isnblk;    /* I'm reserved for non-blocking sends */
         int                       tx_nob;       /* # bytes of payload */
         int                       tx_buftype;   /* payload buffer type */
@@ -306,7 +306,7 @@ typedef struct kra_tx                           /* message descriptor */
 #define RANAL_TX_GETT_DONE       0x52           /* GET target about to send GET_DONE */
 
 typedef struct kra_conn
-{ 
+{
         struct kra_peer    *rac_peer;           /* owning peer */
         struct list_head    rac_list;           /* stash on peer's conn list */
         struct list_head    rac_hashlist;       /* stash in connection hash table */
@@ -317,10 +317,10 @@ typedef struct kra_conn
         __u64               rac_peerstamp;      /* peer's unique stamp */
         __u64               rac_peer_connstamp; /* peer's unique connection stamp */
         __u64               rac_my_connstamp;   /* my unique connection stamp */
-        unsigned long       rac_last_tx;        /* when I last sent an FMA message */
-        unsigned long       rac_last_rx;        /* when I last received an FMA messages */
-        long                rac_keepalive;      /* keepalive interval */
-        long                rac_timeout;        /* infer peer death on (last_rx + timout > now) */
+        unsigned long       rac_last_tx;        /* when I last sent an FMA message (jiffies) */
+        unsigned long       rac_last_rx;        /* when I last received an FMA messages (jiffies) */
+        long                rac_keepalive;      /* keepalive interval (seconds) */
+        long                rac_timeout;        /* infer peer death if no rx for this many seconds */
         __u32               rac_cqid;           /* my completion callback id (non-unique) */
         __u32               rac_tx_seq;         /* tx msg sequence number */
         __u32               rac_rx_seq;         /* rx msg sequence number */
@@ -394,10 +394,10 @@ kranal_peer_decref(kra_peer_t *peer)
 }
 
 static inline struct list_head *
-kranal_nid2peerlist (ptl_nid_t nid) 
+kranal_nid2peerlist (ptl_nid_t nid)
 {
         unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
-        
+
         return (&kranal_data.kra_peers[hash]);
 }
 
@@ -426,27 +426,27 @@ kranal_conn_decref(kra_conn_t *conn)
 }
 
 static inline struct list_head *
-kranal_cqid2connlist (__u32 cqid) 
+kranal_cqid2connlist (__u32 cqid)
 {
         unsigned int hash = cqid % kranal_data.kra_conn_hash_size;
-        
+
         return (&kranal_data.kra_conns [hash]);
 }
 
 static inline kra_conn_t *
-kranal_cqid2conn_locked (__u32 cqid) 
+kranal_cqid2conn_locked (__u32 cqid)
 {
         struct list_head *conns = kranal_cqid2connlist(cqid);
         struct list_head *tmp;
         kra_conn_t       *conn;
-        
+
         list_for_each(tmp, conns) {
                 conn = list_entry(tmp, kra_conn_t, rac_hashlist);
-                
+
                 if (conn->rac_cqid == cqid)
                         return conn;
         }
-        
+
         return NULL;
 }
 
@@ -464,8 +464,8 @@ kranal_page2phys (struct page *p)
 }
 
 extern void kranal_free_acceptsock (kra_acceptsock_t *ras);
-extern int kranal_listener_procint (ctl_table *table, 
-                                    int write, struct file *filp, 
+extern int kranal_listener_procint (ctl_table *table,
+                                    int write, struct file *filp,
                                     void *buffer, size_t *lenp);
 extern void kranal_update_reaper_timeout (long timeout);
 extern void kranal_tx_done (kra_tx_t *tx, int completion);
@@ -484,3 +484,4 @@ extern void kranal_close_conn_locked (kra_conn_t *conn, int error);
 extern void kranal_terminate_conn_locked (kra_conn_t *conn);
 extern void kranal_connect (kra_peer_t *peer);
 extern int kranal_conn_handshake (struct socket *sock, kra_peer_t *peer);
+extern void kranal_pause(int ticks);
index e805e67..38f1b77 100644 (file)
@@ -43,7 +43,9 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
         kra_device_t *dev;
         int           i;
         unsigned long flags;
-        
+
+        CDEBUG(D_NET, "callback for device %d\n", devid);
+
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
 
                 dev = &kranal_data.kra_devices[i];
@@ -60,7 +62,7 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
                 return;
         }
-        
+
         CWARN("callback for unknown device %d\n", devid);
 }
 
@@ -69,9 +71,9 @@ kranal_schedule_conn(kra_conn_t *conn)
 {
         kra_device_t    *dev = conn->rac_device;
         unsigned long    flags;
-        
+
         spin_lock_irqsave(&dev->rad_lock, flags);
-        
+
         if (!conn->rac_scheduled) {
                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
                 conn->rac_scheduled = 1;
@@ -83,11 +85,11 @@ kranal_schedule_conn(kra_conn_t *conn)
 }
 
 kra_tx_t *
-kranal_get_idle_tx (int may_block) 
+kranal_get_idle_tx (int may_block)
 {
         unsigned long  flags;
         kra_tx_t      *tx = NULL;
-        
+
         for (;;) {
                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
 
@@ -132,7 +134,7 @@ kranal_get_idle_tx (int may_block)
         }
 
         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-        
+
         return tx;
 }
 
@@ -159,40 +161,46 @@ kranal_new_tx_msg (int may_block, int type)
 }
 
 int
-kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                                int offset, int nob)
-                 
+
 {
         /* For now this is almost identical to kranal_setup_virt_buffer, but we
          * could "flatten" the payload into a single contiguous buffer ready
          * for sending direct over an FMA if we ever needed to. */
 
-        LASSERT (nob > 0);
-        LASSERT (niov > 0);
         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+        LASSERT (nob >= 0);
 
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                niov--;
-                iov++;
+        if (nob == 0) {
+                tx->tx_buffer = NULL;
+        } else {
                 LASSERT (niov > 0);
-        }
 
-        if (nob > iov->iov_len - offset) {
-                CERROR("Can't handle multiple vaddr fragments\n");
-                return -EMSGSIZE;
+                while (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                        niov--;
+                        iov++;
+                        LASSERT (niov > 0);
+                }
+
+                if (nob > iov->iov_len - offset) {
+                        CERROR("Can't handle multiple vaddr fragments\n");
+                        return -EMSGSIZE;
+                }
+
+                tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         }
 
         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
         tx->tx_nob = nob;
-        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
 }
 
 int
-kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                           int offset, int nob)
-                 
+
 {
         LASSERT (nob > 0);
         LASSERT (niov > 0);
@@ -239,7 +247,7 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
-        
+
         phys->Address = kranal_page2phys(kiov->kiov_page);
         phys++;
 
@@ -250,13 +258,13 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 LASSERT (nkiov > 0);
 
                 if (kiov->kiov_offset != 0 ||
-                    ((resid > PAGE_SIZE) && 
+                    ((resid > PAGE_SIZE) &&
                      kiov->kiov_len < PAGE_SIZE)) {
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
-                               "page %d, offset %d, len %d \n", 
-                               (int)(phys - tx->tx_phys), 
-                               kiov->kiov_offset, kiov->kiov_len);                        
+                               "page %d, offset %d, len %d \n",
+                               (int)(phys - tx->tx_phys),
+                               kiov->kiov_offset, kiov->kiov_len);
                         return -EINVAL;
                 }
 
@@ -276,15 +284,15 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
 }
 
 static inline int
-kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, 
+kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
                           struct iovec *iov, ptl_kiov_t *kiov,
                           int offset, int nob)
 {
         LASSERT ((iov == NULL) != (kiov == NULL));
-        
+
         if (kiov != NULL)
                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
-        
+
         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
 }
 
@@ -300,13 +308,13 @@ kranal_map_buffer (kra_tx_t *tx)
         switch (tx->tx_buftype) {
         default:
                 LBUG();
-                
+
         case RANAL_BUF_NONE:
         case RANAL_BUF_IMMEDIATE:
         case RANAL_BUF_PHYS_MAPPED:
         case RANAL_BUF_VIRT_MAPPED:
                 break;
-                
+
         case RANAL_BUF_PHYS_UNMAPPED:
                 rrc = RapkRegisterPhys(dev->rad_handle,
                                        tx->tx_phys, tx->tx_phys_npages,
@@ -334,13 +342,13 @@ kranal_unmap_buffer (kra_tx_t *tx)
         switch (tx->tx_buftype) {
         default:
                 LBUG();
-                
+
         case RANAL_BUF_NONE:
         case RANAL_BUF_IMMEDIATE:
         case RANAL_BUF_PHYS_UNMAPPED:
         case RANAL_BUF_VIRT_UNMAPPED:
                 break;
-                
+
         case RANAL_BUF_PHYS_MAPPED:
                 LASSERT (tx->tx_conn != NULL);
                 dev = tx->tx_conn->rac_device;
@@ -438,11 +446,11 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
 
         /* If I get here, I've committed to send, so I complete the tx with
          * failure on any problems */
-        
+
         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
 
         read_lock(g_lock);
-        
+
         peer = kranal_find_peer_locked(nid);
         if (peer == NULL) {
                 read_unlock(g_lock);
@@ -456,7 +464,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
                 read_unlock(g_lock);
                 return;
         }
-        
+
         /* Making one or more connections; I'll need a write lock... */
         read_unlock(g_lock);
         write_lock_irqsave(g_lock, flags);
@@ -480,26 +488,26 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
 
         if (!peer->rap_connecting) {
                 LASSERT (list_empty(&peer->rap_tx_queue));
-                
+
                 now = CURRENT_SECONDS;
                 if (now < peer->rap_reconnect_time) {
                         write_unlock_irqrestore(g_lock, flags);
                         kranal_tx_done(tx, -EHOSTUNREACH);
                         return;
                 }
-        
+
                 peer->rap_connecting = 1;
                 kranal_peer_addref(peer); /* extra ref for connd */
-        
+
                 spin_lock(&kranal_data.kra_connd_lock);
-        
+
                 list_add_tail(&peer->rap_connd_list,
                               &kranal_data.kra_connd_peers);
                 wake_up(&kranal_data.kra_connd_waitq);
-        
+
                 spin_unlock(&kranal_data.kra_connd_lock);
         }
-        
+
         /* A connection is being established; queue the message... */
         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
 
@@ -507,7 +515,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
 }
 
 void
-kranal_rdma(kra_tx_t *tx, int type, 
+kranal_rdma(kra_tx_t *tx, int type,
             kra_rdma_desc_t *sink, int nob, __u64 cookie)
 {
         kra_conn_t   *conn = tx->tx_conn;
@@ -520,7 +528,7 @@ kranal_rdma(kra_tx_t *tx, int type,
 
         /* No actual race with scheduler sending CLOSE (I'm she!) */
         LASSERT (current == conn->rac_device->rad_scheduler);
-        
+
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
@@ -556,6 +564,7 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
         RAP_RETURN rrc;
 
         LASSERT (conn->rac_rxmsg != NULL);
+        CDEBUG(D_NET, "Consuming %p\n", conn);
 
         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
                              &nob_received, sizeof(kra_msg_t));
@@ -563,25 +572,26 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
 
         conn->rac_rxmsg = NULL;
 
-        if (nob_received != nob) {
-                CWARN("Expected %d immediate bytes but got %d\n",
-                      nob, nob_received);
+        if (nob_received < nob) {
+                CWARN("Incomplete immediate msg from "LPX64
+                      ": expected %d, got %d\n",
+                      conn->rac_peer->rap_nid, nob, nob_received);
                 return -EPROTO;
         }
-        
+
         return 0;
 }
 
 ptl_err_t
-kranal_do_send (lib_nal_t    *nal, 
+kranal_do_send (lib_nal_t    *nal,
                 void         *private,
                 lib_msg_t    *libmsg,
-                ptl_hdr_t    *hdr, 
-                int           type, 
-                ptl_nid_t     nid, 
+                ptl_hdr_t    *hdr,
+                int           type,
+                ptl_nid_t     nid,
                 ptl_pid_t     pid,
-                unsigned int  niov, 
-                struct iovec *iov, 
+                unsigned int  niov,
+                struct iovec *iov,
                 ptl_kiov_t   *kiov,
                 int           offset,
                 int           nob)
@@ -605,7 +615,7 @@ kranal_do_send (lib_nal_t    *nal,
         switch(type) {
         default:
                 LBUG();
-                
+
         case PTL_MSG_REPLY: {
                 /* reply's 'private' is the conn that received the GET_REQ */
                 conn = private;
@@ -619,7 +629,7 @@ kranal_do_send (lib_nal_t    *nal,
                         }
                         break;                  /* RDMA not expected */
                 }
-                
+
                 /* Incoming message consistent with immediate reply? */
                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
@@ -644,6 +654,9 @@ kranal_do_send (lib_nal_t    *nal,
                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
+
+                /* flag matched by consuming rx message */
+                kranal_consume_rxmsg(conn, NULL, 0);
                 return PTL_OK;
         }
 
@@ -697,10 +710,10 @@ kranal_do_send (lib_nal_t    *nal,
 
         case PTL_MSG_PUT:
                 if (kiov == NULL &&             /* not paged */
-                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= RANAL_FMA_MAX_DATA && /* small enough */
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
-                
+
                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
@@ -719,11 +732,11 @@ kranal_do_send (lib_nal_t    *nal,
         }
 
         LASSERT (kiov == NULL);
-        LASSERT (nob <= RANAL_MAX_IMMEDIATE);
+        LASSERT (nob <= RANAL_FMA_MAX_DATA);
 
         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
                                  type == PTL_MSG_REPLY ||
-                                 in_interrupt()), 
+                                 in_interrupt()),
                                RANAL_MSG_IMMEDIATE);
         if (tx == NULL)
                 return PTL_NO_SPACE;
@@ -733,7 +746,7 @@ kranal_do_send (lib_nal_t    *nal,
                 kranal_tx_done(tx, rc);
                 return PTL_FAIL;
         }
-                
+
         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
         tx->tx_libmsg[0] = libmsg;
         kranal_launch_tx(tx, nid);
@@ -753,9 +766,9 @@ kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
 }
 
 ptl_err_t
-kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
+kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                   unsigned int niov, ptl_kiov_t *kiov, 
+                   unsigned int niov, ptl_kiov_t *kiov,
                    size_t offset, size_t len)
 {
         return kranal_do_send(nal, private, cookie,
@@ -774,17 +787,26 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
         kra_tx_t    *tx;
         void        *buffer;
         int          rc;
-        
+
         LASSERT (mlen <= rlen);
         LASSERT (!in_interrupt());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
 
+        CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
+
+        if (libmsg == NULL) {
+                /* GET or ACK or portals is discarding */
+                LASSERT (mlen == 0);
+                lib_finalize(nal, NULL, libmsg, PTL_OK);
+                return PTL_OK;
+        }
+
         switch(rxmsg->ram_type) {
         default:
                 LBUG();
                 return PTL_FAIL;
-                
+
         case RANAL_MSG_IMMEDIATE:
                 if (mlen == 0) {
                         buffer = NULL;
@@ -809,21 +831,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
                 return PTL_OK;
 
-        case RANAL_MSG_GET_REQ:
-                /* If the GET matched, we've already handled it in
-                 * kranal_do_send which is called to send the REPLY.  We're
-                 * only called here to complete the GET receive (if we needed
-                 * it which we don't, but I digress...) */
-                LASSERT (libmsg == NULL);
-                lib_finalize(nal, NULL, libmsg, PTL_OK);
-                return PTL_OK;
-
         case RANAL_MSG_PUT_REQ:
-                if (libmsg == NULL) {           /* PUT didn't match... */
-                        lib_finalize(nal, NULL, libmsg, PTL_OK);
-                        return PTL_OK;
-                }
-                
                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
@@ -836,19 +844,19 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 
                 tx->tx_conn = conn;
                 kranal_map_buffer(tx);
-                
-                tx->tx_msg.ram_u.putack.rapam_src_cookie = 
+
+                tx->tx_msg.ram_u.putack.rapam_src_cookie =
                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
                         (__u64)((unsigned long)tx->tx_buffer);
                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
 
                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
 
                 kranal_post_fma(conn, tx);
-                
+
                 /* flag matched by consuming rx message */
                 kranal_consume_rxmsg(conn, NULL, 0);
                 return PTL_OK;
@@ -857,7 +865,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 
 ptl_err_t
 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
-             unsigned int niov, struct iovec *iov, 
+             unsigned int niov, struct iovec *iov,
              size_t offset, size_t mlen, size_t rlen)
 {
         return kranal_do_recv(nal, private, msg, niov, iov, NULL,
@@ -866,7 +874,7 @@ kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
 
 ptl_err_t
 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
-                   unsigned int niov, ptl_kiov_t *kiov, 
+                   unsigned int niov, ptl_kiov_t *kiov,
                    size_t offset, size_t mlen, size_t rlen)
 {
         return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
@@ -906,6 +914,8 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
         if (!conn->rac_close_sent &&
             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
+                CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
+                       conn, conn->rac_peer->rap_nid);
                 kranal_schedule_conn(conn);
         }
 
@@ -913,14 +923,16 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
-                CERROR("Nothing received from "LPX64" within %lu seconds\n",
+                CERROR("%s received from "LPX64" within %lu seconds\n",
+                       (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
+                       "Nothing" : "CLOSE not",
                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
                 return -ETIMEDOUT;
         }
 
         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 return 0;
-        
+
         /* Check the conn's queues are moving.  These are "belt+braces" checks,
          * in case of hardware/software errors that make this conn seem
          * responsive even though it isn't progressing its message queues. */
@@ -929,7 +941,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
 
         list_for_each (ttmp, &conn->rac_fmaq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
@@ -937,10 +949,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
                         return -ETIMEDOUT;
                 }
         }
-        
+
         list_for_each (ttmp, &conn->rac_rdmaq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
@@ -948,10 +960,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
                         return -ETIMEDOUT;
                 }
         }
-        
+
         list_for_each (ttmp, &conn->rac_replyq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
@@ -959,7 +971,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn)
                         return -ETIMEDOUT;
                 }
         }
-        
+
         spin_unlock_irqrestore(&conn->rac_lock, flags);
         return 0;
 }
@@ -1005,12 +1017,12 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp)
                 case RANAL_CONN_ESTABLISHED:
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                         break;
-                        
+
                 case RANAL_CONN_CLOSING:
                         kranal_terminate_conn_locked(conn);
                         break;
                 }
-                
+
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
                 kranal_conn_decref(conn);
@@ -1048,24 +1060,24 @@ kranal_connd (void *arg)
                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
                                          kra_acceptsock_t, ras_list);
                         list_del(&ras->ras_list);
-                        
+
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
-                        CDEBUG(D_WARNING,"About to handshake someone\n");
+                        CDEBUG(D_NET,"About to handshake someone\n");
 
                         kranal_conn_handshake(ras->ras_sock, NULL);
                         kranal_free_acceptsock(ras);
 
-                        CDEBUG(D_WARNING,"Finished handshaking someone\n");
+                        CDEBUG(D_NET,"Finished handshaking someone\n");
 
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
                         did_something = 1;
                 }
-                        
+
                 if (!list_empty(&kranal_data.kra_connd_peers)) {
                         peer = list_entry(kranal_data.kra_connd_peers.next,
                                           kra_peer_t, rap_connd_list);
-                        
+
                         list_del_init(&peer->rap_connd_list);
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
@@ -1081,11 +1093,11 @@ kranal_connd (void *arg)
 
                 set_current_state(TASK_INTERRUPTIBLE);
                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
-                
+
                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
                 schedule ();
-                
+
                 set_current_state(TASK_RUNNING);
                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
 
@@ -1099,14 +1111,14 @@ kranal_connd (void *arg)
 }
 
 void
-kranal_update_reaper_timeout(long timeout) 
+kranal_update_reaper_timeout(long timeout)
 {
         unsigned long   flags;
 
         LASSERT (timeout > 0);
-        
+
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-        
+
         if (timeout < kranal_data.kra_new_min_timeout)
                 kranal_data.kra_new_min_timeout = timeout;
 
@@ -1126,7 +1138,7 @@ kranal_reaper (void *arg)
         unsigned long      next_check_time = jiffies;
         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
         long               current_min_timeout = 1;
-        
+
         kportal_daemonize("kranal_reaper");
         kportal_blockallsigs();
 
@@ -1135,95 +1147,91 @@ kranal_reaper (void *arg)
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
 
         while (!kranal_data.kra_shutdown) {
+                /* I wake up every 'p' seconds to check for timeouts on some
+                 * more peers.  I try to check every connection 'n' times
+                 * within the global minimum of all keepalive and timeout
+                 * intervals, to ensure I attend to every connection within
+                 * (n+1)/n times its timeout intervals. */
+                const int     p = 1;
+                const int     n = 3;
+                unsigned long min_timeout;
+                int           chunk;
 
                 /* careful with the jiffy wrap... */
                 timeout = (long)(next_check_time - jiffies);
-                if (timeout <= 0) {
-                
-                        /* I wake up every 'p' seconds to check for
-                         * timeouts on some more peers.  I try to check
-                         * every connection 'n' times within the global
-                         * minimum of all keepalive and timeout intervals,
-                         * to ensure I attend to every connection within
-                         * (n+1)/n times its timeout intervals. */
-                
-                        const int     p = 1;
-                        const int     n = 3;
-                        unsigned long min_timeout;
-                        int           chunk;
-
-                        if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
-                                /* new min timeout set: restart min timeout scan */
-                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                                base_index = conn_index - 1;
-                                if (base_index < 0)
-                                        base_index = conn_entries - 1;
-
-                                if (kranal_data.kra_new_min_timeout < current_min_timeout) {
-                                        current_min_timeout = kranal_data.kra_new_min_timeout;
-                                        CWARN("Set new min timeout %ld\n",
-                                              current_min_timeout);
-                                }
-
-                                kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                        }
-                        min_timeout = current_min_timeout;
-
-                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
-                                               flags);
-
-                        LASSERT (min_timeout > 0);
-
-                        /* Compute how many table entries to check now so I
-                         * get round the whole table fast enough (NB I do
-                         * this at fixed intervals of 'p' seconds) */
-                        chunk = conn_entries;
-                        if (min_timeout > n * p)
-                                chunk = (chunk * n * p) / min_timeout;
-                        if (chunk == 0)
-                                chunk = 1;
-
-                        for (i = 0; i < chunk; i++) {
-                                kranal_reaper_check(conn_index, 
-                                                    &next_min_timeout);
-                                conn_index = (conn_index + 1) % conn_entries;
-                        }
+                if (timeout > 0) {
+                        set_current_state(TASK_INTERRUPTIBLE);
+                        add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+
+                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                        next_check_time += p * HZ;
+                        schedule_timeout(timeout);
 
                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
 
-                        if (((conn_index - chunk <= base_index &&
-                              base_index < conn_index) ||
-                             (conn_index - conn_entries - chunk <= base_index &&
-                              base_index < conn_index - conn_entries))) {
-
-                                /* Scanned all conns: set current_min_timeout... */
-                                if (current_min_timeout != next_min_timeout) {
-                                        current_min_timeout = next_min_timeout;                                        
-                                        CWARN("Set new min timeout %ld\n",
-                                              current_min_timeout);
-                                }
-
-                                /* ...and restart min timeout scan */
-                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                                base_index = conn_index - 1;
-                                if (base_index < 0)
-                                        base_index = conn_entries - 1;
-                        }
+                        set_current_state(TASK_RUNNING);
+                        remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                        continue;
                 }
 
-                set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
+                        /* new min timeout set: restart min timeout scan */
+                        next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        base_index = conn_index - 1;
+                        if (base_index < 0)
+                                base_index = conn_entries - 1;
+
+                        if (kranal_data.kra_new_min_timeout < current_min_timeout) {
+                                current_min_timeout = kranal_data.kra_new_min_timeout;
+                                CDEBUG(D_NET, "Set new min timeout %ld\n",
+                                       current_min_timeout);
+                        }
+
+                        kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                }
+                min_timeout = current_min_timeout;
 
                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                schedule_timeout(timeout);
+                LASSERT (min_timeout > 0);
+
+                /* Compute how many table entries to check now so I get round
+                 * the whole table fast enough given that I do this at fixed
+                 * intervals of 'p' seconds) */
+                chunk = conn_entries;
+                if (min_timeout > n * p)
+                        chunk = (chunk * n * p) / min_timeout;
+                if (chunk == 0)
+                        chunk = 1;
+
+                for (i = 0; i < chunk; i++) {
+                        kranal_reaper_check(conn_index,
+                                            &next_min_timeout);
+                        conn_index = (conn_index + 1) % conn_entries;
+                }
+
+                next_check_time += p * HZ;
 
                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
 
-                set_current_state(TASK_RUNNING);
-                remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                if (((conn_index - chunk <= base_index &&
+                      base_index < conn_index) ||
+                     (conn_index - conn_entries - chunk <= base_index &&
+                      base_index < conn_index - conn_entries))) {
+
+                        /* Scanned all conns: set current_min_timeout... */
+                        if (current_min_timeout != next_min_timeout) {
+                                current_min_timeout = next_min_timeout;
+                                CDEBUG(D_NET, "Set new min timeout %ld\n",
+                                       current_min_timeout);
+                        }
+
+                        /* ...and restart min timeout scan */
+                        next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        base_index = conn_index - 1;
+                        if (base_index < 0)
+                                base_index = conn_entries - 1;
+                }
         }
 
         kranal_thread_fini();
@@ -1243,8 +1251,10 @@ kranal_check_rdma_cq (kra_device_t *dev)
 
         for (;;) {
                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
-                if (rrc == RAP_NOT_DONE)
+                if (rrc == RAP_NOT_DONE) {
+                        CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
                         return;
+                }
 
                 LASSERT (rrc == RAP_SUCCESS);
                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
@@ -1254,7 +1264,7 @@ kranal_check_rdma_cq (kra_device_t *dev)
                 conn = kranal_cqid2conn_locked(cqid);
                 if (conn == NULL) {
                         /* Conn was destroyed? */
-                        CWARN("RDMA CQID lookup %d failed\n", cqid);
+                        CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
                         read_unlock(&kranal_data.kra_global_lock);
                         continue;
                 }
@@ -1262,6 +1272,9 @@ kranal_check_rdma_cq (kra_device_t *dev)
                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
                 LASSERT (rrc == RAP_SUCCESS);
 
+                CDEBUG(D_NET, "Completed %p\n",
+                       list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
+
                 spin_lock_irqsave(&conn->rac_lock, flags);
 
                 LASSERT (!list_empty(&conn->rac_rdmaq));
@@ -1274,7 +1287,7 @@ kranal_check_rdma_cq (kra_device_t *dev)
 
                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
                 tx->tx_qtime = jiffies;
-        
+
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
 
                 /* Get conn's fmaq processed, now I've just put something
@@ -1298,20 +1311,26 @@ kranal_check_fma_cq (kra_device_t *dev)
 
         for (;;) {
                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
-                if (rrc != RAP_NOT_DONE)
+                if (rrc == RAP_NOT_DONE) {
+                        CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
                         return;
-                
+                }
+
                 LASSERT (rrc == RAP_SUCCESS);
 
                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
 
                         read_lock(&kranal_data.kra_global_lock);
-                
+
                         conn = kranal_cqid2conn_locked(cqid);
-                        if (conn == NULL)
-                                CWARN("FMA CQID lookup %d failed\n", cqid);
-                        else
+                        if (conn == NULL) {
+                                CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
+                                       cqid);
+                        } else {
+                                CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
+                                       conn, cqid);
                                 kranal_schedule_conn(conn);
+                        }
 
                         read_unlock(&kranal_data.kra_global_lock);
                         continue;
@@ -1321,15 +1340,15 @@ kranal_check_fma_cq (kra_device_t *dev)
                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
 
                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
-                        
+
                         read_lock(&kranal_data.kra_global_lock);
-                
+
                         conns = &kranal_data.kra_conns[i];
 
                         list_for_each (tmp, conns) {
-                                conn = list_entry(tmp, kra_conn_t, 
+                                conn = list_entry(tmp, kra_conn_t,
                                                   rac_hashlist);
-                
+
                                 if (conn->rac_device == dev)
                                         kranal_schedule_conn(conn);
                         }
@@ -1346,7 +1365,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
 {
         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
         RAP_RETURN rrc;
-        
+
+        CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
+               conn, msg, msg->ram_type, sync ? "(sync)" : "",
+               immediate, immediatenob);
+
         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
                  immediatenob <= RANAL_FMA_MAX_DATA :
@@ -1356,11 +1379,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
-                rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
+                rrc = RapkFmaSyncSend(conn->rac_rihandle,
                                       immediate, immediatenob,
                                       msg, sizeof(*msg));
         else
-                rrc = RapkFmaSend(conn->rac_device->rad_handle,
+                rrc = RapkFmaSend(conn->rac_rihandle,
                                   immediate, immediatenob,
                                   msg, sizeof(*msg));
 
@@ -1372,14 +1395,14 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                 conn->rac_last_tx = jiffies;
                 conn->rac_tx_seq++;
                 return 0;
-                
+
         case RAP_NOT_DONE:
                 return -EAGAIN;
         }
 }
 
 void
-kranal_process_fmaq (kra_conn_t *conn) 
+kranal_process_fmaq (kra_conn_t *conn)
 {
         unsigned long flags;
         int           more_to_do;
@@ -1390,28 +1413,33 @@ kranal_process_fmaq (kra_conn_t *conn)
         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
          *       However I will be rescheduled some by an FMA completion event
          *       when I eventually get some.
-         * NB 2. Sampling rac_state here, races with setting it elsewhere
-         *       kranal_close_conn_locked.  But it doesn't matter if I try to
-         *       send a "real" message just as I start closing because I'll get
-         *       scheduled to send the close anyway. */
+         * NB 2. Sampling rac_state here races with setting it elsewhere.
+         *       But it doesn't matter if I try to send a "real" message just
+         *       as I start closing because I'll get scheduled to send the
+         *       close anyway. */
+
+        /* Not racing with incoming message processing! */
+        LASSERT (current == conn->rac_device->rad_scheduler);
 
         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
                 if (!list_empty(&conn->rac_rdmaq)) {
                         /* RDMAs in progress */
                         LASSERT (!conn->rac_close_sent);
-                        
-                        if (time_after_eq(jiffies, 
-                                          conn->rac_last_tx + 
-                                          conn->rac_keepalive)) {
+
+                        if (time_after_eq(jiffies,
+                                          conn->rac_last_tx +
+                                          conn->rac_keepalive * HZ)) {
+                                CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                         }
                         return;
                 }
-                
+
                 if (conn->rac_close_sent)
                         return;
 
+                CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 if (rc != 0)
@@ -1420,7 +1448,7 @@ kranal_process_fmaq (kra_conn_t *conn)
                 conn->rac_close_sent = 1;
                 if (!conn->rac_close_recvd)
                         return;
-                        
+
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (conn->rac_state == RANAL_CONN_CLOSING)
@@ -1436,14 +1464,15 @@ kranal_process_fmaq (kra_conn_t *conn)
 
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
 
-                if (time_after_eq(jiffies, 
-                                  conn->rac_last_tx + conn->rac_keepalive)) {
+                if (time_after_eq(jiffies,
+                                  conn->rac_last_tx + conn->rac_keepalive * HZ)) {
+                        CDEBUG(D_NET, "sending NOOP (idle)\n");
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
                 return;
         }
-        
+
         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
         list_del(&tx->tx_list);
         more_to_do = !list_empty(&conn->rac_fmaq);
@@ -1451,20 +1480,26 @@ kranal_process_fmaq (kra_conn_t *conn)
         spin_unlock_irqrestore(&conn->rac_lock, flags);
 
         expect_reply = 0;
+        CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
+               tx, tx->tx_msg.ram_type, tx->tx_cookie);
         switch (tx->tx_msg.ram_type) {
         default:
                 LBUG();
-                
+
         case RANAL_MSG_IMMEDIATE:
+                rc = kranal_sendmsg(conn, &tx->tx_msg,
+                                    tx->tx_buffer, tx->tx_nob);
+                expect_reply = 0;
+                break;
+
         case RANAL_MSG_PUT_NAK:
         case RANAL_MSG_PUT_DONE:
         case RANAL_MSG_GET_NAK:
         case RANAL_MSG_GET_DONE:
-                rc = kranal_sendmsg(conn, &tx->tx_msg,
-                                    tx->tx_buffer, tx->tx_nob);
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
                 expect_reply = 0;
                 break;
-                
+
         case RANAL_MSG_PUT_REQ:
                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
@@ -1481,7 +1516,7 @@ kranal_process_fmaq (kra_conn_t *conn)
                 kranal_map_buffer(tx);
                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
+                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
                         (__u64)((unsigned long)tx->tx_buffer);
                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
@@ -1492,6 +1527,7 @@ kranal_process_fmaq (kra_conn_t *conn)
         if (rc == -EAGAIN) {
                 /* I need credits to send this.  Replace tx at the head of the
                  * fmaq and I'll get rescheduled when credits appear */
+                CDEBUG(D_NET, "EAGAIN on %p\n", conn);
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add(&tx->tx_list, &conn->rac_fmaq);
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
@@ -1499,18 +1535,22 @@ kranal_process_fmaq (kra_conn_t *conn)
         }
 
         LASSERT (rc == 0);
-        
+
         if (!expect_reply) {
                 kranal_tx_done(tx, 0);
         } else {
+                /* LASSERT(current) above ensures this doesn't race with reply
+                 * processing */
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
                 tx->tx_qtime = jiffies;
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
         }
 
-        if (more_to_do)
+        if (more_to_do) {
+                CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
                 kranal_schedule_conn(conn);
+        }
 }
 
 static inline void
@@ -1529,23 +1569,36 @@ kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
 {
         struct list_head *ttmp;
         kra_tx_t         *tx;
-        
+        unsigned long     flags;
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
         list_for_each(ttmp, &conn->rac_replyq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
+                CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
+                       tx, tx->tx_msg.ram_type, tx->tx_cookie);
+
                 if (tx->tx_cookie != cookie)
                         continue;
-                
+
                 if (tx->tx_msg.ram_type != type) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
                         CWARN("Unexpected type %x (%x expected) "
                               "matched reply from "LPX64"\n",
                               tx->tx_msg.ram_type, type,
                               conn->rac_peer->rap_nid);
                         return NULL;
                 }
+
+                list_del(&tx->tx_list);
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+                return tx;
         }
-        
-        CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
+
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+        CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
+              type, cookie, conn->rac_peer->rap_nid);
         return NULL;
 }
 
@@ -1562,12 +1615,19 @@ kranal_check_fma_rx (kra_conn_t *conn)
 
         if (rrc == RAP_NOT_DONE)
                 return;
-        
+
+        CDEBUG(D_NET, "RX on %p\n", conn);
+
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
         seq = conn->rac_rx_seq++;
         msg = (kra_msg_t *)prefix;
 
+        /* stash message for portals callbacks they'll NULL
+         * rac_rxmsg if they consume it */
+        LASSERT (conn->rac_rxmsg == NULL);
+        conn->rac_rxmsg = msg;
+
         if (msg->ram_magic != RANAL_MSG_MAGIC) {
                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
                         CERROR("Unexpected magic %08x from "LPX64"\n",
@@ -1591,7 +1651,7 @@ kranal_check_fma_rx (kra_conn_t *conn)
                 case RANAL_MSG_GET_REQ:
                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
                         break;
-                        
+
                 default:
                         break;
                 }
@@ -1608,7 +1668,7 @@ kranal_check_fma_rx (kra_conn_t *conn)
                        msg->ram_srcnid, peer->rap_nid);
                 goto out;
         }
-        
+
         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
                 CERROR("Unexpected connstamp "LPX64"("LPX64
                        " expected) from "LPX64"\n",
@@ -1616,7 +1676,7 @@ kranal_check_fma_rx (kra_conn_t *conn)
                        peer->rap_nid);
                 goto out;
         }
-        
+
         if (msg->ram_seq != seq) {
                 CERROR("Unexpected sequence number %d(%d expected) from "
                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
@@ -1630,18 +1690,20 @@ kranal_check_fma_rx (kra_conn_t *conn)
         }
 
         if (conn->rac_close_recvd) {
-                CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
+                CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
                        msg->ram_type, conn->rac_peer->rap_nid);
                 goto out;
         }
 
         if (msg->ram_type == RANAL_MSG_CLOSE) {
+                CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
                 conn->rac_close_recvd = 1;
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
                         kranal_close_conn_locked(conn, 0);
-                else if (conn->rac_close_sent)
+                else if (conn->rac_state == RANAL_CONN_CLOSING &&
+                         conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
 
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
@@ -1650,19 +1712,20 @@ kranal_check_fma_rx (kra_conn_t *conn)
 
         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 goto out;
-        
-        conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
-                                                /* they'll NULL rac_rxmsg if they consume it */
+
         switch (msg->ram_type) {
         case RANAL_MSG_NOOP:
                 /* Nothing to do; just a keepalive */
+                CDEBUG(D_NET, "RX NOOP on %p\n", conn);
                 break;
-                
+
         case RANAL_MSG_IMMEDIATE:
+                CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
                 break;
-                
+
         case RANAL_MSG_PUT_REQ:
+                CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
 
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
@@ -1671,36 +1734,39 @@ kranal_check_fma_rx (kra_conn_t *conn)
                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
                 if (tx == NULL)
                         break;
-                
-                tx->tx_msg.ram_u.completion.racm_cookie = 
+
+                tx->tx_msg.ram_u.completion.racm_cookie =
                         msg->ram_u.putreq.raprm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
 
         case RANAL_MSG_PUT_NAK:
+                CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, -ENOENT);    /* no match */
                 break;
-                
+
         case RANAL_MSG_PUT_ACK:
+                CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
                                         msg->ram_u.putack.rapam_src_cookie);
                 if (tx == NULL)
                         break;
 
                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
-                            &msg->ram_u.putack.rapam_desc, 
+                            &msg->ram_u.putack.rapam_desc,
                             msg->ram_u.putack.rapam_desc.rard_nob,
                             msg->ram_u.putack.rapam_dst_cookie);
                 break;
 
         case RANAL_MSG_PUT_DONE:
+                CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
@@ -1712,8 +1778,9 @@ kranal_check_fma_rx (kra_conn_t *conn)
                 break;
 
         case RANAL_MSG_GET_REQ:
+                CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
-                
+
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
                         break;
 
@@ -1724,24 +1791,26 @@ kranal_check_fma_rx (kra_conn_t *conn)
                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
-                
+
         case RANAL_MSG_GET_NAK:
+                CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, -ENOENT);    /* no match */
                 break;
-                
+
         case RANAL_MSG_GET_DONE:
+                CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, 0);
@@ -1757,24 +1826,26 @@ kranal_check_fma_rx (kra_conn_t *conn)
 }
 
 void
-kranal_complete_closed_conn (kra_conn_t *conn) 
+kranal_complete_closed_conn (kra_conn_t *conn)
 {
         kra_tx_t   *tx;
 
         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
+        LASSERT (list_empty(&conn->rac_list));
+        LASSERT (list_empty(&conn->rac_hashlist));
 
         while (!list_empty(&conn->rac_fmaq)) {
                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-                
+
                 list_del(&tx->tx_list);
                 kranal_tx_done(tx, -ECONNABORTED);
         }
-        
+
         LASSERT (list_empty(&conn->rac_rdmaq));
 
         while (!list_empty(&conn->rac_replyq)) {
                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-                
+
                 list_del(&tx->tx_list);
                 kranal_tx_done(tx, -ECONNABORTED);
         }
@@ -1801,7 +1872,7 @@ kranal_scheduler (void *arg)
 
         while (!kranal_data.kra_shutdown) {
                 /* Safe: kra_shutdown only set when quiescent */
-                
+
                 if (busy_loops++ >= RANAL_RESCHED) {
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
@@ -1821,7 +1892,7 @@ kranal_scheduler (void *arg)
 
                         spin_lock_irqsave(&dev->rad_lock, flags);
                 }
-                
+
                 if (!list_empty(&dev->rad_connq)) {
                         /* Connection needs attention */
                         conn = list_entry(dev->rad_connq.next,
@@ -1838,11 +1909,15 @@ kranal_scheduler (void *arg)
                                 kranal_complete_closed_conn(conn);
 
                         kranal_conn_decref(conn);
-                        
+
                         spin_lock_irqsave(&dev->rad_lock, flags);
                         continue;
                 }
 
+                /* recheck device callback fired before sleeping */
+                if (dev->rad_ready)
+                        continue;
+
                 add_wait_queue(&dev->rad_waitq, &wait);
                 set_current_state(TASK_INTERRUPTIBLE);