Whamcloud - gitweb
LU-2675 lmv: remove lmv_init_{lock,unlock}()
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
index 76ab468..424bf31 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -51,7 +51,7 @@ usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
         if (conn->uc_rx_flag && /* receiving is in progress */
             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
                 return 1;
-        
+
         return 0;
 }
 
@@ -61,7 +61,7 @@ usocklnd_conn_kill(usock_conn_t *conn)
         pthread_mutex_lock(&conn->uc_lock);
         if (conn->uc_state != UC_DEAD)
                 usocklnd_conn_kill_locked(conn);
-        pthread_mutex_unlock(&conn->uc_lock);        
+        pthread_mutex_unlock(&conn->uc_lock);
 }
 
 /* Mark the conn as DEAD and schedule its deletion */
@@ -82,7 +82,7 @@ usocklnd_conn_allocate()
         LIBCFS_ALLOC (pr, sizeof(*pr));
         if (pr == NULL)
                 return NULL;
-        
+
         LIBCFS_ALLOC (conn, sizeof(*conn));
         if (conn == NULL) {
                 LIBCFS_FREE (pr, sizeof(*pr));
@@ -115,7 +115,7 @@ usocklnd_conn_free(usock_conn_t *conn)
                 LIBCFS_FREE (conn->uc_rx_hello,
                              offsetof(ksock_hello_msg_t,
                                       kshm_ips[LNET_MAX_INTERFACES]));
-        
+
         LIBCFS_FREE (conn, sizeof(*conn));
 }
 
@@ -128,12 +128,14 @@ usocklnd_tear_peer_conn(usock_conn_t *conn)
         lnet_process_id_t id;
         int               decref_flag  = 0;
         int               killall_flag = 0;
-        
+        void             *rx_lnetmsg   = NULL; 
+       struct list_head   zombie_txs = LIST_HEAD_INIT(zombie_txs);
+
         if (peer == NULL) /* nothing to tear */
                 return;
-        
+
         pthread_mutex_lock(&peer->up_lock);
-        pthread_mutex_lock(&conn->uc_lock);        
+        pthread_mutex_lock(&conn->uc_lock);
 
         ni = peer->up_ni;
         id = peer->up_peerid;
@@ -142,11 +144,12 @@ usocklnd_tear_peer_conn(usock_conn_t *conn)
                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
                         /* change state not to finalize twice */
                         conn->uc_rx_state = UC_RX_KSM_HEADER;
-                        lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                        
+                        /* stash lnetmsg while holding locks */
+                        rx_lnetmsg = conn->uc_rx_lnetmsg;
                 }
-                
-                usocklnd_destroy_txlist(peer->up_ni,
-                                        &conn->uc_tx_list);
+
+                /* we cannot finilize txs right now (bug #18844) */
+               list_splice_init(&conn->uc_tx_list, &zombie_txs);
 
                 peer->up_conns[idx] = NULL;
                 conn->uc_peer = NULL;
@@ -154,18 +157,26 @@ usocklnd_tear_peer_conn(usock_conn_t *conn)
 
                 if(conn->uc_errored && !peer->up_errored)
                         peer->up_errored = killall_flag = 1;
+
+                /* prevent queueing new txs to this conn */
+                conn->uc_errored = 1;
         }
-        
+
         pthread_mutex_unlock(&conn->uc_lock);
 
         if (killall_flag)
                 usocklnd_del_conns_locked(peer);
 
         pthread_mutex_unlock(&peer->up_lock);
-        
+
         if (!decref_flag)
                 return;
 
+        if (rx_lnetmsg != NULL)
+                lnet_finalize(ni, rx_lnetmsg, -EIO);
+        
+        usocklnd_destroy_txlist(ni, &zombie_txs);
+
         usocklnd_conn_decref(conn);
         usocklnd_peer_decref(peer);
 
@@ -178,7 +189,7 @@ void
 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
 {
         usock_peer_t *peer;
-        
+
         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
         peer = usocklnd_find_peer_locked(ni, id);
 
@@ -187,18 +198,18 @@ usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
                 return;
         }
 
