From: eeb Date: Tue, 4 Jan 2005 15:51:31 +0000 (+0000) Subject: * ranal changes in response to Igor's changes to the RapidArray X-Git-Tag: v1_7_100~1709 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=979f4b66a21eb2398434793d92caa98a12817e58 * ranal changes in response to Igor's changes to the RapidArray kernel comms API. --- diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index 02c33630..4c2ce9d 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -217,7 +217,8 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) connreq->racr_magic = RANAL_MSG_MAGIC; connreq->racr_version = RANAL_MSG_VERSION; connreq->racr_devid = conn->rac_device->rad_id; - connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_dstnid = conn->rac_peer->rap_nid; connreq->racr_peerstamp = kranal_data.kra_peerstamp; connreq->racr_connstamp = conn->rac_my_connstamp; connreq->racr_timeout = conn->rac_timeout; @@ -246,13 +247,14 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) __swab32s(&connreq->racr_magic); __swab16s(&connreq->racr_version); __swab16s(&connreq->racr_devid); - __swab64s(&connreq->racr_nid); + __swab64s(&connreq->racr_srcnid); + __swab64s(&connreq->racr_dstnid); __swab64s(&connreq->racr_peerstamp); __swab64s(&connreq->racr_connstamp); __swab32s(&connreq->racr_timeout); + __swab32s(&connreq->racr_riparams.HostId); __swab32s(&connreq->racr_riparams.FmaDomainHndl); - __swab32s(&connreq->racr_riparams.RcvCqHndl); __swab32s(&connreq->racr_riparams.PTag); __swab32s(&connreq->racr_riparams.CompletionCookie); } @@ -262,7 +264,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) return -EPROTO; } - if (connreq->racr_nid == PTL_NID_ANY) { + if (connreq->racr_srcnid == PTL_NID_ANY || + connreq->racr_dstnid == PTL_NID_ANY) { CERROR("Received PTL_NID_ANY\n"); return -EPROTO; } @@ -417,8 +420,6 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) kranal_update_reaper_timeout(conn->rac_timeout); rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid, - dev->rad_ptag, - dev->rad_rdma_cq, dev->rad_fma_cq, &conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("RapkCreateRi failed: %d\n", rrc); @@ -543,14 +544,15 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, } int -kranal_passive_conn_handshake (struct socket *sock, - ptl_nid_t *peer_nidp, kra_conn_t **connp) +kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, + ptl_nid_t *dst_nidp, kra_conn_t **connp) { struct sockaddr_in addr; __u32 peer_ip; unsigned int peer_port; kra_connreq_t connreq; - ptl_nid_t peer_nid; + ptl_nid_t src_nid; + ptl_nid_t dst_nid; kra_conn_t *conn; kra_device_t *dev; int rc; @@ -581,8 +583,8 @@ kranal_passive_conn_handshake (struct socket *sock, return rc; } - peer_nid = connreq.racr_nid; - LASSERT (peer_nid != PTL_NID_ANY); + src_nid = connreq.racr_srcnid; + dst_nid = connreq.racr_dstnid; for (i = 0;;i++) { if (i == kranal_data.kra_ndevs) { @@ -616,7 +618,8 @@ kranal_passive_conn_handshake (struct socket *sock, } *connp = conn; - *peer_nidp = peer_nid; + *src_nidp = src_nid; + *dst_nidp = dst_nid; return 0; } @@ -685,7 +688,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) int -kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) +kranal_active_conn_handshake(kra_peer_t *peer, + ptl_nid_t *dst_nidp, kra_conn_t **connp) { kra_connreq_t connreq; kra_conn_t *conn; @@ -730,11 +734,11 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) sock_release(sock); rc = -EPROTO; - if (connreq.racr_nid != peer->rap_nid) { - CERROR("Unexpected nid from %u.%u.%u.%u/%d: " + if (connreq.racr_srcnid != peer->rap_nid) { + CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: " "received "LPX64" expected "LPX64"\n", HIPQUAD(peer->rap_ip), peer->rap_port, - connreq.racr_nid, peer->rap_nid); + connreq.racr_srcnid, peer->rap_nid); goto failed_0; } @@ -752,6 +756,7 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) goto failed_0; *connp = conn; + *dst_nidp = connreq.racr_dstnid; return 0; failed_1: @@ -767,17 +772,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) kra_peer_t *peer2; kra_tx_t *tx; ptl_nid_t peer_nid; + ptl_nid_t dst_nid; unsigned long flags; kra_conn_t *conn; int rc; int nstale; + int new_peer = 0; if (sock == NULL) { /* active: connd wants to connect to 'peer' */ LASSERT (peer != NULL); LASSERT (peer->rap_connecting); - rc = kranal_active_conn_handshake(peer, &conn); + rc = kranal_active_conn_handshake(peer, &dst_nid, &conn); if (rc != 0) return rc; @@ -788,16 +795,16 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); - return ESTALE; + return -ESTALE; } peer_nid = peer->rap_nid; - } else { /* passive: listener accepted 'sock' */ LASSERT (peer == NULL); - rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); + rc = kranal_passive_conn_handshake(sock, &peer_nid, + &dst_nid, &conn); if (rc != 0) return rc; @@ -813,26 +820,32 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) peer2 = kranal_find_peer_locked(peer_nid); if (peer2 == NULL) { - /* peer table takes my initial ref on peer */ - list_add_tail(&peer->rap_list, - kranal_nid2peerlist(peer_nid)); + new_peer = 1; } else { /* peer_nid already in the peer table */ kranal_peer_decref(peer); peer = peer2; } - /* NB I may now have a non-persistent peer in the peer - * table with no connections: I can't drop the global lock - * until I've given it a connection or removed it, and when - * I do 'peer' can disappear under me. */ } - LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ + LASSERT (!new_peer == !kranal_peer_active(peer)); + + /* Refuse connection if peer thinks we are a different NID. We check + * this while holding the global lock, to synch with connection + * destruction on NID change. */ + if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) { + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - /* Refuse to duplicate an existing connection (both sides might try - * to connect at once). NB we return success! We _do_ have a - * connection (so we don't need to remove the peer from the peer - * table) and we _don't_ have any blocked txs to complete */ + CERROR("Stale/bad connection with "LPX64 + ": dst_nid "LPX64", expected "LPX64"\n", + peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid); + rc = -ESTALE; + goto failed; + } + + /* Refuse to duplicate an existing connection (both sides might try to + * connect at once). NB we return success! We _are_ connected so we + * _don't_ have any blocked txs to complete with failure. */ rc = kranal_conn_isdup_locked(peer, conn); if (rc != 0) { LASSERT (!list_empty(&peer->rap_conns)); @@ -840,10 +853,16 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); CWARN("Not creating duplicate connection to "LPX64": %d\n", peer_nid, rc); - kranal_conn_decref(conn); - return 0; + rc = 0; + goto failed; } + if (new_peer) { + /* peer table takes my ref on the new peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(peer_nid)); + } + kranal_peer_addref(peer); /* +1 ref for conn */ conn->rac_peer = peer; list_add_tail(&conn->rac_list, &peer->rap_conns); @@ -874,6 +893,12 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) * FMA event may have happened before it got in the cq hash table */ kranal_schedule_conn(conn); return 0; + + failed: + if (new_peer) + kranal_peer_decref(peer); + kranal_conn_decref(conn); + return rc; } void @@ -938,11 +963,11 @@ kranal_listener(void *arg) struct sockaddr_in addr; wait_queue_t wait; struct socket *sock; - struct socket *newsock; + kra_acceptsock_t *ras; int port; - kra_connreq_t *connreqs; char name[16]; int rc; + unsigned long flags; /* Parent thread holds kra_nid_mutex, and is, or is about to * block on kra_listener_signal */ @@ -954,14 +979,9 @@ kranal_listener(void *arg) init_waitqueue_entry(&wait, current); - rc = -ENOMEM; - PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs)); - if (connreqs == NULL) - goto out_0; - rc = kranal_create_sock(&sock); if (rc != 0) - goto out_1; + goto out_0; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -971,14 +991,14 @@ kranal_listener(void *arg) rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr)); if (rc != 0) { CERROR("Can't bind to port %d\n", port); - goto out_2; + goto out_1; } rc = sock->ops->listen(sock, kranal_tunables.kra_backlog); if (rc != 0) { CERROR("Can't set listen backlog %d: %d\n", kranal_tunables.kra_backlog, rc); - goto out_2; + goto out_1; } LASSERT (kranal_data.kra_listener_sock == NULL); @@ -990,46 +1010,70 @@ kranal_listener(void *arg) /* Wake me any time something happens on my socket */ add_wait_queue(sock->sk->sk_sleep, &wait); + ras = NULL; while (kranal_data.kra_listener_shutdown == 0) { - newsock = sock_alloc(); - if (newsock == NULL) { - CERROR("Can't allocate new socket for accept\n"); - kranal_pause(HZ); - continue; + if (ras == NULL) { + PORTAL_ALLOC(ras, sizeof(*ras)); + if (ras == NULL) { + CERROR("Out of Memory: pausing...\n"); + kranal_pause(HZ); + continue; + } + ras->ras_sock = NULL; } + if (ras->ras_sock == NULL) { + ras->ras_sock = sock_alloc(); + if (ras->ras_sock == NULL) { + CERROR("Can't allocate socket: pausing...\n"); + kranal_pause(HZ); + continue; + } + } + set_current_state(TASK_INTERRUPTIBLE); - rc = sock->ops->accept(sock, newsock, O_NONBLOCK); + rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK); + /* Sleep for socket activity? */ if (rc == -EAGAIN && kranal_data.kra_listener_shutdown == 0) schedule(); set_current_state(TASK_RUNNING); - if (rc != 0) { - sock_release(newsock); - if (rc != -EAGAIN) { - CERROR("Accept failed: %d\n", rc); - kranal_pause(HZ); - } + if (rc == 0) { + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + list_add_tail(&ras->ras_list, + &kranal_data.kra_connd_acceptq); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + wake_up(&kranal_data.kra_connd_waitq); + + ras = NULL; continue; - } + } + + if (rc != -EAGAIN) { + CERROR("Accept failed: %d, pausing...\n", rc); + kranal_pause(HZ); + } + } - kranal_conn_handshake(newsock, NULL); - sock_release(newsock); + if (ras != NULL) { + if (ras->ras_sock != NULL) + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); } rc = 0; remove_wait_queue(sock->sk->sk_sleep, &wait); - out_2: + out_1: sock_release(sock); kranal_data.kra_listener_sock = NULL; - out_1: - PORTAL_FREE(connreqs, 2 * sizeof(*connreqs)); out_0: /* set completion status and unblock thread waiting for me * (parent on startup failure, executioner on normal shutdown) */ @@ -1650,36 +1694,26 @@ kranal_device_init(int id, kra_device_t *dev) goto failed_1; } - rrc = RapkCreatePtag(dev->rad_handle, - &dev->rad_ptag); - if (rrc != RAP_SUCCESS) { - CERROR("Can't create ptag" - " for device %d: %d\n", id, rrc); - goto failed_1; - } - - rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag, - &dev->rad_rdma_cq); + rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND, + &dev->rad_rdma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create rdma cq size %d" " for device %d: %d\n", total_ntx, id, rrc); - goto failed_2; + goto failed_1; } - rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, - dev->rad_ptag, &dev->rad_fma_cq); + rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV, + &dev->rad_fma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create fma cq size %d" " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc); - goto failed_3; + goto failed_2; } return 0; - failed_3: - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); failed_2: - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); failed_1: RapkReleaseDevice(dev->rad_handle); failed_0: @@ -1690,9 +1724,8 @@ void kranal_device_fini(kra_device_t *dev) { LASSERT(dev->rad_scheduler == NULL); - RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag); - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); RapkReleaseDevice(dev->rad_handle); } diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h index cc5c2e6..53762eb 100644 --- a/lnet/klnds/ralnd/ralnd.h +++ b/lnet/klnds/ralnd/ralnd.h @@ -102,9 +102,8 @@ typedef struct typedef struct { RAP_PVOID rad_handle; /* device handle */ - RAP_PROTECTION_HANDLE rad_ptag; /* protection tag */ - RAP_CQ_HANDLE rad_fma_cq; /* FMA (small message) completion queue */ - RAP_CQ_HANDLE rad_rdma_cq; /* rdma completion queue */ + RAP_PVOID rad_fma_cqh; /* FMA completion queue handle */ + RAP_PVOID rad_rdma_cqh; /* rdma completion queue handle */ int rad_id; /* device id */ int rad_idx; /* index in kra_devices */ int rad_ready; /* set by device callback */ @@ -147,6 +146,7 @@ typedef struct spinlock_t kra_reaper_lock; /* serialise */ struct list_head kra_connd_peers; /* peers waiting for a connection */ + struct list_head kra_connd_acceptq; /* accepted sockets to handshake */ wait_queue_head_t kra_connd_waitq; /* connection daemons sleep here */ spinlock_t kra_connd_lock; /* serialise */ @@ -162,6 +162,12 @@ typedef struct #define RANAL_INIT_LIB 2 #define RANAL_INIT_ALL 3 +typedef struct kra_acceptsock /* accepted socket queued for connd */ +{ + struct list_head ras_list; /* queue for attention */ + struct socket *ras_sock; /* the accepted socket */ +} kra_acceptsock_t; + /************************************************************************ * Wire message structs. These are sent in sender's byte order * (i.e. receiver checks magic and flips if required). @@ -172,7 +178,8 @@ typedef struct kra_connreq /* connection request/response * __u32 racr_magic; /* I'm an ranal connreq */ __u16 racr_version; /* this is my version number */ __u16 racr_devid; /* sender's device ID */ - __u64 racr_nid; /* sender's NID */ + __u64 racr_srcnid; /* sender's NID */ + __u64 racr_dstnid; /* who sender expects to listen */ __u64 racr_peerstamp; /* sender's instance stamp */ __u64 racr_connstamp; /* sender's connection stamp */ __u32 racr_timeout; /* sender's timeout */ @@ -477,3 +484,4 @@ extern int kranal_scheduler (void *arg); extern void kranal_close_conn_locked (kra_conn_t *conn, int error); extern void kranal_terminate_conn_locked (kra_conn_t *conn); extern void kranal_connect (kra_peer_t *peer); +extern int kranal_conn_handshake (struct socket *sock, kra_peer_t *peer); diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index 8901a2d..541a15a 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -241,7 +241,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset)); phys->Address = kranal_page2phys(kiov->kiov_page); - phys->Length = PAGE_SIZE; phys++; resid = nob - (kiov->kiov_len - offset); @@ -267,7 +266,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, } phys->Address = kranal_page2phys(kiov->kiov_page); - phys->Length = PAGE_SIZE; phys++; resid -= PAGE_SIZE; @@ -312,7 +310,7 @@ kranal_map_buffer (kra_tx_t *tx) case RANAL_BUF_PHYS_UNMAPPED: rrc = RapkRegisterPhys(dev->rad_handle, tx->tx_phys, tx->tx_phys_npages, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_PHYS_MAPPED; break; @@ -320,7 +318,7 @@ kranal_map_buffer (kra_tx_t *tx) case RANAL_BUF_VIRT_UNMAPPED: rrc = RapkRegisterMemory(dev->rad_handle, tx->tx_buffer, tx->tx_nob, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_VIRT_MAPPED; break; @@ -348,7 +346,7 @@ kranal_unmap_buffer (kra_tx_t *tx) dev = tx->tx_conn->rac_device; LASSERT (current == dev->rad_scheduler); rrc = RapkDeregisterMemory(dev->rad_handle, NULL, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; break; @@ -358,7 +356,7 @@ kranal_unmap_buffer (kra_tx_t *tx) dev = tx->tx_conn->rac_device; LASSERT (current == dev->rad_scheduler); rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED; break; @@ -559,8 +557,8 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) LASSERT (conn->rac_rxmsg != NULL); - rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer, - &nob_received, sizeof(kra_msg_t)); + rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer, + &nob_received, sizeof(kra_msg_t)); LASSERT (rrc == RAP_SUCCESS); conn->rac_rxmsg = NULL; @@ -1031,6 +1029,8 @@ kranal_connd (void *arg) wait_queue_t wait; unsigned long flags; kra_peer_t *peer; + kra_acceptsock_t *ras; + int did_something; snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); kportal_daemonize(name); @@ -1041,8 +1041,22 @@ kranal_connd (void *arg) spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); while (!kranal_data.kra_shutdown) { - /* Safe: kra_shutdown only set when quiescent */ + did_something = 0; + + if (!list_empty(&kranal_data.kra_connd_acceptq)) { + ras = list_entry(kranal_data.kra_connd_acceptq.next, + kra_acceptsock_t, ras_list); + list_del(&ras->ras_list); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + kranal_conn_handshake(ras->ras_sock, NULL); + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + did_something = 1; + } + if (!list_empty(&kranal_data.kra_connd_peers)) { peer = list_entry(kranal_data.kra_connd_peers.next, kra_peer_t, rap_connd_list); @@ -1054,9 +1068,12 @@ kranal_connd (void *arg) kranal_peer_decref(peer); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - continue; + did_something = 1; } + if (did_something) + continue; + set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&kranal_data.kra_connd_waitq, &wait); @@ -1220,7 +1237,7 @@ kranal_check_rdma_cq (kra_device_t *dev) __u32 event_type; for (;;) { - rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); + rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type); if (rrc == RAP_NOT_DONE) return; @@ -1275,7 +1292,7 @@ kranal_check_fma_cq (kra_device_t *dev) int i; for (;;) { - rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); + rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type); if (rrc != RAP_NOT_DONE) return; @@ -1366,8 +1383,8 @@ kranal_process_fmaq (kra_conn_t *conn) int expect_reply; /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. - * However I will be rescheduled some by a rad_fma_cq event when - * I eventually get some. + * However I will be rescheduled some by an FMA completion event + * when I eventually get some. * NB 2. Sampling rac_state here, races with setting it elsewhere * kranal_close_conn_locked. But it doesn't matter if I try to * send a "real" message just as I start closing because I'll get