Whamcloud - gitweb
Land b_release_1_4_3 onto HEAD (20050619_0305)
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
index 02c3363..f984e6f 100644 (file)
  */
 #include "ranal.h"
 
+static int        kranal_devids[] = {RAPK_MAIN_DEVICE_ID,
+                                     RAPK_EXPANSION_DEVICE_ID};
 
 nal_t                   kranal_api;
 ptl_handle_ni_t         kranal_ni;
 kra_data_t              kranal_data;
 kra_tunables_t          kranal_tunables;
 
-#ifdef CONFIG_SYSCTL
 #define RANAL_SYSCTL_TIMEOUT           1
 #define RANAL_SYSCTL_LISTENER_TIMEOUT  2
 #define RANAL_SYSCTL_BACKLOG           3
@@ -38,10 +39,10 @@ kra_tunables_t          kranal_tunables;
 #define RANAL_SYSCTL                   202
 
 static ctl_table kranal_ctl_table[] = {
-        {RANAL_SYSCTL_TIMEOUT, "timeout", 
+        {RANAL_SYSCTL_TIMEOUT, "timeout",
          &kranal_tunables.kra_timeout, sizeof(int),
          0644, NULL, &proc_dointvec},
-        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
+        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout",
          &kranal_tunables.kra_listener_timeout, sizeof(int),
          0644, NULL, &proc_dointvec},
         {RANAL_SYSCTL_BACKLOG, "backlog",
@@ -50,7 +51,7 @@ static ctl_table kranal_ctl_table[] = {
         {RANAL_SYSCTL_PORT, "port",
          &kranal_tunables.kra_port, sizeof(int),
          0644, NULL, kranal_listener_procint},
-        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
+        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate",
          &kranal_tunables.kra_max_immediate, sizeof(int),
          0644, NULL, &proc_dointvec},
         { 0 }
@@ -60,7 +61,6 @@ static ctl_table kranal_top_ctl_table[] = {
         {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
         { 0 }
 };
-#endif
 
 int
 kranal_sock_write (struct socket *sock, void *buffer, int nob)
@@ -89,6 +89,12 @@ kranal_sock_write (struct socket *sock, void *buffer, int nob)
         rc = sock_sendmsg(sock, &msg, iov.iov_len);
         set_fs(oldmm);
 
+        if (rc == nob)
+                return 0;
+
+        if (rc >= 0)
+                return -EAGAIN;
+
         return rc;
 }
 
@@ -208,7 +214,7 @@ kranal_pause(int ticks)
 }
 
 void
-kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
+kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid)
 {
         RAP_RETURN   rrc;
 
@@ -217,7 +223,8 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
         connreq->racr_magic     = RANAL_MSG_MAGIC;
         connreq->racr_version   = RANAL_MSG_VERSION;
         connreq->racr_devid     = conn->rac_device->rad_id;
-        connreq->racr_nid       = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_srcnid    = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_dstnid    = dstnid;
         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
         connreq->racr_connstamp = conn->rac_my_connstamp;
         connreq->racr_timeout   = conn->rac_timeout;
@@ -246,13 +253,14 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 __swab32s(&connreq->racr_magic);
                 __swab16s(&connreq->racr_version);
                 __swab16s(&connreq->racr_devid);
-                __swab64s(&connreq->racr_nid);
+                __swab64s(&connreq->racr_srcnid);
+                __swab64s(&connreq->racr_dstnid);
                 __swab64s(&connreq->racr_peerstamp);
                 __swab64s(&connreq->racr_connstamp);
                 __swab32s(&connreq->racr_timeout);
 
+                __swab32s(&connreq->racr_riparams.HostId);
                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
-                __swab32s(&connreq->racr_riparams.RcvCqHndl);
                 __swab32s(&connreq->racr_riparams.PTag);
                 __swab32s(&connreq->racr_riparams.CompletionCookie);
         }
@@ -262,7 +270,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 return -EPROTO;
         }
 
-        if (connreq->racr_nid == PTL_NID_ANY) {
+        if (connreq->racr_srcnid == PTL_NID_ANY ||
+            connreq->racr_dstnid == PTL_NID_ANY) {
                 CERROR("Received PTL_NID_ANY\n");
                 return -EPROTO;
         }
@@ -272,7 +281,7 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
                 return -EPROTO;
         }
-        
+
         return 0;
 }
 
@@ -305,16 +314,16 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
 
                 if (conn->rac_device != newconn->rac_device)
                         continue;