-        if (cfs_atomic_read(&peer->up_refcount) == 2) {
+       if (mt_atomic_read(&peer->up_refcount) == 2) {
                 int i;
                 for (i = 0; i < N_CONN_TYPES; i++)
                         LASSERT (peer->up_conns[i] == NULL);
 
-                list_del(&peer->up_list);                        
-                
+               list_del(&peer->up_list);
+
                 if (peer->up_errored &&
                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
                                      cfs_time_seconds(peer->up_last_alive));
-                
+
                 usocklnd_peer_decref(peer);
         }
 
@@ -208,18 +219,21 @@ usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
 
 /* Returns 0 on success, <0 else */
 int
-usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
+usocklnd_create_passive_conn(lnet_ni_t *ni,
+                             cfs_socket_t *sock, usock_conn_t **connp)
 {
         int           rc;
         __u32         peer_ip;
-        __u16         peer_port;
+        int           peer_port;
         usock_conn_t *conn;
 
-        rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
+        rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
         if (rc)
                 return rc;
 
-        rc = usocklnd_set_sock_options(fd);
+        LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
+
+        rc = usocklnd_set_sock_options(sock);
         if (rc)
                 return rc;
 
@@ -228,17 +242,17 @@ usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
                 return -ENOMEM;
 
         usocklnd_rx_hellomagic_state_transition(conn);
-        
-        conn->uc_fd = fd;
+
+        conn->uc_sock = sock;
         conn->uc_peer_ip = peer_ip;
         conn->uc_peer_port = peer_port;
         conn->uc_state = UC_RECEIVING_HELLO;
         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
         conn->uc_ni = ni;
-        CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
-        CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
+       INIT_LIST_HEAD(&conn->uc_tx_list);
+       INIT_LIST_HEAD(&conn->uc_zcack_list);
         pthread_mutex_init(&conn->uc_lock, NULL);
-        cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
+       mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
 
         *connp = conn;
         return 0;
@@ -250,11 +264,11 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type,
                             usock_conn_t **connp)
 {
         int           rc;
-        int           fd;
+        cfs_socket_t *sock;
         usock_conn_t *conn;
         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
         __u16         dst_port = lnet_acceptor_port();
-        
+
         conn = usocklnd_conn_allocate();
         if (conn == NULL)
                 return -ENOMEM;
@@ -264,37 +278,38 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type,
         if (conn->uc_tx_hello == NULL) {
                 usocklnd_conn_free(conn);
                 return -ENOMEM;
-        }                
-        
+        }
+
         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
-                rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
+                rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
         else
-                rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
-        
+                rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
+
         if (rc) {
                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
                 usocklnd_conn_free(conn);
                 return rc;
         }
-        
+
         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
-        conn->uc_tx_flag = 1;
-        
-        conn->uc_fd = fd;
-        conn->uc_peer_ip = dst_ip;
-        conn->uc_peer_port = dst_port;
-        conn->uc_type = type;
+        conn->uc_tx_flag     = 1;
+
+        conn->uc_sock       = sock;
+        conn->uc_peer_ip    = dst_ip;
+        conn->uc_peer_port  = dst_port;
+        conn->uc_type       = type;
         conn->uc_activeflag = 1;
-        conn->uc_state = UC_CONNECTING;
-        conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
-        conn->uc_ni = NULL;
-        conn->uc_peerid = peer->up_peerid;
-        conn->uc_peer = peer;
+        conn->uc_state      = UC_CONNECTING;
+        conn->uc_pt_idx     = usocklnd_ip2pt_idx(dst_ip);
+        conn->uc_ni         = NULL;
+        conn->uc_peerid     = peer->up_peerid;
+        conn->uc_peer       = peer;
+
         usocklnd_peer_addref(peer);
-        CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
-        CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
+       INIT_LIST_HEAD(&conn->uc_tx_list);
+       INIT_LIST_HEAD(&conn->uc_zcack_list);
         pthread_mutex_init(&conn->uc_lock, NULL);
-        cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
+       mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
 
         *connp = conn;
         return 0;
@@ -302,45 +317,42 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type,
 
 /* Returns 0 on success, <0 else */
 int
-usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
 {
-        __u16 port;
-        int   fd;
-        int   rc;
+        __u16         port;
+        cfs_socket_t *sock;
+        int           rc;
+        int           fatal;
 
-        for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
-             port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
+        for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
+             port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
              port--) {
                 /* Iterate through reserved ports. */
-
-                rc = libcfs_sock_create(&fd);
-                if (rc)
-                        return rc;                        
-                                
-                rc = libcfs_sock_bind_to_port(fd, port);
+                rc = libcfs_sock_create(&sock, &fatal, 0, port);
                 if (rc) {
-                        close(fd);
+                        if (fatal)
+                                return rc;
                         continue;
                 }
 
-                rc = usocklnd_set_sock_options(fd);
+                rc = usocklnd_set_sock_options(sock);
                 if (rc) {
-                        close(fd);
+                        libcfs_sock_release(sock);
                         return rc;
                 }
 
-                rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+                rc = libcfs_sock_connect(sock, dst_ip, dst_port);
                 if (rc == 0) {
-                        *fdp = fd;
+                        *sockp = sock;
                         return 0;
                 }
-                
+
                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
-                        close(fd);
+                        libcfs_sock_release(sock);
                         return rc;
                 }
 
-                close(fd);
+                libcfs_sock_release(sock);
         }
 
         CERROR("Can't bind to any reserved port\n");
