Whamcloud - gitweb
* ranal code review
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
index c924827..0aa1d53 100644 (file)
@@ -162,7 +162,6 @@ kranal_create_sock(struct socket **sockp)
 {
        struct socket       *sock;
        int                  rc;
-        struct timeval       tv;
        int                  option;
         mm_segment_t         oldmm = get_fs();
 
@@ -215,12 +214,13 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 
         memset(connreq, 0, sizeof(*connreq));
 
-        connreq->racr_magic       = RANAL_MSG_MAGIC;
-        connreq->racr_version     = RANAL_MSG_VERSION;
-        connreq->racr_devid       = conn->rac_device->rad_id;
-        connreq->racr_nid         = kranal_lib.libnal_ni.ni_pid.nid;
-        connreq->racr_timeout     = conn->rac_timeout;
-        connreq->racr_incarnation = conn->rac_my_incarnation;
+        connreq->racr_magic     = RANAL_MSG_MAGIC;
+        connreq->racr_version   = RANAL_MSG_VERSION;
+        connreq->racr_devid     = conn->rac_device->rad_id;
+        connreq->racr_nid       = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_peerstamp = kranal_data.kra_peerstamp;
+        connreq->racr_connstamp = conn->rac_my_connstamp;
+        connreq->racr_timeout   = conn->rac_timeout;
 
         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         LASSERT(rrc == RAP_SUCCESS);
@@ -229,7 +229,6 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 int
 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
 {
-        int         i;
        int         rc;
 
        rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
@@ -248,7 +247,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                __swab16s(&connreq->racr_version);
                 __swab16s(&connreq->racr_devid);
                __swab64s(&connreq->racr_nid);
-               __swab64s(&connreq->racr_incarnation);
+               __swab64s(&connreq->racr_peerstamp);
+               __swab64s(&connreq->racr_connstamp);
                 __swab32s(&connreq->racr_timeout);
 
                __swab32s(&connreq->racr_riparams.FmaDomainHndl);
@@ -273,43 +273,100 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
                 return -EPROTO;
         }
         
-        for (i = 0; i < kranal_data.kra_ndevs; i++)
-                if (connreq->racr_devid == 
-                    kranal_data.kra_devices[i].rad_id)
-                        break;
+       return 0;
+}
 
-        if (i == kranal_data.kra_ndevs) {
-                CERROR("Can't match device %d\n", connreq->racr_devid);
-                return -ENODEV;
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 loopback;
+        int                 count = 0;
+
+        loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                if (conn == newconn)
+                        continue;
+
+                if (conn->rac_peerstamp != newconn->rac_peerstamp) {
+                        CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+                               " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+                               conn->rac_peerstamp, newconn->rac_peerstamp);
+                        LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
+                        count++;
+                        kranal_close_conn_locked(conn, -ESTALE);
+                        continue;
+                }
+
+                if (conn->rac_device != newconn->rac_device)
+                        continue;
+                
+                if (loopback &&
+                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)
+                        continue;
+                    
+                LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
+
+                CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+                       " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
+                       conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
+
+                count++;
+                kranal_close_conn_locked(conn, -ESTALE);
         }
 
-       return 0;
+        return count;
 }
 
 int
-kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
 {
        kra_conn_t       *conn;
        struct list_head *tmp;
-        int               loopback = 0;
+        int               loopback;
 
+        loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+        
        list_for_each(tmp, &peer->rap_conns) {
                conn = list_entry(tmp, kra_conn_t, rac_list);
 
-                if (conn->rac_peer_incarnation < incarnation) {
-                        /* Conns with an older incarnation get culled later */
+                /* 'newconn' is from an earlier version of 'peer'!!! */
+                if (newconn->rac_peerstamp < conn->rac_peerstamp)
+                        return 1;
+
+                /* 'conn' is from an earlier version of 'peer': it will be
+                 * removed when we cull stale conns later on... */
+                if (newconn->rac_peerstamp > conn->rac_peerstamp)
                         continue;
-                }
 
-                if (!loopback &&
-                    conn->rac_peer_incarnation == incarnation &&
-                    peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
-                        /* loopback creates 2 conns */
-                        loopback = 1;
+                /* Different devices are OK */
+                if (conn->rac_device != newconn->rac_device)
+                        continue;
+
+                /* It's me connecting to myself */
+                if (loopback &&
+                    newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+                    newconn->rac_peer_connstamp == conn->rac_my_connstamp)
                         continue;
-                }
 
-                return 1;
+                /* 'newconn' is an earlier connection from 'peer'!!! */
+                if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
+                        return 2;
+                
+                /* 'conn' is an earlier connection from 'peer': it will be
+                 * removed when we cull stale conns later on... */
+                if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
+                        continue;
+                
+                /* 'newconn' has the SAME connection stamp; 'peer' isn't
+                 * playing the game... */
+                return 3;
        }
 
        return 0;
@@ -322,7 +379,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-        conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+        conn->rac_my_connstamp = kranal_data.kra_connstamp++;
 
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
@@ -333,7 +390,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 }
 
 int
-kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
 {
        kra_conn_t    *conn;
         RAP_RETURN     rrc;
@@ -348,6 +405,7 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
        atomic_set(&conn->rac_refcount, 1);
        INIT_LIST_HEAD(&conn->rac_list);
        INIT_LIST_HEAD(&conn->rac_hashlist);
+        INIT_LIST_HEAD(&conn->rac_schedlist);
        INIT_LIST_HEAD(&conn->rac_fmaq);
        INIT_LIST_HEAD(&conn->rac_rdmaq);
        INIT_LIST_HEAD(&conn->rac_replyq);
@@ -374,34 +432,20 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
 }
 
 void
-__kranal_conn_decref(kra_conn_t *conn) 
+kranal_destroy_conn(kra_conn_t *conn) 
 {
-        kra_tx_t          *tx;
         RAP_RETURN         rrc;
 
         LASSERT (!in_interrupt());
         LASSERT (!conn->rac_scheduled);
         LASSERT (list_empty(&conn->rac_list));
         LASSERT (list_empty(&conn->rac_hashlist));
+        LASSERT (list_empty(&conn->rac_schedlist));
         LASSERT (atomic_read(&conn->rac_refcount) == 0);
-
-        while (!list_empty(&conn->rac_fmaq)) {
-                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-                
-                list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-        
-        /* We may not destroy this connection while it has RDMAs outstanding */
+        LASSERT (list_empty(&conn->rac_fmaq));
         LASSERT (list_empty(&conn->rac_rdmaq));
+        LASSERT (list_empty(&conn->rac_replyq));
 
-        while (!list_empty(&conn->rac_replyq)) {
-                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-                
-                list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-        
         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
                             conn->rac_rihandle);
         LASSERT (rrc == RAP_SUCCESS);
@@ -416,18 +460,20 @@ __kranal_conn_decref(kra_conn_t *conn)
 void
 kranal_terminate_conn_locked (kra_conn_t *conn)
 {
-        kra_peer_t *peer = conn->rac_peer;
-
         LASSERT (!in_interrupt());
-        LASSERT (conn->rac_closing);
+        LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (list_empty(&conn->rac_list));
 
-        /* Remove from conn hash table (no new callbacks) */
+        /* Remove from conn hash table: no new callbacks */
         list_del_init(&conn->rac_hashlist);
         kranal_conn_decref(conn);
 
-        /* Conn is now just waiting for remaining refs to go */
+        conn->rac_state = RANAL_CONN_CLOSED;
+
+        /* schedule to clear out all uncompleted comms in context of dev's
+         * scheduler */
+        kranal_schedule_conn(conn);
 }
 
 void
@@ -439,7 +485,7 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
               "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
 
         LASSERT (!in_interrupt());
-        LASSERT (!conn->rac_closing);
+        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
         LASSERT (!list_empty(&conn->rac_hashlist));
         LASSERT (!list_empty(&conn->rac_list));
 
@@ -450,9 +496,14 @@ kranal_close_conn_locked (kra_conn_t *conn, int error)
                 /* Non-persistent peer with no more conns... */
                 kranal_unlink_peer_locked(peer);
         }
+                        
+        /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
+         * full timeout */
+        conn->rac_last_rx = jiffies;
+        mb();
 
-        conn->rac_closing = 1;
-        kranal_schedule_conn(conn);
+        conn->rac_state = RANAL_CONN_CLOSING;
+        kranal_schedule_conn(conn);             /* schedule sending CLOSE */
 
         kranal_conn_decref(conn);               /* lose peer's ref */
 }