-                
+
                 if (loopback &&
                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
                         continue;
-                    
+
                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
 
                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64
-                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
+                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid,
                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
 
                 count++;
@@ -332,7 +341,7 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
         int               loopback;
 
         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
-        
+
         list_for_each(tmp, &peer->rap_conns) {
                 conn = list_entry(tmp, kra_conn_t, rac_list);
 
@@ -358,12 +367,12 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
                 /* 'newconn' is an earlier connection from 'peer'!!! */
                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
                         return 2;
-                
+
                 /* 'conn' is an earlier connection from 'peer': it will be
                  * removed when we cull stale conns later on... */
                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
                         continue;
-                
+
                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
                  * playing the game... */
                 return 3;
@@ -384,7 +393,6 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
-        
 
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
@@ -413,12 +421,11 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
 
         kranal_set_conn_uniqueness(conn);
 
+        conn->rac_device = dev;
         conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
         kranal_update_reaper_timeout(conn->rac_timeout);
 
         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
-                           dev->rad_ptag,
-                           dev->rad_rdma_cq, dev->rad_fma_cq,
                            &conn->rac_rihandle);
         if (rrc != RAP_SUCCESS) {
                 CERROR("RapkCreateRi failed: %d\n", rrc);
@@ -432,7 +439,7 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
 }
 
 void
-kranal_destroy_conn(kra_conn_t *conn) 
+kranal_destroy_conn(kra_conn_t *conn)
 {
         RAP_RETURN         rrc;
 
@@ -496,9 +503,11 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
                 /* Non-persistent peer with no more conns... */
                 kranal_unlink_peer_locked(peer);
         }
-                        
+
         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
-         * full timeout */
+         * full timeout.  If we get a CLOSE we know the peer has stopped all
+         * RDMA.  Otherwise if we wait for the full timeout we can also be sure
+         * all RDMA has stopped. */
         conn->rac_last_rx = jiffies;
         mb();
 
@@ -512,29 +521,53 @@ void
 kranal_close_conn (kra_conn_t *conn, int error)
 {
         unsigned long    flags;
-        
+
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-        
+
         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
                 kranal_close_conn_locked(conn, error);
-        
+
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
 int
-kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
                        __u32 peer_ip, int peer_port)
 {
-        RAP_RETURN    rrc;
-        
+        kra_device_t  *dev = conn->rac_device;
+        unsigned long  flags;
+        RAP_RETURN     rrc;
+
+        /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
+         * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
+        conn->rac_last_tx = jiffies;
+        conn->rac_keepalive = 0;
+
         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         if (rrc != RAP_SUCCESS) {
-                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rrc);
-                return -EPROTO;
+                return -ECONNABORTED;
         }
-        
+
+        /* Schedule conn on rad_new_conns */
+        kranal_conn_addref(conn);
+        spin_lock_irqsave(&dev->rad_lock, flags);
+        list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
+        wake_up(&dev->rad_waitq);
+        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+        rrc = RapkWaitToConnect(conn->rac_rihandle);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
+                       HIPQUAD(peer_ip), peer_port, rrc);
+                return -ECONNABORTED;
+        }
+
+        /* Scheduler doesn't touch conn apart from to deschedule and decref it
+         * after RapkCompleteSync() return success, so conn is all mine */
+
         conn->rac_peerstamp = connreq->racr_peerstamp;
         conn->rac_peer_connstamp = connreq->racr_connstamp;
         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
@@ -543,14 +576,14 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
 }
 
 int
-kranal_passive_conn_handshake (struct socket *sock, 
-                               ptl_nid_t *peer_nidp, kra_conn_t **connp)
+kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
+                               ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         struct sockaddr_in   addr;
         __u32                peer_ip;
         unsigned int         peer_port;
-        kra_connreq_t        connreq;
-        ptl_nid_t            peer_nid;
+        kra_connreq_t        rx_connreq;
+        kra_connreq_t        tx_connreq;
         kra_conn_t          *conn;
         kra_device_t        *dev;
         int                  rc;
@@ -573,25 +606,22 @@ kranal_passive_conn_handshake (struct socket *sock,
                 return -ECONNREFUSED;
         }
 
-        rc = kranal_recv_connreq(sock, &connreq, 
+        rc = kranal_recv_connreq(sock, &rx_connreq,
                                  kranal_tunables.kra_listener_timeout);
         if (rc != 0) {
-                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rc);
                 return rc;
         }
 
-        peer_nid = connreq.racr_nid;
-        LASSERT (peer_nid != PTL_NID_ANY);
-
         for (i = 0;;i++) {
                 if (i == kranal_data.kra_ndevs) {
                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
-                               connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
+                               rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
                         return -ENODEV;
                 }
                 dev = &kranal_data.kra_devices[i];
-                if (dev->rad_id == connreq.racr_devid)
+                if (dev->rad_id == rx_connreq.racr_devid)
                         break;
         }
 
