kra_data_t kranal_data;
kra_tunables_t kranal_tunables;
-#ifdef CONFIG_SYSCTL
#define RANAL_SYSCTL_TIMEOUT 1
#define RANAL_SYSCTL_LISTENER_TIMEOUT 2
#define RANAL_SYSCTL_BACKLOG 3
{RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
{ 0 }
};
-#endif
int
kranal_sock_write (struct socket *sock, void *buffer, int nob)
rc = sock_sendmsg(sock, &msg, iov.iov_len);
set_fs(oldmm);
+ if (rc == nob)
+ return 0;
+
+ if (rc >= 0)
+ return -EAGAIN;
+
return rc;
}
connreq->racr_magic = RANAL_MSG_MAGIC;
connreq->racr_version = RANAL_MSG_VERSION;
connreq->racr_devid = conn->rac_device->rad_id;
- connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid;
+ connreq->racr_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
+ connreq->racr_dstnid = conn->rac_peer->rap_nid;
connreq->racr_peerstamp = kranal_data.kra_peerstamp;
connreq->racr_connstamp = conn->rac_my_connstamp;
connreq->racr_timeout = conn->rac_timeout;
__swab32s(&connreq->racr_magic);
__swab16s(&connreq->racr_version);
__swab16s(&connreq->racr_devid);
- __swab64s(&connreq->racr_nid);
+ __swab64s(&connreq->racr_srcnid);
+ __swab64s(&connreq->racr_dstnid);
__swab64s(&connreq->racr_peerstamp);
__swab64s(&connreq->racr_connstamp);
__swab32s(&connreq->racr_timeout);
+ __swab32s(&connreq->racr_riparams.HostId);
__swab32s(&connreq->racr_riparams.FmaDomainHndl);
- __swab32s(&connreq->racr_riparams.RcvCqHndl);
__swab32s(&connreq->racr_riparams.PTag);
__swab32s(&connreq->racr_riparams.CompletionCookie);
}
return -EPROTO;
}
- if (connreq->racr_nid == PTL_NID_ANY) {
+ if (connreq->racr_srcnid == PTL_NID_ANY ||
+ connreq->racr_dstnid == PTL_NID_ANY) {
CERROR("Received PTL_NID_ANY\n");
return -EPROTO;
}
kranal_update_reaper_timeout(conn->rac_timeout);
rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
- dev->rad_ptag,
- dev->rad_rdma_cq, dev->rad_fma_cq,
&conn->rac_rihandle);
if (rrc != RAP_SUCCESS) {
CERROR("RapkCreateRi failed: %d\n", rrc);
}
int
-kranal_passive_conn_handshake (struct socket *sock,
- ptl_nid_t *peer_nidp, kra_conn_t **connp)
+kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
+ ptl_nid_t *dst_nidp, kra_conn_t **connp)
{
struct sockaddr_in addr;
__u32 peer_ip;
unsigned int peer_port;
kra_connreq_t connreq;
- ptl_nid_t peer_nid;
+ ptl_nid_t src_nid;
+ ptl_nid_t dst_nid;
kra_conn_t *conn;
kra_device_t *dev;
int rc;
return rc;
}
- peer_nid = connreq.racr_nid;
- LASSERT (peer_nid != PTL_NID_ANY);
+ src_nid = connreq.racr_srcnid;
+ dst_nid = connreq.racr_dstnid;
for (i = 0;;i++) {
if (i == kranal_data.kra_ndevs) {
}
*connp = conn;
- *peer_nidp = peer_nid;
+ *src_nidp = src_nid;
+ *dst_nidp = dst_nid;
return 0;
}
int
-kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+kranal_active_conn_handshake(kra_peer_t *peer,
+ ptl_nid_t *dst_nidp, kra_conn_t **connp)
{
kra_connreq_t connreq;
kra_conn_t *conn;
/* spread connections over all devices using both peer NIDs to ensure
* all nids use all devices */
- idx = (peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid)
+ idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid;
dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
rc = kranal_create_conn(&conn, dev);
sock_release(sock);
rc = -EPROTO;
- if (connreq.racr_nid != peer->rap_nid) {
- CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+ if (connreq.racr_srcnid != peer->rap_nid) {
+ CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
"received "LPX64" expected "LPX64"\n",
HIPQUAD(peer->rap_ip), peer->rap_port,
- connreq.racr_nid, peer->rap_nid);
+ connreq.racr_srcnid, peer->rap_nid);
goto failed_0;
}
goto failed_0;
*connp = conn;
+ *dst_nidp = connreq.racr_dstnid;
return 0;
failed_1:
kra_peer_t *peer2;
kra_tx_t *tx;
ptl_nid_t peer_nid;
+ ptl_nid_t dst_nid;
unsigned long flags;
kra_conn_t *conn;
int rc;
int nstale;
+ int new_peer = 0;
if (sock == NULL) {
/* active: connd wants to connect to 'peer' */
LASSERT (peer != NULL);
LASSERT (peer->rap_connecting);
- rc = kranal_active_conn_handshake(peer, &conn);
+ rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
if (rc != 0)
return rc;
write_unlock_irqrestore(&kranal_data.kra_global_lock,
flags);
kranal_conn_decref(conn);
- return ESTALE;
+ return -ESTALE;
}
peer_nid = peer->rap_nid;
-
} else {
/* passive: listener accepted 'sock' */
LASSERT (peer == NULL);
- rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+ rc = kranal_passive_conn_handshake(sock, &peer_nid,
+ &dst_nid, &conn);
if (rc != 0)
return rc;
peer2 = kranal_find_peer_locked(peer_nid);
if (peer2 == NULL) {
- /* peer table takes my initial ref on peer */
- list_add_tail(&peer->rap_list,
- kranal_nid2peerlist(peer_nid));
+ new_peer = 1;
} else {
/* peer_nid already in the peer table */
kranal_peer_decref(peer);
peer = peer2;
}
- /* NB I may now have a non-persistent peer in the peer
- * table with no connections: I can't drop the global lock
- * until I've given it a connection or removed it, and when
- * I do 'peer' can disappear under me. */
}
- LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */
+ LASSERT (!new_peer == !kranal_peer_active(peer));
+
+ /* Refuse connection if peer thinks we are a different NID. We check
+ * this while holding the global lock, to synch with connection
+ * destruction on NID change. */
+ if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
- /* Refuse to duplicate an existing connection (both sides might try
- * to connect at once). NB we return success! We _do_ have a
- * connection (so we don't need to remove the peer from the peer
- * table) and we _don't_ have any blocked txs to complete */
+ CERROR("Stale/bad connection with "LPX64
+ ": dst_nid "LPX64", expected "LPX64"\n",
+ peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
+ rc = -ESTALE;
+ goto failed;
+ }
+
+ /* Refuse to duplicate an existing connection (both sides might try to
+ * connect at once). NB we return success! We _are_ connected so we
+ * _don't_ have any blocked txs to complete with failure. */
rc = kranal_conn_isdup_locked(peer, conn);
if (rc != 0) {
LASSERT (!list_empty(&peer->rap_conns));
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
CWARN("Not creating duplicate connection to "LPX64": %d\n",
peer_nid, rc);
- kranal_conn_decref(conn);
- return 0;
+ rc = 0;
+ goto failed;
}
+ if (new_peer) {
+ /* peer table takes my ref on the new peer */
+ list_add_tail(&peer->rap_list,
+ kranal_nid2peerlist(peer_nid));
+ }
+
kranal_peer_addref(peer); /* +1 ref for conn */
conn->rac_peer = peer;
list_add_tail(&conn->rac_list, &peer->rap_conns);
/* Schedule all packets blocking for a connection */
while (!list_empty(&peer->rap_tx_queue)) {
- tx = list_entry(&peer->rap_tx_queue.next,
+ tx = list_entry(peer->rap_tx_queue.next,
kra_tx_t, tx_list);
list_del(&tx->tx_list);
* FMA event may have happened before it got in the cq hash table */
kranal_schedule_conn(conn);
return 0;
+
+ failed:
+ if (new_peer)
+ kranal_peer_decref(peer);
+ kranal_conn_decref(conn);
+ return rc;
}
void
/* reset reconnection timeouts */
peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
- peer->rap_reconnect_time = CURRENT_TIME;
+ peer->rap_reconnect_time = CURRENT_SECONDS;
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
return;
}
LASSERT (peer->rap_reconnect_interval != 0);
- peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
+ peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval;
peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
1 * peer->rap_reconnect_interval);
} while (!list_empty(&zombies));
}
+void
+kranal_free_acceptsock (kra_acceptsock_t *ras)
+{
+ sock_release(ras->ras_sock);
+ PORTAL_FREE(ras, sizeof(*ras));
+}
+
int
-kranal_listener(void *arg)
+kranal_listener (void *arg)
{
struct sockaddr_in addr;
wait_queue_t wait;
struct socket *sock;
- struct socket *newsock;
+ kra_acceptsock_t *ras;
int port;
- kra_connreq_t *connreqs;
char name[16];
int rc;
+ unsigned long flags;
/* Parent thread holds kra_nid_mutex, and is, or is about to
* block on kra_listener_signal */
init_waitqueue_entry(&wait, current);
- rc = -ENOMEM;
- PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
- if (connreqs == NULL)
- goto out_0;
-
rc = kranal_create_sock(&sock);
if (rc != 0)
- goto out_1;
+ goto out_0;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
if (rc != 0) {
CERROR("Can't bind to port %d\n", port);
- goto out_2;
+ goto out_1;
}
rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
if (rc != 0) {
CERROR("Can't set listen backlog %d: %d\n",
kranal_tunables.kra_backlog, rc);
- goto out_2;
+ goto out_1;
}
LASSERT (kranal_data.kra_listener_sock == NULL);
/* Wake me any time something happens on my socket */
add_wait_queue(sock->sk->sk_sleep, &wait);
+ ras = NULL;
while (kranal_data.kra_listener_shutdown == 0) {
- newsock = sock_alloc();
- if (newsock == NULL) {
- CERROR("Can't allocate new socket for accept\n");
- kranal_pause(HZ);
- continue;
+ if (ras == NULL) {
+ PORTAL_ALLOC(ras, sizeof(*ras));
+ if (ras == NULL) {
+ CERROR("Out of Memory: pausing...\n");
+ kranal_pause(HZ);
+ continue;
+ }
+ ras->ras_sock = NULL;
}
+ if (ras->ras_sock == NULL) {
+ ras->ras_sock = sock_alloc();
+ if (ras->ras_sock == NULL) {
+ CERROR("Can't allocate socket: pausing...\n");
+ kranal_pause(HZ);
+ continue;
+ }
+ /* XXX this should add a ref to sock->ops->owner, if
+ * TCP could be a module */
+ ras->ras_sock->type = sock->type;
+ ras->ras_sock->ops = sock->ops;
+ }
+
set_current_state(TASK_INTERRUPTIBLE);
- rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+ rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
+ /* Sleep for socket activity? */
if (rc == -EAGAIN &&
kranal_data.kra_listener_shutdown == 0)
schedule();
set_current_state(TASK_RUNNING);
- if (rc != 0) {
- sock_release(newsock);
- if (rc != -EAGAIN) {
- CERROR("Accept failed: %d\n", rc);
- kranal_pause(HZ);
- }
+ if (rc == 0) {
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+ list_add_tail(&ras->ras_list,
+ &kranal_data.kra_connd_acceptq);
+
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ wake_up(&kranal_data.kra_connd_waitq);
+
+ ras = NULL;
continue;
- }
+ }
+
+ if (rc != -EAGAIN) {
+ CERROR("Accept failed: %d, pausing...\n", rc);
+ kranal_pause(HZ);
+ }
+ }
- kranal_conn_handshake(newsock, NULL);
- sock_release(newsock);
+ if (ras != NULL) {
+ if (ras->ras_sock != NULL)
+ sock_release(ras->ras_sock);
+ PORTAL_FREE(ras, sizeof(*ras));
}
rc = 0;
remove_wait_queue(sock->sk->sk_sleep, &wait);
- out_2:
+ out_1:
sock_release(sock);
kranal_data.kra_listener_sock = NULL;
- out_1:
- PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
out_0:
/* set completion status and unblock thread waiting for me
* (parent on startup failure, executioner on normal shutdown) */
}
void
-kranal_stop_listener(void)
+kranal_stop_listener(int clear_acceptq)
{
+ struct list_head zombie_accepts;
+ unsigned long flags;
+ kra_acceptsock_t *ras;
+
CDEBUG(D_WARNING, "Stopping listener\n");
/* Called holding kra_nid_mutex: listener running */
LASSERT (kranal_data.kra_listener_sock == NULL);
CDEBUG(D_WARNING, "Listener stopped\n");
+
+ if (!clear_acceptq)
+ return;
+
+ /* Close any unhandled accepts */
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+ list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq);
+ list_del_init(&kranal_data.kra_connd_acceptq);
+
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+ while (!list_empty(&zombie_accepts)) {
+ ras = list_entry(zombie_accepts.next,
+ kra_acceptsock_t, ras_list);
+ list_del(&ras->ras_list);
+ kranal_free_acceptsock(ras);
+ }
}
int
kranal_data.kra_listener_sock == NULL)) {
if (kranal_data.kra_listener_sock != NULL)
- kranal_stop_listener();
+ kranal_stop_listener(0);
rc = kranal_start_listener();
int
kranal_set_mynid(ptl_nid_t nid)
{
- unsigned long flags;
- lib_ni_t *ni = &kranal_lib.libnal_ni;
- int rc = 0;
+ unsigned long flags;
+ lib_ni_t *ni = &kranal_lib.libnal_ni;
+ int rc = 0;
CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
nid, ni->ni_pid.nid);
}
if (kranal_data.kra_listener_sock != NULL)
- kranal_stop_listener();
+ kranal_stop_listener(1);
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
kranal_data.kra_peerstamp++;
- write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
-
ni->ni_pid.nid = nid;
-
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
/* Delete all existing peers and their connections after new
* NID/connstamp set to ensure no old connections in our brave
* new world. */
INIT_LIST_HEAD(&peer->rap_conns);
INIT_LIST_HEAD(&peer->rap_tx_queue);
- peer->rap_reconnect_time = CURRENT_TIME;
+ peer->rap_reconnect_time = CURRENT_SECONDS;
peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
atomic_inc(&kranal_data.kra_npeers);
else {
rc = 0;
pcfg->pcfg_nid = conn->rac_peer->rap_nid;
- pcfg->pcfg_id = 0;
+ pcfg->pcfg_id = conn->rac_device->rad_id;
pcfg->pcfg_misc = 0;
pcfg->pcfg_flags = 0;
kranal_conn_decref(conn);
goto failed_1;
}
- rrc = RapkCreatePtag(dev->rad_handle,
- &dev->rad_ptag);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't create ptag"
- " for device %d: %d\n", id, rrc);
- goto failed_1;
- }
-
- rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
- &dev->rad_rdma_cq);
+ rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
+ &dev->rad_rdma_cqh);
if (rrc != RAP_SUCCESS) {
CERROR("Can't create rdma cq size %d"
" for device %d: %d\n", total_ntx, id, rrc);
- goto failed_2;
+ goto failed_1;
}
- rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
- dev->rad_ptag, &dev->rad_fma_cq);
+ rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
+ &dev->rad_fma_cqh);
if (rrc != RAP_SUCCESS) {
CERROR("Can't create fma cq size %d"
" for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
- goto failed_3;
+ goto failed_2;
}
return 0;
- failed_3:
- RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
failed_2:
- RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
failed_1:
RapkReleaseDevice(dev->rad_handle);
failed_0:
kranal_device_fini(kra_device_t *dev)
{
LASSERT(dev->rad_scheduler == NULL);
- RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
- RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
- RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
RapkReleaseDevice(dev->rad_handle);
}
}
for (i = 0; i < RANAL_N_CONND; i++) {
- rc = kranal_thread_start(kranal_connd, (void *)i);
+ rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
if (rc != 0) {
CERROR("Can't spawn ranal connd[%d]: %d\n",
i, rc);
void __exit
kranal_module_fini (void)
{
-#ifdef CONFIG_SYSCTL
if (kranal_tunables.kra_sysctl != NULL)
unregister_sysctl_table(kranal_tunables.kra_sysctl);
-#endif
+
PtlNIFini(kranal_ni);
ptl_unregister_nal(RANAL);
/* Initialise dynamic tunables to defaults once only */
kranal_tunables.kra_timeout = RANAL_TIMEOUT;
+ kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT;
+ kranal_tunables.kra_backlog = RANAL_BACKLOG;
+ kranal_tunables.kra_port = RANAL_PORT;
+ kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE;
rc = ptl_register_nal(RANAL, &kranal_api);
if (rc != PTL_OK) {
return -ENODEV;
}
-#ifdef CONFIG_SYSCTL
- /* Press on regardless even if registering sysctl doesn't work */
kranal_tunables.kra_sysctl =
register_sysctl_table(kranal_top_ctl_table, 0);
-#endif
+ if (kranal_tunables.kra_sysctl == NULL) {
+ CERROR("Can't register sysctl table\n");
+ PtlNIFini(kranal_ni);
+ ptl_unregister_nal(RANAL);
+ return -ENOMEM;
+ }
+
return 0;
}