Whamcloud - gitweb
* ranal code review
authoreeb <eeb>
Mon, 20 Dec 2004 21:24:34 +0000 (21:24 +0000)
committereeb <eeb>
Mon, 20 Dec 2004 21:24:34 +0000 (21:24 +0000)
lnet/klnds/ralnd/Makefile.in
lnet/klnds/ralnd/ralnd.c
lnet/klnds/ralnd/ralnd.h
lnet/klnds/ralnd/ralnd_cb.c

index 1772cc2..eb806e2 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := kranal
 kranal-objs := ranal.o ranal_cb.o
 
 MODULES := kranal
 kranal-objs := ranal.o ranal_cb.o
 
-EXTRA_POST_CFLAGS := @RACPPFLAGS@
+EXTRA_POST_CFLAGS := @RACPPFLAGS@ -Wall
 
 @INCLUDE_RULES@
 
 @INCLUDE_RULES@
index c924827..0aa1d53 100644 (file)
@@ -162,7 +162,6 @@ kranal_create_sock(struct socket **sockp)
 {
        struct socket       *sock;
        int                  rc;
 {
        struct socket       *sock;
        int                  rc;
-        struct timeval       tv;
        int                  option;
         mm_segment_t         oldmm = get_fs();
 
        int                  option;
         mm_segment_t         oldmm = get_fs();
 
@@ -215,12 +214,13 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 
         memset(connreq, 0, sizeof(*connreq));
 
 
         memset(connreq, 0, sizeof(*connreq));
 
-        connreq->racr_magic       = RANAL_MSG_MAGIC;
-        connreq->racr_version     = RANAL_MSG_VERSION;
-        connreq->racr_devid       = conn->rac_device->rad_id;
-        connreq->racr_nid         = kranal_lib.libnal_ni.ni_pid.nid;
-        connreq->racr_timeout     = conn->rac_timeout;
-        connreq->racr_incarnation = conn->rac_my_incarnation;
+        connreq->racr_magic     = RANAL_MSG_MAGIC;
+        connreq->racr_version   = RANAL_MSG_VERSION;
+        connreq->racr_devid     = conn->rac_device->rad_id;
+        connreq->racr_nid       = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_peerstamp = kranal_data.kra_peerstamp;
+        connreq->racr_connstamp = conn->rac_my_connstamp;
+        connreq->racr_timeout   = conn->rac_timeout;
 
         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         LASSERT(rrc == RAP_SUCCESS);
 
         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         LASSERT(rrc == RAP_SUCCESS);
@@ -229,7 +229,6 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 int
 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
 {
 int
 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
 {
-        int         i;
        int         rc;
 
        rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
        int         rc;
 
        rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
@@ -248,7 +247,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                __swab16s(&connreq->racr_version);
                 __swab16s(&connreq->racr_devid);
                __swab64s(&connreq->racr_nid);
                __swab16s(&connreq->racr_version);
                 __swab16s(&connreq->racr_devid);
                __swab64s(&connreq->racr_nid);
-               __swab64s(&connreq->racr_incarnation);
+               __swab64s(&connreq->racr_peerstamp);
+               __swab64s(&connreq->racr_connstamp);
                 __swab32s(&connreq->racr_timeout);
 
                __swab32s(&connreq->racr_riparams.FmaDomainHndl);
                 __swab32s(&connreq->racr_timeout);
 
                __swab32s(&connreq->racr_riparams.FmaDomainHndl);
@@ -273,43 +273,100 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 return -EPROTO;
         }
         
                 return -EPROTO;
         }
         
-        for (i = 0; i < kranal_data.kra_ndevs; i++)
-                if (connreq->racr_devid == 
-                    kranal_data.kra_devices[i].rad_id)
-                        break;
+       return 0;
+}
 
 
-        if (i == kranal_data.kra_ndevs) {
-                CERROR("Can't match device %d\n", connreq->racr_devid);
-                return -ENODEV;
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 loopback;
+        int                 count = 0;
+
+        loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                if (conn == newconn)
+                        continue;
+
+                if (conn->rac_peerstamp != newconn->rac_peerstamp) {
+                        CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+                               " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+                               conn->rac_peerstamp, newconn->rac_peerstamp);
+                        LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
+                        count++;
+                        kranal_close_conn_locked(conn, -ESTALE);
+                        continue;
+                }
+
+                if (conn->rac_device != newconn->rac_device)
+                        continue;
+                
+                if (loopback &&
+                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)
+                        continue;
+                    
+                LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
+
+                CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
+                       conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
+
+                count++;
+                kranal_close_conn_locked(conn, -ESTALE);
         }
 
         }
 
-       return 0;
+        return count;
 }
 
 int
 }
 
 int
-kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
 {
        kra_conn_t       *conn;
        struct list_head *tmp;
 {
        kra_conn_t       *conn;
        struct list_head *tmp;
-        int               loopback = 0;
+        int               loopback;
 
 
+        loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+        
        list_for_each(tmp, &peer->rap_conns) {
                conn = list_entry(tmp, kra_conn_t, rac_list);
 
        list_for_each(tmp, &peer->rap_conns) {
                conn = list_entry(tmp, kra_conn_t, rac_list);
 
-                if (conn->rac_peer_incarnation < incarnation) {
-                        /* Conns with an older incarnation get culled later */
+                /* 'newconn' is from an earlier version of 'peer'!!! */
+                if (newconn->rac_peerstamp < conn->rac_peerstamp)
+                        return 1;
+
+                /* 'conn' is from an earlier version of 'peer': it will be
+                 * removed when we cull stale conns later on... */
+                if (newconn->rac_peerstamp > conn->rac_peerstamp)
                         continue;
                         continue;
-                }
 
 
-                if (!loopback &&
-                    conn->rac_peer_incarnation == incarnation &&
-                    peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
-                        /* loopback creates 2 conns */
-                        loopback = 1;
+                /* Different devices are OK */
+                if (conn->rac_device != newconn->rac_device)
+                        continue;
+
+                /* It's me connecting to myself */
+                if (loopback &&
+                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)
                         continue;
                         continue;
-                }
 
 
-                return 1;
+                /* 'newconn' is an earlier connection from 'peer'!!! */
+                if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
+                        return 2;
+                
+                /* 'conn' is an earlier connection from 'peer': it will be
+                 * removed when we cull stale conns later on... */
+                if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
+                        continue;
+                
+                /* 'newconn' has the SAME connection stamp; 'peer' isn't
+                 * playing the game... */
+                return 3;
        }
 
        return 0;
        }
 
        return 0;
@@ -322,7 +379,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-        conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+        conn->rac_my_connstamp = kranal_data.kra_connstamp++;
 
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
 
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
@@ -333,7 +390,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 }
 
 int
 }
 
 int
-kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
 {
        kra_conn_t    *conn;
         RAP_RETURN     rrc;
 {
        kra_conn_t    *conn;
         RAP_RETURN     rrc;
@@ -348,6 +405,7 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
        atomic_set(&conn->rac_refcount, 1);
        INIT_LIST_HEAD(&conn->rac_list);
        INIT_LIST_HEAD(&conn->rac_hashlist);
        atomic_set(&conn->rac_refcount, 1);
        INIT_LIST_HEAD(&conn->rac_list);
        INIT_LIST_HEAD(&conn->rac_hashlist);
+        INIT_LIST_HEAD(&conn->rac_schedlist);
        INIT_LIST_HEAD(&conn->rac_fmaq);
        INIT_LIST_HEAD(&conn->rac_rdmaq);
        INIT_LIST_HEAD(&conn->rac_replyq);
        INIT_LIST_HEAD(&conn->rac_fmaq);
        INIT_LIST_HEAD(&conn->rac_rdmaq);
        INIT_LIST_HEAD(&conn->rac_replyq);
@@ -374,34 +432,20 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
 }
 
 void
 }
 
 void
