Whamcloud - gitweb
* ranal changes in response to Igor's changes to the RapidArray
authoreeb <eeb>
Tue, 4 Jan 2005 15:51:31 +0000 (15:51 +0000)
committereeb <eeb>
Tue, 4 Jan 2005 15:51:31 +0000 (15:51 +0000)
    kernel comms API.

lnet/klnds/ralnd/ralnd.c
lnet/klnds/ralnd/ralnd.h
lnet/klnds/ralnd/ralnd_cb.c

index 02c3363..4c2ce9d 100644 (file)
@@ -217,7 +217,8 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
         connreq->racr_magic     = RANAL_MSG_MAGIC;
         connreq->racr_version   = RANAL_MSG_VERSION;
         connreq->racr_devid     = conn->rac_device->rad_id;
-        connreq->racr_nid       = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_srcnid    = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_dstnid    = conn->rac_peer->rap_nid;
         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
         connreq->racr_connstamp = conn->rac_my_connstamp;
         connreq->racr_timeout   = conn->rac_timeout;
@@ -246,13 +247,14 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 __swab32s(&connreq->racr_magic);
                 __swab16s(&connreq->racr_version);
                 __swab16s(&connreq->racr_devid);
-                __swab64s(&connreq->racr_nid);
+                __swab64s(&connreq->racr_srcnid);
+                __swab64s(&connreq->racr_dstnid);
                 __swab64s(&connreq->racr_peerstamp);
                 __swab64s(&connreq->racr_connstamp);
                 __swab32s(&connreq->racr_timeout);
 
+                __swab32s(&connreq->racr_riparams.HostId);
                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
-                __swab32s(&connreq->racr_riparams.RcvCqHndl);
                 __swab32s(&connreq->racr_riparams.PTag);
                 __swab32s(&connreq->racr_riparams.CompletionCookie);
         }
@@ -262,7 +264,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 return -EPROTO;
         }
 
-        if (connreq->racr_nid == PTL_NID_ANY) {
+        if (connreq->racr_srcnid == PTL_NID_ANY ||
+            connreq->racr_dstnid == PTL_NID_ANY) {
                 CERROR("Received PTL_NID_ANY\n");
                 return -EPROTO;
         }
@@ -417,8 +420,6 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
         kranal_update_reaper_timeout(conn->rac_timeout);
 
         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
-                           dev->rad_ptag,
-                           dev->rad_rdma_cq, dev->rad_fma_cq,
                            &conn->rac_rihandle);
         if (rrc != RAP_SUCCESS) {
                 CERROR("RapkCreateRi failed: %d\n", rrc);
@@ -543,14 +544,15 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
 }
 
 int
-kranal_passive_conn_handshake (struct socket *sock, 
-                               ptl_nid_t *peer_nidp, kra_conn_t **connp)
+kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, 
+                               ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         struct sockaddr_in   addr;
         __u32                peer_ip;
         unsigned int         peer_port;
         kra_connreq_t        connreq;
-        ptl_nid_t            peer_nid;
+        ptl_nid_t            src_nid;
+        ptl_nid_t            dst_nid;
         kra_conn_t          *conn;
         kra_device_t        *dev;
         int                  rc;
@@ -581,8 +583,8 @@ kranal_passive_conn_handshake (struct socket *sock,
                 return rc;
         }
 