@@ -599,24 +629,25 @@ kranal_passive_conn_handshake (struct socket *sock,
         if (rc != 0)
                 return rc;
 
-        rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
+        kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
+
+        rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq));
         if (rc != 0) {
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
+                       HIPQUAD(peer_ip), peer_port, rc);
                 kranal_conn_decref(conn);
                 return rc;
         }
 
-        kranal_pack_connreq(&connreq, conn);
-
-        rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+        rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
-                       HIPQUAD(peer_ip), peer_port, rc);
                 kranal_conn_decref(conn);
                 return rc;
         }
 
         *connp = conn;
-        *peer_nidp = peer_nid;
+        *src_nidp = rx_connreq.racr_srcnid;
+        *dst_nidp = rx_connreq.racr_dstnid;
         return 0;
 }
 
@@ -631,8 +662,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 
         for (port = 1023; port >= 512; port--) {
 
-                memset(&locaddr, 0, sizeof(locaddr)); 
-                locaddr.sin_family      = AF_INET; 
+                memset(&locaddr, 0, sizeof(locaddr));
+                locaddr.sin_family      = AF_INET;
                 locaddr.sin_port        = htons(port);
                 locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 
@@ -649,7 +680,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                                      (struct sockaddr *)&locaddr, sizeof(locaddr));
                 if (rc != 0) {
                         sock_release(sock);
-                        
+
                         if (rc == -EADDRINUSE) {
                                 CDEBUG(D_NET, "Port %d already in use\n", port);
                                 continue;
@@ -666,7 +697,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                         *sockp = sock;
                         return 0;
                 }
-                
+
                 sock_release(sock);
 
                 if (rc != -EADDRNOTAVAIL) {
@@ -674,8 +705,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                                port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                         return rc;
                 }
-                
-                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
+
+                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n",
                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
         }
 
@@ -685,7 +716,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 
 
 int
-kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+kranal_active_conn_handshake(kra_peer_t *peer,
+                             ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         kra_connreq_t       connreq;
         kra_conn_t         *conn;
@@ -703,8 +735,8 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
         if (rc != 0)
                 return rc;
 
-        kranal_pack_connreq(&connreq, conn);
-        
+        kranal_pack_connreq(&connreq, conn, peer->rap_nid);
+
         rc = ranal_connect_sock(peer, &sock);
         if (rc != 0)
                 goto failed_0;
@@ -715,14 +747,14 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 
         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                 goto failed_1;
         }
 
         rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
         if (rc != 0) {
-                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
                 goto failed_1;
         }
@@ -730,28 +762,29 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
         sock_release(sock);
         rc = -EPROTO;
 
-        if (connreq.racr_nid != peer->rap_nid) {
-                CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+        if (connreq.racr_srcnid != peer->rap_nid) {
+                CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
                        "received "LPX64" expected "LPX64"\n",
-                       HIPQUAD(peer->rap_ip), peer->rap_port, 
-                       connreq.racr_nid, peer->rap_nid);
+                       HIPQUAD(peer->rap_ip), peer->rap_port,
+                       connreq.racr_srcnid, peer->rap_nid);
                 goto failed_0;
         }
 
         if (connreq.racr_devid != dev->rad_id) {
                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
                        "received %d expected %d\n",
-                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       HIPQUAD(peer->rap_ip), peer->rap_port,
                        connreq.racr_devid, dev->rad_id);
                 goto failed_0;
         }
 