@@ -349,63 +361,55 @@ usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
 
 /* Returns 0 on success, <0 else */
 int
-usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
 {
-        int fd;
-        int rc;
+        cfs_socket_t *sock;
+        int           rc;
+        int           fatal;
 
-        rc = libcfs_sock_create(&fd);
+        rc = libcfs_sock_create(&sock, &fatal, 0, 0);
         if (rc)
                 return rc;
-        
-        rc = usocklnd_set_sock_options(fd);
+
+        rc = usocklnd_set_sock_options(sock);
         if (rc) {
-                close(fd);
+                libcfs_sock_release(sock);
                 return rc;
         }
 
-        rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+        rc = libcfs_sock_connect(sock, dst_ip, dst_port);
         if (rc) {
-                close(fd);
+                libcfs_sock_release(sock);
                 return rc;
         }
 
-        *fdp = fd;
+        *sockp = sock;
         return 0;
 }
 
 int
-usocklnd_set_sock_options(int fd)
+usocklnd_set_sock_options(cfs_socket_t *sock)
 {
         int rc;
 
-        rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
+        rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
         if (rc)
                 return rc;
 
         if (usock_tuns.ut_sockbufsiz) {
-                rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
+                rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
                 if (rc)
-                        return rc;        
+                        return rc;
         }
-        
-        return libcfs_fcntl_nonblock(fd);
-}
 
-void
-usocklnd_init_msg(ksock_msg_t *msg, int type)
-{
-        msg->ksm_type           = type;
-        msg->ksm_csum           = 0;
-        msg->ksm_zc_req_cookie  = 0;
-        msg->ksm_zc_ack_cookie  = 0;
+        return libcfs_fcntl_nonblock(sock);
 }
 
 usock_tx_t *
 usocklnd_create_noop_tx(__u64 cookie)
 {
         usock_tx_t *tx;
-        
+
         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
         if (tx == NULL)
                 return NULL;
@@ -413,24 +417,24 @@ usocklnd_create_noop_tx(__u64 cookie)
         tx->tx_size = sizeof(usock_tx_t);
         tx->tx_lnetmsg = NULL;
 
-        usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
-        tx->tx_msg.ksm_zc_ack_cookie = cookie;
-        
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
+        tx->tx_msg.ksm_zc_cookies[1] = cookie;
+
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
         tx->tx_iov = tx->tx_iova;
         tx->tx_niov = 1;
-        
+
         return tx;
 }
-        
+
 usock_tx_t *
 usocklnd_create_tx(lnet_msg_t *lntmsg)
 {
         usock_tx_t   *tx;
-        unsigned int  payload_niov = lntmsg->msg_niov; 
-        struct iovec *payload_iov = lntmsg->msg_iov; 
+        unsigned int  payload_niov = lntmsg->msg_niov;
+        struct iovec *payload_iov = lntmsg->msg_iov;
         unsigned int  payload_offset = lntmsg->msg_offset;
         unsigned int  payload_nob = lntmsg->msg_len;
         int           size = offsetof(usock_tx_t,
@@ -443,18 +447,15 @@ usocklnd_create_tx(lnet_msg_t *lntmsg)
         tx->tx_size = size;
         tx->tx_lnetmsg = lntmsg;
 
-        tx->tx_resid = tx->tx_nob =
-                offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_payload) +
-                payload_nob;
-        
-        usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
+        tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
+
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
-        tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
-                                          ksm_u.lnetmsg.ksnm_payload);
+        tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
         tx->tx_iov = tx->tx_iova;
 