-        peer_nid = connreq.racr_nid;
-        LASSERT (peer_nid != PTL_NID_ANY);
+        src_nid = connreq.racr_srcnid;
+        dst_nid = connreq.racr_dstnid;
 
         for (i = 0;;i++) {
                 if (i == kranal_data.kra_ndevs) {
@@ -616,7 +618,8 @@ kranal_passive_conn_handshake (struct socket *sock,
         }
 
         *connp = conn;
-        *peer_nidp = peer_nid;
+        *src_nidp = src_nid;
+        *dst_nidp = dst_nid;
         return 0;
 }
 
@@ -685,7 +688,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 
 
 int
-kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+kranal_active_conn_handshake(kra_peer_t *peer, 
+                             ptl_nid_t *dst_nidp, kra_conn_t **connp)
 {
         kra_connreq_t       connreq;
         kra_conn_t         *conn;
@@ -730,11 +734,11 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
         sock_release(sock);
         rc = -EPROTO;
 
-        if (connreq.racr_nid != peer->rap_nid) {
-                CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+        if (connreq.racr_srcnid != peer->rap_nid) {
+                CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
                        "received "LPX64" expected "LPX64"\n",
                        HIPQUAD(peer->rap_ip), peer->rap_port, 
-                       connreq.racr_nid, peer->rap_nid);
+                       connreq.racr_srcnid, peer->rap_nid);
                 goto failed_0;
         }
 
@@ -752,6 +756,7 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
                 goto failed_0;
 
         *connp = conn;
+        *dst_nidp = connreq.racr_dstnid;
         return 0;
 
  failed_1:
@@ -767,17 +772,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
         kra_peer_t        *peer2;
         kra_tx_t          *tx;
         ptl_nid_t          peer_nid;
+        ptl_nid_t          dst_nid;
         unsigned long      flags;
         kra_conn_t        *conn;
         int                rc;
         int                nstale;
+        int                new_peer = 0;
 
         if (sock == NULL) {
                 /* active: connd wants to connect to 'peer' */
                 LASSERT (peer != NULL);
                 LASSERT (peer->rap_connecting);
                 
-                rc = kranal_active_conn_handshake(peer, &conn);
+                rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
                 if (rc != 0)
                         return rc;
 
@@ -788,16 +795,16 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                         write_unlock_irqrestore(&kranal_data.kra_global_lock, 
                                                 flags);
                         kranal_conn_decref(conn);
-                        return ESTALE;
+                        return -ESTALE;
                 }
 
                 peer_nid = peer->rap_nid;
-
         } else {
                 /* passive: listener accepted 'sock' */
                 LASSERT (peer == NULL);
 
-                rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+                rc = kranal_passive_conn_handshake(sock, &peer_nid,
+                                                   &dst_nid, &conn);
                 if (rc != 0)
                         return rc;
 
@@ -813,26 +820,32 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 
                 peer2 = kranal_find_peer_locked(peer_nid);
                 if (peer2 == NULL) {
-                        /* peer table takes my initial ref on peer */
-                        list_add_tail(&peer->rap_list,
-                                      kranal_nid2peerlist(peer_nid));
+                        new_peer = 1;
                 } else {
                         /* peer_nid already in the peer table */
                         kranal_peer_decref(peer);
                         peer = peer2;
                 }
-                /* NB I may now have a non-persistent peer in the peer
-                 * table with no connections: I can't drop the global lock
-                 * until I've given it a connection or removed it, and when
-                 * I do 'peer' can disappear under me. */
         }
 
-        LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
+        LASSERT (!new_peer == !kranal_peer_active(peer));
+
+        /* Refuse connection if peer thinks we are a different NID.  We check
+         * this while holding the global lock, to synch with connection
+         * destruction on NID change. */
+        if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
-        /* Refuse to duplicate an existing connection (both sides might try
-         * to connect at once).  NB we return success!  We _do_ have a
-         * connection (so we don't need to remove the peer from the peer
-         * table) and we _don't_ have any blocked txs to complete */
+                CERROR("Stale/bad connection with "LPX64
+                       ": dst_nid "LPX64", expected "LPX64"\n",
+                       peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
+                rc = -ESTALE;
+                goto failed;
+        }
+
+        /* Refuse to duplicate an existing connection (both sides might try to
+         * connect at once).  NB we return success!  We _are_ connected so we
+         * _don't_ have any blocked txs to complete with failure. */
         rc = kranal_conn_isdup_locked(peer, conn);
         if (rc != 0) {
                 LASSERT (!list_empty(&peer->rap_conns));
@@ -840,10 +853,16 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 CWARN("Not creating duplicate connection to "LPX64": %d\n",
                       peer_nid, rc);
-                kranal_conn_decref(conn);
-                return 0;
+                rc = 0;
+                goto failed;
         }
 
+        if (new_peer) {
+                /* peer table takes my ref on the new peer */
+                list_add_tail(&peer->rap_list,
+                              kranal_nid2peerlist(peer_nid));
+        }
+        
         kranal_peer_addref(peer);               /* +1 ref for conn */
         conn->rac_peer = peer;
         list_add_tail(&conn->rac_list, &peer->rap_conns);
@@ -874,6 +893,12 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
          * FMA event may have happened before it got in the cq hash table */
         kranal_schedule_conn(conn);
         return 0;
+
+ failed:
+        if (new_peer)
+                kranal_peer_decref(peer);
+        kranal_conn_decref(conn);
+        return rc;
 }
 
 void