-        rc = kranal_set_conn_params(conn, &connreq, 
+        rc = kranal_set_conn_params(conn, &connreq,
                                     peer->rap_ip, peer->rap_port);
         if (rc != 0)
                 goto failed_0;
 
         *connp = conn;
+        *dst_nidp = connreq.racr_dstnid;
         return 0;
 
  failed_1:
@@ -767,17 +800,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
         kra_peer_t        *peer2;
         kra_tx_t          *tx;
         ptl_nid_t          peer_nid;
+        ptl_nid_t          dst_nid;
         unsigned long      flags;
         kra_conn_t        *conn;
         int                rc;
         int                nstale;
+        int                new_peer = 0;
 
         if (sock == NULL) {
                 /* active: connd wants to connect to 'peer' */
                 LASSERT (peer != NULL);
                 LASSERT (peer->rap_connecting);
-                
-                rc = kranal_active_conn_handshake(peer, &conn);
+
+                rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
                 if (rc != 0)
                         return rc;
 
@@ -785,19 +820,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 
                 if (!kranal_peer_active(peer)) {
                         /* raced with peer getting unlinked */
-                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock,
                                                 flags);
                         kranal_conn_decref(conn);
-                        return ESTALE;
+                        return -ESTALE;
                 }
 
                 peer_nid = peer->rap_nid;
-
         } else {
                 /* passive: listener accepted 'sock' */
                 LASSERT (peer == NULL);
 
-                rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+                rc = kranal_passive_conn_handshake(sock, &peer_nid,
+                                                   &dst_nid, &conn);
                 if (rc != 0)
                         return rc;
 
@@ -813,26 +848,32 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 
                 peer2 = kranal_find_peer_locked(peer_nid);
                 if (peer2 == NULL) {
-                        /* peer table takes my initial ref on peer */
-                        list_add_tail(&peer->rap_list,
-                                      kranal_nid2peerlist(peer_nid));
+                        new_peer = 1;
                 } else {
                         /* peer_nid already in the peer table */
                         kranal_peer_decref(peer);
                         peer = peer2;
                 }
-                /* NB I may now have a non-persistent peer in the peer
-                 * table with no connections: I can't drop the global lock
-                 * until I've given it a connection or removed it, and when
-                 * I do 'peer' can disappear under me. */
         }
 
-        LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
+        LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
+
+        /* Refuse connection if peer thinks we are a different NID.  We check
+         * this while holding the global lock, to synch with connection
+         * destruction on NID change. */
+        if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
-        /* Refuse to duplicate an existing connection (both sides might try
-         * to connect at once).  NB we return success!  We _do_ have a
-         * connection (so we don't need to remove the peer from the peer
-         * table) and we _don't_ have any blocked txs to complete */
+                CERROR("Stale/bad connection with "LPX64
+                       ": dst_nid "LPX64", expected "LPX64"\n",
+                       peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
+                rc = -ESTALE;
+                goto failed;
+        }
+
+        /* Refuse to duplicate an existing connection (both sides might try to
+         * connect at once).  NB we return success!  We _are_ connected so we
+         * _don't_ have any blocked txs to complete with failure. */
         rc = kranal_conn_isdup_locked(peer, conn);
         if (rc != 0) {
                 LASSERT (!list_empty(&peer->rap_conns));
@@ -840,10 +881,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 CWARN("Not creating duplicate connection to "LPX64": %d\n",
                       peer_nid, rc);
-                kranal_conn_decref(conn);
-                return 0;
+                rc = 0;
+                goto failed;
+        }
+
+        if (new_peer) {
+                /* peer table takes my ref on the new peer */
+                list_add_tail(&peer->rap_list,
+                              kranal_nid2peerlist(peer_nid));
         }
 
+        /* initialise timestamps before reaper looks at them */
+        conn->rac_last_tx = conn->rac_last_rx = jiffies;
+
         kranal_peer_addref(peer);               /* +1 ref for conn */
         conn->rac_peer = peer;
         list_add_tail(&conn->rac_list, &peer->rap_conns);
@@ -854,7 +904,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 
         /* Schedule all packets blocking for a connection */
         while (!list_empty(&peer->rap_tx_queue)) {
-                tx = list_entry(&peer->rap_tx_queue.next,
+                tx = list_entry(peer->rap_tx_queue.next,
                                 kra_tx_t, tx_list);
 
                 list_del(&tx->tx_list);
@@ -870,10 +920,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
         if (nstale != 0)
                 CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
 
+        CWARN("New connection to "LPX64" on devid[%d] = %d\n",
+               peer_nid, conn->rac_device->rad_idx, conn->rac_device->rad_id);
+
         /* Ensure conn gets checked.  Transmits may have been queued and an
          * FMA event may have happened before it got in the cq hash table */
         kranal_schedule_conn(conn);
         return 0;
+
+ failed:
+        if (new_peer)
+                kranal_peer_decref(peer);
+        kranal_conn_decref(conn);
+        return rc;
 }
 
 void
@@ -886,8 +945,12 @@ kranal_connect (kra_peer_t *peer)
 
         LASSERT (peer->rap_connecting);
 
+        CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid);
+
         rc = kranal_conn_handshake(NULL, peer);
 
+        CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc);
+
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
         LASSERT (peer->rap_connecting);
@@ -900,14 +963,14 @@ kranal_connect (kra_peer_t *peer)
 
                 /* reset reconnection timeouts */
                 peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
