X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fgnilnd%2Fgnilnd_cb.c;h=d9839cef372b48ee182a58449296885ec2340566;hp=56be88a64357476e45b2e7950ca16cd439998e16;hb=2c800c7ad0102407ea89af4d2a98ba761b957d4a;hpb=4d381ef9f179b21217c237ad1cc83055a2448550 diff --git a/lnet/klnds/gnilnd/gnilnd_cb.c b/lnet/klnds/gnilnd/gnilnd_cb.c index 56be88a..d9839ce 100644 --- a/lnet/klnds/gnilnd/gnilnd_cb.c +++ b/lnet/klnds/gnilnd/gnilnd_cb.c @@ -4,6 +4,7 @@ * Copyright (C) 2009-2012 Cray, Inc. * * Derived from work by Eric Barton + * Author: James Shimek * Author: Nic Henke * * This file is part of Lustre, http://www.lustre.org. @@ -23,6 +24,7 @@ * */ +#include #include #include "gnilnd.h" @@ -117,8 +119,11 @@ kgnilnd_device_callback(__u32 devid, __u64 arg) * < 0 : do not reschedule under any circumstances * == 0: reschedule if someone marked him WANTS_SCHED * > 0 : force a reschedule */ +/* Return code 0 means it did not schedule the conn, 1 + * means it successfully scheduled the conn. + */ -void +int kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent) { int conn_sched; @@ -136,19 +141,28 @@ kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent) if (sched_intent >= 0) { if ((sched_intent > 0 || (conn_sched == GNILND_CONN_WANTS_SCHED))) { - kgnilnd_schedule_conn(conn); + return kgnilnd_schedule_conn_refheld(conn, 1); } } + return 0; } -void -kgnilnd_schedule_conn(kgn_conn_t *conn) +/* Return of 0 for conn not scheduled, 1 returned if conn was scheduled or marked + * as scheduled */ + +int +_kgnilnd_schedule_conn(kgn_conn_t *conn, const char *caller, int line, int refheld) { kgn_device_t *dev = conn->gnc_device; int sched; + int rc; sched = xchg(&conn->gnc_scheduled, GNILND_CONN_WANTS_SCHED); - + /* we only care about the last person who marked want_sched since they + * are most likely the culprit + */ + memcpy(conn->gnc_sched_caller, caller, sizeof(conn->gnc_sched_caller)); + conn->gnc_sched_line = line; /* if we are IDLE, add to list - only one guy sees IDLE and "wins" * the chance to put it onto gnd_ready_conns. * otherwise, leave marked as WANTS_SCHED and the thread that "owns" @@ -158,25 +172,33 @@ kgnilnd_schedule_conn(kgn_conn_t *conn) if (sched == GNILND_CONN_IDLE) { /* if the conn is already scheduled, we've already requested * the scheduler thread wakeup */ - kgnilnd_conn_addref(conn); /* +1 ref for scheduler */ - + if (!refheld) { + /* Add a reference to the conn if we are not holding a reference + * already from the exisiting scheduler. We now use the same + * reference if we need to reschedule a conn while in a scheduler + * thread. + */ + kgnilnd_conn_addref(conn); + } LASSERTF(list_empty(&conn->gnc_schedlist), "conn %p already sched state %d\n", conn, sched); - CDEBUG(D_INFO, "scheduling conn 0x%p\n", conn); + CDEBUG(D_INFO, "scheduling conn 0x%p caller %s:%d\n", conn, caller, line); spin_lock(&dev->gnd_lock); list_add_tail(&conn->gnc_schedlist, &dev->gnd_ready_conns); spin_unlock(&dev->gnd_lock); set_mb(conn->gnc_last_sched_ask, jiffies); - + rc = 1; } else { - CDEBUG(D_INFO, "not scheduling conn 0x%p: %d\n", conn, sched); + CDEBUG(D_INFO, "not scheduling conn 0x%p: %d caller %s:%d\n", conn, sched, caller, line); + rc = 0; } /* make sure thread(s) going to process conns - but let it make * separate decision from conn schedule */ kgnilnd_schedule_device(dev); + return rc; } void @@ -208,27 +230,33 @@ kgnilnd_free_tx(kgn_tx_t *tx) /* we only allocate this if we need to */ if (tx->tx_phys != NULL) { - cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys); + kmem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys); CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n", LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys); } + + /* Only free the buffer if we used it */ + if (tx->tx_buffer_copy != NULL) { + vfree(tx->tx_buffer_copy); + tx->tx_buffer_copy = NULL; + CDEBUG(D_MALLOC, "vfreed buffer2\n"); + } #if 0 KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t)); #endif - cfs_mem_cache_free(kgnilnd_data.kgn_tx_cache, tx); - CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n", - sizeof(*tx), tx); + CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n", sizeof(*tx), tx); + kmem_cache_free(kgnilnd_data.kgn_tx_cache, tx); } kgn_tx_t * -kgnilnd_alloc_tx(void) +kgnilnd_alloc_tx (void) { - kgn_tx_t *tx = NULL; + kgn_tx_t *tx = NULL; if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ALLOC_TX)) return tx; - tx = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_cache, CFS_ALLOC_ATOMIC); + tx = kmem_cache_alloc(kgnilnd_data.kgn_tx_cache, GFP_ATOMIC); if (tx == NULL) { CERROR("failed to allocate tx\n"); return NULL; @@ -416,14 +444,40 @@ kgnilnd_new_tx_msg(int type, lnet_nid_t source) } static void -kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t source) { +kgnilnd_nak_rdma(kgn_conn_t *conn, int rx_type, int error, __u64 cookie, lnet_nid_t source) { kgn_tx_t *tx; + int nak_type; + + switch (rx_type) { + case GNILND_MSG_GET_REQ: + case GNILND_MSG_GET_DONE: + nak_type = GNILND_MSG_GET_NAK; + break; + case GNILND_MSG_PUT_REQ: + case GNILND_MSG_PUT_ACK: + case GNILND_MSG_PUT_DONE: + nak_type = GNILND_MSG_PUT_NAK; + break; + case GNILND_MSG_PUT_REQ_REV: + case GNILND_MSG_PUT_DONE_REV: + nak_type = GNILND_MSG_PUT_NAK_REV; + break; + case GNILND_MSG_GET_REQ_REV: + case GNILND_MSG_GET_ACK_REV: + case GNILND_MSG_GET_DONE_REV: + nak_type = GNILND_MSG_GET_NAK_REV; + break; + default: + CERROR("invalid msg type %s (%d)\n", + kgnilnd_msgtype2str(rx_type), rx_type); + LBUG(); + } /* only allow NAK on error and truncate to zero */ LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n", error, conn, cookie); - tx = kgnilnd_new_tx_msg(type, source); + tx = kgnilnd_new_tx_msg(nak_type, source); if (tx == NULL) { CNETERR("can't get TX to NAK RDMA to %s\n", libcfs_nid2str(conn->gnc_peer->gnp_nid)); @@ -436,9 +490,9 @@ kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t } int -kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *iov, - lnet_kiov_t *kiov, unsigned int offset, unsigned int nob) - +kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, + struct kvec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int nob) { kgn_msg_t *msg = &tx->tx_msg; int i; @@ -447,13 +501,19 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io * gni_smsg_send to send that as the payload */ LASSERT(tx->tx_buftype == GNILND_BUF_NONE); - LASSERT(nob >= 0); if (nob == 0) { tx->tx_buffer = NULL; } else if (kiov != NULL) { + + if ((niov > 0) && unlikely(niov > (nob/PAGE_SIZE))) { + niov = ((nob + offset + kiov->kiov_offset + PAGE_SIZE - 1) / + PAGE_SIZE); + } + LASSERTF(niov > 0 && niov < GNILND_MAX_IMMEDIATE/PAGE_SIZE, - "bad niov %d\n", niov); + "bad niov %d msg %p kiov %p iov %p offset %d nob%d\n", + niov, msg, kiov, iov, offset, nob); while (offset >= kiov->kiov_len) { offset -= kiov->kiov_len; @@ -469,7 +529,7 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io * than kiov_len, we will also have a whole at the end of that page * which isn't allowed */ if ((kiov[i].kiov_offset != 0 && i > 0) || - (kiov[i].kiov_offset + kiov[i].kiov_len != CFS_PAGE_SIZE && i < niov - 1)) { + (kiov[i].kiov_offset + kiov[i].kiov_len != PAGE_SIZE && i < niov - 1)) { CNETERR("Can't make payload contiguous in I/O VM:" "page %d, offset %u, nob %u, kiov_offset %u kiov_len %u \n", i, offset, nob, kiov->kiov_offset, kiov->kiov_len); @@ -546,7 +606,7 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io int kgnilnd_setup_virt_buffer(kgn_tx_t *tx, - unsigned int niov, struct iovec *iov, + unsigned int niov, struct kvec *iov, unsigned int offset, unsigned int nob) { @@ -577,8 +637,8 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov, unsigned int offset, unsigned int nob) { gni_mem_segment_t *phys; - int rc = 0; - unsigned int fraglen; + int rc = 0; + unsigned int fraglen; GNIDBG_TX(D_NET, tx, "niov %d kiov 0x%p offset %u nob %u", nkiov, kiov, offset, nob); @@ -587,8 +647,8 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov, LASSERT(tx->tx_buftype == GNILND_BUF_NONE); /* only allocate this if we are going to use it */ - tx->tx_phys = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache, - CFS_ALLOC_ATOMIC); + tx->tx_phys = kmem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache, + GFP_ATOMIC); if (tx->tx_phys == NULL) { CERROR("failed to allocate tx_phys\n"); rc = -ENOMEM; @@ -658,7 +718,7 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov, "nkiov %u offset %u\n", kiov->kiov_page, kiov->kiov_offset, kiov->kiov_len, nob, nkiov, offset); - phys->address = lnet_page2phys(kiov->kiov_page); + phys->address = page_to_phys(kiov->kiov_page); phys++; kiov++; nkiov--; @@ -676,7 +736,7 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov, error: if (tx->tx_phys != NULL) { - cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys); + kmem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys); CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n", sizeof(*tx->tx_phys), tx->tx_phys); tx->tx_phys = NULL; @@ -686,12 +746,14 @@ error: static inline int kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov, - struct iovec *iov, lnet_kiov_t *kiov, + struct kvec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int nob) { int rc; - LASSERT((iov == NULL) != (kiov == NULL)); + LASSERTF((iov == NULL) != (kiov == NULL), "iov 0x%p, kiov 0x%p, tx 0x%p," + " offset %d, nob %d, niov %d\n" + , iov, kiov, tx, offset, nob, niov); if (kiov != NULL) { rc = kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob); @@ -701,9 +763,20 @@ kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov, return rc; } +/* kgnilnd_parse_lnet_rdma() + * lntmsg - message passed in from lnet. + * niov, kiov, offset - see lnd_t in lib-types.h for descriptions. + * nob - actual number of bytes to in this message. + * put_len - It is possible for PUTs to have a different length than the + * length stored in lntmsg->msg_len since LNET can adjust this + * length based on it's buffer size and offset. + * lnet_try_match_md() sets the mlength that we use to do the RDMA + * transfer. + */ static void -kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *offset, - unsigned int *nob, lnet_kiov_t **kiov) +kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, + unsigned int *offset, unsigned int *nob, + lnet_kiov_t **kiov, int put_len) { /* GETs are weird, see kgnilnd_send */ if (lntmsg->msg_type == LNET_MSG_GET) { @@ -718,13 +791,13 @@ kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *of } else { *kiov = lntmsg->msg_kiov; *niov = lntmsg->msg_niov; - *nob = lntmsg->msg_len; + *nob = put_len; *offset = lntmsg->msg_offset; } } static inline void -kgnilnd_compute_rdma_cksum(kgn_tx_t *tx) +kgnilnd_compute_rdma_cksum(kgn_tx_t *tx, int put_len) { unsigned int niov, offset, nob; lnet_kiov_t *kiov; @@ -732,10 +805,18 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx) int dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1); GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) || - (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE)), + (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE) || + (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV)), "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type)); - + if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV)) { + tx->tx_msg.gnm_payload_cksum = 0; + return; + } if (*kgnilnd_tunables.kgn_checksum < 3) { tx->tx_msg.gnm_payload_cksum = 0; return; @@ -743,7 +824,8 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx) GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL); - kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov); + kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, + put_len); if (kiov != NULL) { tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, dump_cksum); @@ -759,8 +841,13 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx) } } +/* kgnilnd_verify_rdma_cksum() + * tx - PUT_DONE/GET_DONE matched tx. + * rx_cksum - received checksum to compare against. + * put_len - see kgnilnd_parse_lnet_rdma comments. + */ static inline int -kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum) +kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum, int put_len) { int rc = 0; __u16 cksum; @@ -771,9 +858,18 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum) /* we can only match certain requests */ GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) || - (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK)), + (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK) || + (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV)), "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type)); + if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) || + (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV)) { + return 0; + } + if (rx_cksum == 0) { if (*kgnilnd_tunables.kgn_checksum >= 3) { GNIDBG_MSG(D_WARNING, &tx->tx_msg, @@ -784,7 +880,7 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum) GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL); - kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov); + kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, put_len); if (kiov != NULL) { cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, 0); @@ -1001,14 +1097,17 @@ kgnilnd_map_buffer(kgn_tx_t *tx) void kgnilnd_add_purgatory_tx(kgn_tx_t *tx) { - kgn_conn_t *conn = tx->tx_conn; - kgn_mdd_purgatory_t *gmp; + kgn_conn_t *conn = tx->tx_conn; + kgn_mdd_purgatory_t *gmp; LIBCFS_ALLOC(gmp, sizeof(*gmp)); LASSERTF(gmp != NULL, "couldn't allocate MDD purgatory member;" " asserting to avoid data corruption\n"); + if (tx->tx_buffer_copy) + gmp->gmp_map_key = tx->tx_buffer_copy_map_key; + else + gmp->gmp_map_key = tx->tx_map_key; - gmp->gmp_map_key = tx->tx_map_key; atomic_inc(&conn->gnc_device->gnd_n_mdd_held); /* ensure that we don't have a blank purgatory - indicating the @@ -1066,6 +1165,7 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error) * verified peer notification - the theory is that * a TX error can be communicated in all other cases */ if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED && + error != -GNILND_NOPURG && kgnilnd_check_purgatory_conn(tx->tx_conn)) { kgnilnd_add_purgatory_tx(tx); @@ -1079,10 +1179,15 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error) tx->tx_conn->gnc_device, hold_timeout, tx->tx_map_key.qword1, tx->tx_map_key.qword2); } - - rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout); - - LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc); + if (tx->tx_buffer_copy != NULL) { + rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_buffer_copy_map_key, hold_timeout); + LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc); + rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, 0); + LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc); + } else { + rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout); + LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc); + } tx->tx_buftype--; kgnilnd_mem_del_map_list(dev, tx); @@ -1112,7 +1217,7 @@ kgnilnd_tx_done(kgn_tx_t *tx, int completion) libcfs_nid2str(conn->gnc_peer->gnp_nid) : "", tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx, kgnilnd_tx_state2str(tx->tx_list_state), - cfs_duration_sec((long)jiffies - tx->tx_qtime)); + cfs_duration_sec((unsigned long)jiffies - tx->tx_qtime)); } /* The error codes determine if we hold onto the MDD */ @@ -1159,6 +1264,7 @@ kgnilnd_tx_done(kgn_tx_t *tx, int completion) /* warning - we should hold no locks here - calling lnet_finalize * could free up lnet credits, resulting in a call chain back into * the LND via kgnilnd_send and friends */ + lnet_finalize(ni, lntmsg0, status0); if (lntmsg1 != NULL) { @@ -1229,7 +1335,7 @@ search_again: * if we are sending to the same node faster than 256000/sec. * To help guard against this, we OR in the tx_seq - that is 32 bits */ - tx->tx_id.txe_chips = (__u32)(jiffies | conn->gnc_tx_seq); + tx->tx_id.txe_chips = (__u32)(jiffies | atomic_read(&conn->gnc_tx_seq)); GNIDBG_TX(D_NET, tx, "set cookie/id/bits", NULL); @@ -1331,7 +1437,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, * close message. */ if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) != 0 && msg->gnm_type != GNILND_MSG_CLOSE) { - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); /* Return -ETIME, we are closing the connection already so we dont want to * have this tx hit the wire. The tx will be killed by the calling function. * Once the EP is marked dirty the close message will be the last @@ -1349,11 +1456,13 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, } if (time_after_eq(now, newest_last_rx + GNILND_TIMEOUTRX(timeout))) { - GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, "Cant send to %s after timeout lapse of %lu; TO %lu", + GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, + "Cant send to %s after timeout lapse of %lu; TO %lu\n", libcfs_nid2str(conn->gnc_peer->gnp_nid), cfs_duration_sec(now - newest_last_rx), cfs_duration_sec(GNILND_TIMEOUTRX(timeout))); - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); return -ETIME; } @@ -1364,7 +1473,7 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, */ msg->gnm_connstamp = conn->gnc_my_connstamp; msg->gnm_payload_len = immediatenob; - msg->gnm_seq = conn->gnc_tx_seq; + msg->gnm_seq = atomic_read(&conn->gnc_tx_seq); /* always init here - kgn_checksum is a /sys module tunable * and can be flipped at any point, even between msg init and sending */ @@ -1388,14 +1497,15 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, if (unlikely(tx->tx_state & GNILND_TX_FAIL_SMSG)) { rrc = cfs_fail_val ? cfs_fail_val : GNI_RC_NOT_DONE; } else { - rrc = kgnilnd_smsg_send(conn->gnc_ephandle, - msg, sizeof(*msg), immediate, immediatenob, - tx->tx_id.txe_smsg_id); + rrc = kgnilnd_smsg_send(conn->gnc_ephandle, + msg, sizeof(*msg), immediate, + immediatenob, + tx->tx_id.txe_smsg_id); } switch (rrc) { case GNI_RC_SUCCESS: - conn->gnc_tx_seq++; + atomic_inc(&conn->gnc_tx_seq); conn->gnc_last_tx = jiffies; /* no locking here as LIVE isn't a list */ kgnilnd_tx_add_state_locked(tx, NULL, conn, GNILND_TX_LIVE_FMAQ, 1); @@ -1409,7 +1519,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, /* serialize with seeing CQ events for completion on this, as well as * tx_seq */ - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); atomic_inc(&conn->gnc_device->gnd_short_ntx); atomic64_add(immediatenob, &conn->gnc_device->gnd_short_txbytes); @@ -1421,8 +1532,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, /* XXX Nic: We need to figure out how to track this * - there are bound to be good reasons for it, * but we want to know when it happens */ - - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); /* We'll handle this error inline - makes the calling logic much more * clean */ @@ -1459,7 +1570,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, } default: /* handle bad retcode gracefully */ - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); return -EIO; } } @@ -1474,7 +1586,8 @@ kgnilnd_sendmsg(kgn_tx_t *tx, void *immediate, unsigned int immediatenob, int rc; timestamp = jiffies; - mutex_lock(&dev->gnd_cq_mutex); + kgnilnd_gl_mutex_lock(&dev->gnd_cq_mutex); + kgnilnd_conn_mutex_lock(&tx->tx_conn->gnc_smsg_mutex); /* delay in jiffies - we are really concerned only with things that * result in a schedule() or really holding this off for long times . * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */ @@ -1519,7 +1632,8 @@ kgnilnd_sendmsg_trylock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob rc = 0; } else { atomic_inc(&conn->gnc_device->gnd_fast_try); - rc = mutex_trylock(&conn->gnc_device->gnd_cq_mutex); + rc = kgnilnd_trylock(&conn->gnc_device->gnd_cq_mutex, + &conn->gnc_smsg_mutex); } if (!rc) { rc = -EAGAIN; @@ -1599,7 +1713,7 @@ kgnilnd_queue_rdma(kgn_conn_t *conn, kgn_tx_t *tx) void kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx) { - int rc; + int rc = 0; int add_tail = 1; /* set the tx_id here, we delay it until we have an actual conn @@ -1627,6 +1741,8 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx) switch (tx->tx_msg.gnm_type) { case GNILND_MSG_PUT_ACK: case GNILND_MSG_GET_REQ: + case GNILND_MSG_PUT_REQ_REV: + case GNILND_MSG_GET_ACK_REV: /* hijacking time! If this messages will authorize our peer to * send his dirty little bytes in an RDMA, we need to get permission */ kgnilnd_queue_rdma(conn, tx); @@ -1638,17 +1754,8 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx) if (rc >= 0) { /* it was sent, break out of switch to avoid default case of queueing */ break; - } else if (rc == -EAGAIN) { - /* needs to queue to try again, so fall through to default case */ - } else { - /* bail: it wasnt sent and we didn't get EAGAIN indicating - * we should retrans - We do not close the conn due to locking - * we let the reaper thread take care of it. There are no hard - * errors from send_msg that would require close to be called - */ - kgnilnd_tx_done(tx, rc); - break; } + /* needs to queue to try again, so fall through to default case */ case GNILND_MSG_NOOP: /* Just make sure this goes out first for this conn */ add_tail = 0; @@ -1669,6 +1776,7 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target) kgn_peer_t *new_peer = NULL; kgn_conn_t *conn = NULL; int rc; + int node_state; ENTRY; @@ -1695,6 +1803,13 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target) read_unlock(&kgnilnd_data.kgn_peer_conn_lock); RETURN_EXIT; } + + /* don't create a connection if the peer is marked down */ + if (peer->gnp_down == GNILND_RCA_NODE_DOWN) { + read_unlock(&kgnilnd_data.kgn_peer_conn_lock); + rc = -ENETRESET; + GOTO(no_peer, rc); + } } /* creating peer or conn; I'll need a write lock... */ @@ -1702,6 +1817,8 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target) CFS_RACE(CFS_FAIL_GNI_FIND_TARGET); + node_state = kgnilnd_get_node_state(LNET_NIDADDR(target->nid)); + /* NB - this will not block during normal operations - * the only writer of this is in the startup/shutdown path. */ rc = down_read_trylock(&kgnilnd_data.kgn_net_rw_sem); @@ -1713,7 +1830,7 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target) /* ignore previous peer entirely - we cycled the lock, so we * will create new peer and at worst drop it if peer is still * in the tables */ - rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net); + rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net, node_state); if (rc != 0) { up_read(&kgnilnd_data.kgn_net_rw_sem); GOTO(no_peer, rc); @@ -1726,7 +1843,20 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target) * if we don't find it, add our new one to the list */ kgnilnd_add_peer_locked(target->nid, new_peer, &peer); + /* don't create a connection if the peer is not up */ + if (peer->gnp_down != GNILND_RCA_NODE_UP) { + write_unlock(&kgnilnd_data.kgn_peer_conn_lock); + rc = -ENETRESET; + GOTO(no_peer, rc); + } + conn = kgnilnd_find_or_create_conn_locked(peer); + + if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DGRAM_DROP_TX)) { + write_unlock(&kgnilnd_data.kgn_peer_conn_lock); + GOTO(no_peer, rc); + } + if (conn != NULL) { /* oh hey, found a conn now... magical */ kgnilnd_queue_tx(conn, tx); @@ -1742,14 +1872,18 @@ no_peer: RETURN_EXIT; } -void +int kgnilnd_rdma(kgn_tx_t *tx, int type, kgn_rdma_desc_t *sink, unsigned int nob, __u64 cookie) { kgn_conn_t *conn = tx->tx_conn; unsigned long timestamp; + gni_post_type_t post_type; gni_return_t rrc; - + int rc = 0; + unsigned int desc_nob = nob; + void *desc_buffer = tx->tx_buffer; + gni_mem_handle_t desc_map_key = tx->tx_map_key; LASSERTF(kgnilnd_tx_mapped(tx), "unmapped tx %p\n", tx); LASSERTF(conn != NULL, @@ -1761,45 +1895,110 @@ kgnilnd_rdma(kgn_tx_t *tx, int type, "nob %d > tx(%p)->tx_nob %d\n", nob, tx, tx->tx_nob); + switch (type) { + case GNILND_MSG_GET_DONE: + case GNILND_MSG_PUT_DONE: + post_type = GNI_POST_RDMA_PUT; + break; + case GNILND_MSG_GET_DONE_REV: + case GNILND_MSG_PUT_DONE_REV: + post_type = GNI_POST_RDMA_GET; + break; + default: + CERROR("invalid msg type %s (%d)\n", + kgnilnd_msgtype2str(type), type); + LBUG(); + } + if (post_type == GNI_POST_RDMA_GET) { + /* Check for remote buffer / local buffer / length alignment. All must be 4 byte + * aligned. If the local buffer is not aligned correctly using the copy buffer + * will fix that issue. If length is misaligned copy buffer will also fix the issue, we end + * up transferring extra bytes into the buffer but only copy the correct nob into the original + * buffer. Remote offset correction is done through a combination of adjusting the offset, + * making sure the length and addr are aligned and copying the data into the correct location + * once the transfer has completed. + */ + if ((((__u64)((unsigned long)tx->tx_buffer)) & 3) || + (sink->gnrd_addr & 3) || + (nob & 3)) { + + tx->tx_offset = ((__u64)((unsigned long)sink->gnrd_addr)) & 3; + if (tx->tx_offset) + kgnilnd_admin_addref(kgnilnd_data.kgn_rev_offset); + + if ((nob + tx->tx_offset) & 3) { + desc_nob = ((nob + tx->tx_offset) + (4 - ((nob + tx->tx_offset) & 3))); + kgnilnd_admin_addref(kgnilnd_data.kgn_rev_length); + } else { + desc_nob = (nob + tx->tx_offset); + } + + if (tx->tx_buffer_copy == NULL) { + /* Allocate the largest copy buffer we will need, this will prevent us from overwriting data + * and require at most we allocate a few extra bytes. */ + tx->tx_buffer_copy = vmalloc(desc_nob); + + if (!tx->tx_buffer_copy) { + /* allocation of buffer failed nak the rdma */ + kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid); + kgnilnd_tx_done(tx, -EFAULT); + return 0; + } + kgnilnd_admin_addref(kgnilnd_data.kgn_rev_copy_buff); + rc = kgnilnd_mem_register(conn->gnc_device->gnd_handle, (__u64)tx->tx_buffer_copy, desc_nob, NULL, GNI_MEM_READWRITE, &tx->tx_buffer_copy_map_key); + if (rc != GNI_RC_SUCCESS) { + /* Registration Failed nak rdma and kill the tx. */ + vfree(tx->tx_buffer_copy); + tx->tx_buffer_copy = NULL; + kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid); + kgnilnd_tx_done(tx, -EFAULT); + return 0; + } + } + desc_map_key = tx->tx_buffer_copy_map_key; + desc_buffer = tx->tx_buffer_copy; + } + } + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.post_id = tx->tx_id.txe_cookie; - tx->tx_rdma_desc.type = GNI_POST_RDMA_PUT; + tx->tx_rdma_desc.type = post_type; tx->tx_rdma_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT; - tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)tx->tx_buffer); - tx->tx_rdma_desc.local_mem_hndl = tx->tx_map_key; - tx->tx_rdma_desc.remote_addr = sink->gnrd_addr; + tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)desc_buffer); + tx->tx_rdma_desc.local_mem_hndl = desc_map_key; + tx->tx_rdma_desc.remote_addr = sink->gnrd_addr - tx->tx_offset; tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key; - tx->tx_rdma_desc.length = nob; - if (!*kgnilnd_tunables.kgn_bte_hash) - tx->tx_rdma_desc.dlvr_mode |= GNI_DLVMODE_NO_HASH; - if (!*kgnilnd_tunables.kgn_bte_adapt) - tx->tx_rdma_desc.dlvr_mode |= (GNI_DLVMODE_NO_ADAPT | GNI_DLVMODE_NO_RADAPT); - + tx->tx_rdma_desc.length = desc_nob; + tx->tx_nob_rdma = nob; + if (*kgnilnd_tunables.kgn_bte_dlvr_mode) + tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_dlvr_mode; /* prep final completion message */ kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid); tx->tx_msg.gnm_u.completion.gncm_cookie = cookie; /* send actual size RDMA'd in retval */ tx->tx_msg.gnm_u.completion.gncm_retval = nob; - kgnilnd_compute_rdma_cksum(tx); + kgnilnd_compute_rdma_cksum(tx, nob); if (nob == 0) { kgnilnd_queue_tx(conn, tx); - return; + return 0; } /* Don't lie (CLOSE == RDMA idle) */ LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n", tx, conn, conn->gnc_close_sent); - GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x", - type, tx->tx_rdma_desc.dlvr_mode); + GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x conn %p dlvr_mode " + "0x%x cookie:"LPX64, + type, conn, tx->tx_rdma_desc.dlvr_mode, cookie); /* set CQ dedicated for RDMA */ tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh; timestamp = jiffies; - mutex_lock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex); + kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex); /* delay in jiffies - we are really concerned only with things that * result in a schedule() or really holding this off for long times . * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */ @@ -1807,25 +2006,43 @@ kgnilnd_rdma(kgn_tx_t *tx, int type, rrc = kgnilnd_post_rdma(conn->gnc_ephandle, &tx->tx_rdma_desc); + if (rrc == GNI_RC_ERROR_RESOURCE) { + kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_unmap_buffer(tx, 0); + + if (tx->tx_buffer_copy != NULL) { + vfree(tx->tx_buffer_copy); + tx->tx_buffer_copy = NULL; + } + + spin_lock(&tx->tx_conn->gnc_device->gnd_lock); + kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, + GNILND_TX_MAPQ, 0); + spin_unlock(&tx->tx_conn->gnc_device->gnd_lock); + kgnilnd_schedule_device(tx->tx_conn->gnc_device); + return -EAGAIN; + } + spin_lock(&conn->gnc_list_lock); kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, GNILND_TX_LIVE_RDMAQ, 1); tx->tx_qtime = jiffies; spin_unlock(&conn->gnc_list_lock); - - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex); /* XXX Nic: is this a place we should handle more errors for * robustness sake */ LASSERT(rrc == GNI_RC_SUCCESS); - + return 0; } kgn_rx_t * kgnilnd_alloc_rx(void) { - kgn_rx_t *rx; + kgn_rx_t *rx; - rx = cfs_mem_cache_alloc(kgnilnd_data.kgn_rx_cache, CFS_ALLOC_ATOMIC); + rx = kmem_cache_alloc(kgnilnd_data.kgn_rx_cache, GFP_ATOMIC); if (rx == NULL) { CERROR("failed to allocate rx\n"); return NULL; @@ -1848,14 +2065,14 @@ kgnilnd_release_msg(kgn_conn_t *conn) CDEBUG(D_NET, "consuming %p\n", conn); timestamp = jiffies; - mutex_lock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex); /* delay in jiffies - we are really concerned only with things that * result in a schedule() or really holding this off for long times . * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */ conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp; rrc = kgnilnd_smsg_release(conn->gnc_ephandle); - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); LASSERTF(rrc == GNI_RC_SUCCESS, "bad rrc %d\n", rrc); GNIDBG_SMSG_CREDS(D_NET, conn); @@ -1872,6 +2089,7 @@ kgnilnd_consume_rx(kgn_rx_t *rx) /* if we are eager, free the cache alloc'd msg */ if (unlikely(rx->grx_eager)) { LIBCFS_FREE(rxmsg, sizeof(*rxmsg) + *kgnilnd_tunables.kgn_max_immediate); + atomic_dec(&kgnilnd_data.kgn_neager_allocs); /* release ref from eager_recv */ kgnilnd_conn_decref(conn); @@ -1880,7 +2098,7 @@ kgnilnd_consume_rx(kgn_rx_t *rx) kgnilnd_release_msg(conn); } - cfs_mem_cache_free(kgnilnd_data.kgn_rx_cache, rx); + kmem_cache_free(kgnilnd_data.kgn_rx_cache, rx); CDEBUG(D_MALLOC, "slab-freed 'rx': %lu at %p.\n", sizeof(*rx), rx); @@ -1896,7 +2114,7 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int niov = lntmsg->msg_niov; - struct iovec *iov = lntmsg->msg_iov; + struct kvec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; @@ -1905,6 +2123,7 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) kgn_tx_t *tx; int rc = 0; int mpflag = 0; + int reverse_rdma_flag = *kgnilnd_tunables.kgn_reverse_rdma; /* NB 'private' is different depending on what we're sending.... */ LASSERT(!in_interrupt()); @@ -1951,12 +2170,15 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) if (lntmsg->msg_md->md_length <= *kgnilnd_tunables.kgn_max_immediate) break; - tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid); + if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0) + tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid); + else + tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ_REV, ni->ni_nid); + if (tx == NULL) { rc = -ENOMEM; goto out; } - /* slightly different options as we might actually have a GET with a * MD_KIOV set but a non-NULL md_iov.iov */ if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) @@ -1984,11 +2206,14 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } tx->tx_lntmsg[0] = lntmsg; - tx->tx_msg.gnm_u.get.gngm_hdr = *hdr; + if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0) + tx->tx_msg.gnm_u.get.gngm_hdr = *hdr; + else + tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ kgnilnd_launch_tx(tx, net, &target); goto out; - case LNET_MSG_REPLY: case LNET_MSG_PUT: /* to save on MDDs, we'll handle short kiov by vmap'ing @@ -1996,7 +2221,11 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) if (nob <= *kgnilnd_tunables.kgn_max_immediate) break; - tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid); + if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0) + tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid); + else + tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ_REV, ni->ni_nid); + if (tx == NULL) { rc = -ENOMEM; goto out; @@ -2010,7 +2239,11 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } tx->tx_lntmsg[0] = lntmsg; - tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr; + if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0) + tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr; + else + tx->tx_msg.gnm_u.get.gngm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ kgnilnd_launch_tx(tx, net, &target); goto out; @@ -2045,19 +2278,35 @@ out: } void -kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg) +kgnilnd_setup_rdma(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg, int mlen) { kgn_conn_t *conn = rx->grx_conn; kgn_msg_t *rxmsg = rx->grx_msg; unsigned int niov = lntmsg->msg_niov; - struct iovec *iov = lntmsg->msg_iov; + struct kvec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; + int done_type; kgn_tx_t *tx; int rc = 0; - tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_DONE, ni->ni_nid); + switch (rxmsg->gnm_type) { + case GNILND_MSG_PUT_REQ_REV: + done_type = GNILND_MSG_PUT_DONE_REV; + nob = mlen; + break; + case GNILND_MSG_GET_REQ: + done_type = GNILND_MSG_GET_DONE; + break; + default: + CERROR("invalid msg type %s (%d)\n", + kgnilnd_msgtype2str(rxmsg->gnm_type), + rxmsg->gnm_type); + LBUG(); + } + + tx = kgnilnd_new_tx_msg(done_type, ni->ni_nid); if (tx == NULL) goto failed_0; @@ -2084,7 +2333,7 @@ kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg) failed_1: kgnilnd_tx_done(tx, rc); - kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); + kgnilnd_nak_rdma(conn, done_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); failed_0: lnet_finalize(ni, lntmsg, rc); } @@ -2097,6 +2346,8 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, kgn_conn_t *conn = rx->grx_conn; kgn_msg_t *rxmsg = rx->grx_msg; kgn_msg_t *eagermsg = NULL; + kgn_peer_t *peer = NULL; + kgn_conn_t *found_conn = NULL; GNIDBG_MSG(D_NET, rxmsg, "eager recv for conn %p, rxmsg %p, lntmsg %p", conn, rxmsg, lntmsg); @@ -2106,11 +2357,56 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, rxmsg->gnm_payload_len); return -EPROTO; } + /* Grab a read lock so the connection doesnt disappear on us + * while we look it up + */ + read_lock(&kgnilnd_data.kgn_peer_conn_lock); + + peer = kgnilnd_find_peer_locked(rxmsg->gnm_srcnid); + if (peer != NULL) + found_conn = kgnilnd_find_conn_locked(peer); + + + /* Verify the connection found is the same one that the message + * is supposed to be using, if it is not output an error message + * and return. + */ + if (!peer || !found_conn + || found_conn->gnc_peer_connstamp != rxmsg->gnm_connstamp) { + read_unlock(&kgnilnd_data.kgn_peer_conn_lock); + CERROR("Couldnt find matching peer %p or conn %p / %p\n", + peer, conn, found_conn); + if (found_conn) { + CERROR("Unexpected connstamp "LPX64"("LPX64" expected)" + " from %s", rxmsg->gnm_connstamp, + found_conn->gnc_peer_connstamp, + libcfs_nid2str(peer->gnp_nid)); + } + return -ENOTCONN; + } + + /* add conn ref to ensure it doesn't go away until all eager + * messages processed */ + kgnilnd_conn_addref(conn); + + /* Now that we have verified the connection is valid and added a + * reference we can remove the read_lock on the peer_conn_lock */ + read_unlock(&kgnilnd_data.kgn_peer_conn_lock); /* we have no credits or buffers for this message, so copy it * somewhere for a later kgnilnd_recv */ + if (atomic_read(&kgnilnd_data.kgn_neager_allocs) >= + *kgnilnd_tunables.kgn_eager_credits) { + CERROR("Out of eager credits to %s\n", + libcfs_nid2str(conn->gnc_peer->gnp_nid)); + return -ENOMEM; + } + + atomic_inc(&kgnilnd_data.kgn_neager_allocs); + LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate); if (eagermsg == NULL) { + kgnilnd_conn_decref(conn); CERROR("couldn't allocate eager rx message for conn %p to %s\n", conn, libcfs_nid2str(conn->gnc_peer->gnp_nid)); return -ENOMEM; @@ -2124,9 +2420,6 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, /* stash this for lnet_finalize on cancel-on-conn-close */ rx->grx_lntmsg = lntmsg; - /* add conn ref to ensure it doesn't go away until all eager messages processed */ - kgnilnd_conn_addref(conn); - /* keep the same rx_t, it just has a new grx_msg now */ *new_private = private; @@ -2139,7 +2432,7 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, - struct iovec *iov, lnet_kiov_t *kiov, + struct kvec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen) { kgn_rx_t *rx = private; @@ -2175,6 +2468,9 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, switch (rxmsg->gnm_type) { default: + GNIDBG_MSG(D_NETERROR, rxmsg, "conn %p, rx %p, rxmsg %p, lntmsg %p" + " niov=%d kiov=%p iov=%p offset=%d mlen=%d rlen=%d", + conn, rx, rxmsg, lntmsg, niov, kiov, iov, offset, mlen, rlen); LBUG(); case GNILND_MSG_IMMEDIATE: @@ -2250,7 +2546,7 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, /* only error if lntmsg == NULL, otherwise we are just * short circuiting the rdma process of 0 bytes */ - kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, lntmsg == NULL ? -ENOENT : 0, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); @@ -2281,7 +2577,7 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen; tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */ - + tx->tx_qtime = jiffies; /* we only queue from kgnilnd_recv - we might get called from other contexts * and we don't want to block the mutex in those cases */ @@ -2295,20 +2591,116 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, nak_put_req: /* make sure we send an error back when the PUT fails */ - kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); kgnilnd_tx_done(tx, rc); kgnilnd_consume_rx(rx); /* return magic LNet network error */ RETURN(-EIO); + case GNILND_MSG_GET_REQ_REV: + /* LNET wants to truncate or drop transaction, sending NAK */ + if (mlen == 0) { + kgnilnd_consume_rx(rx); + lnet_finalize(ni, lntmsg, 0); + + /* only error if lntmsg == NULL, otherwise we are just + * short circuiting the rdma process of 0 bytes */ + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, + lntmsg == NULL ? -ENOENT : 0, + rxmsg->gnm_u.get.gngm_cookie, + ni->ni_nid); + RETURN(0); + } + /* lntmsg can be null when parsing a LNET_GET */ + if (lntmsg != NULL) { + /* sending ACK with sink buff. info */ + tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_ACK_REV, ni->ni_nid); + if (tx == NULL) { + kgnilnd_consume_rx(rx); + RETURN(-ENOMEM); + } + + rc = kgnilnd_set_tx_id(tx, conn); + if (rc != 0) + GOTO(nak_get_req_rev, rc); + + + rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen); + if (rc != 0) + GOTO(nak_get_req_rev, rc); + + tx->tx_msg.gnm_u.putack.gnpam_src_cookie = + rxmsg->gnm_u.putreq.gnprm_cookie; + tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie; + tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr = + (__u64)((unsigned long)tx->tx_buffer); + tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen; + + tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */ + + /* we only queue from kgnilnd_recv - we might get called from other contexts + * and we don't want to block the mutex in those cases */ + + spin_lock(&tx->tx_conn->gnc_device->gnd_lock); + kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1); + spin_unlock(&tx->tx_conn->gnc_device->gnd_lock); + kgnilnd_schedule_device(tx->tx_conn->gnc_device); + } else { + /* No match */ + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, + -ENOENT, + rxmsg->gnm_u.get.gngm_cookie, + ni->ni_nid); + } + + kgnilnd_consume_rx(rx); + RETURN(0); + +nak_get_req_rev: + /* make sure we send an error back when the GET fails */ + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); + kgnilnd_tx_done(tx, rc); + kgnilnd_consume_rx(rx); + + /* return magic LNet network error */ + RETURN(-EIO); + + + case GNILND_MSG_PUT_REQ_REV: + /* LNET wants to truncate or drop transaction, sending NAK */ + if (mlen == 0) { + kgnilnd_consume_rx(rx); + lnet_finalize(ni, lntmsg, 0); + + /* only error if lntmsg == NULL, otherwise we are just + * short circuiting the rdma process of 0 bytes */ + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, + lntmsg == NULL ? -ENOENT : 0, + rxmsg->gnm_u.get.gngm_cookie, + ni->ni_nid); + RETURN(0); + } + + if (lntmsg != NULL) { + /* Matched! */ + kgnilnd_setup_rdma(ni, rx, lntmsg, mlen); + } else { + /* No match */ + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, + -ENOENT, + rxmsg->gnm_u.get.gngm_cookie, + ni->ni_nid); + } + kgnilnd_consume_rx(rx); + RETURN(0); case GNILND_MSG_GET_REQ: if (lntmsg != NULL) { /* Matched! */ - kgnilnd_reply(ni, rx, lntmsg); + kgnilnd_setup_rdma(ni, rx, lntmsg, mlen); } else { /* No match */ - kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, + kgnilnd_nak_rdma(conn, rxmsg->gnm_type, -ENOENT, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid); @@ -2352,7 +2744,13 @@ kgnilnd_check_conn_timeouts_locked(kgn_conn_t *conn) newest_last_rx = GNILND_LASTRX(conn); if (time_after_eq(now, newest_last_rx + timeout)) { - GNIDBG_CONN(D_CONSOLE|D_NETERROR, conn, "No gnilnd traffic received from %s for %lu " + uint32_t level = D_CONSOLE|D_NETERROR; + + if (conn->gnc_peer->gnp_down == GNILND_RCA_NODE_DOWN) { + level = D_NET; + } + GNIDBG_CONN(level, conn, + "No gnilnd traffic received from %s for %lu " "seconds, terminating connection. Is node down? ", libcfs_nid2str(conn->gnc_peer->gnp_nid), cfs_duration_sec(now - newest_last_rx)); @@ -2401,8 +2799,10 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie, int rc = 0; int count = 0; int reconnect; + int to_reconn; short releaseconn = 0; unsigned long first_rx = 0; + int purgatory_conn_cnt = 0; CDEBUG(D_NET, "checking peer 0x%p->%s for timeouts; interval %lus\n", peer, libcfs_nid2str(peer->gnp_nid), @@ -2465,15 +2865,22 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie, /* Don't reconnect if we are still trying to clear out old conns. * This prevents us sending traffic on the new mbox before ensuring we are done * with the old one */ - reconnect = (atomic_read(&peer->gnp_dirty_eps) == 0); + reconnect = (peer->gnp_down == GNILND_RCA_NODE_UP) && + (atomic_read(&peer->gnp_dirty_eps) == 0); + + /* fast reconnect after a timeout */ + to_reconn = !conn && + (peer->gnp_last_errno == -ETIMEDOUT) && + *kgnilnd_tunables.kgn_fast_reconn; /* if we are not connected and there are tx on the gnp_tx_queue waiting * to be sent, we'll check the reconnect interval and fire up a new * connection request */ - if ((peer->gnp_connecting == GNILND_PEER_IDLE) && + if (reconnect && + (peer->gnp_connecting == GNILND_PEER_IDLE) && (time_after_eq(jiffies, peer->gnp_reconnect_time)) && - !list_empty(&peer->gnp_tx_queue) && reconnect) { + (!list_empty(&peer->gnp_tx_queue) || to_reconn)) { CDEBUG(D_NET, "starting connect to %s\n", libcfs_nid2str(peer->gnp_nid)); @@ -2501,8 +2908,8 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie, */ if (first_rx && time_after(jiffies, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout))) { - CDEBUG(D_NET,"We can release conn %p from purgatory %lu\n", - conn, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout)); + CDEBUG(D_INFO, "We can release peer %s conn's from purgatory %lu\n", + libcfs_nid2str(peer->gnp_nid), first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout)); releaseconn = 1; } @@ -2541,6 +2948,30 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie, cfs_duration_sec(waiting)); kgnilnd_detach_purgatory_locked(conn, souls); + } else { + purgatory_conn_cnt++; + } + } + } + + /* If we have too many connections in purgatory we could run out of + * resources. Limit the number of connections to a tunable number, + * clean up to the minimum all in one fell swoop... there are + * situations where dvs will retry tx's and we can eat up several + * hundread connection requests at once. + */ + if (purgatory_conn_cnt > *kgnilnd_tunables.kgn_max_purgatory) { + list_for_each_entry_safe(conn, connN, &peer->gnp_conns, + gnc_list) { + if (conn->gnc_in_purgatory && + conn->gnc_state == GNILND_CONN_DONE) { + CDEBUG(D_NET, "Dropping Held resource due to" + " resource limits being hit\n"); + kgnilnd_detach_purgatory_locked(conn, souls); + + if (purgatory_conn_cnt-- < + *kgnilnd_tunables.kgn_max_purgatory) + break; } } } @@ -2609,7 +3040,6 @@ kgnilnd_reaper(void *arg) struct timer_list timer; DEFINE_WAIT(wait); - cfs_daemonize("kgnilnd_rpr"); cfs_block_allsigs(); /* all gnilnd threads need to run fairly urgently */ @@ -2643,7 +3073,7 @@ kgnilnd_reaper(void *arg) next_check_time); mod_timer(&timer, (long) jiffies + timeout); - /* check flag variables before comitting */ + /* check flag variables before committing */ if (!kgnilnd_data.kgn_shutdown && !kgnilnd_data.kgn_quiesce_trigger) { CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n", @@ -2696,6 +3126,25 @@ kgnilnd_reaper(void *arg) } int +kgnilnd_recv_bte_get(kgn_tx_t *tx) { + unsigned niov, offset, nob; + lnet_kiov_t *kiov; + lnet_msg_t *lntmsg = tx->tx_lntmsg[0]; + kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, tx->tx_nob_rdma); + + if (kiov != NULL) { + lnet_copy_flat2kiov( + niov, kiov, offset, + nob, + tx->tx_buffer_copy + tx->tx_offset, 0, nob); + } else { + memcpy(tx->tx_buffer, tx->tx_buffer_copy + tx->tx_offset, nob); + } + return 0; +} + + +int kgnilnd_check_rdma_cq(kgn_device_t *dev) { gni_return_t rrc; @@ -2707,6 +3156,9 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) long num_processed = 0; kgn_conn_t *conn = NULL; kgn_tx_t *tx = NULL; + kgn_rdma_desc_t *rdesc; + unsigned int rnob; + __u64 rcookie; for (;;) { /* make sure we don't keep looping if we need to reset */ @@ -2729,7 +3181,7 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) } if (rrc == GNI_RC_NOT_DONE) { - mutex_unlock(&dev->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex); CDEBUG(D_INFO, "SEND RDMA CQ %d empty processed %ld\n", dev->gnd_id, num_processed); return num_processed; @@ -2746,7 +3198,7 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) rrc = kgnilnd_get_completed(dev->gnd_snd_rdma_cqh, event_data, &desc); - mutex_unlock(&dev->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex); /* XXX Nic: Need better error handling here... */ LASSERTF((rrc == GNI_RC_SUCCESS) || @@ -2764,17 +3216,44 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) } GNITX_ASSERTF(tx, tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE || - tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE, + tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE || + tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV || + tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV, "tx %p with type %d\n", tx, tx->tx_msg.gnm_type); GNIDBG_TX(D_NET, tx, "RDMA completion for %d bytes", tx->tx_nob); + if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) { + lnet_set_reply_msg_len(NULL, tx->tx_lntmsg[1], + tx->tx_msg.gnm_u.completion.gncm_retval); + } + + rc = 0; + if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV && desc->status == GNI_RC_SUCCESS) { + if (tx->tx_buffer_copy != NULL) + kgnilnd_recv_bte_get(tx); + rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_putinfo.gnpam_payload_cksum, tx->tx_nob_rdma); + } + + if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV && desc->status == GNI_RC_SUCCESS) { + if (tx->tx_buffer_copy != NULL) + kgnilnd_recv_bte_get(tx); + rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_getinfo.gngm_payload_cksum, tx->tx_nob_rdma); + } + /* remove from rdmaq */ + kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex); spin_lock(&conn->gnc_list_lock); kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD); spin_unlock(&conn->gnc_list_lock); + kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex); - if (likely(desc->status == GNI_RC_SUCCESS)) { + if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RDMA_CQ_ERROR)) { + event_data = 1LL << 48; + rc = 1; + } + + if (likely(desc->status == GNI_RC_SUCCESS) && rc == 0) { atomic_inc(&dev->gnd_rdma_ntx); atomic64_add(tx->tx_nob, &dev->gnd_rdma_txbytes); /* transaction succeeded, add into fmaq */ @@ -2782,7 +3261,9 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) kgnilnd_peer_alive(conn->gnc_peer); /* drop ref from kgnilnd_validate_tx_ev_id */ + kgnilnd_admin_decref(conn->gnc_tx_in_use); kgnilnd_conn_decref(conn); + continue; } @@ -2804,35 +3285,34 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev) GNIDBG_TX(D_NETERROR, tx, "RDMA %s error (%s)", should_retry ? "transient" : "unrecoverable", err_str); - if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) { - if (should_retry) { - kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE, - &tx->tx_putinfo.gnpam_desc, - tx->tx_putinfo.gnpam_desc.gnrd_nob, - tx->tx_putinfo.gnpam_dst_cookie); - } else { - kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, - -EFAULT, - tx->tx_putinfo.gnpam_dst_cookie, - tx->tx_msg.gnm_srcnid); - kgnilnd_tx_done(tx, -EFAULT); - } + if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE || + tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) { + rdesc = &tx->tx_putinfo.gnpam_desc; + rnob = tx->tx_putinfo.gnpam_desc.gnrd_nob; + rcookie = tx->tx_putinfo.gnpam_dst_cookie; } else { - if (should_retry) { - kgnilnd_rdma(tx, GNILND_MSG_GET_DONE, - &tx->tx_getinfo.gngm_desc, - tx->tx_lntmsg[0]->msg_len, - tx->tx_getinfo.gngm_cookie); - } else { - kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, - -EFAULT, - tx->tx_getinfo.gngm_cookie, - tx->tx_msg.gnm_srcnid); - kgnilnd_tx_done(tx, -EFAULT); - } + rdesc = &tx->tx_getinfo.gngm_desc; + rnob = tx->tx_lntmsg[0]->msg_len; + rcookie = tx->tx_getinfo.gngm_cookie; + } + + if (should_retry) { + kgnilnd_rdma(tx, + tx->tx_msg.gnm_type, + rdesc, + rnob, rcookie); + } else { + kgnilnd_nak_rdma(conn, + tx->tx_msg.gnm_type, + -EFAULT, + rcookie, + tx->tx_msg.gnm_srcnid); + kgnilnd_tx_done(tx, -GNILND_NOPURG); + kgnilnd_close_conn(conn, -ECOMM); } /* drop ref from kgnilnd_validate_tx_ev_id */ + kgnilnd_admin_decref(conn->gnc_tx_in_use); kgnilnd_conn_decref(conn); } } @@ -2862,7 +3342,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev) } rrc = kgnilnd_cq_get_event(dev->gnd_snd_fma_cqh, &event_data); - mutex_unlock(&dev->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex); if (rrc == GNI_RC_NOT_DONE) { CDEBUG(D_INFO, @@ -2928,6 +3408,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev) } /* lock tx_list_state and tx_state */ + kgnilnd_conn_mutex_lock(&conn->gnc_smsg_mutex); spin_lock(&tx->tx_conn->gnc_list_lock); GNITX_ASSERTF(tx, tx->tx_list_state == GNILND_TX_LIVE_FMAQ, @@ -2948,6 +3429,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev) saw_reply = !(tx->tx_state & GNILND_TX_WAITING_REPLY); spin_unlock(&tx->tx_conn->gnc_list_lock); + kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex); if (queued_fma) { CDEBUG(D_NET, "scheduling conn 0x%p->%s for fmaq\n", @@ -2985,6 +3467,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev) } /* drop ref from kgnilnd_validate_tx_ev_id */ + kgnilnd_admin_decref(conn->gnc_tx_in_use); kgnilnd_conn_decref(conn); /* if we are waiting for a REPLY, we'll handle the tx then */ @@ -3015,7 +3498,7 @@ kgnilnd_check_fma_rcv_cq(kgn_device_t *dev) return 1; } rrc = kgnilnd_cq_get_event(dev->gnd_rcv_fma_cqh, &event_data); - mutex_unlock(&dev->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex); if (rrc == GNI_RC_NOT_DONE) { CDEBUG(D_INFO, "SMSG RX CQ %d empty data "LPX64" " @@ -3126,7 +3609,8 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full) rc = kgnilnd_map_buffer(tx); } - /* rc should be 0 if we mapped succesfully here, if non-zero we are queueing */ + /* rc should be 0 if we mapped successfully here, if non-zero + * we are queueing */ if (rc != 0) { /* if try_map_if_full set, they handle requeuing */ if (unlikely(try_map_if_full)) { @@ -3150,7 +3634,12 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full) * remote node where the RDMA will be started * Special case -EAGAIN logic - this should just queued as if the mapping couldn't * be satisified. The rest of the errors are "hard" errors that require - * upper layers to handle themselves */ + * upper layers to handle themselves. + * If kgnilnd_post_rdma returns a resource error, kgnilnd_rdma will put + * the tx back on the TX_MAPQ. When this tx is pulled back off the MAPQ, + * it's gnm_type will now be GNILND_MSG_PUT_DONE or + * GNILND_MSG_GET_DONE_REV. + */ case GNILND_MSG_GET_REQ: tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key; tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie; @@ -3174,18 +3663,59 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full) break; /* PUT_REQ and GET_DONE are where we do the actual RDMA */ + case GNILND_MSG_PUT_DONE: case GNILND_MSG_PUT_REQ: - kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE, + rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE, &tx->tx_putinfo.gnpam_desc, tx->tx_putinfo.gnpam_desc.gnrd_nob, tx->tx_putinfo.gnpam_dst_cookie); + RETURN(try_map_if_full ? rc : 0); break; case GNILND_MSG_GET_DONE: - kgnilnd_rdma(tx, GNILND_MSG_GET_DONE, + rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE, &tx->tx_getinfo.gngm_desc, tx->tx_lntmsg[0]->msg_len, tx->tx_getinfo.gngm_cookie); + RETURN(try_map_if_full ? rc : 0); + break; + case GNILND_MSG_PUT_REQ_REV: + tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key; + tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie; + tx->tx_msg.gnm_u.get.gngm_desc.gnrd_addr = (__u64)((unsigned long)tx->tx_buffer); + tx->tx_msg.gnm_u.get.gngm_desc.gnrd_nob = tx->tx_nob; + tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY; + kgnilnd_compute_rdma_cksum(tx, tx->tx_nob); + tx->tx_msg.gnm_u.get.gngm_payload_cksum = tx->tx_msg.gnm_payload_cksum; + rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ); + break; + case GNILND_MSG_PUT_DONE_REV: + rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE_REV, + &tx->tx_getinfo.gngm_desc, + tx->tx_nob, + tx->tx_getinfo.gngm_cookie); + RETURN(try_map_if_full ? rc : 0); + break; + case GNILND_MSG_GET_ACK_REV: + tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_key = tx->tx_map_key; + tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY; + /* LNET_GETS are a special case for parse */ + kgnilnd_compute_rdma_cksum(tx, tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob); + tx->tx_msg.gnm_u.putack.gnpam_payload_cksum = tx->tx_msg.gnm_payload_cksum; + + if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PUT_ACK_AGAIN)) + tx->tx_state |= GNILND_TX_FAIL_SMSG; + + /* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */ + rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ); + break; + case GNILND_MSG_GET_DONE_REV: + case GNILND_MSG_GET_REQ_REV: + rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE_REV, + &tx->tx_putinfo.gnpam_desc, + tx->tx_putinfo.gnpam_desc.gnrd_nob, + tx->tx_putinfo.gnpam_dst_cookie); + RETURN(try_map_if_full ? rc : 0); break; } @@ -3289,15 +3819,22 @@ kgnilnd_process_fmaq(kgn_conn_t *conn) case GNILND_MSG_GET_DONE: case GNILND_MSG_PUT_DONE: + case GNILND_MSG_PUT_DONE_REV: + case GNILND_MSG_GET_DONE_REV: case GNILND_MSG_PUT_NAK: case GNILND_MSG_GET_NAK: + case GNILND_MSG_GET_NAK_REV: + case GNILND_MSG_PUT_NAK_REV: tx->tx_state = GNILND_TX_WAITING_COMPLETION; break; case GNILND_MSG_PUT_REQ: + case GNILND_MSG_GET_REQ_REV: tx->tx_msg.gnm_u.putreq.gnprm_cookie = tx->tx_id.txe_cookie; case GNILND_MSG_PUT_ACK: + case GNILND_MSG_PUT_REQ_REV: + case GNILND_MSG_GET_ACK_REV: case GNILND_MSG_GET_REQ: /* This is really only to handle the retransmit of SMSG once these * two messages are setup in send_mapped_tx */ @@ -3486,6 +4023,13 @@ kgnilnd_complete_tx(kgn_tx_t *tx, int rc) { int complete = 0; kgn_conn_t *conn = tx->tx_conn; + __u64 nob = tx->tx_nob; + __u32 physnop = tx->tx_phys_npages; + int id = tx->tx_id.txe_smsg_id; + int buftype = tx->tx_buftype; + gni_mem_handle_t hndl; + hndl.qword1 = tx->tx_map_key.qword1; + hndl.qword2 = tx->tx_map_key.qword2; spin_lock(&conn->gnc_list_lock); @@ -3495,6 +4039,22 @@ kgnilnd_complete_tx(kgn_tx_t *tx, int rc) tx->tx_rc = rc; tx->tx_state &= ~GNILND_TX_WAITING_REPLY; + if (rc == -EFAULT) { + CDEBUG(D_NETERROR, "Error %d TX data: TX %p tx_id %x nob %16"LPF64"u physnop %8d buffertype %#8x MemHandle "LPX64"."LPX64"x\n", + rc, tx, id, nob, physnop, buftype, hndl.qword1, hndl.qword2); + + if(*kgnilnd_tunables.kgn_efault_lbug) { + GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg, + "error %d on tx 0x%p->%s id %u/%d state %s age %ds", + rc, tx, conn ? + libcfs_nid2str(conn->gnc_peer->gnp_nid) : "", + tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx, + kgnilnd_tx_state2str(tx->tx_list_state), + cfs_duration_sec((unsigned long) jiffies - tx->tx_qtime)); + LBUG(); + } + } + if (!(tx->tx_state & GNILND_TX_WAITING_COMPLETION)) { kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD); /* sample under lock as follow on steps require gnc_list_lock @@ -3518,7 +4078,9 @@ kgnilnd_finalize_rx_done(kgn_tx_t *tx, kgn_msg_t *msg) atomic_inc(&conn->gnc_device->gnd_rdma_nrx); atomic64_add(tx->tx_nob, &conn->gnc_device->gnd_rdma_rxbytes); - rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum); + /* the gncm_retval is passed in for PUTs */ + rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum, + msg->gnm_u.completion.gncm_retval); kgnilnd_complete_tx(tx, rc); } @@ -3539,7 +4101,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) int repost = 1, saw_complete; unsigned long timestamp, newest_last_rx, timeout; int last_seq; - void *memory = NULL; ENTRY; /* Short circuit if the ep_handle is null. @@ -3549,7 +4110,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) RETURN_EXIT; timestamp = jiffies; - mutex_lock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex); /* delay in jiffies - we are really concerned only with things that * result in a schedule() or really holding this off for long times . * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */ @@ -3578,7 +4139,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) libcfs_nid2str(conn->gnc_peer->gnp_nid), cfs_duration_sec(timestamp - newest_last_rx), cfs_duration_sec(GNILND_TIMEOUTRX(timeout))); - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); rc = -ETIME; kgnilnd_close_conn(conn, rc); RETURN_EXIT; @@ -3587,40 +4148,49 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) rrc = kgnilnd_smsg_getnext(conn->gnc_ephandle, &prefix); if (rrc == GNI_RC_NOT_DONE) { - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); - CDEBUG(D_INFO, "SMSG RX empty\n"); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + CDEBUG(D_INFO, "SMSG RX empty conn 0x%p\n", conn); RETURN_EXIT; } + /* Instead of asserting when we get mailbox corruption lets attempt to + * close the conn and recover. We can put the conn/mailbox into + * purgatory and let purgatory deal with the problem. If we see + * this NETTERROR reported on production systems in large amounts + * we will need to revisit the state machine to see if we can tighten + * it up further to improve data protection. + */ + if (rrc == GNI_RC_INVALID_STATE) { - LIBCFS_ALLOC(memory, conn->gnpr_smsg_attr.buff_size); - if (memory == NULL) { - memory = (void *)0xdeadbeef; - } else { - memcpy(memory, conn->gnpr_smsg_attr.msg_buffer + conn->gnpr_smsg_attr.mbox_offset, conn->gnpr_smsg_attr.buff_size); - } + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + GNIDBG_CONN(D_NETERROR | D_CONSOLE, conn, "Mailbox corruption " + "detected closing conn %p from peer %s\n", conn, + libcfs_nid2str(conn->gnc_peer->gnp_nid)); + rc = -EIO; + kgnilnd_close_conn(conn, rc); + RETURN_EXIT; } LASSERTF(rrc == GNI_RC_SUCCESS, - "bad rc %d on conn %p from peer %s mailbox copy %p\n", - rrc, conn, libcfs_nid2str(peer->gnp_nid), memory); + "bad rc %d on conn %p from peer %s\n", + rrc, conn, libcfs_nid2str(peer->gnp_nid)); msg = (kgn_msg_t *)prefix; rx = kgnilnd_alloc_rx(); if (rx == NULL) { - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); kgnilnd_release_msg(conn); GNIDBG_MSG(D_NETERROR, msg, "Dropping SMSG RX from 0x%p->%s, no RX memory", conn, libcfs_nid2str(peer->gnp_nid)); RETURN_EXIT; } - GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p from %s", - conn, libcfs_nid2str(peer->gnp_nid)); + GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p", conn); timestamp = conn->gnc_last_rx; - last_seq = conn->gnc_rx_seq; + seq = last_seq = atomic_read(&conn->gnc_rx_seq); + atomic_inc(&conn->gnc_rx_seq); conn->gnc_last_rx = jiffies; /* stash first rx so we can clear out purgatory @@ -3628,10 +4198,8 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) if (conn->gnc_first_rx == 0) conn->gnc_first_rx = jiffies; - seq = conn->gnc_rx_seq++; - /* needs to linger to protect gnc_rx_seq like we do with gnc_tx_seq */ - mutex_unlock(&conn->gnc_device->gnd_cq_mutex); + kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex); kgnilnd_peer_alive(conn->gnc_peer); rx->grx_msg = msg; @@ -3688,10 +4256,12 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) /* NB message type checked below; NOT here... */ switch (msg->gnm_type) { + case GNILND_MSG_GET_ACK_REV: case GNILND_MSG_PUT_ACK: kgnilnd_swab_rdma_desc(&msg->gnm_u.putack.gnpam_desc); break; + case GNILND_MSG_PUT_REQ_REV: case GNILND_MSG_GET_REQ: kgnilnd_swab_rdma_desc(&msg->gnm_u.get.gngm_desc); break; @@ -3753,7 +4323,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) conn, last_seq, cfs_duration_sec(now - timestamp), cfs_duration_sec(now - conn->gnc_last_rx_cq), - conn->gnc_tx_seq, + atomic_read(&conn->gnc_tx_seq), cfs_duration_sec(now - conn->gnc_last_tx), cfs_duration_sec(now - conn->gnc_last_tx_cq), cfs_duration_sec(now - conn->gnc_last_noop_want), @@ -3795,13 +4365,20 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) msg->gnm_srcnid, rx, 0); repost = rc < 0; break; - + case GNILND_MSG_GET_REQ_REV: case GNILND_MSG_PUT_REQ: rc = lnet_parse(net->gnn_ni, &msg->gnm_u.putreq.gnprm_hdr, msg->gnm_srcnid, rx, 1); repost = rc < 0; break; + case GNILND_MSG_GET_NAK_REV: + tx = kgnilnd_match_reply_either(conn, GNILND_MSG_GET_REQ_REV, GNILND_MSG_GET_ACK_REV, + msg->gnm_u.completion.gncm_cookie); + if (tx == NULL) + break; + kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval); + break; case GNILND_MSG_PUT_NAK: tx = kgnilnd_match_reply_either(conn, GNILND_MSG_PUT_REQ, GNILND_MSG_PUT_ACK, msg->gnm_u.completion.gncm_cookie); @@ -3810,7 +4387,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval); break; - case GNILND_MSG_PUT_ACK: tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ, msg->gnm_u.putack.gnpam_src_cookie); @@ -3848,7 +4424,42 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) kgnilnd_tx_done(tx, rc); } break; + case GNILND_MSG_GET_ACK_REV: + tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_REQ_REV, + msg->gnm_u.putack.gnpam_src_cookie); + if (tx == NULL) + break; + + /* store putack data for later: deferred rdma or re-try */ + tx->tx_putinfo = msg->gnm_u.putack; + saw_complete = 0; + spin_lock(&tx->tx_conn->gnc_list_lock); + + GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_REPLY, + "not waiting for reply", NULL); + + tx->tx_state &= ~GNILND_TX_WAITING_REPLY; + + if (likely(!(tx->tx_state & GNILND_TX_WAITING_COMPLETION))) { + kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD); + /* sample under lock as follow on steps require gnc_list_lock + * - or call kgnilnd_tx_done which requires no locks held over + * call to lnet_finalize */ + saw_complete = 1; + } else { + /* cannot launch rdma if still waiting for fma-msg completion */ + CDEBUG(D_NET, "tx 0x%p type 0x%02x will need to " + "wait for SMSG completion\n", tx, tx->tx_msg.gnm_type); + tx->tx_state |= GNILND_TX_PENDING_RDMA; + } + spin_unlock(&tx->tx_conn->gnc_list_lock); + if (saw_complete) { + rc = kgnilnd_send_mapped_tx(tx, 0); + if (rc < 0) + kgnilnd_tx_done(tx, rc); + } + break; case GNILND_MSG_PUT_DONE: tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_ACK, msg->gnm_u.completion.gncm_cookie); @@ -3861,7 +4472,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) kgnilnd_finalize_rx_done(tx, msg); break; - + case GNILND_MSG_PUT_REQ_REV: case GNILND_MSG_GET_REQ: rc = lnet_parse(net->gnn_ni, &msg->gnm_u.get.gngm_hdr, msg->gnm_srcnid, rx, 1); @@ -3896,6 +4507,45 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn) kgnilnd_finalize_rx_done(tx, msg); break; + case GNILND_MSG_GET_DONE_REV: + tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_ACK_REV, + msg->gnm_u.completion.gncm_cookie); + if (tx == NULL) + break; + + GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED || + tx->tx_buftype == GNILND_BUF_VIRT_MAPPED, + "bad tx buftype %d", tx->tx_buftype); + + kgnilnd_finalize_rx_done(tx, msg); + break; + + case GNILND_MSG_PUT_DONE_REV: + tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV, + msg->gnm_u.completion.gncm_cookie); + + if (tx == NULL) + break; + + GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED || + tx->tx_buftype == GNILND_BUF_VIRT_MAPPED, + "bad tx buftype %d", tx->tx_buftype); + + kgnilnd_finalize_rx_done(tx, msg); + break; + case GNILND_MSG_PUT_NAK_REV: + tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV, + msg->gnm_u.completion.gncm_cookie); + + if (tx == NULL) + break; + + GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED || + tx->tx_buftype == GNILND_BUF_VIRT_MAPPED, + "bad tx buftype %d", tx->tx_buftype); + + kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval); + break; } out: @@ -4031,7 +4681,10 @@ kgnilnd_send_conn_close(kgn_conn_t *conn) } } + /* When changing gnc_state we need to take the kgn_peer_conn_lock */ + write_lock(&kgnilnd_data.kgn_peer_conn_lock); conn->gnc_state = GNILND_CONN_CLOSED; + write_unlock(&kgnilnd_data.kgn_peer_conn_lock); /* mark this conn as CLOSED now that we processed it * do after TX, so we can use CLOSING in asserts */ @@ -4053,13 +4706,15 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev) int found_work = 0; int rc = 0; kgn_tx_t *tx; - int max_retrans = *kgnilnd_tunables.kgn_max_retransmits; + int fast_remaps = GNILND_FAST_MAPPING_TRY; int log_retrans, log_retrans_level; static int last_map_version; ENTRY; spin_lock(&dev->gnd_lock); if (list_empty(&dev->gnd_map_tx)) { + /* if the list is empty make sure we dont have a timer running */ + del_singleshot_timer_sync(&dev->gnd_map_timer); spin_unlock(&dev->gnd_lock); RETURN(0); } @@ -4070,13 +4725,23 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev) * backing off until our map version changes - indicating we unmapped * something */ tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list); - if ((tx->tx_retrans > (max_retrans / 4)) && - (last_map_version == dev->gnd_map_version)) { + if (likely(dev->gnd_map_attempt == 0) || + time_after_eq(jiffies, dev->gnd_next_map) || + last_map_version != dev->gnd_map_version) { + + /* if this is our first attempt at mapping set last mapped to current + * jiffies so we can timeout our attempt correctly. + */ + if (dev->gnd_map_attempt == 0) + dev->gnd_last_map = jiffies; + } else { GNIDBG_TX(D_NET, tx, "waiting for mapping event event to retry", NULL); spin_unlock(&dev->gnd_lock); RETURN(0); } + /* delete the previous timer if it exists */ + del_singleshot_timer_sync(&dev->gnd_map_timer); /* stash the last map version to let us know when a good one was seen */ last_map_version = dev->gnd_map_version; @@ -4116,28 +4781,64 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev) * this function is called again - we operate on a copy of the original * list and not the live list */ spin_lock(&dev->gnd_lock); + /* reset map attempts back to zero we successfully + * mapped so we can reset our timers */ + dev->gnd_map_attempt = 0; continue; + } else if (rc == -EAGAIN) { + spin_lock(&dev->gnd_lock); + mod_timer(&dev->gnd_map_timer, dev->gnd_next_map); + spin_unlock(&dev->gnd_lock); + GOTO(get_out_mapped, rc); } else if (rc != -ENOMEM) { /* carp, failure we can't handle */ kgnilnd_tx_done(tx, rc); spin_lock(&dev->gnd_lock); + /* reset map attempts back to zero we dont know what happened but it + * wasnt a failed mapping + */ + dev->gnd_map_attempt = 0; continue; } - /* time to handle the retry cases.. */ - tx->tx_retrans++; - if (tx->tx_retrans == 1) - tx->tx_qtime = jiffies; + /* time to handle the retry cases.. lock so we dont have 2 threads + * mucking with gnd_map_attempt, or gnd_next_map at the same time. + */ + spin_lock(&dev->gnd_lock); + dev->gnd_map_attempt++; + if (dev->gnd_map_attempt < fast_remaps) { + /* do nothing we just want it to go as fast as possible. + * just set gnd_next_map to current jiffies so it will process + * as fast as possible. + */ + dev->gnd_next_map = jiffies; + } else { + /* Retry based on GNILND_MAP_RETRY_RATE */ + dev->gnd_next_map = jiffies + GNILND_MAP_RETRY_RATE; + } - /* only log occasionally once we've retried max / 2 */ - log_retrans = (tx->tx_retrans >= (max_retrans / 2)) && - ((tx->tx_retrans % 32) == 0); + /* only log occasionally once we've retried fast_remaps */ + log_retrans = (dev->gnd_map_attempt >= fast_remaps) && + ((dev->gnd_map_attempt % fast_remaps) == 0); log_retrans_level = log_retrans ? D_NETERROR : D_NET; /* make sure we are not off in the weeds with this tx */ - if (tx->tx_retrans > *kgnilnd_tunables.kgn_max_retransmits) { + if (time_after(jiffies, dev->gnd_last_map + GNILND_MAP_TIMEOUT)) { GNIDBG_TX(D_NETERROR, tx, "giving up on TX, too many retries", NULL); + spin_unlock(&dev->gnd_lock); + if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ || + tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ_REV) { + kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, + -ENOMEM, + tx->tx_putinfo.gnpam_dst_cookie, + tx->tx_msg.gnm_srcnid); + } else { + kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, + -ENOMEM, + tx->tx_getinfo.gngm_cookie, + tx->tx_msg.gnm_srcnid); + } kgnilnd_tx_done(tx, -ENOMEM); GOTO(get_out_mapped, rc); } else { @@ -4145,7 +4846,7 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev) "transient map failure #%d %d pages/%d bytes phys %u@%u " "virt %u@"LPU64" " "nq_map %d mdd# %d/%d GART %ld", - tx->tx_retrans, tx->tx_phys_npages, tx->tx_nob, + dev->gnd_map_attempt, tx->tx_phys_npages, tx->tx_nob, dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE, dev->gnd_map_nvirt, dev->gnd_map_virtnob, atomic_read(&dev->gnd_nq_map), @@ -4154,7 +4855,8 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev) } /* we need to stop processing the rest of the list, so add it back in */ - spin_lock(&dev->gnd_lock); + /* set timer to wake device when we need to schedule this tx */ + mod_timer(&dev->gnd_map_timer, dev->gnd_next_map); kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0); spin_unlock(&dev->gnd_lock); GOTO(get_out_mapped, rc); @@ -4165,16 +4867,20 @@ get_out_mapped: } int -kgnilnd_process_conns(kgn_device_t *dev) +kgnilnd_process_conns(kgn_device_t *dev, unsigned long deadline) { int found_work = 0; int conn_sched; int intent = 0; + int error_inject = 0; + int rc = 0; kgn_conn_t *conn; spin_lock(&dev->gnd_lock); - while (!list_empty(&dev->gnd_ready_conns)) { + while (!list_empty(&dev->gnd_ready_conns) && time_before(jiffies, deadline)) { dev->gnd_sched_alive = jiffies; + error_inject = 0; + rc = 0; if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) { /* break with lock held */ @@ -4201,10 +4907,16 @@ kgnilnd_process_conns(kgn_device_t *dev) if (kgnilnd_check_conn_fail_loc(dev, conn, &intent)) { /* based on intent see if we should run again. */ - kgnilnd_schedule_process_conn(conn, intent); - + rc = kgnilnd_schedule_process_conn(conn, intent); + error_inject = 1; /* drop ref from gnd_ready_conns */ + if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) { + down_write(&dev->gnd_conn_sem); + kgnilnd_conn_decref(conn); + up_write(&dev->gnd_conn_sem); + } else if (rc != 1) { kgnilnd_conn_decref(conn); + } /* clear this so that scheduler thread doesn't spin */ found_work = 0; /* break with lock held... */ @@ -4213,30 +4925,60 @@ kgnilnd_process_conns(kgn_device_t *dev) } if (unlikely(conn->gnc_state == GNILND_CONN_CLOSED)) { + down_write(&dev->gnd_conn_sem); + /* CONN_CLOSED set in procces_fmaq when CLOSE is sent */ - kgnilnd_complete_closed_conn(conn); + if (unlikely(atomic_read(&conn->gnc_tx_in_use))) { + /* If there are tx's currently in use in another + * thread we dont want to complete the close + * yet. Cycle this conn back through + * the scheduler. */ + kgnilnd_schedule_conn(conn); + } else { + kgnilnd_complete_closed_conn(conn); + } + up_write(&dev->gnd_conn_sem); } else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) { /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */ /* serialize SMSG CQs with ep_bind and smsg_release */ + down_write(&dev->gnd_conn_sem); kgnilnd_destroy_conn_ep(conn); + up_write(&dev->gnd_conn_sem); } else if (unlikely(conn->gnc_state == GNILND_CONN_CLOSING)) { /* if we need to do some CLOSE sending, etc done here do it */ + down_write(&dev->gnd_conn_sem); kgnilnd_send_conn_close(conn); kgnilnd_check_fma_rx(conn); + up_write(&dev->gnd_conn_sem); } else if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) == 0) { /* start moving traffic if the old conns are cleared out */ + down_read(&dev->gnd_conn_sem); kgnilnd_check_fma_rx(conn); kgnilnd_process_fmaq(conn); + up_read(&dev->gnd_conn_sem); } - kgnilnd_schedule_process_conn(conn, 0); + rc = kgnilnd_schedule_process_conn(conn, 0); /* drop ref from gnd_ready_conns */ + if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) { + down_write(&dev->gnd_conn_sem); + kgnilnd_conn_decref(conn); + up_write(&dev->gnd_conn_sem); + } else if (rc != 1) { kgnilnd_conn_decref(conn); + } /* check list again with lock held */ spin_lock(&dev->gnd_lock); } + + /* If we are short circuiting due to timing we want to be scheduled + * as soon as possible. + */ + if (!list_empty(&dev->gnd_ready_conns) && !error_inject) + found_work++; + spin_unlock(&dev->gnd_lock); RETURN(found_work); @@ -4246,20 +4988,18 @@ int kgnilnd_scheduler(void *arg) { int threadno = (long)arg; - kgn_device_t *dev; - char name[16]; - int busy_loops = 0; + kgn_device_t *dev; + int busy_loops = 0; + unsigned long deadline = 0; DEFINE_WAIT(wait); dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs]; - snprintf(name, sizeof(name), "kgnilnd_sd_%02d", threadno); - cfs_daemonize(name); cfs_block_allsigs(); /* all gnilnd threads need to run fairly urgently */ - set_user_nice(current, *kgnilnd_tunables.kgn_nice); - + set_user_nice(current, *kgnilnd_tunables.kgn_sched_nice); + deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout); while (!kgnilnd_data.kgn_shutdown) { int found_work = 0; /* Safe: kgn_shutdown only set when quiescent */ @@ -4273,12 +5013,15 @@ kgnilnd_scheduler(void *arg) /* tracking for when thread goes AWOL */ dev->gnd_sched_alive = jiffies; + CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_SCHED_DEADLINE, + (*kgnilnd_tunables.kgn_sched_timeout + 1)); /* let folks know we are up and kicking * - they can use this for latency savings, etc * - only change if IRQ, if IDLE leave alone as that * schedule_device calls to put us back to IRQ */ (void)cmpxchg(&dev->gnd_ready, GNILND_DEV_IRQ, GNILND_DEV_LOOP); + down_read(&dev->gnd_conn_sem); /* always check these - they are super low cost */ found_work += kgnilnd_check_fma_send_cq(dev); found_work += kgnilnd_check_fma_rcv_cq(dev); @@ -4299,21 +5042,23 @@ kgnilnd_scheduler(void *arg) * transistion * ...should.... */ + up_read(&dev->gnd_conn_sem); + /* process all conns ready now */ - found_work += kgnilnd_process_conns(dev); + found_work += kgnilnd_process_conns(dev, deadline); /* do an eager check to avoid the IRQ disabling in * prepare_to_wait and friends */ - if (found_work && busy_loops++ < *kgnilnd_tunables.kgn_loops) { + if (found_work && + (busy_loops++ < *kgnilnd_tunables.kgn_loops) && + time_before(jiffies, deadline)) { found_work = 0; if ((busy_loops % 10) == 0) { /* tickle heartbeat and watchdog to ensure our * piggishness doesn't turn into heartbeat failure */ touch_nmi_watchdog(); - if (kgnilnd_hssops.hb_to_l0 != NULL) { - kgnilnd_hssops.hb_to_l0(); - } + kgnilnd_hw_hb(); } continue; } @@ -4332,7 +5077,8 @@ kgnilnd_scheduler(void *arg) found_work += xchg(&dev->gnd_ready, GNILND_DEV_IDLE); - if (busy_loops >= *kgnilnd_tunables.kgn_loops) { + if ((busy_loops >= *kgnilnd_tunables.kgn_loops) || + time_after_eq(jiffies, deadline)) { CDEBUG(D_INFO, "yeilding: found_work %d busy_loops %d\n", found_work, busy_loops); @@ -4346,8 +5092,10 @@ kgnilnd_scheduler(void *arg) * again. yield() ensures we wake up without another * waitq poke in that case */ atomic_inc(&dev->gnd_n_yield); + kgnilnd_data.kgn_last_condresched = jiffies; yield(); CDEBUG(D_INFO, "awake after yeild\n"); + deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout); } else if (found_work == GNILND_DEV_IDLE) { /* busy_loops is low and there is nothing to do, * go to sleep and wait for a waitq poke */ @@ -4355,8 +5103,10 @@ kgnilnd_scheduler(void *arg) "scheduling: found_work %d busy_loops %d\n", found_work, busy_loops); atomic_inc(&dev->gnd_n_schedule); + kgnilnd_data.kgn_last_scheduled = jiffies; schedule(); CDEBUG(D_INFO, "awake after schedule\n"); + deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout); } finish_wait(&dev->gnd_waitq, &wait); }