@@ -938,11 +963,11 @@ kranal_listener(void *arg)
         struct sockaddr_in addr;
         wait_queue_t       wait;
         struct socket     *sock;
-        struct socket     *newsock;
+        kra_acceptsock_t  *ras;
         int                port;
-        kra_connreq_t     *connreqs;
         char               name[16];
         int                rc;
+        unsigned long      flags;
 
         /* Parent thread holds kra_nid_mutex, and is, or is about to
          * block on kra_listener_signal */
@@ -954,14 +979,9 @@ kranal_listener(void *arg)
 
         init_waitqueue_entry(&wait, current);
 
-        rc = -ENOMEM;
-        PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
-        if (connreqs == NULL)
-                goto out_0;
-
         rc = kranal_create_sock(&sock);
         if (rc != 0)
-                goto out_1;
+                goto out_0;
 
         memset(&addr, 0, sizeof(addr));
         addr.sin_family      = AF_INET;
@@ -971,14 +991,14 @@ kranal_listener(void *arg)
         rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
         if (rc != 0) {
                 CERROR("Can't bind to port %d\n", port);
-                goto out_2;
+                goto out_1;
         }
 
         rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
         if (rc != 0) {
                 CERROR("Can't set listen backlog %d: %d\n", 
                        kranal_tunables.kra_backlog, rc);
-                goto out_2;
+                goto out_1;
         }
 
         LASSERT (kranal_data.kra_listener_sock == NULL);
@@ -990,46 +1010,70 @@ kranal_listener(void *arg)
 
         /* Wake me any time something happens on my socket */
         add_wait_queue(sock->sk->sk_sleep, &wait);
+        ras = NULL;
 
         while (kranal_data.kra_listener_shutdown == 0) {
 
-                newsock = sock_alloc();
-                if (newsock == NULL) {
-                        CERROR("Can't allocate new socket for accept\n");
-                        kranal_pause(HZ);
-                        continue;
+                if (ras == NULL) {
+                        PORTAL_ALLOC(ras, sizeof(*ras));
+                        if (ras == NULL) {
+                                CERROR("Out of Memory: pausing...\n");
+                                kranal_pause(HZ);
+                                continue;
+                        }
+                        ras->ras_sock = NULL;
                 }
 
+                if (ras->ras_sock == NULL) {
+                        ras->ras_sock = sock_alloc();
+                        if (ras->ras_sock == NULL) {
+                                CERROR("Can't allocate socket: pausing...\n");
+                                kranal_pause(HZ);
+                                continue;
+                        }
+                }
+                
                 set_current_state(TASK_INTERRUPTIBLE);
 
-                rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+                rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
 
+                /* Sleep for socket activity? */
                 if (rc == -EAGAIN &&
                     kranal_data.kra_listener_shutdown == 0)
                         schedule();
 
                 set_current_state(TASK_RUNNING);
 
-                if (rc != 0) {
-                        sock_release(newsock);
-                        if (rc != -EAGAIN) {
-                                CERROR("Accept failed: %d\n", rc);
-                                kranal_pause(HZ);
-                        }
+                if (rc == 0) {
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+                        
+                        list_add_tail(&ras->ras_list, 
+                                      &kranal_data.kra_connd_acceptq);
+
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+                        wake_up(&kranal_data.kra_connd_waitq);
+
+                        ras = NULL;
                         continue;
-                } 
+                }
+                
+                if (rc != -EAGAIN) {
+                        CERROR("Accept failed: %d, pausing...\n", rc);
+                        kranal_pause(HZ);
+                }
+        }
 
-                kranal_conn_handshake(newsock, NULL);
-                sock_release(newsock);
+        if (ras != NULL) {
+                if (ras->ras_sock != NULL)
+                        sock_release(ras->ras_sock);
+                PORTAL_FREE(ras, sizeof(*ras));
         }
 
         rc = 0;
         remove_wait_queue(sock->sk->sk_sleep, &wait);
- out_2:
+ out_1:
         sock_release(sock);
         kranal_data.kra_listener_sock = NULL;
- out_1:
-        PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
  out_0:
         /* set completion status and unblock thread waiting for me 
          * (parent on startup failure, executioner on normal shutdown) */
@@ -1650,36 +1694,26 @@ kranal_device_init(int id, kra_device_t *dev)
                 goto failed_1;
         }
 