-                peer->rap_reconnect_time = CURRENT_TIME;
+                peer->rap_reconnect_time = CURRENT_SECONDS;
 
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 return;
         }
 
         LASSERT (peer->rap_reconnect_interval != 0);
-        peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
+        peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval;
         peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
                                            1 * peer->rap_reconnect_interval);
 
@@ -932,17 +995,24 @@ kranal_connect (kra_peer_t *peer)
         } while (!list_empty(&zombies));
 }
 
+void
+kranal_free_acceptsock (kra_acceptsock_t *ras)
+{
+        sock_release(ras->ras_sock);
+        PORTAL_FREE(ras, sizeof(*ras));
+}
+
 int
-kranal_listener(void *arg)
+kranal_listener (void *arg)
 {
         struct sockaddr_in addr;
         wait_queue_t       wait;
         struct socket     *sock;
-        struct socket     *newsock;
+        kra_acceptsock_t  *ras;
         int                port;
-        kra_connreq_t     *connreqs;
         char               name[16];
         int                rc;
+        unsigned long      flags;
 
         /* Parent thread holds kra_nid_mutex, and is, or is about to
          * block on kra_listener_signal */
@@ -954,14 +1024,9 @@ kranal_listener(void *arg)
 
         init_waitqueue_entry(&wait, current);
 
-        rc = -ENOMEM;
-        PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
-        if (connreqs == NULL)
-                goto out_0;
-
         rc = kranal_create_sock(&sock);
         if (rc != 0)
-                goto out_1;
+                goto out_0;
 
         memset(&addr, 0, sizeof(addr));
         addr.sin_family      = AF_INET;
@@ -971,14 +1036,14 @@ kranal_listener(void *arg)
         rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
         if (rc != 0) {
                 CERROR("Can't bind to port %d\n", port);
-                goto out_2;
+                goto out_1;
         }
 
         rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
         if (rc != 0) {
-                CERROR("Can't set listen backlog %d: %d\n", 
+                CERROR("Can't set listen backlog %d: %d\n",
                        kranal_tunables.kra_backlog, rc);
-                goto out_2;
+                goto out_1;
         }
 
         LASSERT (kranal_data.kra_listener_sock == NULL);
@@ -990,48 +1055,76 @@ kranal_listener(void *arg)
 
         /* Wake me any time something happens on my socket */
         add_wait_queue(sock->sk->sk_sleep, &wait);
+        ras = NULL;
 
         while (kranal_data.kra_listener_shutdown == 0) {
 
-                newsock = sock_alloc();
-                if (newsock == NULL) {
-                        CERROR("Can't allocate new socket for accept\n");
-                        kranal_pause(HZ);
-                        continue;
+                if (ras == NULL) {
+                        PORTAL_ALLOC(ras, sizeof(*ras));
+                        if (ras == NULL) {
+                                CERROR("Out of Memory: pausing...\n");
+                                kranal_pause(HZ);
+                                continue;
+                        }
+                        ras->ras_sock = NULL;
+                }
+
+                if (ras->ras_sock == NULL) {
+                        ras->ras_sock = sock_alloc();
+                        if (ras->ras_sock == NULL) {
+                                CERROR("Can't allocate socket: pausing...\n");
+                                kranal_pause(HZ);
+                                continue;
+                        }
+                        /* XXX this should add a ref to sock->ops->owner, if
+                         * TCP could be a module */
+                        ras->ras_sock->type = sock->type;
+                        ras->ras_sock->ops = sock->ops;
                 }
 
                 set_current_state(TASK_INTERRUPTIBLE);
 
-                rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+                rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
 
+                /* Sleep for socket activity? */
                 if (rc == -EAGAIN &&
                     kranal_data.kra_listener_shutdown == 0)
                         schedule();
 
                 set_current_state(TASK_RUNNING);
 
-                if (rc != 0) {
-                        sock_release(newsock);
-                        if (rc != -EAGAIN) {
-                                CERROR("Accept failed: %d\n", rc);
-                                kranal_pause(HZ);
-                        }
+                if (rc == 0) {
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+                        list_add_tail(&ras->ras_list,
+                                      &kranal_data.kra_connd_acceptq);
+
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+                        wake_up(&kranal_data.kra_connd_waitq);
+
+                        ras = NULL;
                         continue;
-                } 
+                }
+
+                if (rc != -EAGAIN) {
+                        CERROR("Accept failed: %d, pausing...\n", rc);
+                        kranal_pause(HZ);
+                }
+        }
 
-                kranal_conn_handshake(newsock, NULL);
-                sock_release(newsock);
+        if (ras != NULL) {
+                if (ras->ras_sock != NULL)
+                        sock_release(ras->ras_sock);
+                PORTAL_FREE(ras, sizeof(*ras));
         }
 
         rc = 0;
         remove_wait_queue(sock->sk->sk_sleep, &wait);
- out_2:
+ out_1:
         sock_release(sock);
         kranal_data.kra_listener_sock = NULL;
- out_1:
-        PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
  out_0:
-        /* set completion status and unblock thread waiting for me 
+        /* set completion status and unblock thread waiting for me
          * (parent on startup failure, executioner on normal shutdown) */
         kranal_data.kra_listener_shutdown = rc;
         up(&kranal_data.kra_listener_signal);
@@ -1045,7 +1138,7 @@ kranal_start_listener (void)
         long           pid;
         int            rc;
 
-        CDEBUG(D_WARNING, "Starting listener\n");
+        CDEBUG(D_NET, "Starting listener\n");
 
         /* Called holding kra_nid_mutex: listener stopped */
         LASSERT (kranal_data.kra_listener_sock == NULL);
@@ -1063,14 +1156,18 @@ kranal_start_listener (void)
         rc = kranal_data.kra_listener_shutdown;
         LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
 
-        CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
+        CDEBUG(D_NET, "Listener %ld started OK\n", pid);
         return rc;
 }
 
 void
-kranal_stop_listener(void)
+kranal_stop_listener(int clear_acceptq)
 {
-        CDEBUG(D_WARNING, "Stopping listener\n");
+        struct list_head  zombie_accepts;
+        unsigned long     flags;
+        kra_acceptsock_t *ras;
+
+        CDEBUG(D_NET, "Stopping listener\n");
 
         /* Called holding kra_nid_mutex: listener running */
         LASSERT (kranal_data.kra_listener_sock != NULL);
@@ -1082,10 +1179,28 @@ kranal_stop_listener(void)
         down(&kranal_data.kra_listener_signal);
 
         LASSERT (kranal_data.kra_listener_sock == NULL);
-        CDEBUG(D_WARNING, "Listener stopped\n");
+        CDEBUG(D_NET, "Listener stopped\n");
+
+        if (!clear_acceptq)
+                return;
+
+        /* Close any unhandled accepts */
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+        list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq);
+        list_del_init(&kranal_data.kra_connd_acceptq);
+
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+        while (!list_empty(&zombie_accepts)) {
+                ras = list_entry(zombie_accepts.next,
+                                 kra_acceptsock_t, ras_list);
+                list_del(&ras->ras_list);
+                kranal_free_acceptsock(ras);
+        }
 }
 
-int 
+int
 kranal_listener_procint(ctl_table *table, int write, struct file *filp,
                         void *buffer, size_t *lenp)
 {
@@ -1110,7 +1225,7 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
              kranal_data.kra_listener_sock == NULL)) {
 
                 if (kranal_data.kra_listener_sock != NULL)
-                        kranal_stop_listener();
+                        kranal_stop_listener(0);
 
                 rc = kranal_start_listener();
 
@@ -1131,9 +1246,9 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
 int
 kranal_set_mynid(ptl_nid_t nid)
 {
-        unsigned long  flags;
-        lib_ni_t      *ni = &kranal_lib.libnal_ni;
-        int            rc = 0;
+        unsigned long    flags;
+        lib_ni_t        *ni = &kranal_lib.libnal_ni;
+        int              rc = 0;
 
         CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
                nid, ni->ni_pid.nid);
@@ -1147,13 +1262,12 @@ kranal_set_mynid(ptl_nid_t nid)
         }
 
         if (kranal_data.kra_listener_sock != NULL)