-__kranal_conn_decref(kra_conn_t *conn) 
+kranal_destroy_conn(kra_conn_t *conn) 
 {
 {
-        kra_tx_t          *tx;
         RAP_RETURN         rrc;
 
         LASSERT (!in_interrupt());
         LASSERT (!conn->rac_scheduled);
         LASSERT (list_empty(&conn->rac_list));
         LASSERT (list_empty(&conn->rac_hashlist));
         RAP_RETURN         rrc;
 
         LASSERT (!in_interrupt());
         LASSERT (!conn->rac_scheduled);
         LASSERT (list_empty(&conn->rac_list));
         LASSERT (list_empty(&conn->rac_hashlist));
+        LASSERT (list_empty(&conn->rac_schedlist));
         LASSERT (atomic_read(&conn->rac_refcount) == 0);
         LASSERT (atomic_read(&conn->rac_refcount) == 0);
-
-        while (!list_empty(&conn->rac_fmaq)) {
-                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-                
-                list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-        
-        /* We may not destroy this connection while it has RDMAs outstanding */
+        LASSERT (list_empty(&conn->rac_fmaq));
         LASSERT (list_empty(&conn->rac_rdmaq));
         LASSERT (list_empty(&conn->rac_rdmaq));
+        LASSERT (list_empty(&conn->rac_replyq));
 
 
-        while (!list_empty(&conn->rac_replyq)) {
-                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-                
-                list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-        
         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
                             conn->rac_rihandle);
         LASSERT (rrc == RAP_SUCCESS);
         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
                             conn->rac_rihandle);
         LASSERT (rrc == RAP_SUCCESS);
@@ -416,18 +460,20 @@ __kranal_conn_decref(kra_conn_t *conn)
 void
 kranal_terminate_conn_locked (kra_conn_t *conn)
 {
 void
 kranal_terminate_conn_locked (kra_conn_t *conn)
 {
-        kra_peer_t *peer = conn->rac_peer;
-
         LASSERT (!in_interrupt());
         LASSERT (!in_interrupt());
-        LASSERT (conn->rac_closing);
+        LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (list_empty(&conn->rac_list));
 
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (list_empty(&conn->rac_list));
 
-        /* Remove from conn hash table (no new callbacks) */
+        /* Remove from conn hash table: no new callbacks */
         list_del_init(&conn->rac_hashlist);
         kranal_conn_decref(conn);
 
         list_del_init(&conn->rac_hashlist);
         kranal_conn_decref(conn);
 
-        /* Conn is now just waiting for remaining refs to go */
+        conn->rac_state = RANAL_CONN_CLOSED;
+
+        /* schedule to clear out all uncompleted comms in context of dev's
+         * scheduler */
+        kranal_schedule_conn(conn);
 }
 
 void
 }
 
 void
@@ -439,7 +485,7 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
               "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
 
         LASSERT (!in_interrupt());
               "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
 
         LASSERT (!in_interrupt());
-        LASSERT (!conn->rac_closing);
+        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (!list_empty(&conn->rac_list));
 
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (!list_empty(&conn->rac_list));
 
@@ -450,9 +496,14 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
                 /* Non-persistent peer with no more conns... */
                 kranal_unlink_peer_locked(peer);
         }
                 /* Non-persistent peer with no more conns... */
                 kranal_unlink_peer_locked(peer);
         }
+                        
+        /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
+         * full timeout */
+        conn->rac_last_rx = jiffies;
+        mb();
 
 
-        conn->rac_closing = 1;
-        kranal_schedule_conn(conn);
+        conn->rac_state = RANAL_CONN_CLOSING;
+        kranal_schedule_conn(conn);             /* schedule sending CLOSE */
 
         kranal_conn_decref(conn);               /* lose peer's ref */
 }
 
         kranal_conn_decref(conn);               /* lose peer's ref */
 }
@@ -465,13 +516,33 @@ kranal_close_conn (kra_conn_t *conn, int error)
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
         
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
         
-        if (!conn->rac_closing)
+        if (conn->rac_state == RANAL_CONN_ESTABLISHED)
                 kranal_close_conn_locked(conn, error);
         
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
 int
                 kranal_close_conn_locked(conn, error);
         
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
 int
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
+                       __u32 peer_ip, int peer_port)
+{
+        RAP_RETURN    rrc;
+        
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rrc);
+                return -EPROTO;
+        }
+        
+        conn->rac_peerstamp = connreq->racr_peerstamp;
+        conn->rac_peer_connstamp = connreq->racr_connstamp;
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+        return 0;
+}
+
+int
 kranal_passive_conn_handshake (struct socket *sock, 
                                ptl_nid_t *peer_nidp, kra_conn_t **connp)
 {
 kranal_passive_conn_handshake (struct socket *sock, 
                                ptl_nid_t *peer_nidp, kra_conn_t **connp)
 {
@@ -482,7 +553,6 @@ kranal_passive_conn_handshake (struct socket *sock,
        ptl_nid_t            peer_nid;
         kra_conn_t          *conn;
         kra_device_t        *dev;
        ptl_nid_t            peer_nid;
         kra_conn_t          *conn;
         kra_device_t        *dev;
-       RAP_RETURN           rrc;
        int                  rc;
         int                  len;
         int                  i;
        int                  rc;
         int                  len;
         int                  i;
@@ -515,25 +585,24 @@ kranal_passive_conn_handshake (struct socket *sock,
         LASSERT (peer_nid != PTL_NID_ANY);
 
         for (i = 0;;i++) {
         LASSERT (peer_nid != PTL_NID_ANY);
 
         for (i = 0;;i++) {
-                LASSERT(i < kranal_data.kra_ndevs);
+                if (i == kranal_data.kra_ndevs) {
+                        CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
+                               connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
+                        return -ENODEV;
+                }
                 dev = &kranal_data.kra_devices[i];
                 if (dev->rad_id == connreq.racr_devid)
                         break;
         }
 
                 dev = &kranal_data.kra_devices[i];
                 if (dev->rad_id == connreq.racr_devid)
                         break;
         }
 
-        rc = kranal_alloc_conn(&conn, dev);
+        rc = kranal_create_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
         if (rc != 0)
                 return rc;
 
-        conn->rac_peer_incarnation = connreq.racr_incarnation;
-        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
-        kranal_update_reaper_timeout(conn->rac_keepalive);
-        
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+        rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
+        if (rc != 0) {
                 kranal_conn_decref(conn);
                 kranal_conn_decref(conn);
-                return -EPROTO;
+                return rc;
         }
 
         kranal_pack_connreq(&connreq, conn);
         }
 
         kranal_pack_connreq(&connreq, conn);
@@ -559,9 +628,6 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
         struct socket      *sock;
         unsigned int        port;
         int                 rc;
         struct socket      *sock;
         unsigned int        port;
         int                 rc;
-        int                 option;
-        mm_segment_t        oldmm = get_fs();
-        struct timeval      tv;
 
         for (port = 1023; port >= 512; port--) {
 
 
         for (port = 1023; port >= 512; port--) {
 
@@ -621,19 +687,17 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 int
 kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 {
 int
 kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 {
-        struct sockaddr_in  dstaddr;
        kra_connreq_t       connreq;
         kra_conn_t         *conn;
         kra_device_t       *dev;
         struct socket      *sock;
        kra_connreq_t       connreq;
         kra_conn_t         *conn;
         kra_device_t       *dev;
         struct socket      *sock;
-       RAP_RETURN          rrc;
        int                 rc;
         int                 idx;
         
         idx = peer->rap_nid & 0x7fffffff;
         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
 
        int                 rc;
         int                 idx;
         
         idx = peer->rap_nid & 0x7fffffff;
         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
 
-        rc = kranal_alloc_conn(&conn, dev);
+        rc = kranal_create_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
         if (rc != 0)
                 return rc;
 
@@ -680,17 +744,10 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
                 goto failed_0;
         }
 
                 goto failed_0;
         }
 
-        conn->rac_peer_incarnation = connreq.racr_incarnation; 
-        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
-        kranal_update_reaper_timeout(conn->rac_keepalive);
-
-        rc = -ENETDOWN;
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't set riparams for "LPX64": %d\n",
-                       peer->rap_nid, rrc);
+        rc = kranal_set_conn_params(conn, &connreq, 
+                                    peer->rap_ip, peer->rap_port);
+        if (rc != 0)
                 goto failed_0;
                 goto failed_0;
-        }
 
         *connp = conn;
        return 0;
 
         *connp = conn;
        return 0;
@@ -709,13 +766,33 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
         kra_tx_t          *tx;
        ptl_nid_t          peer_nid;
        unsigned long      flags;
         kra_tx_t          *tx;
        ptl_nid_t          peer_nid;
        unsigned long      flags;
-        unsigned long      timeout;
        kra_conn_t        *conn;
        int                rc;
         int                nstale;
 
        kra_conn_t        *conn;
        int                rc;
         int                nstale;
 
-        if (sock != NULL) {
-                /* passive: listener accepted sock */
+        if (sock == NULL) {
+                /* active: connd wants to connect to 'peer' */
+                LASSERT (peer != NULL);
+                LASSERT (peer->rap_connecting);
+                
+                rc = kranal_active_conn_handshake(peer, &conn);
+                if (rc != 0)
+                        return rc;
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               if (!kranal_peer_active(peer)) {
+                       /* raced with peer getting unlinked */
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                                                flags);
+                        kranal_conn_decref(conn);
+                       return ESTALE;
+               }
+
+                peer_nid = peer->rap_nid;
+
+       } else {
+                /* passive: listener accepted 'sock' */
                 LASSERT (peer == NULL);
 
                 rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
                 LASSERT (peer == NULL);
 
                 rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
@@ -746,39 +823,21 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                  * table with no connections: I can't drop the global lock
                  * until I've given it a connection or removed it, and when
                  * I do 'peer' can disappear under me. */
                  * table with no connections: I can't drop the global lock
                  * until I've given it a connection or removed it, and when
                  * I do 'peer' can disappear under me. */
-        } else {
-                /* active: connd wants to connect to peer */
-                LASSERT (peer != NULL);
-                LASSERT (peer->rap_connecting);
-                
-                rc = kranal_active_conn_handshake(peer, &conn);
-                if (rc != 0)
-                        return rc;
-
-               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
-               if (!kranal_peer_active(peer)) {
-                       /* raced with peer getting unlinked */
-                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
-                                                flags);
-                        kranal_conn_decref(conn);
-                       return ESTALE;
-               }
-       }
+        }
 
        LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
 
        LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
-        peer_nid = peer->rap_nid;
 
         /* Refuse to duplicate an existing connection (both sides might try
          * to connect at once).  NB we return success!  We _do_ have a
          * connection (so we don't need to remove the peer from the peer
          * table) and we _don't_ have any blocked txs to complete */
 
         /* Refuse to duplicate an existing connection (both sides might try
          * to connect at once).  NB we return success!  We _do_ have a
          * connection (so we don't need to remove the peer from the peer
          * table) and we _don't_ have any blocked txs to complete */
-       if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
+        rc = kranal_conn_isdup_locked(peer, conn);
+       if (rc != 0) {
                 LASSERT (!list_empty(&peer->rap_conns));
                 LASSERT (list_empty(&peer->rap_tx_queue));
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 LASSERT (!list_empty(&peer->rap_conns));
                 LASSERT (list_empty(&peer->rap_tx_queue));
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
-               CWARN("Not creating duplicate connection to "LPX64"\n",
-                      peer_nid);
+               CWARN("Not creating duplicate connection to "LPX64": %d\n",
+                      peer_nid, rc);
                 kranal_conn_decref(conn);
                 return 0;
        }
                 kranal_conn_decref(conn);
                 return 0;
        }