@@ -465,13 +516,33 @@ kranal_close_conn (kra_conn_t *conn, int error)
 
         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
         
-        if (!conn->rac_closing)
+        if (conn->rac_state == RANAL_CONN_ESTABLISHED)
                 kranal_close_conn_locked(conn, error);
         
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 }
 
 int
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
+                       __u32 peer_ip, int peer_port)
+{
+        RAP_RETURN    rrc;
+        
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rrc);
+                return -EPROTO;
+        }
+        
+        conn->rac_peerstamp = connreq->racr_peerstamp;
+        conn->rac_peer_connstamp = connreq->racr_connstamp;
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+        return 0;
+}
+
+int
 kranal_passive_conn_handshake (struct socket *sock, 
                                ptl_nid_t *peer_nidp, kra_conn_t **connp)
 {
@@ -482,7 +553,6 @@ kranal_passive_conn_handshake (struct socket *sock,
        ptl_nid_t            peer_nid;
         kra_conn_t          *conn;
         kra_device_t        *dev;
-       RAP_RETURN           rrc;
        int                  rc;
         int                  len;
         int                  i;
@@ -515,25 +585,24 @@ kranal_passive_conn_handshake (struct socket *sock,
         LASSERT (peer_nid != PTL_NID_ANY);
 
         for (i = 0;;i++) {
-                LASSERT(i < kranal_data.kra_ndevs);
+                if (i == kranal_data.kra_ndevs) {
+                        CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
+                               connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
+                        return -ENODEV;
+                }
                 dev = &kranal_data.kra_devices[i];
                 if (dev->rad_id == connreq.racr_devid)
                         break;
         }
 
-        rc = kranal_alloc_conn(&conn, dev);
+        rc = kranal_create_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
-        conn->rac_peer_incarnation = connreq.racr_incarnation;
-        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
-        kranal_update_reaper_timeout(conn->rac_keepalive);
-        
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+        rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
+        if (rc != 0) {
                 kranal_conn_decref(conn);
-                return -EPROTO;
+                return rc;
         }
 
         kranal_pack_connreq(&connreq, conn);
@@ -559,9 +628,6 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
         struct socket      *sock;
         unsigned int        port;
         int                 rc;
-        int                 option;
-        mm_segment_t        oldmm = get_fs();
-        struct timeval      tv;
 
         for (port = 1023; port >= 512; port--) {
 
@@ -621,19 +687,17 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
 int
 kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 {
-        struct sockaddr_in  dstaddr;
        kra_connreq_t       connreq;
         kra_conn_t         *conn;
         kra_device_t       *dev;
         struct socket      *sock;
-       RAP_RETURN          rrc;
        int                 rc;
         int                 idx;
         
         idx = peer->rap_nid & 0x7fffffff;
         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
 
-        rc = kranal_alloc_conn(&conn, dev);
+        rc = kranal_create_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
@@ -680,17 +744,10 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
                 goto failed_0;
         }
 
-        conn->rac_peer_incarnation = connreq.racr_incarnation; 
-        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
-        kranal_update_reaper_timeout(conn->rac_keepalive);
-
-        rc = -ENETDOWN;
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
-        if (rrc != RAP_SUCCESS) {
-                CERROR("Can't set riparams for "LPX64": %d\n",
-                       peer->rap_nid, rrc);
+        rc = kranal_set_conn_params(conn, &connreq, 
+                                    peer->rap_ip, peer->rap_port);
+        if (rc != 0)
                 goto failed_0;
-        }
 
         *connp = conn;
        return 0;
@@ -709,13 +766,33 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
         kra_tx_t          *tx;
        ptl_nid_t          peer_nid;
        unsigned long      flags;
-        unsigned long      timeout;
        kra_conn_t        *conn;
        int                rc;
         int                nstale;
 
-        if (sock != NULL) {
-                /* passive: listener accepted sock */
+        if (sock == NULL) {
+                /* active: connd wants to connect to 'peer' */
+                LASSERT (peer != NULL);
+                LASSERT (peer->rap_connecting);
+                
+                rc = kranal_active_conn_handshake(peer, &conn);
+                if (rc != 0)
+                        return rc;
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               if (!kranal_peer_active(peer)) {
+                       /* raced with peer getting unlinked */
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                                                flags);
+                        kranal_conn_decref(conn);
+                       return ESTALE;
+               }
+
+                peer_nid = peer->rap_nid;
+
+       } else {
+                /* passive: listener accepted 'sock' */
                 LASSERT (peer == NULL);
 
                 rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
@@ -746,39 +823,21 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                  * table with no connections: I can't drop the global lock
                  * until I've given it a connection or removed it, and when
                  * I do 'peer' can disappear under me. */
-        } else {
-                /* active: connd wants to connect to peer */
-                LASSERT (peer != NULL);
-                LASSERT (peer->rap_connecting);
-                
-                rc = kranal_active_conn_handshake(peer, &conn);
-                if (rc != 0)
-                        return rc;
-
-               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
-               if (!kranal_peer_active(peer)) {
-                       /* raced with peer getting unlinked */
-                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
-                                                flags);
-                        kranal_conn_decref(conn);
-                       return ESTALE;
-               }
-       }
+        }
 
        LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
-        peer_nid = peer->rap_nid;
 
         /* Refuse to duplicate an existing connection (both sides might try
          * to connect at once).  NB we return success!  We _do_ have a
          * connection (so we don't need to remove the peer from the peer
          * table) and we _don't_ have any blocked txs to complete */
-       if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
+        rc = kranal_conn_isdup_locked(peer, conn);
+       if (rc != 0) {
                 LASSERT (!list_empty(&peer->rap_conns));
                 LASSERT (list_empty(&peer->rap_tx_queue));
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
-               CWARN("Not creating duplicate connection to "LPX64"\n",
-                      peer_nid);
+               CWARN("Not creating duplicate connection to "LPX64": %d\n",
+                      peer_nid, rc);
                 kranal_conn_decref(conn);
                 return 0;
        }
@@ -800,7 +859,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
                 kranal_post_fma(conn, tx);
         }
 
-       nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
+       nstale = kranal_close_stale_conns_locked(peer, conn);
 
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
@@ -989,7 +1048,7 @@ kranal_start_listener (void)
        /* Called holding kra_nid_mutex: listener stopped */
        LASSERT (kranal_data.kra_listener_sock == NULL);
 
-       kranal_data.kra_listener_shutdown == 0;
+       kranal_data.kra_listener_shutdown = 0;
        pid = kernel_thread(kranal_listener, NULL, 0);
        if (pid < 0) {
                CERROR("Can't spawn listener: %ld\n", pid);
@@ -1032,6 +1091,10 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
        int    old_val;
        int    rc;
 
+        /* No race with nal initialisation since the nal is setup all the time
+         * it's loaded.  When that changes, change this! */
+        LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
+
        down(&kranal_data.kra_nid_mutex);
 
        LASSERT (tunable == &kranal_tunables.kra_port ||
@@ -1050,18 +1113,23 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
                rc = kranal_start_listener();
 
                if (rc != 0) {
+                        CWARN("Unable to start listener with new tunable:"
+                              " reverting to old value\n");
                        *tunable = old_val;
                        kranal_start_listener();
                }
        }
 
        up(&kranal_data.kra_nid_mutex);
+
+        LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
        return rc;
 }
 
 int
 kranal_set_mynid(ptl_nid_t nid)
 {
+        unsigned long  flags;
         lib_ni_t      *ni = &kranal_lib.libnal_ni;
         int            rc = 0;
 
@@ -1079,10 +1147,14 @@ kranal_set_mynid(ptl_nid_t nid)
        if (kranal_data.kra_listener_sock != NULL)
                kranal_stop_listener();
 
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+        kranal_data.kra_peerstamp++;
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
         ni->ni_pid.nid = nid;
 
         /* Delete all existing peers and their connections after new
-         * NID/incarnation set to ensure no old connections in our brave
+         * NID/connstamp set to ensure no old connections in our brave
          * new world. */
         kranal_del_peer(PTL_NID_ANY, 0);
 
@@ -1109,7 +1181,8 @@ kranal_create_peer (ptl_nid_t nid)
         peer->rap_nid = nid;
         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
 
-        INIT_LIST_HEAD(&peer->rap_list);        /* not in the peer table yet */
+        INIT_LIST_HEAD(&peer->rap_list);
+        INIT_LIST_HEAD(&peer->rap_connd_list);
         INIT_LIST_HEAD(&peer->rap_conns);
         INIT_LIST_HEAD(&peer->rap_tx_queue);
 
@@ -1121,16 +1194,17 @@ kranal_create_peer (ptl_nid_t nid)
 }
 
 void
-__kranal_peer_decref (kra_peer_t *peer)
+kranal_destroy_peer (kra_peer_t *peer)
 {
         CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
 
         LASSERT (atomic_read(&peer->rap_refcount) == 0);
         LASSERT (peer->rap_persistence == 0);
         LASSERT (!kranal_peer_active(peer));
-        LASSERT (peer->rap_connecting == 0);
+        LASSERT (!peer->rap_connecting);
         LASSERT (list_empty(&peer->rap_conns));
         LASSERT (list_empty(&peer->rap_tx_queue));
+        LASSERT (list_empty(&peer->rap_connd_list));
 
         PORTAL_FREE(peer, sizeof(*peer));
 
@@ -1387,31 +1461,6 @@ kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
 }
 
 int
-kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
-{
-        kra_conn_t         *conn;
-        struct list_head   *ctmp;
-        struct list_head   *cnxt;
-        int                 count = 0;
-
-        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
-                conn = list_entry(ctmp, kra_conn_t, rac_list);
-
-                if (conn->rac_peer_incarnation == incarnation)
-                        continue;
-
-                CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
-                       peer->rap_nid, conn->rac_peer_incarnation, incarnation);
-                LASSERT (conn->rac_peer_incarnation < incarnation);
-
-                count++;
-                kranal_close_conn_locked(conn, -ESTALE);
-        }
-
-        return count;
-}
-
-int
 kranal_close_matching_conns (ptl_nid_t nid)
 {
         unsigned long       flags;
@@ -1570,6 +1619,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
 
                 tx->tx_isnblk = isnblk;
                 tx->tx_buftype = RANAL_BUF_NONE;
+                tx->tx_msg.ram_type = RANAL_MSG_NONE;
 
                 list_add(&tx->tx_list, freelist);
         }