-                kranal_stop_listener();
+                kranal_stop_listener(1);
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
         kranal_data.kra_peerstamp++;
-        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
-
         ni->ni_pid.nid = nid;
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
         /* Delete all existing peers and their connections after new
          * NID/connstamp set to ensure no old connections in our brave
@@ -1188,7 +1302,7 @@ kranal_create_peer (ptl_nid_t nid)
         INIT_LIST_HEAD(&peer->rap_conns);
         INIT_LIST_HEAD(&peer->rap_tx_queue);
 
-        peer->rap_reconnect_time = CURRENT_TIME;
+        peer->rap_reconnect_time = CURRENT_SECONDS;
         peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
 
         atomic_inc(&kranal_data.kra_npeers);
@@ -1269,7 +1383,7 @@ kranal_unlink_peer_locked (kra_peer_t *peer)
 }
 
 int
-kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
+kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp,
                       int *persistencep)
 {
         kra_peer_t        *peer;
@@ -1537,7 +1651,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private)
                 break;
         }
         case NAL_CMD_DEL_PEER: {
-                rc = kranal_del_peer(pcfg->pcfg_nid, 
+                rc = kranal_del_peer(pcfg->pcfg_nid,
                                      /* flags == single_share */
                                      pcfg->pcfg_flags != 0);
                 break;
