* Copyright (C) 2009-2012 Cray, Inc.
*
* Derived from work by Eric Barton <eric@bartonsoftware.com>
+ * Author: James Shimek <jshimek@cray.com>
* Author: Nic Henke <nic@cray.com>
*
* This file is part of Lustre, http://www.lustre.org.
*
*/
+#include <asm/page.h>
#include <linux/nmi.h>
#include "gnilnd.h"
* == 0: reschedule if someone marked him WANTS_SCHED
* > 0 : force a reschedule */
/* Return code 0 means it did not schedule the conn, 1
- * means it succesfully scheduled the conn.
+ * means it successfully scheduled the conn.
*/
int
/* Only free the buffer if we used it */
if (tx->tx_buffer_copy != NULL) {
- vfree(tx->tx_buffer_copy);
+ kgnilnd_vfree(tx->tx_buffer_copy, tx->tx_rdma_desc.length);
tx->tx_buffer_copy = NULL;
CDEBUG(D_MALLOC, "vfreed buffer2\n");
}
#if 0
KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t));
#endif
+ CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n", sizeof(*tx), tx);
kmem_cache_free(kgnilnd_data.kgn_tx_cache, tx);
- CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n",
- sizeof(*tx), tx);
}
kgn_tx_t *
LBUG();
}
/* only allow NAK on error and truncate to zero */
- LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n",
+ LASSERTF(error <= 0, "error %d conn 0x%p, cookie %llu\n",
error, conn, cookie);
tx = kgnilnd_new_tx_msg(nak_type, source);
}
int
-kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *iov,
- lnet_kiov_t *kiov, unsigned int offset, unsigned int nob)
-
+kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov,
+ struct kvec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int nob)
{
kgn_msg_t *msg = &tx->tx_msg;
int i;
* gni_smsg_send to send that as the payload */
LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
- LASSERT(nob >= 0);
if (nob == 0) {
tx->tx_buffer = NULL;
} else if (kiov != NULL) {
+
+ if ((niov > 0) && unlikely(niov > (nob/PAGE_SIZE))) {
+ niov = ((nob + offset + kiov->kiov_offset + PAGE_SIZE - 1) /
+ PAGE_SIZE);
+ }
+
LASSERTF(niov > 0 && niov < GNILND_MAX_IMMEDIATE/PAGE_SIZE,
- "bad niov %d\n", niov);
+ "bad niov %d msg %p kiov %p iov %p offset %d nob%d\n",
+ niov, msg, kiov, iov, offset, nob);
while (offset >= kiov->kiov_len) {
offset -= kiov->kiov_len;
int
kgnilnd_setup_virt_buffer(kgn_tx_t *tx,
- unsigned int niov, struct iovec *iov,
+ unsigned int niov, struct kvec *iov,
unsigned int offset, unsigned int nob)
{
"nkiov %u offset %u\n",
kiov->kiov_page, kiov->kiov_offset, kiov->kiov_len, nob, nkiov, offset);
- phys->address = lnet_page2phys(kiov->kiov_page);
+ phys->address = page_to_phys(kiov->kiov_page);
phys++;
kiov++;
nkiov--;
static inline int
kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov,
- struct iovec *iov, lnet_kiov_t *kiov,
+ struct kvec *iov, lnet_kiov_t *kiov,
unsigned int offset, unsigned int nob)
{
int rc;
if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
atomic64_add(bytes, &dev->gnd_rdmaq_bytes_out);
- GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to "LPD64"",
+ GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to %lld",
bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
}
atomic64_sub(bytes, &dev->gnd_rdmaq_bytes_out);
LASSERTF(atomic64_read(&dev->gnd_rdmaq_bytes_out) >= 0,
"bytes_out negative! %ld\n", atomic64_read(&dev->gnd_rdmaq_bytes_out));
- GNIDBG_TX(D_NETTRACE, tx, "rdma -- %d to "LPD64"",
+ GNIDBG_TX(D_NETTRACE, tx, "rdma -- %d to %lld",
bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
}
* GART resource, etc starvation handling */
if (rrc != GNI_RC_SUCCESS) {
GNIDBG_TX(D_NET, tx, "Can't map %d pages: dev %d "
- "phys %u pp %u, virt %u nob "LPU64"",
+ "phys %u pp %u, virt %u nob %llu",
tx->tx_phys_npages, dev->gnd_id,
dev->gnd_map_nphys, dev->gnd_map_physnop,
dev->gnd_map_nvirt, dev->gnd_map_virtnob);
NULL, flags, &tx->tx_map_key);
if (rrc != GNI_RC_SUCCESS) {
GNIDBG_TX(D_NET, tx, "Can't map %u bytes: dev %d "
- "phys %u pp %u, virt %u nob "LPU64"",
+ "phys %u pp %u, virt %u nob %llu",
tx->tx_nob, dev->gnd_id,
dev->gnd_map_nphys, dev->gnd_map_physnop,
dev->gnd_map_nvirt, dev->gnd_map_virtnob);
if (tx->tx_buffer_copy)
gmp->gmp_map_key = tx->tx_buffer_copy_map_key;
else
- gmp->gmp_map_key = tx->tx_map_key;
+ gmp->gmp_map_key = tx->tx_map_key;
atomic_inc(&conn->gnc_device->gnd_n_mdd_held);
* verified peer notification - the theory is that
* a TX error can be communicated in all other cases */
if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED &&
+ error != -GNILND_NOPURG &&
kgnilnd_check_purgatory_conn(tx->tx_conn)) {
kgnilnd_add_purgatory_tx(tx);
hold_timeout = GNILND_TIMEOUT2DEADMAN;
GNIDBG_TX(D_NET, tx,
- "dev %p delaying MDD release for %dms key "LPX64"."LPX64"",
+ "dev %p delaying MDD release for %dms key %#llx.%#llx",
tx->tx_conn->gnc_device, hold_timeout,
tx->tx_map_key.qword1, tx->tx_map_key.qword2);
}
rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, 0);
LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
} else {
- rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
- LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+ rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
+ LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
}
tx->tx_buftype--;
libcfs_nid2str(conn->gnc_peer->gnp_nid) : "<?>",
tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx,
kgnilnd_tx_state2str(tx->tx_list_state),
- cfs_duration_sec((long)jiffies - tx->tx_qtime));
+ cfs_duration_sec((unsigned long)jiffies - tx->tx_qtime));
}
/* The error codes determine if we hold onto the MDD */
* if we are sending to the same node faster than 256000/sec.
* To help guard against this, we OR in the tx_seq - that is 32 bits */
- tx->tx_id.txe_chips = (__u32)(jiffies | conn->gnc_tx_seq);
+ tx->tx_id.txe_chips = (__u32)(jiffies | atomic_read(&conn->gnc_tx_seq));
GNIDBG_TX(D_NET, tx, "set cookie/id/bits", NULL);
* close message.
*/
if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) != 0 && msg->gnm_type != GNILND_MSG_CLOSE) {
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
/* Return -ETIME, we are closing the connection already so we dont want to
* have this tx hit the wire. The tx will be killed by the calling function.
* Once the EP is marked dirty the close message will be the last
}
if (time_after_eq(now, newest_last_rx + GNILND_TIMEOUTRX(timeout))) {
- GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, "Cant send to %s after timeout lapse of %lu; TO %lu",
+ GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn,
+ "Cant send to %s after timeout lapse of %lu; TO %lu\n",
libcfs_nid2str(conn->gnc_peer->gnp_nid),
cfs_duration_sec(now - newest_last_rx),
cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
return -ETIME;
}
*/
msg->gnm_connstamp = conn->gnc_my_connstamp;
msg->gnm_payload_len = immediatenob;
- msg->gnm_seq = conn->gnc_tx_seq;
+ msg->gnm_seq = atomic_read(&conn->gnc_tx_seq);
/* always init here - kgn_checksum is a /sys module tunable
* and can be flipped at any point, even between msg init and sending */
if (unlikely(tx->tx_state & GNILND_TX_FAIL_SMSG)) {
rrc = cfs_fail_val ? cfs_fail_val : GNI_RC_NOT_DONE;
} else {
- rrc = kgnilnd_smsg_send(conn->gnc_ephandle,
- msg, sizeof(*msg), immediate, immediatenob,
- tx->tx_id.txe_smsg_id);
+ rrc = kgnilnd_smsg_send(conn->gnc_ephandle,
+ msg, sizeof(*msg), immediate,
+ immediatenob,
+ tx->tx_id.txe_smsg_id);
}
switch (rrc) {
case GNI_RC_SUCCESS:
- conn->gnc_tx_seq++;
+ atomic_inc(&conn->gnc_tx_seq);
conn->gnc_last_tx = jiffies;
/* no locking here as LIVE isn't a list */
kgnilnd_tx_add_state_locked(tx, NULL, conn, GNILND_TX_LIVE_FMAQ, 1);
/* serialize with seeing CQ events for completion on this, as well as
* tx_seq */
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
atomic_inc(&conn->gnc_device->gnd_short_ntx);
atomic64_add(immediatenob, &conn->gnc_device->gnd_short_txbytes);
/* XXX Nic: We need to figure out how to track this
* - there are bound to be good reasons for it,
* but we want to know when it happens */
-
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
/* We'll handle this error inline - makes the calling logic much more
* clean */
}
default:
/* handle bad retcode gracefully */
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
return -EIO;
}
}
int rc;
timestamp = jiffies;
- mutex_lock(&dev->gnd_cq_mutex);
+ kgnilnd_gl_mutex_lock(&dev->gnd_cq_mutex);
+ kgnilnd_conn_mutex_lock(&tx->tx_conn->gnc_smsg_mutex);
/* delay in jiffies - we are really concerned only with things that
* result in a schedule() or really holding this off for long times .
* NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
rc = 0;
} else {
atomic_inc(&conn->gnc_device->gnd_fast_try);
- rc = mutex_trylock(&conn->gnc_device->gnd_cq_mutex);
+ rc = kgnilnd_trylock(&conn->gnc_device->gnd_cq_mutex,
+ &conn->gnc_smsg_mutex);
}
if (!rc) {
rc = -EAGAIN;
void
kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
{
- int rc;
+ int rc = 0;
int add_tail = 1;
/* set the tx_id here, we delay it until we have an actual conn
kgn_peer_t *new_peer = NULL;
kgn_conn_t *conn = NULL;
int rc;
+ int node_state;
ENTRY;
CFS_RACE(CFS_FAIL_GNI_FIND_TARGET);
+ node_state = kgnilnd_get_node_state(LNET_NIDADDR(target->nid));
+
/* NB - this will not block during normal operations -
* the only writer of this is in the startup/shutdown path. */
rc = down_read_trylock(&kgnilnd_data.kgn_net_rw_sem);
/* ignore previous peer entirely - we cycled the lock, so we
* will create new peer and at worst drop it if peer is still
* in the tables */
- rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net);
+ rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net, node_state);
if (rc != 0) {
up_read(&kgnilnd_data.kgn_net_rw_sem);
GOTO(no_peer, rc);
* if we don't find it, add our new one to the list */
kgnilnd_add_peer_locked(target->nid, new_peer, &peer);
+ /* don't create a connection if the peer is not up */
+ if (peer->gnp_down != GNILND_RCA_NODE_UP) {
+ write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+ rc = -ENETRESET;
+ GOTO(no_peer, rc);
+ }
+
conn = kgnilnd_find_or_create_conn_locked(peer);
+
+ if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DGRAM_DROP_TX)) {
+ write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+ GOTO(no_peer, rc);
+ }
+
if (conn != NULL) {
/* oh hey, found a conn now... magical */
kgnilnd_queue_tx(conn, tx);
RETURN_EXIT;
}
-void
+int
kgnilnd_rdma(kgn_tx_t *tx, int type,
kgn_rdma_desc_t *sink, unsigned int nob, __u64 cookie)
{
if (tx->tx_buffer_copy == NULL) {
/* Allocate the largest copy buffer we will need, this will prevent us from overwriting data
* and require at most we allocate a few extra bytes. */
- tx->tx_buffer_copy = vmalloc(desc_nob);
+ tx->tx_buffer_copy = kgnilnd_vzalloc(desc_nob);
if (!tx->tx_buffer_copy) {
/* allocation of buffer failed nak the rdma */
kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
kgnilnd_tx_done(tx, -EFAULT);
- return;
+ return 0;
}
kgnilnd_admin_addref(kgnilnd_data.kgn_rev_copy_buff);
rc = kgnilnd_mem_register(conn->gnc_device->gnd_handle, (__u64)tx->tx_buffer_copy, desc_nob, NULL, GNI_MEM_READWRITE, &tx->tx_buffer_copy_map_key);
if (rc != GNI_RC_SUCCESS) {
/* Registration Failed nak rdma and kill the tx. */
- vfree(tx->tx_buffer_copy);
+ kgnilnd_vfree(tx->tx_buffer_copy,
+ desc_nob);
tx->tx_buffer_copy = NULL;
kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
kgnilnd_tx_done(tx, -EFAULT);
- return;
+ return 0;
}
}
desc_map_key = tx->tx_buffer_copy_map_key;
tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key;
tx->tx_rdma_desc.length = desc_nob;
tx->tx_nob_rdma = nob;
- if (*kgnilnd_tunables.kgn_bte_dlvr_mode)
- tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_dlvr_mode;
+ if (post_type == GNI_POST_RDMA_PUT && *kgnilnd_tunables.kgn_bte_put_dlvr_mode)
+ tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_put_dlvr_mode;
+ if (post_type == GNI_POST_RDMA_GET && *kgnilnd_tunables.kgn_bte_get_dlvr_mode)
+ tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_get_dlvr_mode;
/* prep final completion message */
kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid);
tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
if (nob == 0) {
kgnilnd_queue_tx(conn, tx);
- return;
+ return 0;
}
/* Don't lie (CLOSE == RDMA idle) */
LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n",
tx, conn, conn->gnc_close_sent);
- GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x cookie:"LPX64,
- type, tx->tx_rdma_desc.dlvr_mode, cookie);
+ GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x conn %p dlvr_mode "
+ "0x%x cookie:%#llx",
+ type, conn, tx->tx_rdma_desc.dlvr_mode, cookie);
/* set CQ dedicated for RDMA */
tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh;
timestamp = jiffies;
- mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex);
+ kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
/* delay in jiffies - we are really concerned only with things that
* result in a schedule() or really holding this off for long times .
* NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
rrc = kgnilnd_post_rdma(conn->gnc_ephandle, &tx->tx_rdma_desc);
+ if (rrc == GNI_RC_ERROR_RESOURCE) {
+ kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_unmap_buffer(tx, 0);
+
+ if (tx->tx_buffer_copy != NULL) {
+ kgnilnd_vfree(tx->tx_buffer_copy, desc_nob);
+ tx->tx_buffer_copy = NULL;
+ }
+
+ spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
+ kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn,
+ GNILND_TX_MAPQ, 0);
+ spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
+ kgnilnd_schedule_device(tx->tx_conn->gnc_device);
+ return -EAGAIN;
+ }
+
spin_lock(&conn->gnc_list_lock);
kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, GNILND_TX_LIVE_RDMAQ, 1);
tx->tx_qtime = jiffies;
spin_unlock(&conn->gnc_list_lock);
-
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
/* XXX Nic: is this a place we should handle more errors for
* robustness sake */
LASSERT(rrc == GNI_RC_SUCCESS);
-
+ return 0;
}
kgn_rx_t *
CDEBUG(D_NET, "consuming %p\n", conn);
timestamp = jiffies;
- mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
/* delay in jiffies - we are really concerned only with things that
* result in a schedule() or really holding this off for long times .
* NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp;
rrc = kgnilnd_smsg_release(conn->gnc_ephandle);
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
LASSERTF(rrc == GNI_RC_SUCCESS, "bad rrc %d\n", rrc);
GNIDBG_SMSG_CREDS(D_NET, conn);
/* if we are eager, free the cache alloc'd msg */
if (unlikely(rx->grx_eager)) {
LIBCFS_FREE(rxmsg, sizeof(*rxmsg) + *kgnilnd_tunables.kgn_max_immediate);
+ atomic_dec(&kgnilnd_data.kgn_neager_allocs);
/* release ref from eager_recv */
kgnilnd_conn_decref(conn);
int target_is_router = lntmsg->msg_target_is_router;
int routing = lntmsg->msg_routing;
unsigned int niov = lntmsg->msg_niov;
- struct iovec *iov = lntmsg->msg_iov;
+ struct kvec *iov = lntmsg->msg_iov;
lnet_kiov_t *kiov = lntmsg->msg_kiov;
unsigned int offset = lntmsg->msg_offset;
unsigned int nob = lntmsg->msg_len;
kgn_conn_t *conn = rx->grx_conn;
kgn_msg_t *rxmsg = rx->grx_msg;
unsigned int niov = lntmsg->msg_niov;
- struct iovec *iov = lntmsg->msg_iov;
+ struct kvec *iov = lntmsg->msg_iov;
lnet_kiov_t *kiov = lntmsg->msg_kiov;
unsigned int offset = lntmsg->msg_offset;
unsigned int nob = lntmsg->msg_len;
CERROR("Couldnt find matching peer %p or conn %p / %p\n",
peer, conn, found_conn);
if (found_conn) {
- CERROR("Unexpected connstamp "LPX64"("LPX64" expected)"
+ CERROR("Unexpected connstamp %#llx(%#llx expected)"
" from %s", rxmsg->gnm_connstamp,
found_conn->gnc_peer_connstamp,
libcfs_nid2str(peer->gnp_nid));
/* we have no credits or buffers for this message, so copy it
* somewhere for a later kgnilnd_recv */
+ if (atomic_read(&kgnilnd_data.kgn_neager_allocs) >=
+ *kgnilnd_tunables.kgn_eager_credits) {
+ CERROR("Out of eager credits to %s\n",
+ libcfs_nid2str(conn->gnc_peer->gnp_nid));
+ return -ENOMEM;
+ }
+
+ atomic_inc(&kgnilnd_data.kgn_neager_allocs);
+
LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate);
if (eagermsg == NULL) {
kgnilnd_conn_decref(conn);
int
kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
int delayed, unsigned int niov,
- struct iovec *iov, lnet_kiov_t *kiov,
+ struct kvec *iov, lnet_kiov_t *kiov,
unsigned int offset, unsigned int mlen, unsigned int rlen)
{
kgn_rx_t *rx = private;
tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
-
+ tx->tx_qtime = jiffies;
/* we only queue from kgnilnd_recv - we might get called from other contexts
* and we don't want to block the mutex in those cases */
int rc = 0;
int count = 0;
int reconnect;
+ int to_reconn;
short releaseconn = 0;
unsigned long first_rx = 0;
+ int purgatory_conn_cnt = 0;
CDEBUG(D_NET, "checking peer 0x%p->%s for timeouts; interval %lus\n",
peer, libcfs_nid2str(peer->gnp_nid),
reconnect = (peer->gnp_down == GNILND_RCA_NODE_UP) &&
(atomic_read(&peer->gnp_dirty_eps) == 0);
+ /* fast reconnect after a timeout */
+ to_reconn = !conn &&
+ (peer->gnp_last_errno == -ETIMEDOUT) &&
+ *kgnilnd_tunables.kgn_fast_reconn;
+
/* if we are not connected and there are tx on the gnp_tx_queue waiting
* to be sent, we'll check the reconnect interval and fire up a new
* connection request */
- if ((peer->gnp_connecting == GNILND_PEER_IDLE) &&
+ if (reconnect &&
+ (peer->gnp_connecting == GNILND_PEER_IDLE) &&
(time_after_eq(jiffies, peer->gnp_reconnect_time)) &&
- !list_empty(&peer->gnp_tx_queue) && reconnect) {
+ (!list_empty(&peer->gnp_tx_queue) || to_reconn)) {
CDEBUG(D_NET, "starting connect to %s\n",
libcfs_nid2str(peer->gnp_nid));
cfs_duration_sec(waiting));
kgnilnd_detach_purgatory_locked(conn, souls);
+ } else {
+ purgatory_conn_cnt++;
+ }
+ }
+ }
+
+ /* If we have too many connections in purgatory we could run out of
+ * resources. Limit the number of connections to a tunable number,
+ * clean up to the minimum all in one fell swoop... there are
+ * situations where dvs will retry tx's and we can eat up several
+ * hundread connection requests at once.
+ */
+ if (purgatory_conn_cnt > *kgnilnd_tunables.kgn_max_purgatory) {
+ list_for_each_entry_safe(conn, connN, &peer->gnp_conns,
+ gnc_list) {
+ if (conn->gnc_in_purgatory &&
+ conn->gnc_state == GNILND_CONN_DONE) {
+ CDEBUG(D_NET, "Dropping Held resource due to"
+ " resource limits being hit\n");
+ kgnilnd_detach_purgatory_locked(conn, souls);
+
+ if (purgatory_conn_cnt-- <
+ *kgnilnd_tunables.kgn_max_purgatory)
+ break;
}
}
}
next_check_time);
mod_timer(&timer, (long) jiffies + timeout);
- /* check flag variables before comitting */
+ /* check flag variables before committing */
if (!kgnilnd_data.kgn_shutdown &&
!kgnilnd_data.kgn_quiesce_trigger) {
CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n",
lnet_copy_flat2kiov(
niov, kiov, offset,
nob,
- tx->tx_buffer_copy, tx->tx_offset, nob);
+ tx->tx_buffer_copy + tx->tx_offset, 0, nob);
} else {
memcpy(tx->tx_buffer, tx->tx_buffer_copy + tx->tx_offset, nob);
}
long num_processed = 0;
kgn_conn_t *conn = NULL;
kgn_tx_t *tx = NULL;
+ kgn_rdma_desc_t *rdesc;
+ unsigned int rnob;
+ __u64 rcookie;
for (;;) {
/* make sure we don't keep looping if we need to reset */
}
if (rrc == GNI_RC_NOT_DONE) {
- mutex_unlock(&dev->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
CDEBUG(D_INFO, "SEND RDMA CQ %d empty processed %ld\n",
dev->gnd_id, num_processed);
return num_processed;
"this is bad, somehow our credits didn't protect us"
" from CQ overrun\n");
LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_POST,
- "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
+ "rrc %d, GNI_CQ_GET_TYPE(%#llx) = %#llx\n", rrc,
event_data, GNI_CQ_GET_TYPE(event_data));
rrc = kgnilnd_get_completed(dev->gnd_snd_rdma_cqh, event_data,
&desc);
- mutex_unlock(&dev->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
/* XXX Nic: Need better error handling here... */
LASSERTF((rrc == GNI_RC_SUCCESS) ||
}
/* remove from rdmaq */
+ kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex);
spin_lock(&conn->gnc_list_lock);
kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
spin_unlock(&conn->gnc_list_lock);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
+
+ if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RDMA_CQ_ERROR)) {
+ event_data = 1LL << 48;
+ rc = 1;
+ }
if (likely(desc->status == GNI_RC_SUCCESS) && rc == 0) {
atomic_inc(&dev->gnd_rdma_ntx);
/* drop ref from kgnilnd_validate_tx_ev_id */
kgnilnd_admin_decref(conn->gnc_tx_in_use);
kgnilnd_conn_decref(conn);
+
continue;
}
if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) {
- if (should_retry) {
- kgnilnd_rdma(tx, tx->tx_msg.gnm_type,
- &tx->tx_putinfo.gnpam_desc,
- tx->tx_putinfo.gnpam_desc.gnrd_nob,
- tx->tx_putinfo.gnpam_dst_cookie);
- } else {
- kgnilnd_nak_rdma(conn, tx->tx_msg.gnm_type,
- -EFAULT,
- tx->tx_putinfo.gnpam_dst_cookie,
- tx->tx_msg.gnm_srcnid);
- kgnilnd_tx_done(tx, -EFAULT);
- }
+ rdesc = &tx->tx_putinfo.gnpam_desc;
+ rnob = tx->tx_putinfo.gnpam_desc.gnrd_nob;
+ rcookie = tx->tx_putinfo.gnpam_dst_cookie;
} else {
- if (should_retry) {
- kgnilnd_rdma(tx, tx->tx_msg.gnm_type,
- &tx->tx_getinfo.gngm_desc,
- tx->tx_lntmsg[0]->msg_len,
- tx->tx_getinfo.gngm_cookie);
- } else {
- kgnilnd_nak_rdma(conn, tx->tx_msg.gnm_type,
- -EFAULT,
- tx->tx_getinfo.gngm_cookie,
- tx->tx_msg.gnm_srcnid);
- kgnilnd_tx_done(tx, -EFAULT);
- }
+ rdesc = &tx->tx_getinfo.gngm_desc;
+ rnob = tx->tx_lntmsg[0]->msg_len;
+ rcookie = tx->tx_getinfo.gngm_cookie;
+ }
+
+ if (should_retry) {
+ kgnilnd_rdma(tx,
+ tx->tx_msg.gnm_type,
+ rdesc,
+ rnob, rcookie);
+ } else {
+ kgnilnd_nak_rdma(conn,
+ tx->tx_msg.gnm_type,
+ -EFAULT,
+ rcookie,
+ tx->tx_msg.gnm_srcnid);
+ kgnilnd_tx_done(tx, -GNILND_NOPURG);
+ kgnilnd_close_conn(conn, -ECOMM);
}
/* drop ref from kgnilnd_validate_tx_ev_id */
}
rrc = kgnilnd_cq_get_event(dev->gnd_snd_fma_cqh, &event_data);
- mutex_unlock(&dev->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
if (rrc == GNI_RC_NOT_DONE) {
CDEBUG(D_INFO,
- "SMSG send CQ %d not ready (data "LPX64") "
+ "SMSG send CQ %d not ready (data %#llx) "
"processed %ld\n", dev->gnd_id, event_data,
num_processed);
return num_processed;
"this is bad, somehow our credits didn't "
"protect us from CQ overrun\n");
LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_SMSG,
- "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
+ "rrc %d, GNI_CQ_GET_TYPE(%#llx) = %#llx\n", rrc,
event_data, GNI_CQ_GET_TYPE(event_data));
/* if SMSG couldn't handle an error, time for conn to die */
if (conn == NULL) {
/* Conn was destroyed? */
CDEBUG(D_NET,
- "SMSG CQID lookup "LPX64" failed\n",
+ "SMSG CQID lookup %#llx failed\n",
GNI_CQ_GET_INST_ID(event_data));
write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
continue;
}
/* lock tx_list_state and tx_state */
+ kgnilnd_conn_mutex_lock(&conn->gnc_smsg_mutex);
spin_lock(&tx->tx_conn->gnc_list_lock);
GNITX_ASSERTF(tx, tx->tx_list_state == GNILND_TX_LIVE_FMAQ,
saw_reply = !(tx->tx_state & GNILND_TX_WAITING_REPLY);
spin_unlock(&tx->tx_conn->gnc_list_lock);
+ kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
if (queued_fma) {
CDEBUG(D_NET, "scheduling conn 0x%p->%s for fmaq\n",
return 1;
}
rrc = kgnilnd_cq_get_event(dev->gnd_rcv_fma_cqh, &event_data);
- mutex_unlock(&dev->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
if (rrc == GNI_RC_NOT_DONE) {
- CDEBUG(D_INFO, "SMSG RX CQ %d empty data "LPX64" "
+ CDEBUG(D_INFO, "SMSG RX CQ %d empty data %#llx "
"processed %ld\n",
dev->gnd_id, event_data, num_processed);
return num_processed;
/* sender should get error event too and take care
of failed transaction by re-transmitting */
if (rrc == GNI_RC_TRANSACTION_ERROR) {
- CDEBUG(D_NET, "SMSG RX CQ error "LPX64"\n", event_data);
+ CDEBUG(D_NET, "SMSG RX CQ error %#llx\n", event_data);
continue;
}
conn = kgnilnd_cqid2conn_locked(
GNI_CQ_GET_INST_ID(event_data));
if (conn == NULL) {
- CDEBUG(D_NET, "SMSG RX CQID lookup "LPU64" "
- "failed, dropping event "LPX64"\n",
+ CDEBUG(D_NET, "SMSG RX CQID lookup %llu "
+ "failed, dropping event %#llx\n",
GNI_CQ_GET_INST_ID(event_data),
event_data);
} else {
- CDEBUG(D_NET, "SMSG RX: CQID "LPU64" "
+ CDEBUG(D_NET, "SMSG RX: CQID %llu "
"conn %p->%s\n",
GNI_CQ_GET_INST_ID(event_data),
conn, conn->gnc_peer ?
rc = kgnilnd_map_buffer(tx);
}
- /* rc should be 0 if we mapped succesfully here, if non-zero we are queueing */
+ /* rc should be 0 if we mapped successfully here, if non-zero
+ * we are queueing */
if (rc != 0) {
/* if try_map_if_full set, they handle requeuing */
if (unlikely(try_map_if_full)) {
* remote node where the RDMA will be started
* Special case -EAGAIN logic - this should just queued as if the mapping couldn't
* be satisified. The rest of the errors are "hard" errors that require
- * upper layers to handle themselves */
+ * upper layers to handle themselves.
+ * If kgnilnd_post_rdma returns a resource error, kgnilnd_rdma will put
+ * the tx back on the TX_MAPQ. When this tx is pulled back off the MAPQ,
+ * it's gnm_type will now be GNILND_MSG_PUT_DONE or
+ * GNILND_MSG_GET_DONE_REV.
+ */
case GNILND_MSG_GET_REQ:
tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie;
break;
/* PUT_REQ and GET_DONE are where we do the actual RDMA */
+ case GNILND_MSG_PUT_DONE:
case GNILND_MSG_PUT_REQ:
- kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
+ rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
&tx->tx_putinfo.gnpam_desc,
tx->tx_putinfo.gnpam_desc.gnrd_nob,
tx->tx_putinfo.gnpam_dst_cookie);
+ RETURN(try_map_if_full ? rc : 0);
break;
case GNILND_MSG_GET_DONE:
- kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
+ rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
&tx->tx_getinfo.gngm_desc,
tx->tx_lntmsg[0]->msg_len,
tx->tx_getinfo.gngm_cookie);
-
+ RETURN(try_map_if_full ? rc : 0);
break;
case GNILND_MSG_PUT_REQ_REV:
tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
break;
case GNILND_MSG_PUT_DONE_REV:
- kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE_REV,
+ rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE_REV,
&tx->tx_getinfo.gngm_desc,
- tx->tx_lntmsg[0]->msg_len,
+ tx->tx_nob,
tx->tx_getinfo.gngm_cookie);
+ RETURN(try_map_if_full ? rc : 0);
break;
case GNILND_MSG_GET_ACK_REV:
tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_key = tx->tx_map_key;
/* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */
rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
break;
+ case GNILND_MSG_GET_DONE_REV:
case GNILND_MSG_GET_REQ_REV:
- kgnilnd_rdma(tx, GNILND_MSG_GET_DONE_REV,
+ rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE_REV,
&tx->tx_putinfo.gnpam_desc,
tx->tx_putinfo.gnpam_desc.gnrd_nob,
tx->tx_putinfo.gnpam_dst_cookie);
-
+ RETURN(try_map_if_full ? rc : 0);
break;
}
GNITX_ASSERTF(tx, tx->tx_id.txe_smsg_id != 0,
"tx with zero id", NULL);
- CDEBUG(D_NET, "sending regular msg: %p, type %s(0x%02x), cookie "LPX64"\n",
+ CDEBUG(D_NET, "sending regular msg: %p, type %s(0x%02x), cookie %#llx\n",
tx, kgnilnd_msgtype2str(tx->tx_msg.gnm_type),
tx->tx_msg.gnm_type, tx->tx_id.txe_cookie);
GNITX_ASSERTF(tx, ((tx->tx_id.txe_idx == ev_id.txe_idx) &&
(tx->tx_id.txe_cookie = cookie)),
"conn 0x%p->%s tx_ref_table hosed: wanted "
- "txe_cookie "LPX64" txe_idx %d "
- "found tx %p cookie "LPX64" txe_idx %d\n",
+ "txe_cookie %#llx txe_idx %d "
+ "found tx %p cookie %#llx txe_idx %d\n",
conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
cookie, ev_id.txe_idx,
tx, tx->tx_id.txe_cookie, tx->tx_id.txe_idx);
tx->tx_state, GNILND_TX_WAITING_REPLY,
libcfs_nid2str(conn->gnc_peer->gnp_nid));
} else {
- CWARN("Unmatched reply %02x, or %02x/"LPX64" from %s\n",
+ CWARN("Unmatched reply %02x, or %02x/%#llx from %s\n",
type1, type2, cookie, libcfs_nid2str(conn->gnc_peer->gnp_nid));
}
return tx;
{
int complete = 0;
kgn_conn_t *conn = tx->tx_conn;
+ __u64 nob = tx->tx_nob;
+ __u32 physnop = tx->tx_phys_npages;
+ int id = tx->tx_id.txe_smsg_id;
+ int buftype = tx->tx_buftype;
+ gni_mem_handle_t hndl;
+ hndl.qword1 = tx->tx_map_key.qword1;
+ hndl.qword2 = tx->tx_map_key.qword2;
spin_lock(&conn->gnc_list_lock);
tx->tx_rc = rc;
tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
+ if (rc == -EFAULT) {
+ CDEBUG(D_NETERROR, "Error %d TX data: TX %p tx_id %x nob %16llu physnop %8d buffertype %#8x MemHandle %#llx.%#llxx\n",
+ rc, tx, id, nob, physnop, buftype, hndl.qword1, hndl.qword2);
+
+ if(*kgnilnd_tunables.kgn_efault_lbug) {
+ GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg,
+ "error %d on tx 0x%p->%s id %u/%d state %s age %ds",
+ rc, tx, conn ?
+ libcfs_nid2str(conn->gnc_peer->gnp_nid) : "<?>",
+ tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx,
+ kgnilnd_tx_state2str(tx->tx_list_state),
+ cfs_duration_sec((unsigned long) jiffies - tx->tx_qtime));
+ LBUG();
+ }
+ }
+
if (!(tx->tx_state & GNILND_TX_WAITING_COMPLETION)) {
kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
/* sample under lock as follow on steps require gnc_list_lock
RETURN_EXIT;
timestamp = jiffies;
- mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
/* delay in jiffies - we are really concerned only with things that
* result in a schedule() or really holding this off for long times .
* NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
libcfs_nid2str(conn->gnc_peer->gnp_nid),
cfs_duration_sec(timestamp - newest_last_rx),
cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
rc = -ETIME;
kgnilnd_close_conn(conn, rc);
RETURN_EXIT;
rrc = kgnilnd_smsg_getnext(conn->gnc_ephandle, &prefix);
if (rrc == GNI_RC_NOT_DONE) {
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
- CDEBUG(D_INFO, "SMSG RX empty\n");
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ CDEBUG(D_INFO, "SMSG RX empty conn 0x%p\n", conn);
RETURN_EXIT;
}
*/
if (rrc == GNI_RC_INVALID_STATE) {
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
GNIDBG_CONN(D_NETERROR | D_CONSOLE, conn, "Mailbox corruption "
"detected closing conn %p from peer %s\n", conn,
libcfs_nid2str(conn->gnc_peer->gnp_nid));
rx = kgnilnd_alloc_rx();
if (rx == NULL) {
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
kgnilnd_release_msg(conn);
GNIDBG_MSG(D_NETERROR, msg, "Dropping SMSG RX from 0x%p->%s, no RX memory",
conn, libcfs_nid2str(peer->gnp_nid));
RETURN_EXIT;
}
- GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p from %s",
- conn, libcfs_nid2str(peer->gnp_nid));
+ GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p", conn);
timestamp = conn->gnc_last_rx;
- last_seq = conn->gnc_rx_seq;
+ seq = last_seq = atomic_read(&conn->gnc_rx_seq);
+ atomic_inc(&conn->gnc_rx_seq);
conn->gnc_last_rx = jiffies;
/* stash first rx so we can clear out purgatory
if (conn->gnc_first_rx == 0)
conn->gnc_first_rx = jiffies;
- seq = conn->gnc_rx_seq++;
-
/* needs to linger to protect gnc_rx_seq like we do with gnc_tx_seq */
- mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+ kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
kgnilnd_peer_alive(conn->gnc_peer);
rx->grx_msg = msg;
}
if (msg->gnm_connstamp != conn->gnc_peer_connstamp) {
- GNIDBG_MSG(D_NETERROR, msg, "Unexpected connstamp "LPX64"("LPX64
+ GNIDBG_MSG(D_NETERROR, msg, "Unexpected connstamp %#llx(%#llx"
" expected) from %s",
msg->gnm_connstamp, conn->gnc_peer_connstamp,
libcfs_nid2str(peer->gnp_nid));
conn, last_seq,
cfs_duration_sec(now - timestamp),
cfs_duration_sec(now - conn->gnc_last_rx_cq),
- conn->gnc_tx_seq,
+ atomic_read(&conn->gnc_tx_seq),
cfs_duration_sec(now - conn->gnc_last_tx),
cfs_duration_sec(now - conn->gnc_last_tx_cq),
cfs_duration_sec(now - conn->gnc_last_noop_want),
* mapped so we can reset our timers */
dev->gnd_map_attempt = 0;
continue;
+ } else if (rc == -EAGAIN) {
+ spin_lock(&dev->gnd_lock);
+ mod_timer(&dev->gnd_map_timer, dev->gnd_next_map);
+ spin_unlock(&dev->gnd_lock);
+ GOTO(get_out_mapped, rc);
} else if (rc != -ENOMEM) {
/* carp, failure we can't handle */
kgnilnd_tx_done(tx, rc);
} else {
GNIDBG_TX(log_retrans_level, tx,
"transient map failure #%d %d pages/%d bytes phys %u@%u "
- "virt %u@"LPU64" "
+ "virt %u@%llu "
"nq_map %d mdd# %d/%d GART %ld",
dev->gnd_map_attempt, tx->tx_phys_npages, tx->tx_nob,
dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE,
* yet. Cycle this conn back through
* the scheduler. */
kgnilnd_schedule_conn(conn);
- } else
- kgnilnd_complete_closed_conn(conn);
-
+ } else {
+ kgnilnd_complete_closed_conn(conn);
+ }
up_write(&dev->gnd_conn_sem);
} else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) {
/* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
DEFINE_WAIT(wait);
dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs];
+
cfs_block_allsigs();
/* all gnilnd threads need to run fairly urgently */