-        tx->tx_niov = 1 + 
+        tx->tx_niov = 1 +
                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
                                  payload_niov, payload_iov,
                                  payload_offset, payload_nob);
@@ -472,7 +473,7 @@ usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
         hello->kshm_version     = KSOCK_PROTO_V2;
         hello->kshm_nips        = 0;
         hello->kshm_ctype       = type;
-        
+
         hello->kshm_dst_incarnation = 0; /* not used */
         hello->kshm_src_incarnation = net->un_incarnation;
 
@@ -500,7 +501,7 @@ usocklnd_create_hello_tx(lnet_ni_t *ni,
 
         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
-        
+
         tx->tx_iova[0].iov_base = (void *)hello;
         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
                 offsetof(ksock_hello_msg_t, kshm_ips);
@@ -534,10 +535,10 @@ usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
         cr->acr_nid     = peer_nid;
-        
+
         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
-        
+
         tx->tx_iova[0].iov_base = (void *)cr;
         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
                 sizeof(lnet_acceptor_connreq_t) +
@@ -557,7 +558,7 @@ usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
         LASSERT (ni != NULL || lnetmsg == NULL);
 
         LIBCFS_FREE (tx, tx->tx_size);
-        
+
         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
                 lnet_finalize(ni, lnetmsg, rc);
 }
@@ -567,10 +568,10 @@ usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
 {
         usock_tx_t *tx;
 
-        while (!list_empty(txlist)) {
-                tx = list_entry(txlist->next, usock_tx_t, tx_list);
-                list_del(&tx->tx_list);
-                
+       while (!list_empty(txlist)) {
+               tx = list_entry(txlist->next, usock_tx_t, tx_list);
+               list_del(&tx->tx_list);
+
                 usocklnd_destroy_tx(ni, tx);
         }
 }
@@ -580,10 +581,11 @@ usocklnd_destroy_zcack_list(struct list_head *zcack_list)
 {
         usock_zc_ack_t *zcack;
 
-        while (!list_empty(zcack_list)) {
-                zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
-                list_del(&zcack->zc_list);
-                
+       while (!list_empty(zcack_list)) {
+               zcack = list_entry(zcack_list->next, usock_zc_ack_t,
+                                       zc_list);
+               list_del(&zcack->zc_list);
+
                 LIBCFS_FREE (zcack, sizeof(*zcack));
         }
 }
@@ -600,7 +602,7 @@ usocklnd_destroy_peer(usock_peer_t *peer)
         LIBCFS_FREE (peer, sizeof (*peer));
 
         pthread_mutex_lock(&net->un_lock);
-        if(--net->un_peercount == 0)                
+        if(--net->un_peercount == 0)
                 pthread_cond_signal(&net->un_cond);
         pthread_mutex_unlock(&net->un_lock);
 }
@@ -615,13 +617,13 @@ usocklnd_destroy_conn(usock_conn_t *conn)
                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
         }
 