-        rrc = RapkCreatePtag(dev->rad_handle,
-                             &dev->rad_ptag);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't create ptag"
-                       " for device %d: %d\n", id, rrc);
-                goto failed_1;
-        }
-
-        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
-                           &dev->rad_rdma_cq);
+        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
+                           &dev->rad_rdma_cqh);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create rdma cq size %d"
                        " for device %d: %d\n", total_ntx, id, rrc);
-                goto failed_2;
+                goto failed_1;
         }
 
-        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
-                           dev->rad_ptag, &dev->rad_fma_cq);
+        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
+                           &dev->rad_fma_cqh);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create fma cq size %d"
                        " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
-                goto failed_3;
+                goto failed_2;
         }
 
         return 0;
 
- failed_3:
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
  failed_2:
-        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
  failed_1:
         RapkReleaseDevice(dev->rad_handle);
  failed_0:
@@ -1690,9 +1724,8 @@ void
 kranal_device_fini(kra_device_t *dev)
 {
         LASSERT(dev->rad_scheduler == NULL);
-        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
-        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
         RapkReleaseDevice(dev->rad_handle);
 }
 
index cc5c2e6..53762eb 100644 (file)
@@ -102,9 +102,8 @@ typedef struct
 typedef struct
 {
         RAP_PVOID               rad_handle;     /* device handle */
-        RAP_PROTECTION_HANDLE   rad_ptag;       /* protection tag */
-        RAP_CQ_HANDLE           rad_fma_cq;     /* FMA (small message) completion queue */
-        RAP_CQ_HANDLE           rad_rdma_cq;    /* rdma completion queue */
+        RAP_PVOID               rad_fma_cqh;    /* FMA completion queue handle */
+        RAP_PVOID               rad_rdma_cqh;   /* rdma completion queue handle */
         int                     rad_id;         /* device id */
         int                     rad_idx;        /* index in kra_devices */
         int                     rad_ready;      /* set by device callback */
@@ -147,6 +146,7 @@ typedef struct
         spinlock_t        kra_reaper_lock;      /* serialise */
         
         struct list_head  kra_connd_peers;      /* peers waiting for a connection */
+        struct list_head  kra_connd_acceptq;    /* accepted sockets to handshake */
         wait_queue_head_t kra_connd_waitq;      /* connection daemons sleep here */
         spinlock_t        kra_connd_lock;       /* serialise */
 
@@ -162,6 +162,12 @@ typedef struct
 #define RANAL_INIT_LIB             2
 #define RANAL_INIT_ALL             3
 
+typedef struct kra_acceptsock                   /* accepted socket queued for connd */
+{
+        struct list_head     ras_list;          /* queue for attention */
+        struct socket       *ras_sock;          /* the accepted socket */
+} kra_acceptsock_t;
+
 /************************************************************************
  * Wire message structs.  These are sent in sender's byte order
  * (i.e. receiver checks magic and flips if required).
@@ -172,7 +178,8 @@ typedef struct kra_connreq                      /* connection request/response *
         __u32             racr_magic;           /* I'm an ranal connreq */
         __u16             racr_version;         /* this is my version number */
         __u16             racr_devid;           /* sender's device ID */
-        __u64             racr_nid;             /* sender's NID */
+        __u64             racr_srcnid;          /* sender's NID */
+        __u64             racr_dstnid;          /* who sender expects to listen */
         __u64             racr_peerstamp;       /* sender's instance stamp */
         __u64             racr_connstamp;       /* sender's connection stamp */
         __u32             racr_timeout;         /* sender's timeout */
@@ -477,3 +484,4 @@ extern int kranal_scheduler (void *arg);
 extern void kranal_close_conn_locked (kra_conn_t *conn, int error);
 extern void kranal_terminate_conn_locked (kra_conn_t *conn);
 extern void kranal_connect (kra_peer_t *peer);
+extern int kranal_conn_handshake (struct socket *sock, kra_peer_t *peer);
index 8901a2d..541a15a 100644 (file)
@@ -241,7 +241,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
         
         phys->Address = kranal_page2phys(kiov->kiov_page);
-        phys->Length  = PAGE_SIZE;
         phys++;
 
         resid = nob - (kiov->kiov_len - offset);
@@ -267,7 +266,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 }
 
                 phys->Address = kranal_page2phys(kiov->kiov_page);