@@ -1611,7 +1725,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
                 PORTAL_ALLOC(tx->tx_phys,
                              PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
                 if (tx->tx_phys == NULL) {
-                        CERROR("Can't allocate %stx[%d]->tx_phys\n", 
+                        CERROR("Can't allocate %stx[%d]->tx_phys\n",
                                isnblk ? "nblk " : "", i);
 
                         PORTAL_FREE(tx, sizeof(*tx));
@@ -1650,36 +1764,26 @@ kranal_device_init(int id, kra_device_t *dev)
                 goto failed_1;
         }
 
-        rrc = RapkCreatePtag(dev->rad_handle,
-                             &dev->rad_ptag);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't create ptag"
-                       " for device %d: %d\n", id, rrc);
-                goto failed_1;
-        }
-
-        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
-                           &dev->rad_rdma_cq);
+        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
+                           &dev->rad_rdma_cqh);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create rdma cq size %d"
                        " for device %d: %d\n", total_ntx, id, rrc);
-                goto failed_2;
+                goto failed_1;
         }
 
-        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
-                           dev->rad_ptag, &dev->rad_fma_cq);
+        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
+                           &dev->rad_fma_cqh);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create fma cq size %d"
                        " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
-                goto failed_3;
+                goto failed_2;
         }
 
         return 0;
 
- failed_3:
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
  failed_2:
-        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
  failed_1:
         RapkReleaseDevice(dev->rad_handle);
  failed_0:
@@ -1690,9 +1794,8 @@ void
 kranal_device_fini(kra_device_t *dev)
 {
         LASSERT(dev->rad_scheduler == NULL);
-        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
-        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
         RapkReleaseDevice(dev->rad_handle);
 }
 
@@ -1701,7 +1804,7 @@ kranal_api_shutdown (nal_t *nal)
 {
         int           i;
         unsigned long flags;
-        
+
         if (nal->nal_refct != 0) {
                 /* This module got the first ref */
                 PORTAL_MODULE_UNUSE;
@@ -1749,14 +1852,24 @@ kranal_api_shutdown (nal_t *nal)
                 break;
         }
 
+        /* Conn/Peer state all cleaned up BEFORE setting shutdown, so threads
+         * don't have to worry about shutdown races */
+        LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
+        LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
+        
         /* flag threads to terminate; wake and wait for them to die */
         kranal_data.kra_shutdown = 1;
 
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
                 kra_device_t *dev = &kranal_data.kra_devices[i];
 
-                LASSERT (list_empty(&dev->rad_connq));
-
+                LASSERT (list_empty(&dev->rad_ready_conns));
+                LASSERT (list_empty(&dev->rad_new_conns));
+                LASSERT (dev->rad_nphysmap == 0);
+                LASSERT (dev->rad_nppphysmap == 0);
+                LASSERT (dev->rad_nvirtmap == 0);
+                LASSERT (dev->rad_nobvirtmap == 0);
+                
                 spin_lock_irqsave(&dev->rad_lock, flags);
                 wake_up(&dev->rad_waitq);
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
@@ -1767,9 +1880,9 @@ kranal_api_shutdown (nal_t *nal)
         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
         LASSERT (list_empty(&kranal_data.kra_connd_peers));
-        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
         wake_up_all(&kranal_data.kra_connd_waitq);
-        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
         i = 2;
         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
@@ -1786,7 +1899,7 @@ kranal_api_shutdown (nal_t *nal)
                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
 
                 PORTAL_FREE(kranal_data.kra_peers,
-                            sizeof (struct list_head) * 
+                            sizeof (struct list_head) *
                             kranal_data.kra_peer_hash_size);
         }
 
@@ -1796,7 +1909,7 @@ kranal_api_shutdown (nal_t *nal)
                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
 
                 PORTAL_FREE(kranal_data.kra_conns,
-                            sizeof (struct list_head) * 
+                            sizeof (struct list_head) *
                             kranal_data.kra_conn_hash_size);
         }
 