@@ -800,7 +859,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 kranal_post_fma(conn, tx);
         }
 
                 kranal_post_fma(conn, tx);
         }
 
-       nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
+       nstale = kranal_close_stale_conns_locked(peer, conn);
 
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
 
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
@@ -989,7 +1048,7 @@ kranal_start_listener (void)
        /* Called holding kra_nid_mutex: listener stopped */
        LASSERT (kranal_data.kra_listener_sock == NULL);
 
        /* Called holding kra_nid_mutex: listener stopped */
        LASSERT (kranal_data.kra_listener_sock == NULL);
 
-       kranal_data.kra_listener_shutdown == 0;
+       kranal_data.kra_listener_shutdown = 0;
        pid = kernel_thread(kranal_listener, NULL, 0);
        if (pid < 0) {
                CERROR("Can't spawn listener: %ld\n", pid);
        pid = kernel_thread(kranal_listener, NULL, 0);
        if (pid < 0) {
                CERROR("Can't spawn listener: %ld\n", pid);
@@ -1032,6 +1091,10 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
        int    old_val;
        int    rc;
 
        int    old_val;
        int    rc;
 
+        /* No race with nal initialisation since the nal is setup all the time
+         * it's loaded.  When that changes, change this! */
+        LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
+
        down(&kranal_data.kra_nid_mutex);
 
        LASSERT (tunable == &kranal_tunables.kra_port ||
        down(&kranal_data.kra_nid_mutex);
 
        LASSERT (tunable == &kranal_tunables.kra_port ||
@@ -1050,18 +1113,23 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
                rc = kranal_start_listener();
 
                if (rc != 0) {
                rc = kranal_start_listener();
 
                if (rc != 0) {
+                        CWARN("Unable to start listener with new tunable:"
+                              " reverting to old value\n");
                        *tunable = old_val;
                        kranal_start_listener();
                }
        }
 
        up(&kranal_data.kra_nid_mutex);
                        *tunable = old_val;
                        kranal_start_listener();
                }
        }
 
        up(&kranal_data.kra_nid_mutex);
+
+        LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
        return rc;
 }
 
 int
 kranal_set_mynid(ptl_nid_t nid)
 {
        return rc;
 }
 
 int
 kranal_set_mynid(ptl_nid_t nid)
 {
+        unsigned long  flags;
         lib_ni_t      *ni = &kranal_lib.libnal_ni;
         int            rc = 0;
 
         lib_ni_t      *ni = &kranal_lib.libnal_ni;
         int            rc = 0;
 
@@ -1079,10 +1147,14 @@ kranal_set_mynid(ptl_nid_t nid)
        if (kranal_data.kra_listener_sock != NULL)
                kranal_stop_listener();
 
        if (kranal_data.kra_listener_sock != NULL)
                kranal_stop_listener();
 
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+        kranal_data.kra_peerstamp++;
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
         ni->ni_pid.nid = nid;
 
         /* Delete all existing peers and their connections after new
         ni->ni_pid.nid = nid;
 
         /* Delete all existing peers and their connections after new
-         * NID/incarnation set to ensure no old connections in our brave
+         * NID/connstamp set to ensure no old connections in our brave
          * new world. */
         kranal_del_peer(PTL_NID_ANY, 0);
 
          * new world. */
         kranal_del_peer(PTL_NID_ANY, 0);
 
@@ -1109,7 +1181,8 @@ kranal_create_peer (ptl_nid_t nid)
         peer->rap_nid = nid;
         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
 
         peer->rap_nid = nid;
         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
 
-        INIT_LIST_HEAD(&peer->rap_list);        /* not in the peer table yet */
+        INIT_LIST_HEAD(&peer->rap_list);
+        INIT_LIST_HEAD(&peer->rap_connd_list);
         INIT_LIST_HEAD(&peer->rap_conns);
         INIT_LIST_HEAD(&peer->rap_tx_queue);
 
         INIT_LIST_HEAD(&peer->rap_conns);
         INIT_LIST_HEAD(&peer->rap_tx_queue);
 
@@ -1121,16 +1194,17 @@ kranal_create_peer (ptl_nid_t nid)
 }
 
 void
 }
 
 void
-__kranal_peer_decref (kra_peer_t *peer)
+kranal_destroy_peer (kra_peer_t *peer)
 {
         CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
 
         LASSERT (atomic_read(&peer->rap_refcount) == 0);
         LASSERT (peer->rap_persistence == 0);
         LASSERT (!kranal_peer_active(peer));
 {
         CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
 
         LASSERT (atomic_read(&peer->rap_refcount) == 0);
         LASSERT (peer->rap_persistence == 0);
         LASSERT (!kranal_peer_active(peer));
-        LASSERT (peer->rap_connecting == 0);
+        LASSERT (!peer->rap_connecting);
         LASSERT (list_empty(&peer->rap_conns));
         LASSERT (list_empty(&peer->rap_tx_queue));
         LASSERT (list_empty(&peer->rap_conns));
         LASSERT (list_empty(&peer->rap_tx_queue));
+        LASSERT (list_empty(&peer->rap_connd_list));
 
         PORTAL_FREE(peer, sizeof(*peer));
 
 
         PORTAL_FREE(peer, sizeof(*peer));
 
@@ -1387,31 +1461,6 @@ kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
 }
 
 int
 }
 
 int
-kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
-{
-        kra_conn_t         *conn;
-        struct list_head   *ctmp;
-        struct list_head   *cnxt;
-        int                 count = 0;
-
-        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
-                conn = list_entry(ctmp, kra_conn_t, rac_list);
-
-                if (conn->rac_peer_incarnation == incarnation)
-                        continue;
-
-                CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
-                       peer->rap_nid, conn->rac_peer_incarnation, incarnation);
-                LASSERT (conn->rac_peer_incarnation < incarnation);
-
-                count++;
-                kranal_close_conn_locked(conn, -ESTALE);
-        }
-
-        return count;
-}
-
-int
 kranal_close_matching_conns (ptl_nid_t nid)
 {
         unsigned long       flags;
 kranal_close_matching_conns (ptl_nid_t nid)
 {
         unsigned long       flags;
@@ -1570,6 +1619,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
 
                 tx->tx_isnblk = isnblk;
                 tx->tx_buftype = RANAL_BUF_NONE;
 
                 tx->tx_isnblk = isnblk;
                 tx->tx_buftype = RANAL_BUF_NONE;
+                tx->tx_msg.ram_type = RANAL_MSG_NONE;
 
                 list_add(&tx->tx_list, freelist);
         }
 
                 list_add(&tx->tx_list, freelist);
         }