-                phys->Length  = PAGE_SIZE;
                 phys++;
 
                 resid -= PAGE_SIZE;
@@ -312,7 +310,7 @@ kranal_map_buffer (kra_tx_t *tx)
         case RANAL_BUF_PHYS_UNMAPPED:
                 rrc = RapkRegisterPhys(dev->rad_handle,
                                        tx->tx_phys, tx->tx_phys_npages,
-                                       dev->rad_ptag, &tx->tx_map_key);
+                                       &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
                 break;
@@ -320,7 +318,7 @@ kranal_map_buffer (kra_tx_t *tx)
         case RANAL_BUF_VIRT_UNMAPPED:
                 rrc = RapkRegisterMemory(dev->rad_handle,
                                          tx->tx_buffer, tx->tx_nob,
-                                         dev->rad_ptag, &tx->tx_map_key);
+                                         &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
                 break;
@@ -348,7 +346,7 @@ kranal_unmap_buffer (kra_tx_t *tx)
                 dev = tx->tx_conn->rac_device;
                 LASSERT (current == dev->rad_scheduler);
                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
-                                           dev->rad_ptag, &tx->tx_map_key);
+                                           &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
                 break;
@@ -358,7 +356,7 @@ kranal_unmap_buffer (kra_tx_t *tx)
                 dev = tx->tx_conn->rac_device;
                 LASSERT (current == dev->rad_scheduler);
                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
-                                           dev->rad_ptag, &tx->tx_map_key);
+                                           &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
                 break;
@@ -559,8 +557,8 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
 
         LASSERT (conn->rac_rxmsg != NULL);
 
-        rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
-                                &nob_received, sizeof(kra_msg_t));
+        rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
+                             &nob_received, sizeof(kra_msg_t));
         LASSERT (rrc == RAP_SUCCESS);
 
         conn->rac_rxmsg = NULL;
@@ -1031,6 +1029,8 @@ kranal_connd (void *arg)
         wait_queue_t       wait;
         unsigned long      flags;
         kra_peer_t        *peer;
+        kra_acceptsock_t  *ras;
+        int                did_something;
 
         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
         kportal_daemonize(name);
@@ -1041,8 +1041,22 @@ kranal_connd (void *arg)
         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
 
         while (!kranal_data.kra_shutdown) {
-                /* Safe: kra_shutdown only set when quiescent */
+                did_something = 0;
+                
+                if (!list_empty(&kranal_data.kra_connd_acceptq)) {
+                        ras = list_entry(kranal_data.kra_connd_acceptq.next,
+                                         kra_acceptsock_t, ras_list);
+                        list_del(&ras->ras_list);
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
+                        kranal_conn_handshake(ras->ras_sock, NULL);
+                        sock_release(ras->ras_sock);
+                        PORTAL_FREE(ras, sizeof(*ras));
+
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+                        did_something = 1;
+                }
+                
                 if (!list_empty(&kranal_data.kra_connd_peers)) {
                         peer = list_entry(kranal_data.kra_connd_peers.next,
                                           kra_peer_t, rap_connd_list);
@@ -1054,9 +1068,12 @@ kranal_connd (void *arg)
                         kranal_peer_decref(peer);
 
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
-                        continue;
+                        did_something = 1;
                 }
 
+                if (did_something)
+                        continue;
+
                 set_current_state(TASK_INTERRUPTIBLE);
                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
                 
@@ -1220,7 +1237,7 @@ kranal_check_rdma_cq (kra_device_t *dev)
         __u32                event_type;
 
         for (;;) {
-                rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+                rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
                 if (rrc == RAP_NOT_DONE)
                         return;
 
@@ -1275,7 +1292,7 @@ kranal_check_fma_cq (kra_device_t *dev)
         int                 i;
 
         for (;;) {
-                rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
+                rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
                 if (rrc != RAP_NOT_DONE)
                         return;
                 
@@ -1366,8 +1383,8 @@ kranal_process_fmaq (kra_conn_t *conn)
         int           expect_reply;
 
         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
-         *       However I will be rescheduled some by a rad_fma_cq event when
-         *       I eventually get some.
+         *       However I will be rescheduled some by an FMA completion event
+         *       when I eventually get some.
          * NB 2. Sampling rac_state here, races with setting it elsewhere
          *       kranal_close_conn_locked.  But it doesn't matter if I try to
          *       send a "real" message just as I start closing because I'll get