-        if (!list_empty(&conn->uc_tx_list)) {
-                LASSERT (conn->uc_peer != NULL);                
+       if (!list_empty(&conn->uc_tx_list)) {
+                LASSERT (conn->uc_peer != NULL);
                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
         }
 
         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
-        
+
         if (conn->uc_peer != NULL)
                 usocklnd_peer_decref(conn->uc_peer);
 
@@ -642,9 +644,8 @@ usocklnd_get_conn_type(lnet_msg_t *lntmsg)
         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
                 return SOCKLND_CONN_ANY;
 
-        nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
-                lntmsg->msg_len;
-        
+        nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
+
         if (nob >= usock_tuns.ut_min_bulk)
                 return SOCKLND_CONN_BULK_OUT;
         else
@@ -669,13 +670,13 @@ int usocklnd_type2idx(int type)
 usock_peer_t *
 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
 {
-        struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
-        struct list_head *tmp;
+       struct list_head       *peer_list = usocklnd_nid2peerlist(id.nid);
+       struct list_head       *tmp;
         usock_peer_t     *peer;
 
-        list_for_each (tmp, peer_list) {
+       list_for_each(tmp, peer_list) {
 
-                peer = list_entry (tmp, usock_peer_t, up_list);
+               peer = list_entry(tmp, usock_peer_t, up_list);
 
                 if (peer->up_ni != ni)
                         continue;
@@ -710,11 +711,11 @@ usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
         peer->up_incrn_is_set = 0;
         peer->up_errored      = 0;
         peer->up_last_alive   = 0;
-        cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
-        pthread_mutex_init(&peer->up_lock, NULL);        
+       mt_atomic_set(&peer->up_refcount, 1); /* 1 ref for caller */
+        pthread_mutex_init(&peer->up_lock, NULL);
 
         pthread_mutex_lock(&net->un_lock);
-        net->un_peercount++;        
+        net->un_peercount++;
         pthread_mutex_unlock(&net->un_lock);
 
         *peerp = peer;
@@ -742,7 +743,7 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
         rc = usocklnd_create_peer(ni, id, &peer);
         if (rc)
                 return rc;
-        
+
         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
         peer2 = usocklnd_find_peer_locked(ni, id);
         if (peer2 == NULL) {
@@ -752,18 +753,18 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
                         CERROR("Can't create peer: network shutdown\n");
                         return -ESHUTDOWN;
                 }
-                
+
                 /* peer table will take 1 of my refs on peer */
                 usocklnd_peer_addref(peer);
-                list_add_tail (&peer->up_list,
-                               usocklnd_nid2peerlist(id.nid));
+               list_add_tail(&peer->up_list,
+                                   usocklnd_nid2peerlist(id.nid));
         } else {
                 usocklnd_peer_decref(peer); /* should destroy peer */
                 peer = peer2;
         }
         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
-        
-  find_or_create_peer_done:        
+
+  find_or_create_peer_done:
         *peerp = peer;
         return 0;
 }
@@ -771,18 +772,18 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
 /* NB: both peer and conn locks are held */
 static int
 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
-{        
+{
         if (conn->uc_state == UC_READY &&
-            list_empty(&conn->uc_tx_list) &&
-            list_empty(&conn->uc_zcack_list) &&
+           list_empty(&conn->uc_tx_list) &&
+           list_empty(&conn->uc_zcack_list) &&
             !conn->uc_sending) {
                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
                                                   POLLOUT);
                 if (rc != 0)
                         return rc;
-        }                
+        }
 
-        list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
+       list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
         return 0;
 }
 
@@ -792,18 +793,18 @@ usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
 static void
 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
                     int *send_immediately)
-{        
+{
         if (conn->uc_state == UC_READY &&
-            list_empty(&conn->uc_tx_list) &&
-            list_empty(&conn->uc_zcack_list) &&
+           list_empty(&conn->uc_tx_list) &&
+           list_empty(&conn->uc_zcack_list) &&
             !conn->uc_sending) {
                 conn->uc_sending = 1;
                 *send_immediately = 1;
                 return;
-        }                
+        }
 
         *send_immediately = 0;
-        list_add_tail(&tx->tx_list, &conn->uc_tx_list);
+       list_add_tail(&tx->tx_list, &conn->uc_tx_list);
 }
 
 /* Safely create new conn if needed. Save result in *connp.
@@ -818,12 +819,12 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
         int           idx;
         int           rc;
         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
-        
+
         if (userflag)
                 type = SOCKLND_CONN_ANY;
 
         idx = usocklnd_type2idx(type);
-        
+
         pthread_mutex_lock(&peer->up_lock);
         if (peer->up_conns[idx] != NULL) {
                 conn = peer->up_conns[idx];
@@ -836,7 +837,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
                         rc = -EHOSTUNREACH;
                         goto find_or_create_conn_failed;
                 }
-                
+
                 rc = usocklnd_create_active_conn(peer, type, &conn);
                 if (rc) {
                         peer->up_errored = 1;
@@ -846,7 +847,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
 
                 /* peer takes 1 of conn refcount */
                 usocklnd_link_conn_to_peer(conn, peer, idx);
-                
+
                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
                 if (rc) {
                         peer->up_conns[idx] = NULL;
@@ -855,22 +856,29 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
                 }
                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
         }
-        
+
         pthread_mutex_lock(&conn->uc_lock);
         LASSERT(conn->uc_peer == peer);
 
         LASSERT(tx == NULL || zc_ack == NULL);
         if (tx != NULL) {
+                /* usocklnd_tear_peer_conn() could signal us stop queueing */
+                if (conn->uc_errored) {
+                        rc = -EIO;
+                        pthread_mutex_unlock(&conn->uc_lock);
+                        goto find_or_create_conn_failed;
+                }
+
                 usocklnd_enqueue_tx(conn, tx, send_immediately);
         } else {
-                rc = usocklnd_enqueue_zcack(conn, zc_ack);        
+                rc = usocklnd_enqueue_zcack(conn, zc_ack);
                 if (rc != 0) {
                         usocklnd_conn_kill_locked(conn);
                         pthread_mutex_unlock(&conn->uc_lock);
                         goto find_or_create_conn_failed;
                 }
         }