@@ -1637,6 +1687,7 @@ kranal_device_init(int id, kra_device_t *dev)
 void
 kranal_device_fini(kra_device_t *dev)
 {
 void
 kranal_device_fini(kra_device_t *dev)
 {
+        LASSERT(dev->rad_scheduler == NULL);
         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
         RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
         RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
@@ -1647,7 +1698,6 @@ void
 kranal_api_shutdown (nal_t *nal)
 {
         int           i;
 kranal_api_shutdown (nal_t *nal)
 {
         int           i;
-        int           rc;
         unsigned long flags;
         
         if (nal->nal_refct != 0) {
         unsigned long flags;
         
         if (nal->nal_refct != 0) {
@@ -1791,13 +1841,14 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
 
         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
 
         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
-         * a unique (for all time) incarnation so we can uniquely identify
-         * the sender.  The incarnation is an incrementing counter
+         * a unique (for all time) connstamp so we can uniquely identify
+         * the sender.  The connstamp is an incrementing counter
          * initialised with seconds + microseconds at startup time.  So we
          * rely on NOT creating connections more frequently on average than
          * initialised with seconds + microseconds at startup time.  So we
          * rely on NOT creating connections more frequently on average than
-         * 1MHz to ensure we don't use old incarnations when we reboot. */
+         * 1MHz to ensure we don't use old connstamps when we reboot. */
         do_gettimeofday(&tv);
         do_gettimeofday(&tv);
-        kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+        kranal_data.kra_connstamp =
+        kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
 
         init_MUTEX(&kranal_data.kra_nid_mutex);
         init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
 
         init_MUTEX(&kranal_data.kra_nid_mutex);
         init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
@@ -1813,6 +1864,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 spin_lock_init(&dev->rad_lock);
         }
 
                 spin_lock_init(&dev->rad_lock);
         }
 
+        kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
         spin_lock_init(&kranal_data.kra_reaper_lock);
 
         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
         spin_lock_init(&kranal_data.kra_reaper_lock);
 
index fe130b7..1622c40 100644 (file)
 
 #include <rapl.h>
 
 
 #include <rapl.h>
 
-#if CONFIG_SMP
-# define RANAL_N_SCHED      num_online_cpus()   /* # schedulers */
-#else
-# define RANAL_N_SCHED      1                   /* # schedulers */
-#endif
-
 #define RANAL_MAXDEVS       2                   /* max # devices RapidArray supports */
 
 #define RANAL_N_CONND       4                   /* # connection daemons */
 #define RANAL_MAXDEVS       2                   /* max # devices RapidArray supports */
 
 #define RANAL_N_CONND       4                   /* # connection daemons */
@@ -72,8 +66,8 @@
 #define RANAL_MIN_RECONNECT_INTERVAL 1          /* first failed connection retry (seconds)... */
 #define RANAL_MAX_RECONNECT_INTERVAL 60         /* ...exponentially increasing to this */
 
 #define RANAL_MIN_RECONNECT_INTERVAL 1          /* first failed connection retry (seconds)... */
 #define RANAL_MAX_RECONNECT_INTERVAL 60         /* ...exponentially increasing to this */
 
-#define RANAL_FMA_PREFIX_LEN      232           /* size of FMA "Prefix" */
-#define RANAL_FMA_MAX_DATA_LEN    ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
+#define RANAL_FMA_MAX_PREFIX      232           /* max size of FMA "Prefix" */
+#define RANAL_FMA_MAX_DATA        ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
 
 #define RANAL_PEER_HASH_SIZE  101               /* # peer lists */
 #define RANAL_CONN_HASH_SIZE  101               /* # conn lists */
 
 #define RANAL_PEER_HASH_SIZE  101               /* # peer lists */
 #define RANAL_CONN_HASH_SIZE  101               /* # conn lists */
@@ -92,7 +86,7 @@
 /* default vals for runtime tunables */
 #define RANAL_TIMEOUT           30              /* comms timeout (seconds) */
 #define RANAL_LISTENER_TIMEOUT   5              /* listener timeout (seconds) */
 /* default vals for runtime tunables */
 #define RANAL_TIMEOUT           30              /* comms timeout (seconds) */
 #define RANAL_LISTENER_TIMEOUT   5              /* listener timeout (seconds) */
-#define RANAL_MAX_IMMEDIATE    (2<<10)          /* biggest immediate payload */
+#define RANAL_MAX_IMMEDIATE    (2<<10)          /* immediate payload breakpoint */
 
 typedef struct 
 {
 
 typedef struct 
 {
@@ -100,7 +94,8 @@ typedef struct
         int               kra_listener_timeout; /* max time the listener can block */
        int               kra_backlog;          /* listener's backlog */
        int               kra_port;             /* listener's TCP/IP port */
         int               kra_listener_timeout; /* max time the listener can block */
        int               kra_backlog;          /* listener's backlog */
        int               kra_port;             /* listener's TCP/IP port */
-        int               kra_max_immediate;    /* biggest immediate payload */
+        int               kra_max_immediate;    /* immediate payload breakpoint */
+
         struct ctl_table_header *kra_sysctl;    /* sysctl interface */
 } kra_tunables_t;
 
         struct ctl_table_header *kra_sysctl;    /* sysctl interface */
 } kra_tunables_t;
 
@@ -114,8 +109,10 @@ typedef struct
         int                     rad_idx;        /* index in kra_devices */
         int                     rad_ready;      /* set by device callback */
         struct list_head        rad_connq;      /* connections requiring attention */
         int                     rad_idx;        /* index in kra_devices */
         int                     rad_ready;      /* set by device callback */
         struct list_head        rad_connq;      /* connections requiring attention */
+        struct list_head        rad_zombies;    /* connections to free */
         wait_queue_head_t       rad_waitq;      /* scheduler waits here */
         spinlock_t              rad_lock;       /* serialise */
         wait_queue_head_t       rad_waitq;      /* scheduler waits here */
         spinlock_t              rad_lock;       /* serialise */
+        void                   *rad_scheduler;  /* scheduling thread */
 } kra_device_t;
         
 typedef struct 
 } kra_device_t;
         
 typedef struct 
@@ -140,7 +137,8 @@ typedef struct
 
         struct list_head *kra_conns;            /* conns hashed by cqid */
         int               kra_conn_hash_size;   /* size of kra_conns */
 
         struct list_head *kra_conns;            /* conns hashed by cqid */
         int               kra_conn_hash_size;   /* size of kra_conns */
-        __u64             kra_next_incarnation; /* conn incarnation # generator */
+        __u64             kra_peerstamp;        /* when I started up */
+        __u64             kra_connstamp;        /* conn stamp generator */
         int               kra_next_cqid;        /* cqid generator */
         atomic_t          kra_nconns;           /* # connections extant */
 
         int               kra_next_cqid;        /* cqid generator */
         atomic_t          kra_nconns;           /* # connections extant */
 
@@ -173,11 +171,12 @@ typedef struct kra_connreq                        /* connection request/response */
 {                                              /* (sent via socket) */
         __u32             racr_magic;          /* I'm an ranal connreq */
         __u16             racr_version;                /* this is my version number */
 {                                              /* (sent via socket) */
         __u32             racr_magic;          /* I'm an ranal connreq */
         __u16             racr_version;                /* this is my version number */
-        __u16             racr_devid;           /* which device to connect on */
-        __u64             racr_nid;            /* my NID */
-        __u64             racr_incarnation;    /* my incarnation */
-        __u32             racr_timeout;         /* my timeout */
-       RAP_RI_PARAMETERS racr_riparams;        /* my endpoint info */
+        __u16             racr_devid;           /* sender's device ID */
+        __u64             racr_nid;            /* sender's NID */
+        __u64             racr_peerstamp;       /* sender's instance stamp */
+        __u64             racr_connstamp;       /* sender's connection stamp */
+        __u32             racr_timeout;         /* sender's timeout */
+       RAP_RI_PARAMETERS racr_riparams;        /* sender's endpoint info */
 } kra_connreq_t;
 
 typedef struct
 } kra_connreq_t;
 
 typedef struct
@@ -224,7 +223,7 @@ typedef struct                                  /* NB must fit in FMA "Prefix" *
         __u16             ram_version;         /* this is my version number */
         __u16             ram_type;            /* msg type */
         __u64             ram_srcnid;           /* sender's NID */
         __u16             ram_version;         /* this is my version number */
         __u16             ram_type;            /* msg type */
         __u64             ram_srcnid;           /* sender's NID */
-        __u64             ram_incarnation;      /* sender's connection incarnation */
+        __u64             ram_connstamp;        /* sender's connection stamp */
         union {
                 kra_immediate_msg_t   immediate;
                kra_putreq_msg_t      putreq;
         union {
                 kra_immediate_msg_t   immediate;
                kra_putreq_msg_t      putreq;
@@ -300,12 +299,13 @@ typedef struct kra_conn
         struct kra_peer    *rac_peer;           /* owning peer */
         struct list_head    rac_list;           /* stash on peer's conn list */
         struct list_head    rac_hashlist;       /* stash in connection hash table */
         struct kra_peer    *rac_peer;           /* owning peer */
         struct list_head    rac_list;           /* stash on peer's conn list */
         struct list_head    rac_hashlist;       /* stash in connection hash table */
-        struct list_head    rac_schedlist;      /* queue for scheduler */
+        struct list_head    rac_schedlist;      /* schedule (on rad_connq) for attention */
         struct list_head    rac_fmaq;           /* txs queued for FMA */
         struct list_head    rac_rdmaq;          /* txs awaiting RDMA completion */
         struct list_head    rac_replyq;         /* txs awaiting replies */
         struct list_head    rac_fmaq;           /* txs queued for FMA */
         struct list_head    rac_rdmaq;          /* txs awaiting RDMA completion */
         struct list_head    rac_replyq;         /* txs awaiting replies */
-        __u64               rac_peer_incarnation; /* peer's unique connection stamp */
-        __u64               rac_my_incarnation; /* my unique connection stamp */
+        __u64               rac_peerstamp;      /* peer's unique stamp */
+        __u64               rac_peer_connstamp; /* peer's unique connection stamp */
+        __u64               rac_my_connstamp;   /* my unique connection stamp */
         unsigned long       rac_last_tx;        /* when I last sent an FMA message */
         unsigned long       rac_last_rx;        /* when I last received an FMA messages */
         long                rac_keepalive;      /* keepalive interval */
         unsigned long       rac_last_tx;        /* when I last sent an FMA message */
         unsigned long       rac_last_rx;        /* when I last received an FMA messages */
         long                rac_keepalive;      /* keepalive interval */
@@ -316,7 +316,7 @@ typedef struct kra_conn
         atomic_t            rac_refcount;       /* # users */
         unsigned int        rac_close_sent;     /* I've sent CLOSE */
         unsigned int        rac_close_recvd;    /* I've received CLOSE */
         atomic_t            rac_refcount;       /* # users */
         unsigned int        rac_close_sent;     /* I've sent CLOSE */
         unsigned int        rac_close_recvd;    /* I've received CLOSE */
-        unsigned int        rac_closing;        /* connection being torn down */
+        unsigned int        rac_state;          /* connection state */
         unsigned int        rac_scheduled;      /* being attented to */
         spinlock_t          rac_lock;           /* serialise */
         kra_device_t       *rac_device;         /* which device */
         unsigned int        rac_scheduled;      /* being attented to */
         spinlock_t          rac_lock;           /* serialise */
         kra_device_t       *rac_device;         /* which device */
@@ -325,6 +325,10 @@ typedef struct kra_conn
         kra_msg_t           rac_msg;            /* keepalive/CLOSE message buffer */
 } kra_conn_t;
 
         kra_msg_t           rac_msg;            /* keepalive/CLOSE message buffer */
 } kra_conn_t;
 
+#define RANAL_CONN_ESTABLISHED     0
+#define RANAL_CONN_CLOSING         1
+#define RANAL_CONN_CLOSED          2
+
 typedef struct kra_peer
 {
         struct list_head    rap_list;           /* stash on global peer list */
 typedef struct kra_peer
 {
         struct list_head    rap_list;           /* stash on global peer list */
@@ -358,8 +362,8 @@ extern lib_nal_t       kranal_lib;
 extern kra_data_t      kranal_data;
 extern kra_tunables_t  kranal_tunables;
 
 extern kra_data_t      kranal_data;
 extern kra_tunables_t  kranal_tunables;
 
-extern void __kranal_peer_decref(kra_peer_t *peer);
-extern void __kranal_conn_decref(kra_conn_t *conn);
+extern void kranal_destroy_peer(kra_peer_t *peer);
+extern void kranal_destroy_conn(kra_conn_t *conn);
 
 static inline void
 kranal_peer_addref(kra_peer_t *peer)
 
 static inline void
 kranal_peer_addref(kra_peer_t *peer)
@@ -375,7 +379,7 @@ kranal_peer_decref(kra_peer_t *peer)
         CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
        LASSERT(atomic_read(&peer->rap_refcount) > 0);
        if (atomic_dec_and_test(&peer->rap_refcount))
         CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
        LASSERT(atomic_read(&peer->rap_refcount) > 0);
        if (atomic_dec_and_test(&peer->rap_refcount))
-               __kranal_peer_decref(peer);
+               kranal_destroy_peer(peer);
 }
 
 static inline struct list_head *
 }
 
 static inline struct list_head *
@@ -383,7 +387,7 @@ kranal_nid2peerlist (ptl_nid_t nid)
 {
         unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
         
 {
         unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
         
-        return (&kranal_data.kra_peers [hash]);
+        return (&kranal_data.kra_peers[hash]);
 }
 
 static inline int
 }
 
 static inline int
@@ -407,7 +411,7 @@ kranal_conn_decref(kra_conn_t *conn)
         CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
        LASSERT(atomic_read(&conn->rac_refcount) > 0);
        if (atomic_dec_and_test(&conn->rac_refcount))
         CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
        LASSERT(atomic_read(&conn->rac_refcount) > 0);
        if (atomic_dec_and_test(&conn->rac_refcount))
-               __kranal_conn_decref(conn);
+                kranal_destroy_conn(conn);
 }
 
 static inline struct list_head *
 }
 
 static inline struct list_head *