@@ -1637,6 +1687,7 @@ kranal_device_init(int id, kra_device_t *dev)
 void
 kranal_device_fini(kra_device_t *dev)
 {
+        LASSERT(dev->rad_scheduler == NULL);
         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
         RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
@@ -1647,7 +1698,6 @@ void
 kranal_api_shutdown (nal_t *nal)
 {
         int           i;
-        int           rc;
         unsigned long flags;
         
         if (nal->nal_refct != 0) {
@@ -1791,13 +1841,14 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
 
         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
-         * a unique (for all time) incarnation so we can uniquely identify
-         * the sender.  The incarnation is an incrementing counter
+         * a unique (for all time) connstamp so we can uniquely identify
+         * the sender.  The connstamp is an incrementing counter
          * initialised with seconds + microseconds at startup time.  So we
          * rely on NOT creating connections more frequently on average than
-         * 1MHz to ensure we don't use old incarnations when we reboot. */
+         * 1MHz to ensure we don't use old connstamps when we reboot. */
         do_gettimeofday(&tv);
-        kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+        kranal_data.kra_connstamp =
+        kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
 
         init_MUTEX(&kranal_data.kra_nid_mutex);
         init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
@@ -1813,6 +1864,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 spin_lock_init(&dev->rad_lock);
         }
 
+        kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
         spin_lock_init(&kranal_data.kra_reaper_lock);