-        pthread_mutex_unlock(&conn->uc_lock);         
+        pthread_mutex_unlock(&conn->uc_lock);
 
         usocklnd_conn_addref(conn);
         pthread_mutex_unlock(&peer->up_lock);
@@ -886,7 +894,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
 void
 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
 {
-        peer->up_conns[idx] = conn;        
+        peer->up_conns[idx] = conn;
         peer->up_errored    = 0; /* this new fresh conn will try
                                   * revitalize even stale errored peer */
 }
@@ -923,7 +931,7 @@ usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
                              usock_conn_t *skip_conn)
 {
         int i;
-        
+
         if (!peer->up_incrn_is_set) {
                 peer->up_incarnation = incrn;
                 peer->up_incrn_is_set = 1;
@@ -934,19 +942,19 @@ usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
                 return;
 
         peer->up_incarnation = incrn;
-        
+
         for (i = 0; i < N_CONN_TYPES; i++) {
                 usock_conn_t *conn = peer->up_conns[i];
-                
+
                 if (conn == NULL || conn == skip_conn)
                         continue;
 
-                pthread_mutex_lock(&conn->uc_lock);        
+                pthread_mutex_lock(&conn->uc_lock);
                 LASSERT (conn->uc_peer == peer);
                 conn->uc_peer = NULL;
                 peer->up_conns[i] = NULL;
                 if (conn->uc_state != UC_DEAD)
-                        usocklnd_conn_kill_locked(conn);                
+                        usocklnd_conn_kill_locked(conn);
                 pthread_mutex_unlock(&conn->uc_lock);
 
                 usocklnd_conn_decref(conn);
@@ -991,7 +999,7 @@ usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
                 conn->uc_rx_nob_wanted =
                 conn->uc_rx_nob_left =
                 sizeof(conn->uc_rx_hello->kshm_version);
-        
+
         conn->uc_rx_state = UC_RX_HELLO_VERSION;
 }
 
@@ -1011,7 +1019,7 @@ usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
                 conn->uc_rx_nob_left =
                 offsetof(ksock_hello_msg_t, kshm_ips) -
                 offsetof(ksock_hello_msg_t, kshm_src_nid);
-        
+
         conn->uc_rx_state = UC_RX_HELLO_BODY;
 }
 
@@ -1031,7 +1039,7 @@ usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
                 conn->uc_rx_nob_left =
                 conn->uc_rx_hello->kshm_nips *
                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
-        
+
         conn->uc_rx_state = UC_RX_HELLO_IPS;
 }
 
@@ -1043,12 +1051,12 @@ usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
 {
         conn->uc_rx_niov = 1;
         conn->uc_rx_iov = conn->uc_rx_iova;
-        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                
+        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
         conn->uc_rx_iov[0].iov_len =
                 conn->uc_rx_nob_wanted =
                 conn->uc_rx_nob_left =
                 sizeof(ksock_lnet_msg_t);
-        
+
         conn->uc_rx_state = UC_RX_LNET_HEADER;
         conn->uc_rx_flag = 1;
 }
@@ -1061,12 +1069,12 @@ usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
 {
         conn->uc_rx_niov = 1;
         conn->uc_rx_iov = conn->uc_rx_iova;
-        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                
+        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
         conn->uc_rx_iov[0].iov_len =
                 conn->uc_rx_nob_wanted =
-                conn->uc_rx_nob_left =                        
+                conn->uc_rx_nob_left =
                 offsetof(ksock_msg_t, ksm_u);
-        
+
         conn->uc_rx_state = UC_RX_KSM_HEADER;
         conn->uc_rx_flag = 0;
 }
@@ -1083,7 +1091,7 @@ usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
         unsigned int   niov = 0;
         int            skipped = 0;
         int            nob_to_skip = conn->uc_rx_nob_left;
-        
+
         LASSERT(nob_to_skip != 0);
 
         conn->uc_rx_iov = conn->uc_rx_iova;