@@ -457,8 +461,6 @@ kranal_page2phys (struct page *p)
 extern int kranal_listener_procint(ctl_table *table, 
                                    int write, struct file *filp, 
                                    void *buffer, size_t *lenp);
 extern int kranal_listener_procint(ctl_table *table, 
                                    int write, struct file *filp, 
                                    void *buffer, size_t *lenp);
-extern int kranal_close_stale_conns_locked (kra_peer_t *peer, 
-                                            __u64 incarnation);
 extern void kranal_update_reaper_timeout(long timeout);
 extern void kranal_tx_done (kra_tx_t *tx, int completion);
 extern void kranal_unlink_peer_locked (kra_peer_t *peer);
 extern void kranal_update_reaper_timeout(long timeout);
 extern void kranal_tx_done (kra_tx_t *tx, int completion);
 extern void kranal_unlink_peer_locked (kra_peer_t *peer);
index 9490b56..69a4b33 100644 (file)
@@ -82,122 +82,6 @@ kranal_schedule_conn(kra_conn_t *conn)
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 }
 
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 }
 
-void
-kranal_schedule_cqid (__u32 cqid)
-{
-        kra_conn_t         *conn;
-        struct list_head   *conns;
-        struct list_head   *tmp;
-
-        conns = kranal_cqid2connlist(cqid);
-
-        read_lock(&kranal_data.kra_global_lock);
-
-        conn = kranal_cqid2conn_locked(cqid);
-        
-        if (conn == NULL)
-                CWARN("no cqid %x\n", cqid);
-        else
-                kranal_schedule_conn(conn);
-        
-        read_unlock(&kranal_data.kra_global_lock);
-}
-
-void
-kranal_schedule_dev(kra_device_t *dev)
-{
-        kra_conn_t         *conn;
-        struct list_head   *conns;
-        struct list_head   *tmp;
-        int                 i;
-
-        /* Don't do this in IRQ context (servers may have 1000s of clients) */
-        LASSERT (!in_interrupt()); 
-
-        CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
-
-        for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
-
-                /* Drop the lock on each hash bucket to ensure we don't
-                 * block anyone for too long at IRQ priority on another CPU */
-                
-                read_lock(&kranal_data.kra_global_lock);
-        
-                conns = &kranal_data.kra_conns[i];
-
-                list_for_each (tmp, conns) {
-                        conn = list_entry(tmp, kra_conn_t, rac_hashlist);
-                
-                        if (conn->rac_device == dev)
-                                kranal_schedule_conn(conn);
-                }
-                read_unlock(&kranal_data.kra_global_lock);
-        }
-}
-
-void
-kranal_tx_done (kra_tx_t *tx, int completion)
-{
-        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
-        kra_device_t    *dev;
-        unsigned long    flags;
-        int              i;
-        RAP_RETURN       rrc;
-
-        LASSERT (!in_interrupt());
-
-        switch (tx->tx_buftype) {
-        default:
-                LBUG();
-
-        case RANAL_BUF_NONE:
-        case RANAL_BUF_IMMEDIATE:
-        case RANAL_BUF_PHYS_UNMAPPED:
-        case RANAL_BUF_VIRT_UNMAPPED:
-                break;
-
-        case RANAL_BUF_PHYS_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_conn->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
-                                           dev->rad_ptag, &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-                break;
-
-        case RANAL_BUF_VIRT_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_conn->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
-                                           dev->rad_ptag, &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-                break;
-        }
-
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 libmsgs to finalise */
-                if (tx->tx_libmsg[i] == NULL)
-                        continue;
-
-                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
-                tx->tx_libmsg[i] = NULL;
-        }
-
-        tx->tx_buftype = RANAL_BUF_NONE;
-        tx->tx_msg.ram_type = RANAL_MSG_NONE;
-        tx->tx_conn = NULL;
-
-        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
-
-        if (tx->tx_isnblk) {
-                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
-        } else {
-                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
-                wake_up(&kranal_data.kra_idle_tx_waitq);
-        }
-
-        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-}
-
 kra_tx_t *
 kranal_get_idle_tx (int may_block) 
 {
 kra_tx_t *
 kranal_get_idle_tx (int may_block) 
 {
@@ -259,7 +143,7 @@ kranal_init_msg(kra_msg_t *msg, int type)
         msg->ram_version = RANAL_MSG_VERSION;
         msg->ram_type = type;
         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
         msg->ram_version = RANAL_MSG_VERSION;
         msg->ram_type = type;
         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
-        /* ram_incarnation gets set when FMA is sent */
+        /* ram_connstamp gets set when FMA is sent */
 }
 
 kra_tx_t *
 }
 
 kra_tx_t *