@@ -1819,8 +1932,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                     ptl_ni_limits_t *requested_limits,
                     ptl_ni_limits_t *actual_limits)
 {
-        static int        device_ids[] = {RAPK_MAIN_DEVICE_ID,
-                                          RAPK_EXPANSION_DEVICE_ID};
         struct timeval    tv;
         ptl_process_id_t  process_id;
         int               pkmem = atomic_read(&portal_kmemory);
@@ -1861,7 +1972,8 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 kra_device_t  *dev = &kranal_data.kra_devices[i];
 
                 dev->rad_idx = i;
-                INIT_LIST_HEAD(&dev->rad_connq);
+                INIT_LIST_HEAD(&dev->rad_ready_conns);
+                INIT_LIST_HEAD(&dev->rad_new_conns);
                 init_waitqueue_head(&dev->rad_waitq);
                 spin_lock_init(&dev->rad_lock);
         }
@@ -1870,6 +1982,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
         spin_lock_init(&kranal_data.kra_reaper_lock);
 
+        INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
         init_waitqueue_head(&kranal_data.kra_connd_waitq);
         spin_lock_init(&kranal_data.kra_connd_lock);
@@ -1881,7 +1994,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
 
         /* OK to call kranal_api_shutdown() to cleanup now */
         kranal_data.kra_init = RANAL_INIT_DATA;
-        
+
         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
         PORTAL_ALLOC(kranal_data.kra_peers,
                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
@@ -1929,7 +2042,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         }
 
         for (i = 0; i < RANAL_N_CONND; i++) {
-                rc = kranal_thread_start(kranal_connd, (void *)i);
+                rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
                 if (rc != 0) {
                         CERROR("Can't spawn ranal connd[%d]: %d\n",
                                i, rc);
@@ -1937,14 +2050,25 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 }
         }
 
-        LASSERT(kranal_data.kra_ndevs == 0);
-        for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
+        LASSERT (kranal_data.kra_ndevs == 0);
+
+        for (i = 0; i < sizeof(kranal_devids)/sizeof(kranal_devids[0]); i++) {
+                LASSERT (i < RANAL_MAXDEVS);
+
                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
 
-                rc = kranal_device_init(device_ids[i], dev);
+                rc = kranal_device_init(kranal_devids[i], dev);
                 if (rc == 0)
                         kranal_data.kra_ndevs++;
-
+        }
+        
+        if (kranal_data.kra_ndevs == 0) {
+                CERROR("Can't initialise any RapidArray devices\n");
+                goto failed;
+        }
+        
+        for (i = 0; i < kranal_data.kra_ndevs; i++) {
+                dev = &kranal_data.kra_devices[i];
                 rc = kranal_thread_start(kranal_scheduler, dev);
                 if (rc != 0) {
                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
@@ -1953,9 +2077,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 }
         }
 
-        if (kranal_data.kra_ndevs == 0)
-                goto failed;
-
         rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
         if (rc != 0) {
                 CERROR("Can't initialise command interface (rc = %d)\n", rc);
@@ -1973,17 +2094,16 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         return PTL_OK;
 
  failed:
-        kranal_api_shutdown(&kranal_api);    
+        kranal_api_shutdown(&kranal_api);
         return PTL_FAIL;
 }
 
 void __exit
 kranal_module_fini (void)
 {
-#ifdef CONFIG_SYSCTL
         if (kranal_tunables.kra_sysctl != NULL)
                 unregister_sysctl_table(kranal_tunables.kra_sysctl);
-#endif
+
         PtlNIFini(kranal_ni);
 
         ptl_unregister_nal(RANAL);
@@ -2007,6 +2127,10 @@ kranal_module_init (void)
 
         /* Initialise dynamic tunables to defaults once only */
         kranal_tunables.kra_timeout = RANAL_TIMEOUT;
+        kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT;
+        kranal_tunables.kra_backlog = RANAL_BACKLOG;
+        kranal_tunables.kra_port = RANAL_PORT;
+        kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE;
 
         rc = ptl_register_nal(RANAL, &kranal_api);
         if (rc != PTL_OK) {
@@ -2021,11 +2145,15 @@ kranal_module_init (void)
                 return -ENODEV;
         }
 
-#ifdef CONFIG_SYSCTL
-        /* Press on regardless even if registering sysctl doesn't work */
-        kranal_tunables.kra_sysctl = 
+        kranal_tunables.kra_sysctl =
                 register_sysctl_table(kranal_top_ctl_table, 0);
-#endif
+        if (kranal_tunables.kra_sysctl == NULL) {
+                CERROR("Can't register sysctl table\n");
+                PtlNIFini(kranal_ni);
+                ptl_unregister_nal(RANAL);
+                return -ENOMEM;
+        }
+
         return 0;
 }