@@ -279,6 +163,10 @@ kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                                int offset, int nob)
                  
 {
                                int offset, int nob)
                  
 {
+        /* For now this is almost identical to kranal_setup_virt_buffer, but we
+         * could "flatten" the payload into a single contiguous buffer ready
+         * for sending direct over an FMA if we ever needed to. */
+
         LASSERT (nob > 0);
         LASSERT (niov > 0);
         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
         LASSERT (nob > 0);
         LASSERT (niov > 0);
         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
@@ -365,7 +253,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 if (kiov->kiov_offset != 0 ||
                     ((resid > PAGE_SIZE) && 
                      kiov->kiov_len < PAGE_SIZE)) {
                 if (kiov->kiov_offset != 0 ||
                     ((resid > PAGE_SIZE) && 
                      kiov->kiov_len < PAGE_SIZE)) {
-                        int i;
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
                                "page %d, offset %d, len %d \n", 
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
                                "page %d, offset %d, len %d \n", 
@@ -410,27 +297,108 @@ kranal_map_buffer (kra_tx_t *tx)
         kra_device_t   *dev = conn->rac_device;
         RAP_RETURN      rrc;
 
         kra_device_t   *dev = conn->rac_device;
         RAP_RETURN      rrc;
 
+        LASSERT (current == dev->rad_scheduler);
+
         switch (tx->tx_buftype) {
         default:
         switch (tx->tx_buftype) {
         default:
+                LBUG();
+                
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_MAPPED:
+        case RANAL_BUF_VIRT_MAPPED:
+                break;
                 
         case RANAL_BUF_PHYS_UNMAPPED:
                 
         case RANAL_BUF_PHYS_UNMAPPED:
-                rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
+                rrc = RapkRegisterPhys(dev->rad_handle,
                                        tx->tx_phys, tx->tx_phys_npages,
                                        tx->tx_phys, tx->tx_phys_npages,
-                                       conn->rac_device->rad_ptag,
-                                       &tx->tx_map_key);
+                                       dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
-                return;
+                break;
 
         case RANAL_BUF_VIRT_UNMAPPED:
 
         case RANAL_BUF_VIRT_UNMAPPED:
-                rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
+                rrc = RapkRegisterMemory(dev->rad_handle,
                                          tx->tx_buffer, tx->tx_nob,
                                          tx->tx_buffer, tx->tx_nob,
-                                         conn->rac_device->rad_ptag,
-                                         &tx->tx_map_key);
+                                         dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
-                return;
+                break;
+        }
+}
+
+void
+kranal_unmap_buffer (kra_tx_t *tx)
+{
+        kra_device_t   *dev;
+        RAP_RETURN      rrc;
+
+        switch (tx->tx_buftype) {
+        default:
+                LBUG();
+                
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_UNMAPPED:
+        case RANAL_BUF_VIRT_UNMAPPED:
+                break;
+                
+        case RANAL_BUF_PHYS_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_conn->rac_device;
+                LASSERT (current == dev->rad_scheduler);
+                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
+                break;
+
+        case RANAL_BUF_VIRT_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_conn->rac_device;
+                LASSERT (current == dev->rad_scheduler);
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
+                break;
+        }
+}
+
+void
+kranal_tx_done (kra_tx_t *tx, int completion)
+{
+        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
+        unsigned long    flags;
+        int              i;
+
+        LASSERT (!in_interrupt());
+
+        kranal_unmap_buffer(tx);
+
+        for (i = 0; i < 2; i++) {
+                /* tx may have up to 2 libmsgs to finalise */
+                if (tx->tx_libmsg[i] == NULL)
+                        continue;
+
+                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
+                tx->tx_libmsg[i] = NULL;
+        }
+
+        tx->tx_buftype = RANAL_BUF_NONE;
+        tx->tx_msg.ram_type = RANAL_MSG_NONE;
+        tx->tx_conn = NULL;
+
+        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+        if (tx->tx_isnblk) {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
+        } else {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
+                wake_up(&kranal_data.kra_idle_tx_waitq);
         }
         }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
 }
 
 kra_conn_t *
 }
 
 kra_conn_t *
@@ -513,6 +481,8 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         LASSERT (peer->rap_persistence > 0);
 
         if (!peer->rap_connecting) {
         LASSERT (peer->rap_persistence > 0);
 
         if (!peer->rap_connecting) {
+                LASSERT (list_empty(&peer->rap_tx_queue));
+                
                 now = CURRENT_TIME;
                 if (now < peer->rap_reconnect_time) {
                         write_unlock_irqrestore(g_lock, flags);
                 now = CURRENT_TIME;
                 if (now < peer->rap_reconnect_time) {
                         write_unlock_irqrestore(g_lock, flags);
@@ -538,35 +508,40 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         write_unlock_irqrestore(g_lock, flags);
 }
 
         write_unlock_irqrestore(g_lock, flags);
 }
 
-static void
+void
 kranal_rdma(kra_tx_t *tx, int type, 
 kranal_rdma(kra_tx_t *tx, int type, 
-            kra_rdma_desc_t *rard, int nob, __u64 cookie)
+            kra_rdma_desc_t *sink, int nob, __u64 cookie)
 {
         kra_conn_t   *conn = tx->tx_conn;
         RAP_RETURN    rrc;
         unsigned long flags;
 
 {
         kra_conn_t   *conn = tx->tx_conn;
         RAP_RETURN    rrc;
         unsigned long flags;
 
-        /* prep final completion message */
-        kranal_init_msg(&tx->tx_msg, type);
-        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
-        
-        LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-        LASSERT (nob <= rard->rard_nob);
+        LASSERT (kranal_tx_mapped(tx));
+        LASSERT (nob <= sink->rard_nob);
+        LASSERT (nob <= tx->tx_nob);
 
 
+        /* No actual race with scheduler sending CLOSE (I'm she!) */
+        LASSERT (current == conn->rac_device->rad_scheduler);
+        
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
-        tx->tx_rdma_desc.DstPtr = rard->rard_addr;
-        tx->tx_rdma_desc.DstKey = rard->rard_key;
+        tx->tx_rdma_desc.DstPtr = sink->rard_addr;
+        tx->tx_rdma_desc.DstKey = sink->rard_key;
         tx->tx_rdma_desc.Length = nob;
         tx->tx_rdma_desc.AppPtr = tx;
 
         tx->tx_rdma_desc.Length = nob;
         tx->tx_rdma_desc.AppPtr = tx;
 
+        /* prep final completion message */
+        kranal_init_msg(&tx->tx_msg, type);
+        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
+
         if (nob == 0) { /* Immediate completion */
                 kranal_post_fma(conn, tx);
                 return;
         }
         if (nob == 0) { /* Immediate completion */
                 kranal_post_fma(conn, tx);
                 return;
         }
-        
+
+        LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
+
         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
         LASSERT (rrc == RAP_SUCCESS);
 
         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
         LASSERT (rrc == RAP_SUCCESS);
 
@@ -639,7 +614,7 @@ kranal_do_send (lib_nal_t    *nal,
                 LASSERT (conn->rac_rxmsg != NULL);
 
                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
                 LASSERT (conn->rac_rxmsg != NULL);
 
                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
-                        if (nob > RANAL_MAX_IMMEDIATE) {
+                        if (nob > RANAL_FMA_MAX_DATA) {
                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
                                        nob, nid);
                                 return PTL_FAIL;
                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
                                        nob, nid);
                                 return PTL_FAIL;
@@ -676,11 +651,11 @@ kranal_do_send (lib_nal_t    *nal,
 
         case PTL_MSG_GET:
                 if (kiov == NULL &&             /* not paged */
 
         case PTL_MSG_GET:
                 if (kiov == NULL &&             /* not paged */
-                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= RANAL_FMA_MAX_DATA && /* small enough */
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
 
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
 
-                tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
+                tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
@@ -846,6 +821,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                         return PTL_FAIL;
                 }
 
                         return PTL_FAIL;
                 }
 
+                tx->tx_conn = conn;
                 kranal_map_buffer(tx);
                 
                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
                 kranal_map_buffer(tx);
                 
                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
@@ -903,7 +879,7 @@ kranal_thread_fini (void)
 }
 
 int
 }
 
 int
-kranal_check_conn (kra_conn_t *conn)
+kranal_check_conn_timeouts (kra_conn_t *conn)
 {
         kra_tx_t          *tx;
         struct list_head  *ttmp;
 {
         kra_tx_t          *tx;
         struct list_head  *ttmp;
@@ -911,14 +887,16 @@ kranal_check_conn (kra_conn_t *conn)
         long               timeout;
         unsigned long      now = jiffies;
 
         long               timeout;
         unsigned long      now = jiffies;
 
-        if (!conn->rac_closing &&
+        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
+                 conn->rac_state == RANAL_CONN_CLOSING);
+
+        if (!conn->rac_close_sent &&
             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
                 kranal_schedule_conn(conn);
         }
 
             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
                 kranal_schedule_conn(conn);
         }
 
-        /* wait twice as long for CLOSE to be sure peer is dead */
-        timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
+        timeout = conn->rac_timeout * HZ;
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
@@ -927,7 +905,7 @@ kranal_check_conn (kra_conn_t *conn)
                 return -ETIMEDOUT;
         }
 
                 return -ETIMEDOUT;
         }
 
-        if (conn->rac_closing)
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 return 0;
         
         /* Check the conn's queues are moving.  These are "belt+braces" checks,
                 return 0;
         
         /* Check the conn's queues are moving.  These are "belt+braces" checks,
@@ -974,7 +952,7 @@ kranal_check_conn (kra_conn_t *conn)
 }
 
 void
 }
 
 void
-kranal_check_conns (int idx, unsigned long *min_timeoutp)
+kranal_reaper_check (int idx, unsigned long *min_timeoutp)
 {
         struct list_head  *conns = &kranal_data.kra_conns[idx];
         struct list_head  *ctmp;
 {
         struct list_head  *conns = &kranal_data.kra_conns[idx];
         struct list_head  *ctmp;
@@ -995,23 +973,31 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
                 if (conn->rac_keepalive < *min_timeoutp )
                         *min_timeoutp = conn->rac_keepalive;
 
                 if (conn->rac_keepalive < *min_timeoutp )
                         *min_timeoutp = conn->rac_keepalive;
 
-                rc = kranal_check_conn(conn);
+                rc = kranal_check_conn_timeouts(conn);
                 if (rc == 0)
                         continue;
 
                 kranal_conn_addref(conn);
                 read_unlock(&kranal_data.kra_global_lock);
 
                 if (rc == 0)
                         continue;
 
                 kranal_conn_addref(conn);
                 read_unlock(&kranal_data.kra_global_lock);
 
-                CERROR("Check on conn to "LPX64"failed: %d\n",
-                       conn->rac_peer->rap_nid, rc);
+                CERROR("Conn to "LPX64", cqid %d timed out\n",
+                       conn->rac_peer->rap_nid, conn->rac_cqid);
 
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
 
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-                if (!conn->rac_closing)
+                switch (conn->rac_state) {
+                default:
+                        LBUG();
+
+                case RANAL_CONN_ESTABLISHED:
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
-                else
-                        kranal_terminate_conn_locked(conn);
+                        break;
                         
                         
+                case RANAL_CONN_CLOSING:
+                        kranal_terminate_conn_locked(conn);
+                        break;
+                }
+                
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
                 kranal_conn_decref(conn);
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
                 kranal_conn_decref(conn);
@@ -1030,7 +1016,6 @@ kranal_connd (void *arg)
         wait_queue_t       wait;
         unsigned long      flags;
         kra_peer_t        *peer;
         wait_queue_t       wait;
         unsigned long      flags;
         kra_peer_t        *peer;
-        int                i;
 
        snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
         kportal_daemonize(name);
 
        snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
         kportal_daemonize(name);
@@ -1096,8 +1081,6 @@ kranal_reaper (void *arg)
 {
         wait_queue_t       wait;
         unsigned long      flags;
 {
         wait_queue_t       wait;
         unsigned long      flags;
-        kra_conn_t        *conn;
-        kra_peer_t        *peer;
         long               timeout;
         int                i;
         int                conn_entries = kranal_data.kra_conn_hash_size;
         long               timeout;
         int                i;
         int                conn_entries = kranal_data.kra_conn_hash_size;
@@ -1113,7 +1096,6 @@ kranal_reaper (void *arg)
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-        kranal_data.kra_new_min_timeout = 1;
 
         while (!kranal_data.kra_shutdown) {
 
 
         while (!kranal_data.kra_shutdown) {
 
@@ -1165,8 +1147,8 @@ kranal_reaper (void *arg)
                                 chunk = 1;
 
                         for (i = 0; i < chunk; i++) {
                                 chunk = 1;
 
                         for (i = 0; i < chunk; i++) {
-                                kranal_check_conns(conn_index, 
-                                                   &next_min_timeout);
+                                kranal_reaper_check(conn_index, 
+                                                    &next_min_timeout);
                                 conn_index = (conn_index + 1) % conn_entries;
                         }
 
                                 conn_index = (conn_index + 1) % conn_entries;
                         }
 
@@ -1212,41 +1194,113 @@ kranal_reaper (void *arg)
 }
 
 void
 }
 
 void
-kranal_process_rdmaq (__u32 cqid)
+kranal_check_rdma_cq (kra_device_t *dev)
 {
         kra_conn_t          *conn;
         kra_tx_t            *tx;
         RAP_RETURN           rrc;
         unsigned long        flags;
         RAP_RDMA_DESCRIPTOR *desc;
 {
         kra_conn_t          *conn;
         kra_tx_t            *tx;
         RAP_RETURN           rrc;
         unsigned long        flags;
         RAP_RDMA_DESCRIPTOR *desc;
-        
-        read_lock(&kranal_data.kra_global_lock);
+        __u32                cqid;
+        __u32                event_type;
+
+        for (;;) {
+                rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+                if (rrc == RAP_NOT_DONE)
+                        return;
 
 
-        conn = kranal_cqid2conn_locked(cqid);
-        LASSERT (conn != NULL);
+                LASSERT (rrc == RAP_SUCCESS);
+                LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
 
 
-        rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
-        LASSERT (rrc == RAP_SUCCESS);
+                read_lock(&kranal_data.kra_global_lock);
 
 
-        spin_lock_irqsave(&conn->rac_lock, flags);
+                conn = kranal_cqid2conn_locked(cqid);
+                if (conn == NULL) {
+                        /* Conn was destroyed? */
+                        CWARN("RDMA CQID lookup %d failed\n", cqid);
+                        read_unlock(&kranal_data.kra_global_lock);
+                        continue;
+                }
 
 
-        LASSERT (!list_empty(&conn->rac_rdmaq));
-        tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
-        list_del(&tx->tx_list);
+                rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
+                LASSERT (rrc == RAP_SUCCESS);
 
 
-        LASSERT(desc->AppPtr == (void *)tx);
-        LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
-                tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+                spin_lock_irqsave(&conn->rac_lock, flags);
 
 
-        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
-        tx->tx_qtime = jiffies;
+                LASSERT (!list_empty(&conn->rac_rdmaq));
+                tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
+                list_del(&tx->tx_list);
+
+                LASSERT(desc->AppPtr == (void *)tx);
+                LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                        tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+
+                list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+                tx->tx_qtime = jiffies;
         
         
-        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
 
 
-        /* Get conn's fmaq processed, now I've just put something there */
-        kranal_schedule_conn(conn);
+                /* Get conn's fmaq processed, now I've just put something
+                 * there */
+                kranal_schedule_conn(conn);
 
 
-        read_unlock(&kranal_data.kra_global_lock);
+                read_unlock(&kranal_data.kra_global_lock);
+        }
+}
+
+void
+kranal_check_fma_cq (kra_device_t *dev)
+{
+        kra_conn_t         *conn;
+        RAP_RETURN          rrc;
+        __u32               cqid;
+        __u32               event_type;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+        int                 i;
+
+        for (;;) {
+                rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
+                if (rrc != RAP_NOT_DONE)
+                        return;
+                
+                LASSERT (rrc == RAP_SUCCESS);
+
+                if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
+
+                        read_lock(&kranal_data.kra_global_lock);
+                
+                        conn = kranal_cqid2conn_locked(cqid);
+                        if (conn == NULL)
+                                CWARN("FMA CQID lookup %d failed\n", cqid);
+                        else
+                                kranal_schedule_conn(conn);
+
+                        read_unlock(&kranal_data.kra_global_lock);
+                        continue;
+                }
+
+                /* FMA CQ has overflowed: check ALL conns */
+                CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+
+                for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
+                        
+                        read_lock(&kranal_data.kra_global_lock);
+                
+                        conns = &kranal_data.kra_conns[i];
+
+                        list_for_each (tmp, conns) {
+                                conn = list_entry(tmp, kra_conn_t, 
+                                                  rac_hashlist);
+                
+                                if (conn->rac_device == dev)
+                                        kranal_schedule_conn(conn);
+                        }
+
+                        /* don't block write lockers for too long... */
+                        read_unlock(&kranal_data.kra_global_lock);
+                }
+        }
 }
 
 int
 }
 
 int
@@ -1256,12 +1310,12 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
         RAP_RETURN rrc;
         
         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
         RAP_RETURN rrc;
         
-        LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
+        LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
-                 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
+                 immediatenob <= RANAL_FMA_MAX_DATA :
                  immediatenob == 0);
 
                  immediatenob == 0);
 
-        msg->ram_incarnation = conn->rac_my_incarnation;
+        msg->ram_connstamp = conn->rac_my_connstamp;
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
@@ -1287,7 +1341,7 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
         }
 }
 
         }
 }
 
-int
+void
 kranal_process_fmaq (kra_conn_t *conn) 
 {
         unsigned long flags;
 kranal_process_fmaq (kra_conn_t *conn) 
 {
         unsigned long flags;
@@ -1296,27 +1350,47 @@ kranal_process_fmaq (kra_conn_t *conn)
         int           rc;
         int           expect_reply;
 
         int           rc;
         int           expect_reply;
 
-        /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
-         * out of credits when I try to send right now... */
-
-        if (conn->rac_closing) {
+        /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
+         *       However I will be rescheduled some by a rad_fma_cq event when
+         *       I eventually get some.
+         * NB 2. Sampling rac_state here, races with setting it elsewhere
+         *       kranal_close_conn_locked.  But it doesn't matter if I try to
+         *       send a "real" message just as I start closing because I'll get
+         *       scheduled to send the close anyway. */
 
 
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
                 if (!list_empty(&conn->rac_rdmaq)) {
                 if (!list_empty(&conn->rac_rdmaq)) {
-                        /* Can't send CLOSE yet; I'm still waiting for RDMAs I
-                         * posted to finish */
+                        /* RDMAs in progress */
                         LASSERT (!conn->rac_close_sent);
                         LASSERT (!conn->rac_close_sent);
-                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
-                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                        return 0;
+                        
+                        if (time_after_eq(jiffies, 
+                                          conn->rac_last_tx + 
+                                          conn->rac_keepalive)) {
+                                kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                                kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                        }
+                        return;
                 }
                 }
-
-                if (conn->rac_close_sent)
-                        return 0;
                 
                 
+                if (conn->rac_close_sent)
+                        return;
+
                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                conn->rac_close_sent = (rc == 0);
-                return 0;
+                if (rc != 0)
+                        return;
+
+                conn->rac_close_sent = 1;
+                if (!conn->rac_close_recvd)
+                        return;
+                        
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+                if (conn->rac_state == RANAL_CONN_CLOSING)
+                        kranal_terminate_conn_locked(conn);
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+                return;
         }
 
         spin_lock_irqsave(&conn->rac_lock, flags);
         }
 
         spin_lock_irqsave(&conn->rac_lock, flags);
@@ -1330,7 +1404,7 @@ kranal_process_fmaq (kra_conn_t *conn)
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
-                return 0;
+                return;
         }
         
         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
         }
         
         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
@@ -1379,12 +1453,12 @@ kranal_process_fmaq (kra_conn_t *conn)
         }
 
         if (rc == -EAGAIN) {
         }
 
         if (rc == -EAGAIN) {
-                /* replace at the head of the list for later */
+                /* I need credits to send this.  Replace tx at the head of the
+                 * fmaq and I'll get rescheduled when credits appear */
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add(&tx->tx_list, &conn->rac_fmaq);
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add(&tx->tx_list, &conn->rac_fmaq);
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-                return 0;
+                return;
         }
 
         LASSERT (rc == 0);
         }
 
         LASSERT (rc == 0);
@@ -1398,7 +1472,8 @@ kranal_process_fmaq (kra_conn_t *conn)
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
         }
 
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
         }
 
-        return more_to_do;
+        if (more_to_do)
+                kranal_schedule_conn(conn);
 }
 
 static inline void
 }
 
 static inline void
@@ -1415,7 +1490,6 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d)
 kra_tx_t *
 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
 {
 kra_tx_t *
 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
 {
-        unsigned long     flags;
         struct list_head *ttmp;
         kra_tx_t         *tx;
         
         struct list_head *ttmp;
         kra_tx_t         *tx;
         
@@ -1438,12 +1512,11 @@ kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
         return NULL;
 }
 
         return NULL;
 }
 
-int
-kranal_process_receives(kra_conn_t *conn)
+void
+kranal_check_fma_rx (kra_conn_t *conn)
 {
         unsigned long flags;
         __u32         seq;
 {
         unsigned long flags;
         __u32         seq;
-        __u32         nob;
         kra_tx_t     *tx;
         kra_msg_t    *msg;
         void         *prefix;
         kra_tx_t     *tx;
         kra_msg_t    *msg;
         void         *prefix;
@@ -1451,7 +1524,7 @@ kranal_process_receives(kra_conn_t *conn)
         kra_peer_t   *peer = conn->rac_peer;
 
         if (rrc == RAP_NOT_DONE)
         kra_peer_t   *peer = conn->rac_peer;
 
         if (rrc == RAP_NOT_DONE)
-                return 0;
+                return;
         
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
         
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
@@ -1469,7 +1542,7 @@ kranal_process_receives(kra_conn_t *conn)
                 __swab16s(&msg->ram_version);
                 __swab16s(&msg->ram_type);
                 __swab64s(&msg->ram_srcnid);
                 __swab16s(&msg->ram_version);
                 __swab16s(&msg->ram_type);
                 __swab64s(&msg->ram_srcnid);
-                __swab64s(&msg->ram_incarnation);
+                __swab64s(&msg->ram_connstamp);
                 __swab32s(&msg->ram_seq);
 
                 /* NB message type checked below; NOT here... */
                 __swab32s(&msg->ram_seq);
 
                 /* NB message type checked below; NOT here... */
@@ -1499,10 +1572,10 @@ kranal_process_receives(kra_conn_t *conn)
                 goto out;
         }
         
                 goto out;
         }
         
-        if (msg->ram_incarnation != conn->rac_peer_incarnation) {
-                CERROR("Unexpected incarnation "LPX64"("LPX64
+        if (msg->ram_connstamp != conn->rac_peer_connstamp) {
+                CERROR("Unexpected connstamp "LPX64"("LPX64
                        " expected) from "LPX64"\n",
                        " expected) from "LPX64"\n",
-                       msg->ram_incarnation, conn->rac_peer_incarnation,
+                       msg->ram_connstamp, conn->rac_peer_connstamp,
                        peer->rap_nid);
                 goto out;
         }
                        peer->rap_nid);
                 goto out;
         }
@@ -1514,17 +1587,23 @@ kranal_process_receives(kra_conn_t *conn)
         }
 
         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
         }
 
         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
-                /* This message signals RDMA completion: wait now... */
+                /* This message signals RDMA completion... */
                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
                 LASSERT (rrc == RAP_SUCCESS);
         }
                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
                 LASSERT (rrc == RAP_SUCCESS);
         }
+
+        if (conn->rac_close_recvd) {
+                CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
+                       msg->ram_type, conn->rac_peer->rap_nid);
+                goto out;
+        }
+
         if (msg->ram_type == RANAL_MSG_CLOSE) {
                 conn->rac_close_recvd = 1;
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
         if (msg->ram_type == RANAL_MSG_CLOSE) {
                 conn->rac_close_recvd = 1;
                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-                if (!conn->rac_closing)
-                        kranal_close_conn_locked(conn, -ETIMEDOUT);
+                if (conn->rac_state == RANAL_CONN_ESTABLISHED)
+                        kranal_close_conn_locked(conn, 0);
                 else if (conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
 
                 else if (conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
 
@@ -1532,7 +1611,7 @@ kranal_process_receives(kra_conn_t *conn)
                 goto out;
         }
 
                 goto out;
         }
 
-        if (conn->rac_closing)
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 goto out;
         
         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
                 goto out;
         
         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
@@ -1636,7 +1715,32 @@ kranal_process_receives(kra_conn_t *conn)
         if (conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
         if (conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
-        return 1;
+        /* check again later */
+        kranal_schedule_conn(conn);
+}
+
+void
+kranal_complete_closed_conn (kra_conn_t *conn) 
+{
+        kra_tx_t   *tx;
+
+        LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
+
+        while (!list_empty(&conn->rac_fmaq)) {
+                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+        
+        LASSERT (list_empty(&conn->rac_rdmaq));
+
+        while (!list_empty(&conn->rac_replyq)) {
+                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
 }
 
 int
 }
 
 int
@@ -1647,19 +1751,13 @@ kranal_scheduler (void *arg)
         char            name[16];
         kra_conn_t     *conn;
         unsigned long   flags;
         char            name[16];
         kra_conn_t     *conn;
         unsigned long   flags;
-        RAP_RETURN      rrc;
-        int             rc;
-        int             resched;
-        int             i;
-        __u32           cqid;
-        __u32           event_type;
-        int             did_something;
         int             busy_loops = 0;
 
         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
         int             busy_loops = 0;
 
         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
+        dev->rad_scheduler = current;
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&dev->rad_lock, flags);
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&dev->rad_lock, flags);
@@ -1676,64 +1774,37 @@ kranal_scheduler (void *arg)
                         spin_lock_irqsave(&dev->rad_lock, flags);
                }
 
                         spin_lock_irqsave(&dev->rad_lock, flags);
                }
 
-                did_something = 0;
-
                 if (dev->rad_ready) {
                 if (dev->rad_ready) {
+                        /* Device callback fired since I last checked it */
                         dev->rad_ready = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
                         dev->rad_ready = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                        rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+                        kranal_check_rdma_cq(dev);
+                        kranal_check_fma_cq(dev);
 
 
-                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
-                        LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
-                        
-                        if (rrc == RAP_SUCCESS) {
-                                kranal_process_rdmaq(cqid);
-                                did_something = 1;
-                        }
-                        
-                        rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
-                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
-                        
-                        if (rrc == RAP_SUCCESS) {
-                                if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
-                                        kranal_schedule_dev(dev);
-                                else
-                                        kranal_schedule_cqid(cqid);
-                                did_something = 1;
-                        }
-                        
                         spin_lock_irqsave(&dev->rad_lock, flags);
                         spin_lock_irqsave(&dev->rad_lock, flags);
-
-                        /* If there were no completions to handle, I leave
-                         * rad_ready clear.  NB I cleared it BEFORE I checked
-                         * the completion queues since I'm racing with the
-                         * device callback. */
-
-                        if (did_something)
-                                dev->rad_ready = 1;
                 }
                
                 if (!list_empty(&dev->rad_connq)) {
                 }
                
                 if (!list_empty(&dev->rad_connq)) {
+                        /* Connection needs attention */
                         conn = list_entry(dev->rad_connq.next,
                                           kra_conn_t, rac_schedlist);
                         conn = list_entry(dev->rad_connq.next,
                                           kra_conn_t, rac_schedlist);
-                        list_del(&conn->rac_schedlist);
+                        list_del_init(&conn->rac_schedlist);
+                        LASSERT (conn->rac_scheduled);
+                        conn->rac_scheduled = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                        LASSERT (conn->rac_scheduled);
+                        kranal_check_fma_rx(conn);
+                        kranal_process_fmaq(conn);
 
 
-                        resched  = kranal_process_fmaq(conn);
-                        resched |= kranal_process_receives(conn);
-                        did_something = 1;
+                        if (conn->rac_state == RANAL_CONN_CLOSED)
+                                kranal_complete_closed_conn(conn);
 
 
+                        kranal_conn_decref(conn);
+                        
                         spin_lock_irqsave(&dev->rad_lock, flags);
                         spin_lock_irqsave(&dev->rad_lock, flags);
-                        if (resched)
-                                list_add_tail(&conn->rac_schedlist,
-                                              &dev->rad_connq);
-                }
-
-                if (did_something)
                         continue;
                         continue;
+                }
 
                 add_wait_queue(&dev->rad_waitq, &wait);
                 set_current_state(TASK_INTERRUPTIBLE);
 
                 add_wait_queue(&dev->rad_waitq, &wait);
                 set_current_state(TASK_INTERRUPTIBLE);
@@ -1751,6 +1822,7 @@ kranal_scheduler (void *arg)
 
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
 
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
+        dev->rad_scheduler = NULL;
         kranal_thread_fini();
         return 0;
 }
         kranal_thread_fini();
         return 0;
 }