X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fmxlnd%2Fmxlnd_cb.c;h=ddc3bed4b30568e7eec0ff74c32e69ae556c01f3;hp=6dee5550b4ab099ec9c9ca10cb8bb3e916046c45;hb=c2d26c52d413e7525fcc419dafbb3e381d4b1505;hpb=f64ddbc08a7fc1c5a1231f469e8024cea3d85900 diff --git a/lnet/klnds/mxlnd/mxlnd_cb.c b/lnet/klnds/mxlnd/mxlnd_cb.c index 6dee555..ddc3bed 100644 --- a/lnet/klnds/mxlnd/mxlnd_cb.c +++ b/lnet/klnds/mxlnd/mxlnd_cb.c @@ -52,11 +52,6 @@ mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa) return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa))); } -inline void mxlnd_noop(char *s, ...) -{ - return; -} - char * mxlnd_ctxstate_to_str(int mxc_state) { @@ -86,10 +81,6 @@ mxlnd_connstatus_to_str(int mxk_status) return "MXLND_CONN_READY"; case MXLND_CONN_INIT: return "MXLND_CONN_INIT"; - case MXLND_CONN_REQ: - return "MXLND_CONN_REQ"; - case MXLND_CONN_ACK: - return "MXLND_CONN_ACK"; case MXLND_CONN_WAIT: return "MXLND_CONN_WAIT"; case MXLND_CONN_DISCONNECT: @@ -150,14 +141,13 @@ mxlnd_lnetmsg_to_str(int type) } static inline u64 -//mxlnd_create_match(u8 msg_type, u8 error, u64 cookie) -mxlnd_create_match(struct kmx_ctx *ctx, u8 error) +mxlnd_create_match(kmx_ctx_t *ctx, u8 error) { u64 type = (u64) ctx->mxc_msg_type; u64 err = (u64) error; u64 match = 0ULL; - LASSERT(ctx->mxc_msg_type != 0); + mxlnd_valid_msg_type(ctx->mxc_msg_type); LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0); match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie; return match; @@ -169,42 +159,34 @@ mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie) *msg_type = (u8) MXLND_MSG_TYPE(match); *error = (u8) MXLND_ERROR_VAL(match); *cookie = match & MXLND_MAX_COOKIE; - LASSERT(*msg_type == MXLND_MSG_EAGER || - *msg_type == MXLND_MSG_ICON_REQ || - *msg_type == MXLND_MSG_CONN_REQ || - *msg_type == MXLND_MSG_ICON_ACK || - *msg_type == MXLND_MSG_CONN_ACK || - *msg_type == MXLND_MSG_BYE || - *msg_type == MXLND_MSG_NOOP || - *msg_type == MXLND_MSG_PUT_REQ || - *msg_type == MXLND_MSG_PUT_ACK || - *msg_type == MXLND_MSG_PUT_DATA || - *msg_type == MXLND_MSG_GET_REQ || - *msg_type == MXLND_MSG_GET_DATA); + mxlnd_valid_msg_type(*msg_type); return; } -struct kmx_ctx * -mxlnd_get_idle_rx(void) +kmx_ctx_t * +mxlnd_get_idle_rx(kmx_conn_t *conn) { - struct list_head *tmp = NULL; - struct kmx_ctx *rx = NULL; + struct list_head *rxs = NULL; + kmx_ctx_t *rx = NULL; + + LASSERT(conn != NULL); - spin_lock(&kmxlnd_data.kmx_rx_idle_lock); + rxs = &conn->mxk_rx_idle; + + spin_lock(&conn->mxk_lock); - if (list_empty (&kmxlnd_data.kmx_rx_idle)) { - spin_unlock(&kmxlnd_data.kmx_rx_idle_lock); + if (list_empty (rxs)) { + spin_unlock(&conn->mxk_lock); return NULL; } - tmp = &kmxlnd_data.kmx_rx_idle; - rx = list_entry (tmp->next, struct kmx_ctx, mxc_list); + rx = list_entry (rxs->next, kmx_ctx_t, mxc_list); list_del_init(&rx->mxc_list); - spin_unlock(&kmxlnd_data.kmx_rx_idle_lock); + spin_unlock(&conn->mxk_lock); #if MXLND_DEBUG if (rx->mxc_get != rx->mxc_put) { - CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put); + CDEBUG(D_NETERROR, "*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put); CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation); CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline); CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state)); @@ -222,56 +204,35 @@ mxlnd_get_idle_rx(void) LASSERT (rx->mxc_state == MXLND_CTX_IDLE); rx->mxc_state = MXLND_CTX_PREP; + rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; return rx; } int -mxlnd_put_idle_rx(struct kmx_ctx *rx) +mxlnd_put_idle_rx(kmx_ctx_t *rx) { - if (rx == NULL) { - CDEBUG(D_NETERROR, "called with NULL pointer\n"); - return -EINVAL; - } else if (rx->mxc_type != MXLND_REQ_RX) { - CDEBUG(D_NETERROR, "called with tx\n"); - return -EINVAL; - } - LASSERT(rx->mxc_get == rx->mxc_put + 1); + kmx_conn_t *conn = rx->mxc_conn; + struct list_head *rxs = &conn->mxk_rx_idle; + + LASSERT(rx->mxc_type == MXLND_REQ_RX); + mxlnd_ctx_init(rx); + rx->mxc_put++; - spin_lock(&kmxlnd_data.kmx_rx_idle_lock); - list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle); - spin_unlock(&kmxlnd_data.kmx_rx_idle_lock); - return 0; -} + LASSERT(rx->mxc_get == rx->mxc_put); -int -mxlnd_reduce_idle_rxs(__u32 count) -{ - __u32 i = 0; - struct kmx_ctx *rx = NULL; - - spin_lock(&kmxlnd_data.kmx_rxs_lock); - for (i = 0; i < count; i++) { - rx = mxlnd_get_idle_rx(); - if (rx != NULL) { - struct list_head *tmp = &rx->mxc_global_list; - list_del_init(tmp); - mxlnd_ctx_free(rx); - } else { - CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count); - break; - } - } - spin_unlock(&kmxlnd_data.kmx_rxs_lock); + spin_lock(&conn->mxk_lock); + list_add(&rx->mxc_list, rxs); + spin_unlock(&conn->mxk_lock); return 0; } -struct kmx_ctx * +kmx_ctx_t * mxlnd_get_idle_tx(void) { - struct list_head *tmp = NULL; - struct kmx_ctx *tx = NULL; + struct list_head *tmp = &kmxlnd_data.kmx_tx_idle; + kmx_ctx_t *tx = NULL; spin_lock(&kmxlnd_data.kmx_tx_idle_lock); @@ -282,7 +243,7 @@ mxlnd_get_idle_tx(void) } tmp = &kmxlnd_data.kmx_tx_idle; - tx = list_entry (tmp->next, struct kmx_ctx, mxc_list); + tx = list_entry (tmp->next, kmx_ctx_t, mxc_list); list_del_init(&tx->mxc_list); /* Allocate a new completion cookie. It might not be needed, @@ -304,61 +265,100 @@ mxlnd_get_idle_tx(void) LASSERT (tx->mxc_lntmsg[1] == NULL); tx->mxc_state = MXLND_CTX_PREP; + tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; return tx; } void -mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye); +mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye); int -mxlnd_put_idle_tx(struct kmx_ctx *tx) +mxlnd_put_idle_tx(kmx_ctx_t *tx) { - //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED); int result = 0; lnet_msg_t *lntmsg[2]; - if (tx == NULL) { - CDEBUG(D_NETERROR, "called with NULL pointer\n"); - return -EINVAL; - } else if (tx->mxc_type != MXLND_REQ_TX) { - CDEBUG(D_NETERROR, "called with rx\n"); - return -EINVAL; - } - if (!(tx->mxc_status.code == MX_STATUS_SUCCESS || - tx->mxc_status.code == MX_STATUS_TRUNCATED)) { - struct kmx_conn *conn = tx->mxc_conn; + LASSERT(tx->mxc_type == MXLND_REQ_TX); + + if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) { + kmx_conn_t *conn = tx->mxc_conn; result = -EIO; + if (tx->mxc_errno != 0) result = tx->mxc_errno; + /* FIXME should we set mx_dis? */ mxlnd_conn_disconnect(conn, 0, 1); } lntmsg[0] = tx->mxc_lntmsg[0]; lntmsg[1] = tx->mxc_lntmsg[1]; - LASSERT(tx->mxc_get == tx->mxc_put + 1); mxlnd_ctx_init(tx); + tx->mxc_put++; + LASSERT(tx->mxc_get == tx->mxc_put); + spin_lock(&kmxlnd_data.kmx_tx_idle_lock); list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle); kmxlnd_data.kmx_tx_used--; spin_unlock(&kmxlnd_data.kmx_tx_idle_lock); + if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result); if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result); return 0; } + +void +mxlnd_connparams_free(kmx_connparams_t *cp) +{ + LASSERT(list_empty(&cp->mxr_list)); + MXLND_FREE(cp, sizeof(*cp)); + return; +} + +int +mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context, + mx_endpoint_addr_t epa, u64 match, u32 length, + kmx_conn_t *conn, kmx_peer_t *peer, void *data) +{ + kmx_connparams_t *c = NULL; + + MXLND_ALLOC(c, sizeof(*c)); + if (!c) return -ENOMEM; + + INIT_LIST_HEAD(&c->mxr_list); + c->mxr_context = context; + c->mxr_epa = epa; + c->mxr_match = match; + c->mxr_nob = length; + c->mxr_conn = conn; + c->mxr_peer = peer; + c->mxr_msg = *((kmx_msg_t *) data); + + *cp = c; + return 0; +} + +static inline void +mxlnd_set_conn_status(kmx_conn_t *conn, int status) +{ + conn->mxk_status = status; + mb(); +} + /** - * Free the conn - * \param conn a kmx_conn pointer + * mxlnd_conn_free_locked - free the conn + * @conn - a kmx_conn pointer * * The calling function should remove the conn from the conns list first - * then destroy it. + * then destroy it. Caller should have write-locked kmx_global_lock. */ void -mxlnd_conn_free(struct kmx_conn *conn) +mxlnd_conn_free_locked(kmx_conn_t *conn) { - struct kmx_peer *peer = conn->mxk_peer; + int valid = !mxlnd_endpoint_addr_null(conn->mxk_epa); + kmx_peer_t *peer = conn->mxk_peer; CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn); LASSERT (list_empty (&conn->mxk_tx_credit_queue) && @@ -368,31 +368,61 @@ mxlnd_conn_free(struct kmx_conn *conn) list_del_init(&conn->mxk_list); if (peer->mxp_conn == conn) { peer->mxp_conn = NULL; - if (!mxlnd_endpoint_addr_null(conn->mxk_epa)) - mx_set_endpoint_addr_context(conn->mxk_epa, - (void *) NULL); + if (valid) { + kmx_conn_t *temp = NULL; + + mx_get_endpoint_addr_context(conn->mxk_epa, + (void **) &temp); + if (conn == temp) { + mx_set_endpoint_addr_context(conn->mxk_epa, + (void *) NULL); + } + } + /* unlink from global list and drop its ref */ + list_del_init(&peer->mxp_list); + mxlnd_peer_decref(peer); + } + } + mxlnd_peer_decref(peer); /* drop conn's ref to peer */ + if (conn->mxk_rx_pages) { + LASSERT (conn->mxk_rxs != NULL); + mxlnd_free_pages(conn->mxk_rx_pages); + } + if (conn->mxk_rxs) { + int i = 0; + kmx_ctx_t *rx = NULL; + + for (i = 0; i < MXLND_RX_MSGS(); i++) { + rx = &conn->mxk_rxs[i]; + if (rx->mxc_seg_list != NULL) { + LASSERT(rx->mxc_nseg > 0); + MXLND_FREE(rx->mxc_seg_list, + rx->mxc_nseg * + sizeof(*rx->mxc_seg_list)); + } } + MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t)); } - mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */ - MXLND_FREE (conn, sizeof (*conn)); + + MXLND_FREE(conn, sizeof (*conn)); return; } -void -mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn) +int +mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn) { - int found = 0; - struct kmx_ctx *ctx = NULL; - struct kmx_ctx *next = NULL; - mx_return_t mxret = MX_SUCCESS; - u32 result = 0; + int found = 0; + int count = 0; + kmx_ctx_t *ctx = NULL; + kmx_ctx_t *next = NULL; + mx_return_t mxret = MX_SUCCESS; + u32 result = 0; do { found = 0; spin_lock(&conn->mxk_lock); list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) { - /* we will delete all including txs */ list_del_init(&ctx->mxc_list); if (ctx->mxc_type == MXLND_REQ_RX) { found = 1; @@ -403,13 +433,16 @@ mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn) CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret); } if (result == 1) { - ctx->mxc_status.code = -ECONNABORTED; + ctx->mxc_errno = -ECONNABORTED; ctx->mxc_state = MXLND_CTX_CANCELED; - /* NOTE this calls lnet_finalize() and - * we cannot hold any locks when calling it. - * It also calls mxlnd_conn_decref(conn) */ spin_unlock(&conn->mxk_lock); - mxlnd_handle_rx_completion(ctx); + spin_lock(&kmxlnd_data.kmx_conn_lock); + /* we may be holding the global lock, + * move to orphan list so that it can free it */ + list_add_tail(&ctx->mxc_list, + &kmxlnd_data.kmx_orphan_msgs); + count++; + spin_unlock(&kmxlnd_data.kmx_conn_lock); spin_lock(&conn->mxk_lock); } break; @@ -419,38 +452,20 @@ mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn) } while (found); - return; + return count; } -/** - * Shutdown a connection - * \param conn a kmx_conn pointer - * \param mx_dis call mx_disconnect() - * \param send_bye send peer a BYE msg - * - * This function sets the status to DISCONNECT, completes queued - * txs with failure, calls mx_disconnect, which will complete - * pending txs and matched rxs with failure. - */ -void -mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye) +int +mxlnd_cancel_queued_txs(kmx_conn_t *conn) { - mx_endpoint_addr_t epa = conn->mxk_epa; + int count = 0; struct list_head *tmp = NULL; - int valid = !mxlnd_endpoint_addr_null(epa); spin_lock(&conn->mxk_lock); - if (conn->mxk_status == MXLND_CONN_DISCONNECT) { - spin_unlock(&conn->mxk_lock); - return; - } - conn->mxk_status = MXLND_CONN_DISCONNECT; - conn->mxk_timeout = 0; - while (!list_empty(&conn->mxk_tx_free_queue) || !list_empty(&conn->mxk_tx_credit_queue)) { - struct kmx_ctx *tx = NULL; + kmx_ctx_t *tx = NULL; if (!list_empty(&conn->mxk_tx_free_queue)) { tmp = &conn->mxk_tx_free_queue; @@ -458,33 +473,78 @@ mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye) tmp = &conn->mxk_tx_credit_queue; } - tx = list_entry(tmp->next, struct kmx_ctx, mxc_list); + tx = list_entry(tmp->next, kmx_ctx_t, mxc_list); list_del_init(&tx->mxc_list); - tx->mxc_status.code = -ECONNABORTED; spin_unlock(&conn->mxk_lock); - mxlnd_put_idle_tx(tx); - mxlnd_conn_decref(conn); /* for this tx */ + tx->mxc_errno = -ECONNABORTED; + tx->mxc_state = MXLND_CTX_CANCELED; + /* move to orphan list and then abort */ + spin_lock(&kmxlnd_data.kmx_conn_lock); + list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs); + spin_unlock(&kmxlnd_data.kmx_conn_lock); + count++; spin_lock(&conn->mxk_lock); } + spin_unlock(&conn->mxk_lock); + + return count; +} + +void +mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie) +{ + u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) | + (((u64) error) << MXLND_ERROR_OFFSET) | cookie; + + mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL, + epa, match, NULL, NULL); + return; +} + +/** + * mxlnd_conn_disconnect - shutdown a connection + * @conn - a kmx_conn pointer + * @mx_dis - call mx_disconnect() + * @send_bye - send peer a BYE msg + * + * This function sets the status to DISCONNECT, completes queued + * txs with failure, calls mx_disconnect, which will complete + * pending txs and matched rxs with failure. + */ +void +mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye) +{ + mx_endpoint_addr_t epa = conn->mxk_epa; + int valid = !mxlnd_endpoint_addr_null(epa); + int count = 0; + spin_lock(&conn->mxk_lock); + if (conn->mxk_status == MXLND_CONN_DISCONNECT) { + spin_unlock(&conn->mxk_lock); + return; + } + mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT); + conn->mxk_timeout = 0; spin_unlock(&conn->mxk_lock); - /* cancel pending rxs */ - mxlnd_conn_cancel_pending_rxs(conn); + count = mxlnd_cancel_queued_txs(conn); + count += mxlnd_conn_cancel_pending_rxs(conn); + + if (count) + up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */ - if (send_bye && valid) { - u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET; + if (send_bye && valid && + conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) { /* send a BYE to the peer */ CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__, libcfs_nid2str(conn->mxk_peer->mxp_nid)); - mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL, - epa, match, NULL, NULL); + mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0); /* wait to allow the peer to ack our message */ mxlnd_sleep(msecs_to_jiffies(20)); } - if (kmxlnd_data.kmx_shutdown != 1) { - unsigned long last_msg = 0; + if (atomic_read(&kmxlnd_data.kmx_shutdown) != 1) { + unsigned long last_msg = 0; /* notify LNET that we are giving up on this peer */ if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) @@ -494,7 +554,8 @@ mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye) lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg); - if (mx_dis && valid) + if (mx_dis && valid && + (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0))) mx_disconnect(kmxlnd_data.kmx_endpt, epa); } mxlnd_conn_decref(conn); /* drop the owning peer's reference */ @@ -503,16 +564,24 @@ mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye) } /** - * Allocate and initialize a new conn struct - * \param connp address of a kmx_conn pointer - * \param peer owning kmx_peer + * mxlnd_conn_alloc - allocate and initialize a new conn struct + * @connp - address of a kmx_conn pointer + * @peer - owning kmx_peer * * Returns 0 on success and -ENOMEM on failure */ int -mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer) +mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer) { - struct kmx_conn *conn = NULL; + int i = 0; + int ret = 0; + int ipage = 0; + int offset = 0; + void *addr = NULL; + kmx_conn_t *conn = NULL; + kmx_pages_t *pages = NULL; + struct page *page = NULL; + kmx_ctx_t *rx = NULL; LASSERT(peer != NULL); @@ -525,19 +594,58 @@ mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer) memset(conn, 0, sizeof(*conn)); - /* conn->mxk_incarnation = 0 - will be set by peer */ - atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer - and one for the caller */ + ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES()); + if (ret != 0) { + CERROR("Can't allocate rx pages\n"); + MXLND_FREE(conn, sizeof(*conn)); + return -ENOMEM; + } + conn->mxk_rx_pages = pages; + + MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t)); + if (conn->mxk_rxs == NULL) { + CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS()); + mxlnd_free_pages(pages); + MXLND_FREE(conn, sizeof(*conn)); + return -ENOMEM; + } + + memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t)); + conn->mxk_peer = peer; - /* mxk_epa - to be set after mx_iconnect() */ INIT_LIST_HEAD(&conn->mxk_list); + INIT_LIST_HEAD(&conn->mxk_zombie); + atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer + and one for the caller */ + if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) { + u64 nic_id = 0ULL; + u32 ep_id = 0; + + /* this is localhost, set the epa and status as up */ + mxlnd_set_conn_status(conn, MXLND_CONN_READY); + conn->mxk_epa = kmxlnd_data.kmx_epa; + mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn); + peer->mxp_reconnect_time = 0; + mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id); + peer->mxp_nic_id = nic_id; + peer->mxp_ep_id = ep_id; + conn->mxk_incarnation = kmxlnd_data.kmx_incarnation; + conn->mxk_timeout = 0; + } else { + /* conn->mxk_incarnation = 0 - will be set by peer */ + /* conn->mxk_sid = 0 - will be set by peer */ + mxlnd_set_conn_status(conn, MXLND_CONN_INIT); + /* mxk_epa - to be set after mx_iconnect() */ + } spin_lock_init(&conn->mxk_lock); /* conn->mxk_timeout = 0 */ - conn->mxk_last_tx = jiffies; - conn->mxk_last_rx = conn->mxk_last_tx; - conn->mxk_credits = *kmxlnd_tunables.kmx_credits; + /* conn->mxk_last_tx = 0 */ + /* conn->mxk_last_rx = 0 */ + INIT_LIST_HEAD(&conn->mxk_rx_idle); + + conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits; /* mxk_outstanding = 0 */ - conn->mxk_status = MXLND_CONN_INIT; + INIT_LIST_HEAD(&conn->mxk_tx_credit_queue); INIT_LIST_HEAD(&conn->mxk_tx_free_queue); /* conn->mxk_ntx_msgs = 0 */ @@ -546,6 +654,37 @@ mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer) /* conn->mxk_data_posted = 0 */ INIT_LIST_HEAD(&conn->mxk_pending); + for (i = 0; i < MXLND_RX_MSGS(); i++) { + + rx = &conn->mxk_rxs[i]; + rx->mxc_type = MXLND_REQ_RX; + INIT_LIST_HEAD(&rx->mxc_list); + + /* map mxc_msg to page */ + page = pages->mxg_pages[ipage]; + addr = page_address(page); + LASSERT(addr != NULL); + rx->mxc_msg = (kmx_msg_t *)(addr + offset); + rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg)); + + rx->mxc_conn = conn; + rx->mxc_peer = peer; + rx->mxc_nid = peer->mxp_nid; + + mxlnd_ctx_init(rx); + + offset += MXLND_MSG_SIZE; + LASSERT (offset <= PAGE_SIZE); + + if (offset == PAGE_SIZE) { + offset = 0; + ipage++; + LASSERT (ipage <= MXLND_TX_MSG_PAGES()); + } + + list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle); + } + *connp = conn; mxlnd_peer_addref(peer); /* add a ref for this conn */ @@ -557,20 +696,22 @@ mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer) } int -mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer) +mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer) { - int ret = 0; - write_lock(&kmxlnd_data.kmx_global_lock); + int ret = 0; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; + + write_lock(g_lock); ret = mxlnd_conn_alloc_locked(connp, peer); - write_unlock(&kmxlnd_data.kmx_global_lock); + write_unlock(g_lock); return ret; } int -mxlnd_q_pending_ctx(struct kmx_ctx *ctx) +mxlnd_q_pending_ctx(kmx_ctx_t *ctx) { int ret = 0; - struct kmx_conn *conn = ctx->mxc_conn; + kmx_conn_t *conn = ctx->mxc_conn; ctx->mxc_state = MXLND_CTX_PENDING; if (conn != NULL) { @@ -590,25 +731,26 @@ mxlnd_q_pending_ctx(struct kmx_ctx *ctx) } int -mxlnd_deq_pending_ctx(struct kmx_ctx *ctx) +mxlnd_deq_pending_ctx(kmx_ctx_t *ctx) { LASSERT(ctx->mxc_state == MXLND_CTX_PENDING || ctx->mxc_state == MXLND_CTX_COMPLETED); if (ctx->mxc_state != MXLND_CTX_PENDING && ctx->mxc_state != MXLND_CTX_COMPLETED) { - CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", + CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", mxlnd_ctxstate_to_str(ctx->mxc_state)); } ctx->mxc_state = MXLND_CTX_COMPLETED; if (!list_empty(&ctx->mxc_list)) { - struct kmx_conn *conn = ctx->mxc_conn; - struct kmx_ctx *next = NULL; + kmx_conn_t *conn = ctx->mxc_conn; + kmx_ctx_t *next = NULL; + LASSERT(conn != NULL); spin_lock(&conn->mxk_lock); list_del_init(&ctx->mxc_list); conn->mxk_timeout = 0; if (!list_empty(&conn->mxk_pending)) { - next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list); + next = list_entry(conn->mxk_pending.next, kmx_ctx_t, mxc_list); conn->mxk_timeout = next->mxc_deadline; } spin_unlock(&conn->mxk_lock); @@ -617,78 +759,95 @@ mxlnd_deq_pending_ctx(struct kmx_ctx *ctx) } /** - * Free the peer - * \param peer a kmx_peer pointer + * mxlnd_peer_free - free the peer + * @peer - a kmx_peer pointer * * The calling function should decrement the rxs, drain the tx queues and * remove the peer from the peers list first then destroy it. */ void -mxlnd_peer_free(struct kmx_peer *peer) +mxlnd_peer_free(kmx_peer_t *peer) { - CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, - peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : ""); + CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid)); LASSERT (atomic_read(&peer->mxp_refcount) == 0); - if (peer == kmxlnd_data.kmx_localhost) - LASSERT(kmxlnd_data.kmx_shutdown); - - if (!list_empty(&peer->mxp_peers)) { + if (!list_empty(&peer->mxp_list)) { /* assume we are locked */ - list_del_init(&peer->mxp_peers); + list_del_init(&peer->mxp_list); } - MXLND_FREE (peer, sizeof (*peer)); + MXLND_FREE(peer, sizeof (*peer)); atomic_dec(&kmxlnd_data.kmx_npeers); return; } -#define MXLND_LOOKUP_COUNT 10 +static int +mxlnd_lookup_mac(u32 ip, u64 *tmp_id) +{ + int ret = -EHOSTUNREACH; + unsigned char *haddr = NULL; + struct net_device *dev = NULL; + struct neighbour *n = NULL; + __be32 dst_ip = htonl(ip); + + dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif); + if (dev == NULL) + return -ENODEV; + + haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */ + + n = neigh_lookup(&arp_tbl, &dst_ip, dev); + if (n) { + n->used = jiffies; + if (n->nud_state & NUD_VALID) { + memcpy(haddr, n->ha, dev->addr_len); + neigh_release(n); + ret = 0; + } + } + + dev_put(dev); + + return ret; +} + /* We only want the MAC address of the peer's Myricom NIC. We * require that each node has the IPoMX interface (myriN) up. * We will not pass any traffic over IPoMX, but it allows us * to get the MAC address. */ static int -mxlnd_ip2nic_id(u32 ip, u64 *nic_id) +mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries) { int ret = 0; int try = 1; int fatal = 0; u64 tmp_id = 0ULL; - unsigned char *haddr = NULL; - struct net_device *dev = NULL; - struct neighbour *n = NULL; cfs_socket_t *sock = NULL; - __be32 dst_ip = htonl(ip); - - dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif); - if (dev == NULL) { - return -ENODEV; - } - - haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */ do { - n = neigh_lookup(&arp_tbl, &dst_ip, dev); - if (n) { - n->used = jiffies; - if (n->nud_state & NUD_VALID) { - memcpy(haddr, n->ha, dev->addr_len); - neigh_release(n); - ret = 0; + CDEBUG(D_NET, "try %d of %d tries\n", try, tries); + ret = mxlnd_lookup_mac(ip, &tmp_id); + if (ret == 0) { + break; + } else { + /* not found, try to connect (force an arp) */ + ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987); + if (ret == -ECONNREFUSED) { + /* peer is there, get the MAC address */ + mxlnd_lookup_mac(ip, &tmp_id); + if (tmp_id != 0ULL) + ret = 0; break; + } else if (ret == -EHOSTUNREACH && try < tries) { + /* add a little backoff */ + CDEBUG(D_NET, "sleeping for %d jiffies\n", HZ/4); + mxlnd_sleep(HZ/4); } } - /* not found, try to connect (force an arp) */ - libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987); - if (!fatal) - libcfs_sock_release(sock); - schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */ - } while (try++ < MXLND_LOOKUP_COUNT); - - dev_put(dev); + } while (try++ < tries); + CDEBUG(D_NET, "done trying. ret = %d\n", ret); if (tmp_id == 0ULL) ret = -EHOSTUNREACH; @@ -701,18 +860,18 @@ mxlnd_ip2nic_id(u32 ip, u64 *nic_id) } /** - * Allocate and initialize a new peer struct - * \param peerp address of a kmx_peer pointer - * \param nid LNET node id + * mxlnd_peer_alloc - allocate and initialize a new peer struct + * @peerp - address of a kmx_peer pointer + * @nid - LNET node id + * * Returns 0 on success and -ENOMEM on failure */ int -mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id) +mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id) { - int i = 0; - int ret = 0; - u32 ip = LNET_NIDADDR(nid); - struct kmx_peer *peer = NULL; + int ret = 0; + u32 ip = LNET_NIDADDR(nid); + kmx_peer_t *peer = NULL; LASSERT (nid != LNET_NID_ANY && nid != 0LL); @@ -725,48 +884,36 @@ mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, memset(peer, 0, sizeof(*peer)); + INIT_LIST_HEAD(&peer->mxp_list); peer->mxp_nid = nid; - /* peer->mxp_incarnation */ + /* peer->mxp_ni unused - may be used for multi-rail */ atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */ - peer->mxp_ip = ip; - peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id; peer->mxp_board = board; + peer->mxp_ep_id = ep_id; peer->mxp_nic_id = nic_id; - if (nic_id == 0ULL) { - ret = mxlnd_ip2nic_id(ip, &nic_id); - if (ret != 0) - CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret); - mx_nic_id_to_board_number(nic_id, &peer->mxp_board); - } - - peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */ - - INIT_LIST_HEAD(&peer->mxp_peers); INIT_LIST_HEAD(&peer->mxp_conns); ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */ if (ret != 0) { mxlnd_peer_decref(peer); return ret; } + INIT_LIST_HEAD(&peer->mxp_tx_queue); - for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) { - struct kmx_ctx *rx = NULL; - ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX); - if (ret != 0) { - mxlnd_reduce_idle_rxs(i); - mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */ - mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */ - mxlnd_peer_decref(peer); - return ret; + if (peer->mxp_nic_id != 0ULL) + nic_id = peer->mxp_nic_id; + + if (nic_id == 0ULL) { + ret = mxlnd_ip2nic_id(ip, &nic_id, 1); + if (ret == 0) { + peer->mxp_nic_id = nic_id; + mx_nic_id_to_board_number(nic_id, &peer->mxp_board); } - spin_lock(&kmxlnd_data.kmx_rxs_lock); - list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs); - spin_unlock(&kmxlnd_data.kmx_rxs_lock); - rx->mxc_put = -1; - mxlnd_put_idle_rx(rx); } + + peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */ + /* peer->mxp_reconnect_time = 0 */ /* peer->mxp_incompatible = 0 */ @@ -774,16 +921,16 @@ mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, return 0; } -static inline struct kmx_peer * +static inline kmx_peer_t * mxlnd_find_peer_by_nid_locked(lnet_nid_t nid) { - int found = 0; - int hash = 0; - struct kmx_peer *peer = NULL; + int found = 0; + int hash = 0; + kmx_peer_t *peer = NULL; hash = mxlnd_nid_to_hash(nid); - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) { + list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) { if (peer->mxp_nid == nid) { found = 1; mxlnd_peer_addref(peer); @@ -793,19 +940,83 @@ mxlnd_find_peer_by_nid_locked(lnet_nid_t nid) return (found ? peer : NULL); } -static inline struct kmx_peer * -mxlnd_find_peer_by_nid(lnet_nid_t nid) +static kmx_peer_t * +mxlnd_find_peer_by_nid(lnet_nid_t nid, int create) { - struct kmx_peer *peer = NULL; + int ret = 0; + int hash = 0; + kmx_peer_t *peer = NULL; + kmx_peer_t *old = NULL; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; + + read_lock(g_lock); + peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */ + + if ((peer && peer->mxp_conn) || /* found peer with conn or */ + (!peer && !create)) { /* did not find peer and do not create one */ + read_unlock(g_lock); + return peer; + } + + read_unlock(g_lock); + + /* if peer but _not_ conn */ + if (peer && !peer->mxp_conn) { + if (create) { + write_lock(g_lock); + if (!peer->mxp_conn) { /* check again */ + /* create the conn */ + ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer); + if (ret != 0) { + /* we tried, return the peer only. + * the caller needs to see if the conn exists */ + CDEBUG(D_NETERROR, "%s: %s could not alloc conn\n", + __func__, libcfs_nid2str(peer->mxp_nid)); + } else { + /* drop extra conn ref */ + mxlnd_conn_decref(peer->mxp_conn); + } + } + write_unlock(g_lock); + } + return peer; + } + + /* peer not found and we need to create one */ + hash = mxlnd_nid_to_hash(nid); + + /* create peer (and conn) */ + /* adds conn ref for peer and one for this function */ + ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board, + *kmxlnd_tunables.kmx_ep_id, 0ULL); + if (ret != 0) /* no memory, peer is NULL */ + return NULL; + + write_lock(g_lock); + + /* look again */ + old = mxlnd_find_peer_by_nid_locked(nid); + if (old) { + /* someone already created one */ + mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */ + mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */ + mxlnd_peer_decref(peer); + peer = old; + } else { + /* no other peer, use this one */ + list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]); + atomic_inc(&kmxlnd_data.kmx_npeers); + mxlnd_peer_addref(peer); + mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */ + } + + write_unlock(g_lock); - read_lock(&kmxlnd_data.kmx_global_lock); - peer = mxlnd_find_peer_by_nid_locked(nid); - read_unlock(&kmxlnd_data.kmx_global_lock); return peer; } static inline int -mxlnd_tx_requires_credit(struct kmx_ctx *tx) +mxlnd_tx_requires_credit(kmx_ctx_t *tx) { return (tx->mxc_msg_type == MXLND_MSG_EAGER || tx->mxc_msg_type == MXLND_MSG_GET_REQ || @@ -814,10 +1025,10 @@ mxlnd_tx_requires_credit(struct kmx_ctx *tx) } /** - * Set type and number of bytes - * \param msg msg pointer - * \param type of message - * \param body_nob bytes in msg body + * mxlnd_init_msg - set type and number of bytes + * @msg - msg pointer + * @type - of message + * @body_nob - bytes in msg body */ static inline void mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob) @@ -827,13 +1038,13 @@ mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob) } static inline void -mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid) +mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid) { int nob = offsetof (kmx_msg_t, mxm_u) + body_nob; - struct kmx_msg *msg = NULL; + kmx_msg_t *msg = NULL; LASSERT (tx != NULL); - LASSERT (nob <= MXLND_EAGER_SIZE); + LASSERT (nob <= MXLND_MSG_SIZE); tx->mxc_nid = nid; /* tx->mxc_peer should have already been set if we know it */ @@ -842,7 +1053,6 @@ mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid) /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */ tx->mxc_seg.segment_length = nob; tx->mxc_pin_type = MX_PIN_PHYSICAL; - //tx->mxc_state = MXLND_CTX_PENDING; msg = tx->mxc_msg; msg->mxm_type = type; @@ -865,13 +1075,13 @@ mxlnd_cksum (void *ptr, int nob) } /** - * Complete msg info - * \param tx msg to send + * mxlnd_pack_msg_locked - complete msg info + * @tx - msg to send */ static inline void -mxlnd_pack_msg(struct kmx_ctx *tx) +mxlnd_pack_msg_locked(kmx_ctx_t *tx) { - struct kmx_msg *msg = tx->mxc_msg; + kmx_msg_t *msg = tx->mxc_msg; /* type and nob should already be set in init_msg() */ msg->mxm_magic = MXLND_MSG_MAGIC; @@ -881,10 +1091,8 @@ mxlnd_pack_msg(struct kmx_ctx *tx) * return credits as well */ if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ && tx->mxc_msg_type != MXLND_MSG_CONN_ACK) { - spin_lock(&tx->mxc_conn->mxk_lock); msg->mxm_credits = tx->mxc_conn->mxk_outstanding; tx->mxc_conn->mxk_outstanding = 0; - spin_unlock(&tx->mxc_conn->mxk_lock); } else { msg->mxm_credits = 0; } @@ -895,7 +1103,6 @@ mxlnd_pack_msg(struct kmx_ctx *tx) msg->mxm_dstnid = tx->mxc_nid; /* if it is a new peer, the dststamp will be 0 */ msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation; - msg->mxm_seq = tx->mxc_cookie; if (*kmxlnd_tunables.kmx_cksum) { msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob); @@ -962,7 +1169,6 @@ mxlnd_unpack_msg(kmx_msg_t *msg, int nob) __swab64s(&msg->mxm_srcstamp); __swab64s(&msg->mxm_dstnid); __swab64s(&msg->mxm_dststamp); - __swab64s(&msg->mxm_seq); } if (msg->mxm_srcnid == LNET_NID_ANY) { @@ -1035,17 +1241,22 @@ mxlnd_unpack_msg(kmx_msg_t *msg, int nob) return 0; } + /** - * Receive a message - * \param lntmsg the LNET msg that this is continuing. If EAGER, then NULL. - * \param length length of incoming message + * mxlnd_recv_msg + * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL. + * @rx + * @msg_type + * @cookie + * @length - length of incoming message + * @pending - add to kmx_pending (0 is NO and 1 is YES) * * The caller gets the rx and sets nid, peer and conn if known. * * Returns 0 on success and -1 on failure */ int -mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length) +mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length) { int ret = 0; mx_return_t mxret = MX_SUCCESS; @@ -1057,7 +1268,6 @@ mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, /* rx->mxc_match may already be set */ /* rx->mxc_seg.segment_ptr is already set */ rx->mxc_seg.segment_length = length; - rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; ret = mxlnd_q_pending_ctx(rx); if (ret == -1) { /* the caller is responsible for calling conn_decref() if needed */ @@ -1076,12 +1286,13 @@ mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, /** - * This is the callback function that will handle unexpected receives - * \param context NULL, ignore - * \param source the peer's mx_endpoint_addr_t - * \param match_value the msg's bit, should be MXLND_MSG_EAGER - * \param length length of incoming message - * \param data_if_available ignore + * mxlnd_unexpected_recv - this is the callback function that will handle + * unexpected receives + * @context - NULL, ignore + * @source - the peer's mx_endpoint_addr_t + * @match_value - the msg's bits, should be MXLND_MSG_EAGER + * @length - length of incoming message + * @data_if_available - used for CONN_[REQ|ACK] * * If it is an eager-sized msg, we will call recv_msg() with the actual * length. If it is a large message, we will call recv_msg() with a @@ -1099,37 +1310,117 @@ mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source, uint64_t match_value, uint32_t length, void *data_if_available) { int ret = 0; - struct kmx_ctx *rx = NULL; + kmx_ctx_t *rx = NULL; mx_ksegment_t seg; u8 msg_type = 0; u8 error = 0; u64 cookie = 0ULL; + kmx_conn_t *conn = NULL; + kmx_peer_t *peer = NULL; + u64 nic_id = 0ULL; + u32 ep_id = 0; + u32 sid = 0; + /* TODO this will change to the net struct */ if (context != NULL) { - CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n"); + CDEBUG(D_NETERROR, "non-NULL context\n"); } #if MXLND_DEBUG - CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length); + CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length); #endif + mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid); mxlnd_parse_match(match_value, &msg_type, &error, &cookie); - if (msg_type == MXLND_MSG_BYE) { - struct kmx_peer *peer = NULL; + read_lock(&kmxlnd_data.kmx_global_lock); + mx_get_endpoint_addr_context(source, (void **) &conn); + if (conn) { + mxlnd_conn_addref(conn); /* add ref for this function */ + peer = conn->mxk_peer; + } + read_unlock(&kmxlnd_data.kmx_global_lock); - mx_get_endpoint_addr_context(source, (void **) &peer); - if (peer && peer->mxp_conn) { + if (msg_type == MXLND_MSG_BYE) { + if (conn) { CDEBUG(D_NET, "peer %s sent BYE msg\n", libcfs_nid2str(peer->mxp_nid)); - mxlnd_conn_disconnect(peer->mxp_conn, 1, 0); + mxlnd_conn_disconnect(conn, 1, 0); + mxlnd_conn_decref(conn); /* drop ref taken above */ + } + return MX_RECV_FINISHED; + } + + if (msg_type == MXLND_MSG_CONN_REQ) { + kmx_connparams_t *cp = NULL; + const int expected = offsetof(kmx_msg_t, mxm_u) + + sizeof(kmx_connreq_msg_t); + + if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */ + if (unlikely(length != expected || !data_if_available)) { + CDEBUG(D_NETERROR, "received invalid CONN_REQ from %llx " + "length=%d (expected %d)\n", nic_id, length, expected); + mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0); + return MX_RECV_FINISHED; + } + + ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length, + conn, peer, data_if_available); + if (unlikely(ret != 0)) { + CDEBUG(D_NETERROR, "unable to alloc CONN_REQ from %llx:%d\n", + nic_id, ep_id); + mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0); + return MX_RECV_FINISHED; + } + spin_lock(&kmxlnd_data.kmx_conn_lock); + list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs); + spin_unlock(&kmxlnd_data.kmx_conn_lock); + up(&kmxlnd_data.kmx_conn_sem); + return MX_RECV_FINISHED; + } + if (msg_type == MXLND_MSG_CONN_ACK) { + kmx_connparams_t *cp = NULL; + const int expected = offsetof(kmx_msg_t, mxm_u) + + sizeof(kmx_connreq_msg_t); + + LASSERT(conn); + if (unlikely(error != 0)) { + CDEBUG(D_NETERROR, "received CONN_ACK from %s " + "with error -%d\n", + libcfs_nid2str(peer->mxp_nid), (int) error); + mxlnd_conn_disconnect(conn, 1, 0); + } else if (unlikely(length != expected || !data_if_available)) { + CDEBUG(D_NETERROR, "received %s CONN_ACK from %s " + "length=%d (expected %d)\n", + data_if_available ? "short" : "missing", + libcfs_nid2str(peer->mxp_nid), length, expected); + mxlnd_conn_disconnect(conn, 1, 1); + } else { + /* peer is ready for messages */ + ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length, + conn, peer, data_if_available); + if (unlikely(ret != 0)) { + CDEBUG(D_NETERROR, "unable to alloc kmx_connparams_t" + " from %llx:%d\n", nic_id, ep_id); + mxlnd_conn_disconnect(conn, 1, 1); + } else { + spin_lock(&kmxlnd_data.kmx_conn_lock); + list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs); + spin_unlock(&kmxlnd_data.kmx_conn_lock); + up(&kmxlnd_data.kmx_conn_sem); + } } + mxlnd_conn_decref(conn); /* drop ref taken above */ return MX_RECV_FINISHED; } - rx = mxlnd_get_idle_rx(); + /* Handle unexpected messages (PUT_REQ and GET_REQ) */ + + LASSERT(peer != NULL && conn != NULL); + + rx = mxlnd_get_idle_rx(conn); if (rx != NULL) { - if (length <= MXLND_EAGER_SIZE) { + if (length <= MXLND_MSG_SIZE) { ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length); } else { CDEBUG(D_NETERROR, "unexpected large receive with " @@ -1139,34 +1430,26 @@ mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source, } if (ret == 0) { - struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; - - /* NOTE to avoid a peer disappearing out from under us, - * read lock the peers lock first */ - read_lock(&kmxlnd_data.kmx_global_lock); - mx_get_endpoint_addr_context(source, (void **) &peer); - if (peer != NULL) { - mxlnd_peer_addref(peer); /* add a ref... */ - conn = peer->mxp_conn; - if (conn) { - mxlnd_conn_addref(conn); /* add ref until rx completed */ - mxlnd_peer_decref(peer); /* and drop peer ref */ - rx->mxc_conn = conn; - } - rx->mxc_peer = peer; - rx->mxc_nid = peer->mxp_nid; - } - read_unlock(&kmxlnd_data.kmx_global_lock); + /* hold conn ref until rx completes */ + rx->mxc_conn = conn; + rx->mxc_peer = peer; + rx->mxc_nid = peer->mxp_nid; } else { CDEBUG(D_NETERROR, "could not post receive\n"); mxlnd_put_idle_rx(rx); } } + /* Encountered error, drop incoming message on the floor */ + /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes + * uses the standard code path and acks the sender normally */ + if (rx == NULL || ret != 0) { + mxlnd_conn_decref(conn); /* drop ref taken above */ if (rx == NULL) { - CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n"); + CDEBUG(D_NETERROR, "no idle rxs available - dropping rx" + " 0x%llx from %s\n", match_value, + libcfs_nid2str(peer->mxp_nid)); } else { /* ret != 0 */ CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n"); @@ -1184,13 +1467,13 @@ mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source, int mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count) { - int i = 0; - int ret = -ENOENT; - struct kmx_peer *peer = NULL; + int i = 0; + int ret = -ENOENT; + kmx_peer_t *peer = NULL; read_lock(&kmxlnd_data.kmx_global_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { + list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) { if (index-- == 0) { *nidp = peer->mxp_nid; *count = atomic_read(&peer->mxp_refcount); @@ -1205,11 +1488,14 @@ mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count) } void -mxlnd_del_peer_locked(struct kmx_peer *peer) +mxlnd_del_peer_locked(kmx_peer_t *peer) { - list_del_init(&peer->mxp_peers); /* remove from the global list */ - if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1); - mxlnd_peer_decref(peer); /* drop global list ref */ + if (peer->mxp_conn) { + mxlnd_conn_disconnect(peer->mxp_conn, 1, 1); + } else { + list_del_init(&peer->mxp_list); /* remove from the global list */ + mxlnd_peer_decref(peer); /* drop global list ref */ + } return; } @@ -1218,19 +1504,16 @@ mxlnd_del_peer(lnet_nid_t nid) { int i = 0; int ret = 0; - struct kmx_peer *peer = NULL; - struct kmx_peer *next = NULL; + kmx_peer_t *peer = NULL; + kmx_peer_t *next = NULL; if (nid != LNET_NID_ANY) { - peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ + peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */ } write_lock(&kmxlnd_data.kmx_global_lock); if (nid != LNET_NID_ANY) { if (peer == NULL) { ret = -ENOENT; - } if (peer == kmxlnd_data.kmx_localhost) { - mxlnd_peer_decref(peer); /* and drops it */ - CERROR("cannot free this host's NID 0x%llx\n", nid); } else { mxlnd_peer_decref(peer); /* and drops it */ mxlnd_del_peer_locked(peer); @@ -1238,9 +1521,8 @@ mxlnd_del_peer(lnet_nid_t nid) } else { /* LNET_NID_ANY */ for (i = 0; i < MXLND_HASH_SIZE; i++) { list_for_each_entry_safe(peer, next, - &kmxlnd_data.kmx_peers[i], mxp_peers) { - if (peer != kmxlnd_data.kmx_localhost) - mxlnd_del_peer_locked(peer); + &kmxlnd_data.kmx_peers[i], mxp_list) { + mxlnd_del_peer_locked(peer); } } } @@ -1249,16 +1531,16 @@ mxlnd_del_peer(lnet_nid_t nid) return ret; } -struct kmx_conn * +kmx_conn_t * mxlnd_get_conn_by_idx(int index) { - int i = 0; - struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; + int i = 0; + kmx_peer_t *peer = NULL; + kmx_conn_t *conn = NULL; read_lock(&kmxlnd_data.kmx_global_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { + list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) { list_for_each_entry(conn, &peer->mxp_conns, mxk_list) { if (index-- > 0) { continue; @@ -1276,12 +1558,10 @@ mxlnd_get_conn_by_idx(int index) } void -mxlnd_close_matching_conns_locked(struct kmx_peer *peer) +mxlnd_close_matching_conns_locked(kmx_peer_t *peer) { - struct kmx_conn *conn = NULL; - struct kmx_conn *next = NULL; - - if (peer == kmxlnd_data.kmx_localhost) return; + kmx_conn_t *conn = NULL; + kmx_conn_t *next = NULL; list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) mxlnd_conn_disconnect(conn, 0, 1); @@ -1294,11 +1574,11 @@ mxlnd_close_matching_conns(lnet_nid_t nid) { int i = 0; int ret = 0; - struct kmx_peer *peer = NULL; + kmx_peer_t *peer = NULL; - read_lock(&kmxlnd_data.kmx_global_lock); + write_lock(&kmxlnd_data.kmx_global_lock); if (nid != LNET_NID_ANY) { - peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ + peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */ if (peer == NULL) { ret = -ENOENT; } else { @@ -1307,22 +1587,20 @@ mxlnd_close_matching_conns(lnet_nid_t nid) } } else { /* LNET_NID_ANY */ for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) + list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) mxlnd_close_matching_conns_locked(peer); } } - read_unlock(&kmxlnd_data.kmx_global_lock); + write_unlock(&kmxlnd_data.kmx_global_lock); return ret; } /** - * Modify MXLND parameters - * \param ni LNET interface handle - * \param cmd command to change - * \param arg the ioctl data - * - * Not implemented yet. + * mxlnd_ctl - modify MXLND parameters + * @ni - LNET interface handle + * @cmd - command to change + * @arg - the ioctl data */ int mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) @@ -1347,7 +1625,7 @@ mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) break; } case IOC_LIBCFS_GET_CONN: { - struct kmx_conn *conn = NULL; + kmx_conn_t *conn = NULL; conn = mxlnd_get_conn_by_idx(data->ioc_count); if (conn == NULL) { @@ -1372,16 +1650,16 @@ mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) } /** - * Add the tx to the global tx queue + * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue + * @tx * * Add the tx to the peer's msg or data queue. The caller has locked the peer. */ void -mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx) +mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx) { - u8 msg_type = tx->mxc_msg_type; - //struct kmx_peer *peer = tx->mxc_peer; - struct kmx_conn *conn = tx->mxc_conn; + u8 msg_type = tx->mxc_msg_type; + kmx_conn_t *conn = tx->mxc_conn; LASSERT (msg_type != 0); LASSERT (tx->mxc_nid != 0); @@ -1415,12 +1693,13 @@ mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx) } /** - * Add the tx to the global tx queue + * mxlnd_peer_queue_tx - add the tx to the global tx queue + * @tx * * Add the tx to the peer's msg or data queue */ static inline void -mxlnd_peer_queue_tx(struct kmx_ctx *tx) +mxlnd_peer_queue_tx(kmx_ctx_t *tx) { LASSERT(tx->mxc_peer != NULL); LASSERT(tx->mxc_conn != NULL); @@ -1432,32 +1711,33 @@ mxlnd_peer_queue_tx(struct kmx_ctx *tx) } /** - * Add the tx to the global tx queue + * mxlnd_queue_tx - add the tx to the global tx queue + * @tx * * Add the tx to the global queue and up the tx_queue_sem */ void -mxlnd_queue_tx(struct kmx_ctx *tx) +mxlnd_queue_tx(kmx_ctx_t *tx) { - struct kmx_peer *peer = tx->mxc_peer; + kmx_peer_t *peer = tx->mxc_peer; LASSERT (tx->mxc_nid != 0); if (peer != NULL) { if (peer->mxp_incompatible && tx->mxc_msg_type != MXLND_MSG_CONN_ACK) { /* let this fail now */ - tx->mxc_status.code = -ECONNABORTED; + tx->mxc_errno = -ECONNABORTED; mxlnd_conn_decref(peer->mxp_conn); mxlnd_put_idle_tx(tx); return; } if (tx->mxc_conn == NULL) { int ret = 0; - struct kmx_conn *conn = NULL; + kmx_conn_t *conn = NULL; ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */ if (ret != 0) { - tx->mxc_status.code = ret; + tx->mxc_errno = ret; mxlnd_put_idle_tx(tx); goto done; } @@ -1478,7 +1758,7 @@ done: } int -mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob) +mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob) { int i = 0; int sum = 0; @@ -1515,7 +1795,7 @@ mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u3 nseg = last_iov - first_iov + 1; LASSERT(nseg > 0); - MXLND_ALLOC (seg, nseg * sizeof(*seg)); + MXLND_ALLOC(seg, nseg * sizeof(*seg)); if (seg == NULL) { CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n"); return -1; @@ -1545,7 +1825,7 @@ mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u3 } int -mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob) +mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob) { int i = 0; int sum = 0; @@ -1567,7 +1847,6 @@ mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u if (!first_found && (sum > offset)) { first_kiov = i; first_kiov_offset = offset - old_sum; - //if (i == 0) first_kiov_offset + kiov[i].kiov_offset; if (i == 0) first_kiov_offset = kiov[i].kiov_offset; first_found = 1; sum = kiov[i].kiov_len - first_kiov_offset; @@ -1585,7 +1864,7 @@ mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u nseg = last_kiov - first_kiov + 1; LASSERT(nseg > 0); - MXLND_ALLOC (seg, nseg * sizeof(*seg)); + MXLND_ALLOC(seg, nseg * sizeof(*seg)); if (seg == NULL) { CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n"); return -1; @@ -1617,7 +1896,7 @@ mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u } void -mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie) +mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie) { LASSERT(type == MXLND_MSG_PUT_ACK); mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid); @@ -1631,23 +1910,28 @@ mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 c /** - * Get tx, map [k]iov, queue tx + * mxlnd_send_data - get tx, map [k]iov, queue tx + * @ni + * @lntmsg + * @peer + * @msg_type + * @cookie * * This setups the DATA send for PUT or GET. * * On success, it queues the tx, on failure it calls lnet_finalize() */ void -mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie) +mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie) { - int ret = 0; - lnet_process_id_t target = lntmsg->msg_target; - unsigned int niov = lntmsg->msg_niov; - struct iovec *iov = lntmsg->msg_iov; - lnet_kiov_t *kiov = lntmsg->msg_kiov; - unsigned int offset = lntmsg->msg_offset; - unsigned int nob = lntmsg->msg_len; - struct kmx_ctx *tx = NULL; + int ret = 0; + lnet_process_id_t target = lntmsg->msg_target; + unsigned int niov = lntmsg->msg_niov; + struct iovec *iov = lntmsg->msg_iov; + lnet_kiov_t *kiov = lntmsg->msg_kiov; + unsigned int offset = lntmsg->msg_offset; + unsigned int nob = lntmsg->msg_len; + kmx_ctx_t *tx = NULL; LASSERT(lntmsg != NULL); LASSERT(peer != NULL); @@ -1667,8 +1951,6 @@ mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg tx->mxc_peer = peer; tx->mxc_conn = peer->mxp_conn; tx->mxc_msg_type = msg_type; - tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; - tx->mxc_state = MXLND_CTX_PENDING; tx->mxc_lntmsg[0] = lntmsg; tx->mxc_cookie = cookie; tx->mxc_match = mxlnd_create_match(tx, 0); @@ -1685,9 +1967,9 @@ mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob); } if (ret != 0) { - CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", + CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", libcfs_nid2str(target.nid)); - tx->mxc_status.code = -EIO; + tx->mxc_errno = -EIO; goto failed_1; } mxlnd_queue_tx(tx); @@ -1705,14 +1987,19 @@ failed_0: } /** - * Map [k]iov, post rx + * mxlnd_recv_data - map [k]iov, post rx + * @ni + * @lntmsg + * @rx + * @msg_type + * @cookie * * This setups the DATA receive for PUT or GET. * * On success, it returns 0, on failure it returns -1 */ int -mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie) +mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie) { int ret = 0; lnet_process_id_t target = lntmsg->msg_target; @@ -1739,7 +2026,6 @@ mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_ty LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */ rx->mxc_msg_type = msg_type; - rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; rx->mxc_state = MXLND_CTX_PENDING; rx->mxc_nid = target.nid; /* if posting a GET_DATA, we may not yet know the peer */ @@ -1792,11 +2078,14 @@ mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_ty } /** - * The LND required send function + * mxlnd_send - the LND required send function + * @ni + * @private + * @lntmsg * * This must not block. Since we may not have a peer struct for the receiver, * it will append send messages on a global tx list. We will then up the - * tx_queued's semaphore to notify it of the new send. + * tx_queued's semaphore to notify it of the new send. */ int mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) @@ -1813,14 +2102,15 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; - struct kmx_ctx *tx = NULL; - struct kmx_msg *txmsg = NULL; - struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */ - struct kmx_ctx *rx_data = NULL; - struct kmx_conn *conn = NULL; + kmx_ctx_t *tx = NULL; + kmx_msg_t *txmsg = NULL; + kmx_ctx_t *rx = (kmx_ctx_t *) private; /* for REPLY */ + kmx_ctx_t *rx_data = NULL; + kmx_conn_t *conn = NULL; int nob = 0; uint32_t length = 0; - struct kmx_peer *peer = NULL; + kmx_peer_t *peer = NULL; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); @@ -1833,26 +2123,41 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* private is used on LNET_GET_REPLY only, NULL for all other cases */ /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ - * to a new peer, use the nid */ - peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ - if (peer != NULL) { - if (unlikely(peer->mxp_incompatible)) { - mxlnd_peer_decref(peer); /* drop ref taken above */ + * to a new peer, so create one if not found */ + peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */ + if (peer == NULL || peer->mxp_conn == NULL) { + /* we could not find it nor could we create one or + * one exists but we cannot create a conn, + * fail this message */ + if (peer) { + /* found peer without conn, drop ref taken above */ + LASSERT(peer->mxp_conn == NULL); + mxlnd_peer_decref(peer); + } + return -ENOMEM; + } + + /* we have a peer with a conn */ + + if (unlikely(peer->mxp_incompatible)) { + mxlnd_peer_decref(peer); /* drop ref taken above */ + } else { + read_lock(g_lock); + conn = peer->mxp_conn; + if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) { + mxlnd_conn_addref(conn); } else { - read_lock(&kmxlnd_data.kmx_global_lock); - conn = peer->mxp_conn; - if (conn) { - mxlnd_conn_addref(conn); - mxlnd_peer_decref(peer); /* drop peer ref taken above */ - } - read_unlock(&kmxlnd_data.kmx_global_lock); + conn = NULL; } + read_unlock(g_lock); + mxlnd_peer_decref(peer); /* drop peer ref taken above */ + if (!conn) + return -ENOTCONN; } + + LASSERT(peer && conn); + CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer); - if (conn == NULL && peer != NULL) { - CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n", - peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type)); - } switch (type) { case LNET_MSG_ACK: @@ -1863,7 +2168,7 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) case LNET_MSG_PUT: /* Is the payload small enough not to need DATA? */ nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]); - if (nob <= MXLND_EAGER_SIZE) + if (nob <= MXLND_MSG_SIZE) break; /* send EAGER */ tx = mxlnd_get_idle_tx(); @@ -1875,9 +2180,8 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) return -ENOMEM; } - /* the peer may be NULL */ tx->mxc_peer = peer; - tx->mxc_conn = conn; /* may be NULL */ + tx->mxc_conn = conn; /* we added a conn ref above */ mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid); txmsg = tx->mxc_msg; @@ -1889,7 +2193,7 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) * we need to determine how much to receive, it will be either * a put_ack or a put_nak. The put_ack is larger, so use it. */ - rx = mxlnd_get_idle_rx(); + rx = mxlnd_get_idle_rx(conn); if (unlikely(rx == NULL)) { CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n", libcfs_nid2str(nid)); @@ -1899,10 +2203,7 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } rx->mxc_nid = nid; rx->mxc_peer = peer; - /* conn may be NULL but unlikely since the first msg is always small */ - /* NOTE no need to lock peer before adding conn ref since we took - * a conn ref for the tx (it cannot be freed between there and here ) */ - if (conn) mxlnd_conn_addref(conn); /* for this rx */ + mxlnd_conn_addref(conn); /* for this rx */ rx->mxc_conn = conn; rx->mxc_msg_type = MXLND_MSG_PUT_ACK; rx->mxc_cookie = tx->mxc_cookie; @@ -1916,10 +2217,8 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) rx->mxc_lntmsg[0] = NULL; mxlnd_put_idle_rx(rx); mxlnd_put_idle_tx(tx); - if (conn) { - mxlnd_conn_decref(conn); /* for the rx... */ - mxlnd_conn_decref(conn); /* and for the tx */ - } + mxlnd_conn_decref(conn); /* for the rx... */ + mxlnd_conn_decref(conn); /* and for the tx */ return -EHOSTUNREACH; } @@ -1932,31 +2231,31 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* is the REPLY message too small for DATA? */ nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]); - if (nob <= MXLND_EAGER_SIZE) + if (nob <= MXLND_MSG_SIZE) break; /* send EAGER */ - /* get tx (we need the cookie) , post rx for incoming DATA, + /* get tx (we need the cookie) , post rx for incoming DATA, * then post GET_REQ tx */ tx = mxlnd_get_idle_tx(); if (unlikely(tx == NULL)) { CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n", libcfs_nid2str(nid)); - if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */ + mxlnd_conn_decref(conn); /* for the ref taken above */ return -ENOMEM; } - rx_data = mxlnd_get_idle_rx(); + rx_data = mxlnd_get_idle_rx(conn); if (unlikely(rx_data == NULL)) { CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n", libcfs_nid2str(nid)); mxlnd_put_idle_tx(tx); - if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */ + mxlnd_conn_decref(conn); /* for the ref taken above */ return -ENOMEM; } rx_data->mxc_peer = peer; /* NOTE no need to lock peer before adding conn ref since we took * a conn ref for the tx (it cannot be freed between there and here ) */ - if (conn) mxlnd_conn_addref(conn); /* for the rx_data */ - rx_data->mxc_conn = conn; /* may be NULL */ + mxlnd_conn_addref(conn); /* for the rx_data */ + rx_data->mxc_conn = conn; ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie); if (unlikely(ret != 0)) { @@ -1964,15 +2263,13 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) libcfs_nid2str(nid)); mxlnd_put_idle_rx(rx_data); mxlnd_put_idle_tx(tx); - if (conn) { - mxlnd_conn_decref(conn); /* for the rx_data... */ - mxlnd_conn_decref(conn); /* and for the tx */ - } + mxlnd_conn_decref(conn); /* for the rx_data... */ + mxlnd_conn_decref(conn); /* and for the tx */ return -EIO; } tx->mxc_peer = peer; - tx->mxc_conn = conn; /* may be NULL */ + tx->mxc_conn = conn; /* conn ref taken above */ mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid); txmsg = tx->mxc_msg; @@ -1985,25 +2282,25 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) default: LBUG(); - if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */ + mxlnd_conn_decref(conn); /* drop ref taken above */ return -EIO; } /* send EAGER */ LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]) - <= MXLND_EAGER_SIZE); + <= MXLND_MSG_SIZE); tx = mxlnd_get_idle_tx(); if (unlikely(tx == NULL)) { CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n", mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid)); - if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */ + mxlnd_conn_decref(conn); /* drop ref taken above */ return -ENOMEM; } tx->mxc_peer = peer; - tx->mxc_conn = conn; /* may be NULL */ + tx->mxc_conn = conn; /* conn ref taken above */ nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]); mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid); @@ -2013,11 +2310,11 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) txmsg->mxm_u.eager.mxem_hdr = *hdr; if (payload_kiov != NULL) - lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg, + lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg, offsetof(kmx_msg_t, mxm_u.eager.mxem_payload), payload_niov, payload_kiov, payload_offset, payload_nob); else - lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg, + lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg, offsetof(kmx_msg_t, mxm_u.eager.mxem_payload), payload_niov, payload_iov, payload_offset, payload_nob); @@ -2027,7 +2324,16 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } /** - * The LND required recv function + * mxlnd_recv - the LND required recv function + * @ni + * @private + * @lntmsg + * @delayed + * @niov + * @kiov + * @offset + * @mlen + * @rlen * * This must not block. */ @@ -2036,26 +2342,26 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen) { - int ret = 0; - int nob = 0; - int len = 0; - struct kmx_ctx *rx = private; - struct kmx_msg *rxmsg = rx->mxc_msg; - lnet_nid_t nid = rx->mxc_nid; - struct kmx_ctx *tx = NULL; - struct kmx_msg *txmsg = NULL; - struct kmx_peer *peer = rx->mxc_peer; - struct kmx_conn *conn = peer->mxp_conn; - u64 cookie = 0ULL; - int msg_type = rxmsg->mxm_type; - int repost = 1; - int credit = 0; - int finalize = 0; + int ret = 0; + int nob = 0; + int len = 0; + kmx_ctx_t *rx = private; + kmx_msg_t *rxmsg = rx->mxc_msg; + lnet_nid_t nid = rx->mxc_nid; + kmx_ctx_t *tx = NULL; + kmx_msg_t *txmsg = NULL; + kmx_peer_t *peer = rx->mxc_peer; + kmx_conn_t *conn = peer->mxp_conn; + u64 cookie = 0ULL; + int msg_type = rxmsg->mxm_type; + int repost = 1; + int credit = 0; + int finalize = 0; LASSERT (mlen <= rlen); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); - LASSERT (peer != NULL); + LASSERT (peer && conn); /* conn_addref(conn) already taken for the primary rx */ @@ -2072,12 +2378,12 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, if (kiov != NULL) lnet_copy_flat2kiov(niov, kiov, offset, - MXLND_EAGER_SIZE, rxmsg, + MXLND_MSG_SIZE, rxmsg, offsetof(kmx_msg_t, mxm_u.eager.mxem_payload), mlen); else lnet_copy_flat2iov(niov, iov, offset, - MXLND_EAGER_SIZE, rxmsg, + MXLND_MSG_SIZE, rxmsg, offsetof(kmx_msg_t, mxm_u.eager.mxem_payload), mlen); finalize = 1; @@ -2124,12 +2430,12 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, rx->mxc_conn = conn; /* do not take another ref for this rx, it is already taken */ rx->mxc_nid = peer->mxp_nid; - ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, + ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, txmsg->mxm_u.put_ack.mxpam_dst_cookie); if (unlikely(ret != 0)) { /* Notify peer that it's over */ - CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", + CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", libcfs_nid2str(nid), ret); mxlnd_ctx_init(tx); tx->mxc_state = MXLND_CTX_PREP; @@ -2150,20 +2456,23 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, break; case MXLND_MSG_GET_REQ: + cookie = rxmsg->mxm_u.get_req.mxgrm_cookie; + if (likely(lntmsg != NULL)) { mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA, - rx->mxc_msg->mxm_u.get_req.mxgrm_cookie); + cookie); } else { /* GET didn't match anything */ /* The initiator has a rx mapped to [k]iov. We cannot send a nak. * We have to embed the error code in the match bits. * Send the error in bits 52-59 and the cookie in bits 0-51 */ - u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie; - tx = mxlnd_get_idle_tx(); if (unlikely(tx == NULL)) { CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n", libcfs_nid2str(nid)); + /* we can't get a tx, notify the peer that the GET failed */ + mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA, + ENODATA, cookie); ret = -ENOMEM; break; } @@ -2198,7 +2507,7 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, mxlnd_conn_decref(conn); } - if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); + if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); /* we received a credit, see if we can use it to send a msg */ if (credit) mxlnd_check_sends(peer); @@ -2215,8 +2524,8 @@ mxlnd_sleep(unsigned long timeout) } /** - * The generic send queue thread - * \param arg thread id (as a void *) + * mxlnd_tx_queued - the generic send queue thread + * @arg - thread id (as a void *) * * This thread moves send messages from the global tx_queue to the owning * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds @@ -2228,56 +2537,56 @@ mxlnd_tx_queued(void *arg) long id = (long) arg; int ret = 0; int found = 0; - struct kmx_ctx *tx = NULL; - struct kmx_peer *peer = NULL; - struct list_head *tmp_tx = NULL; + kmx_ctx_t *tx = NULL; + kmx_peer_t *peer = NULL; + struct list_head *queue = &kmxlnd_data.kmx_tx_queue; + spinlock_t *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; cfs_daemonize("mxlnd_tx_queued"); - //cfs_block_allsigs(); - while (!kmxlnd_data.kmx_shutdown) { + while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) { ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem); - if (kmxlnd_data.kmx_shutdown) + if (atomic_read(&kmxlnd_data.kmx_shutdown)) break; if (ret != 0) // Should we check for -EINTR? continue; - spin_lock(&kmxlnd_data.kmx_tx_queue_lock); + spin_lock(tx_q_lock); if (list_empty (&kmxlnd_data.kmx_tx_queue)) { - spin_unlock(&kmxlnd_data.kmx_tx_queue_lock); + spin_unlock(tx_q_lock); continue; } - tmp_tx = &kmxlnd_data.kmx_tx_queue; - tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list); + tx = list_entry (queue->next, kmx_ctx_t, mxc_list); list_del_init(&tx->mxc_list); - spin_unlock(&kmxlnd_data.kmx_tx_queue_lock); + spin_unlock(tx_q_lock); found = 0; - peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */ + peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */ if (peer != NULL) { tx->mxc_peer = peer; - write_lock(&kmxlnd_data.kmx_global_lock); + write_lock(g_lock); if (peer->mxp_conn == NULL) { ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer); if (ret != 0) { /* out of memory, give up and fail tx */ - tx->mxc_status.code = -ENOMEM; - write_unlock(&kmxlnd_data.kmx_global_lock); + tx->mxc_errno = -ENOMEM; mxlnd_peer_decref(peer); + write_unlock(g_lock); mxlnd_put_idle_tx(tx); continue; } } tx->mxc_conn = peer->mxp_conn; mxlnd_conn_addref(tx->mxc_conn); /* for this tx */ - write_unlock(&kmxlnd_data.kmx_global_lock); mxlnd_peer_decref(peer); /* drop peer ref taken above */ + write_unlock(g_lock); mxlnd_queue_tx(tx); found = 1; } if (found == 0) { - int hash = 0; - struct kmx_peer *peer = NULL; - struct kmx_peer *old = NULL; + int hash = 0; + kmx_peer_t *peer = NULL; + kmx_peer_t *old = NULL; hash = mxlnd_nid_to_hash(tx->mxc_nid); @@ -2290,7 +2599,7 @@ mxlnd_tx_queued(void *arg) *kmxlnd_tunables.kmx_ep_id, 0ULL); if (ret != 0) { /* finalize message */ - tx->mxc_status.code = ret; + tx->mxc_errno = ret; mxlnd_put_idle_tx(tx); continue; } @@ -2301,30 +2610,34 @@ mxlnd_tx_queued(void *arg) /* add peer to global peer list, but look to see * if someone already created it after we released * the read lock */ - write_lock(&kmxlnd_data.kmx_global_lock); - list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) { - if (old->mxp_nid == peer->mxp_nid) { - /* somebody beat us here, we created a duplicate */ + write_lock(g_lock); + old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid); + if (old) { + /* we have a peer ref on old */ + if (old->mxp_conn) { found = 1; - break; + } else { + /* no conn */ + /* drop our ref taken above... */ + mxlnd_peer_decref(old); + /* and delete it */ + mxlnd_del_peer_locked(old); } } if (found == 0) { - list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]); + list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]); atomic_inc(&kmxlnd_data.kmx_npeers); } else { tx->mxc_peer = old; tx->mxc_conn = old->mxp_conn; - /* FIXME can conn be NULL? */ LASSERT(old->mxp_conn != NULL); mxlnd_conn_addref(old->mxp_conn); - mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1); mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */ mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */ mxlnd_peer_decref(peer); } - write_unlock(&kmxlnd_data.kmx_global_lock); + write_unlock(g_lock); mxlnd_queue_tx(tx); } @@ -2335,12 +2648,12 @@ mxlnd_tx_queued(void *arg) /* When calling this, we must not have the peer lock. */ void -mxlnd_iconnect(struct kmx_peer *peer, u64 mask) +mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type) { mx_return_t mxret = MX_SUCCESS; mx_request_t request; - struct kmx_conn *conn = peer->mxp_conn; - u8 msg_type = (u8) MXLND_MSG_TYPE(mask); + kmx_conn_t *conn = peer->mxp_conn; + u64 match = ((u64) msg_type) << MXLND_MSG_OFFSET; /* NOTE we are holding a conn ref every time we call this function, * we do not need to lock the peer before taking another ref */ @@ -2353,61 +2666,63 @@ mxlnd_iconnect(struct kmx_peer *peer, u64 mask) } if (peer->mxp_nic_id == 0ULL) { - int ret = 0; + int ret = 0; - ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id); + ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid), + &peer->mxp_nic_id, MXLND_LOOKUP_COUNT); if (ret == 0) { mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board); } - if (peer->mxp_nic_id == 0ULL) { + if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) { /* not mapped yet, return */ spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_INIT; + mxlnd_set_conn_status(conn, MXLND_CONN_INIT); spin_unlock(&conn->mxk_lock); - if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) { - /* give up and notify LNET */ - mxlnd_conn_disconnect(conn, 0, 0); - mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this - function... */ - mxlnd_conn_decref(peer->mxp_conn); /* which we no - longer need */ - } - mxlnd_conn_decref(conn); - return; } } + if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) && + conn->mxk_status != MXLND_CONN_DISCONNECT) { + /* give up and notify LNET */ + CDEBUG(D_NET, "timeout trying to connect to %s\n", + libcfs_nid2str(peer->mxp_nid)); + mxlnd_conn_disconnect(conn, 0, 0); + mxlnd_conn_decref(conn); + return; + } + mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id, - peer->mxp_ep_id, MXLND_MSG_MAGIC, mask, + peer->mxp_ep_id, MXLND_MSG_MAGIC, match, (void *) peer, &request); if (unlikely(mxret != MX_SUCCESS)) { spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); spin_unlock(&conn->mxk_lock); CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n", mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid)); mxlnd_conn_decref(conn); } + mx_set_request_timeout(kmxlnd_data.kmx_endpt, request, MXLND_CONNECT_TIMEOUT/HZ*1000); return; } #define MXLND_STATS 0 int -mxlnd_check_sends(struct kmx_peer *peer) +mxlnd_check_sends(kmx_peer_t *peer) { - int ret = 0; - int found = 0; - mx_return_t mxret = MX_SUCCESS; - struct kmx_ctx *tx = NULL; - struct kmx_conn *conn = NULL; - u8 msg_type = 0; - int credit = 0; - int status = 0; - int ntx_posted = 0; - int credits = 0; + int ret = 0; + int found = 0; + mx_return_t mxret = MX_SUCCESS; + kmx_ctx_t *tx = NULL; + kmx_conn_t *conn = NULL; + u8 msg_type = 0; + int credit = 0; + int status = 0; + int ntx_posted = 0; + int credits = 0; #if MXLND_STATS - static unsigned long last = 0; + static unsigned long last = 0; #endif if (unlikely(peer == NULL)) { @@ -2418,13 +2733,20 @@ mxlnd_check_sends(struct kmx_peer *peer) conn = peer->mxp_conn; /* NOTE take a ref for the duration of this function since it is called * when there might not be any queued txs for this peer */ - if (conn) mxlnd_conn_addref(conn); /* for duration of this function */ + if (conn) { + if (conn->mxk_status == MXLND_CONN_DISCONNECT) { + write_unlock(&kmxlnd_data.kmx_global_lock); + return -1; + } + mxlnd_conn_addref(conn); /* for duration of this function */ + } write_unlock(&kmxlnd_data.kmx_global_lock); /* do not add another ref for this tx */ if (conn == NULL) { /* we do not have any conns */ + CDEBUG(D_NETERROR, "peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid)); return -1; } @@ -2439,21 +2761,18 @@ mxlnd_check_sends(struct kmx_peer *peer) } #endif - /* cache peer state for asserts */ spin_lock(&conn->mxk_lock); ntx_posted = conn->mxk_ntx_posted; credits = conn->mxk_credits; - spin_unlock(&conn->mxk_lock); - LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits); + LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits); LASSERT(ntx_posted >= 0); - LASSERT(credits <= *kmxlnd_tunables.kmx_credits); + LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits); LASSERT(credits >= 0); /* check number of queued msgs, ignore data */ - spin_lock(&conn->mxk_lock); - if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) { + if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) { /* check if any txs queued that could return credits... */ if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) { /* if not, send a NOOP */ @@ -2470,28 +2789,22 @@ mxlnd_check_sends(struct kmx_peer *peer) } } } - spin_unlock(&conn->mxk_lock); /* if the peer is not ready, try to connect */ - spin_lock(&conn->mxk_lock); if (unlikely(conn->mxk_status == MXLND_CONN_INIT || - conn->mxk_status == MXLND_CONN_FAIL || - conn->mxk_status == MXLND_CONN_REQ)) { - u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET; + conn->mxk_status == MXLND_CONN_FAIL)) { CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status)); - conn->mxk_status = MXLND_CONN_WAIT; + mxlnd_set_conn_status(conn, MXLND_CONN_WAIT); spin_unlock(&conn->mxk_lock); - mxlnd_iconnect(peer, match); + mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ); goto done; } - spin_unlock(&conn->mxk_lock); - spin_lock(&conn->mxk_lock); while (!list_empty(&conn->mxk_tx_free_queue) || !list_empty(&conn->mxk_tx_credit_queue)) { /* We have something to send. If we have a queued tx that does not - * require a credit (free), choose it since its completion will - * return a credit (here or at the peer), complete a DATA or + * require a credit (free), choose it since its completion will + * return a credit (here or at the peer), complete a DATA or * CONN_REQ or CONN_ACK. */ struct list_head *tmp_tx = NULL; if (!list_empty(&conn->mxk_tx_free_queue)) { @@ -2499,7 +2812,7 @@ mxlnd_check_sends(struct kmx_peer *peer) } else { tmp_tx = &conn->mxk_tx_credit_queue; } - tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list); + tx = list_entry(tmp_tx->next, kmx_ctx_t, mxc_list); msg_type = tx->mxc_msg_type; @@ -2522,7 +2835,7 @@ mxlnd_check_sends(struct kmx_peer *peer) credit = mxlnd_tx_requires_credit(tx); if (credit) { - if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) { + if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) { CDEBUG(D_NET, "%s: posted enough\n", libcfs_nid2str(peer->mxp_nid)); goto done_locked; @@ -2549,11 +2862,14 @@ mxlnd_check_sends(struct kmx_peer *peer) mxlnd_connstatus_to_str(conn->mxk_status), tx->mxc_cookie, mxlnd_msgtype_to_str(tx->mxc_msg_type)); - if (conn->mxk_status == MXLND_CONN_DISCONNECT) { + if (conn->mxk_status == MXLND_CONN_DISCONNECT || + time_after_eq(jiffies, tx->mxc_deadline)) { list_del_init(&tx->mxc_list); - tx->mxc_status.code = -ECONNABORTED; + tx->mxc_errno = -ECONNABORTED; + spin_unlock(&conn->mxk_lock); mxlnd_put_idle_tx(tx); mxlnd_conn_decref(conn); + goto done; } goto done_locked; } @@ -2577,14 +2893,12 @@ mxlnd_check_sends(struct kmx_peer *peer) conn->mxk_incarnation != 0) { tx->mxc_incarnation = conn->mxk_incarnation; } - spin_unlock(&conn->mxk_lock); - /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER - * or (2) there is a non-DATA msg that can return credits in the + /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER + * or (2) there is a non-DATA msg that can return credits in the * queue, then drop this duplicate NOOP */ if (unlikely(msg_type == MXLND_MSG_NOOP)) { - spin_lock(&conn->mxk_lock); - if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) || + if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) || (conn->mxk_ntx_msgs >= 1)) { conn->mxk_credits++; conn->mxk_ntx_posted--; @@ -2597,19 +2911,16 @@ mxlnd_check_sends(struct kmx_peer *peer) found = 1; goto done; } - spin_unlock(&conn->mxk_lock); } found = 1; if (likely((msg_type != MXLND_MSG_PUT_DATA) && (msg_type != MXLND_MSG_GET_DATA))) { - mxlnd_pack_msg(tx); + mxlnd_pack_msg_locked(tx); } - //ret = -ECONNABORTED; mxret = MX_SUCCESS; - spin_lock(&conn->mxk_lock); status = conn->mxk_status; spin_unlock(&conn->mxk_lock); @@ -2621,9 +2932,6 @@ mxlnd_check_sends(struct kmx_peer *peer) msg_type != MXLND_MSG_CONN_ACK) { /* add to the pending list */ ret = mxlnd_q_pending_ctx(tx); - if (ret == -1) { - /* FIXME the conn is disconnected, now what? */ - } } else { /* CONN_REQ/ACK */ tx->mxc_state = MXLND_CTX_PENDING; @@ -2665,6 +2973,7 @@ mxlnd_check_sends(struct kmx_peer *peer) &tx->mxc_mxreq); } } else { + /* ret != 0 */ mxret = MX_CONNECTION_FAILED; } if (likely(mxret == MX_SUCCESS)) { @@ -2673,12 +2982,12 @@ mxlnd_check_sends(struct kmx_peer *peer) CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) " "sending to %s\n", mx_strerror(mxret), (int) mxret, libcfs_nid2str(peer->mxp_nid)); - /* NOTE mx_kisend() only fails if there are not enough + /* NOTE mx_kisend() only fails if there are not enough * resources. Do not change the connection status. */ if (mxret == MX_NO_RESOURCES) { - tx->mxc_status.code = -ENOMEM; + tx->mxc_errno = -ENOMEM; } else { - tx->mxc_status.code = -ECONNABORTED; + tx->mxc_errno = -ECONNABORTED; } if (credit) { spin_lock(&conn->mxk_lock); @@ -2719,20 +3028,21 @@ done: /** - * A tx completed, progress or complete the msg - * \param tx the tx descriptor - * + * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg + * @ctx - the tx descriptor + * * Determine which type of send request it was and start the next step, if needed, * or, if done, signal completion to LNET. After we are done, put back on the * idle tx list. */ void -mxlnd_handle_tx_completion(struct kmx_ctx *tx) +mxlnd_handle_tx_completion(kmx_ctx_t *tx) { - int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS); - struct kmx_msg *msg = tx->mxc_msg; - struct kmx_peer *peer = tx->mxc_peer; - struct kmx_conn *conn = tx->mxc_conn; + int code = tx->mxc_status.code; + int failed = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0); + kmx_msg_t *msg = tx->mxc_msg; + kmx_peer_t *peer = tx->mxc_peer; + kmx_conn_t *conn = tx->mxc_conn; u8 type = tx->mxc_msg_type; int credit = mxlnd_tx_requires_credit(tx); u64 cookie = tx->mxc_cookie; @@ -2740,15 +3050,6 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) CDEBUG(D_NET, "entering %s (0x%llx):\n", mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie); - if (unlikely(conn == NULL)) { - mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer); - conn = peer->mxp_conn; - if (conn != NULL) { - /* do not add a ref for the tx, it was set before sending */ - tx->mxc_conn = conn; - tx->mxc_peer = conn->mxk_peer; - } - } LASSERT (peer != NULL); LASSERT (conn != NULL); @@ -2757,10 +3058,10 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) } if (failed) { - tx->mxc_status.code = -EIO; + if (tx->mxc_errno == 0) tx->mxc_errno = -EIO; } else { spin_lock(&conn->mxk_lock); - conn->mxk_last_tx = jiffies; + conn->mxk_last_tx = cfs_time_current(); /* jiffies */ spin_unlock(&conn->mxk_lock); } @@ -2788,7 +3089,6 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) case MXLND_MSG_PUT_ACK: case MXLND_MSG_GET_REQ: case MXLND_MSG_EAGER: - //case MXLND_MSG_NAK: break; case MXLND_MSG_CONN_ACK: @@ -2798,15 +3098,17 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) } case MXLND_MSG_CONN_REQ: if (failed) { - CDEBUG(D_NETERROR, "handle_tx_completion(): %s " - "failed with %s (%d) to %s\n", + CDEBUG(D_NETERROR, "%s failed with %s (%d) (errno = %d)" + " to %s\n", type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK", - mx_strstatus(tx->mxc_status.code), - tx->mxc_status.code, + mx_strstatus(code), code, tx->mxc_errno, libcfs_nid2str(tx->mxc_nid)); if (!peer->mxp_incompatible) { spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; + if (code == MX_STATUS_BAD_SESSION) + mxlnd_set_conn_status(conn, MXLND_CONN_INIT); + else + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); spin_unlock(&conn->mxk_lock); } } @@ -2825,40 +3127,37 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) spin_unlock(&conn->mxk_lock); } - CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n"); mxlnd_put_idle_tx(tx); mxlnd_conn_decref(conn); mxlnd_check_sends(peer); + CDEBUG(D_NET, "leaving\n"); return; } +/* Handle completion of MSG or DATA rx. + * CONN_REQ and CONN_ACK are handled elsewhere. */ void -mxlnd_handle_rx_completion(struct kmx_ctx *rx) +mxlnd_handle_rx_completion(kmx_ctx_t *rx) { - int ret = 0; - int repost = 1; - int credit = 1; - u32 nob = rx->mxc_status.xfer_length; - u64 bits = rx->mxc_status.match_info; - struct kmx_msg *msg = rx->mxc_msg; - struct kmx_peer *peer = rx->mxc_peer; - struct kmx_conn *conn = rx->mxc_conn; - u8 type = rx->mxc_msg_type; - u64 seq = 0ULL; - lnet_msg_t *lntmsg[2]; - int result = 0; - u64 nic_id = 0ULL; - u32 ep_id = 0; - u32 sid = 0; - int peer_ref = 0; - int conn_ref = 0; - int incompatible = 0; - u64 match = 0ULL; - - /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, - * failed GET reply, CONN_REQ, or a CONN_ACK */ + int ret = 0; + int repost = 1; + int credit = 1; + u32 nob = rx->mxc_status.xfer_length; + u64 bits = rx->mxc_status.match_info; + kmx_msg_t *msg = rx->mxc_msg; + kmx_peer_t *peer = rx->mxc_peer; + kmx_conn_t *conn = rx->mxc_conn; + u8 type = rx->mxc_msg_type; + u64 seq = bits; + lnet_msg_t *lntmsg[2]; + int result = 0; + int peer_ref = 0; + int conn_ref = 0; + + /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, + * failed GET reply */ /* NOTE peer may still be NULL if it is a new peer and * conn may be NULL if this is a re-connect */ @@ -2870,18 +3169,11 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) peer_ref = 1; } else if (peer == NULL && conn != NULL) { /* fatal error */ - CDEBUG(D_NETERROR, "rx has conn but no peer\n"); + CERROR("rx 0x%llx from %s has conn but no peer\n", + bits, libcfs_nid2str(rx->mxc_nid)); LBUG(); } /* else peer and conn == NULL */ -#if 0 - if (peer == NULL || conn == NULL) { - /* if the peer was disconnected, the peer may exist but - * not have any valid conns */ - decref = 0; /* no peer means no ref was taken for this rx */ - } -#endif - if (conn == NULL && peer != NULL) { write_lock(&kmxlnd_data.kmx_global_lock); conn = peer->mxp_conn; @@ -2902,11 +3194,12 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) lntmsg[0] = NULL; lntmsg[1] = NULL; - if (rx->mxc_status.code != MX_STATUS_SUCCESS) { + if (rx->mxc_status.code != MX_STATUS_SUCCESS && + rx->mxc_status.code != MX_STATUS_TRUNCATED) { CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n", libcfs_nid2str(rx->mxc_nid), mx_strstatus(rx->mxc_status.code), - (int) rx->mxc_status.code); + rx->mxc_status.code); credit = 0; goto cleanup; } @@ -2929,11 +3222,11 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */ if (type == MXLND_MSG_PUT_DATA) { - result = rx->mxc_status.code; + /* result = 0; */ lntmsg[0] = rx->mxc_lntmsg[0]; goto cleanup; } else if (type == MXLND_MSG_GET_DATA) { - result = rx->mxc_status.code; + /* result = 0; */ lntmsg[0] = rx->mxc_lntmsg[0]; lntmsg[1] = rx->mxc_lntmsg[1]; goto cleanup; @@ -2947,11 +3240,9 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) } rx->mxc_nob = nob; type = msg->mxm_type; - seq = msg->mxm_seq; - if (type != MXLND_MSG_CONN_REQ && - (rx->mxc_nid != msg->mxm_srcnid || - kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) { + if (rx->mxc_nid != msg->mxm_srcnid || + kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) { CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is " "0x%llx and rx msg dst is 0x%llx)\n", mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid, @@ -2959,48 +3250,35 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) goto cleanup; } - if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) { - if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) || - msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) { - if (conn != NULL) { - CDEBUG(D_NETERROR, "Stale rx from %s with type %s " - "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) " - "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", - libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type), - msg->mxm_srcstamp, conn->mxk_incarnation, - msg->mxm_dststamp, kmxlnd_data.kmx_incarnation); - } else { - CDEBUG(D_NETERROR, "Stale rx from %s with type %s " - "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", - libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type), - msg->mxm_dststamp, kmxlnd_data.kmx_incarnation); - } - credit = 0; - goto cleanup; - } + if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) || + msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) { + CDEBUG(D_NETERROR, "Stale rx from %s with type %s " + "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) " + "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", + libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type), + msg->mxm_srcstamp, conn->mxk_incarnation, + msg->mxm_dststamp, kmxlnd_data.kmx_incarnation); + credit = 0; + goto cleanup; } CDEBUG(D_NET, "Received %s with %d credits\n", mxlnd_msgtype_to_str(type), msg->mxm_credits); - if (msg->mxm_type != MXLND_MSG_CONN_REQ && - msg->mxm_type != MXLND_MSG_CONN_ACK) { - LASSERT(peer != NULL); - LASSERT(conn != NULL); - if (msg->mxm_credits != 0) { - spin_lock(&conn->mxk_lock); - if (msg->mxm_srcstamp == conn->mxk_incarnation) { - if ((conn->mxk_credits + msg->mxm_credits) > - *kmxlnd_tunables.kmx_credits) { - CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n", - conn->mxk_credits, msg->mxm_credits); - } - conn->mxk_credits += msg->mxm_credits; - LASSERT(conn->mxk_credits >= 0); - LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits); + LASSERT(peer != NULL && conn != NULL); + if (msg->mxm_credits != 0) { + spin_lock(&conn->mxk_lock); + if (msg->mxm_srcstamp == conn->mxk_incarnation) { + if ((conn->mxk_credits + msg->mxm_credits) > + *kmxlnd_tunables.kmx_peercredits) { + CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n", + conn->mxk_credits, msg->mxm_credits); } - spin_unlock(&conn->mxk_lock); + conn->mxk_credits += msg->mxm_credits; + LASSERT(conn->mxk_credits >= 0); + LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits); } + spin_unlock(&conn->mxk_lock); } CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq); @@ -3041,161 +3319,6 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) repost = ret < 0; break; - case MXLND_MSG_CONN_REQ: - if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) { - CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n", - libcfs_nid2str(msg->mxm_srcnid), - libcfs_nid2str(msg->mxm_dstnid)); - goto cleanup; - } - if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) { - CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth " - "%d (%d wanted)\n", - libcfs_nid2str(msg->mxm_srcnid), - msg->mxm_u.conn_req.mxcrm_queue_depth, - *kmxlnd_tunables.kmx_credits); - incompatible = 1; - } - if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) { - CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size " - "%d (%d wanted)\n", - libcfs_nid2str(msg->mxm_srcnid), - msg->mxm_u.conn_req.mxcrm_eager_size, - (int) MXLND_EAGER_SIZE); - incompatible = 1; - } - mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid); - if (peer == NULL) { - peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */ - if (peer == NULL) { - int hash = 0; - struct kmx_peer *existing_peer = NULL; - hash = mxlnd_nid_to_hash(msg->mxm_srcnid); - - rx->mxc_nid = msg->mxm_srcnid; - - /* adds conn ref for peer and one for this function */ - ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid, - *kmxlnd_tunables.kmx_board, - *kmxlnd_tunables.kmx_ep_id, 0ULL); - if (ret != 0) { - goto cleanup; - } - peer->mxp_sid = sid; - LASSERT(peer->mxp_ep_id == ep_id); - write_lock(&kmxlnd_data.kmx_global_lock); - existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid); - if (existing_peer) { - mxlnd_conn_decref(peer->mxp_conn); - mxlnd_peer_decref(peer); - peer = existing_peer; - mxlnd_conn_addref(peer->mxp_conn); - } else { - list_add_tail(&peer->mxp_peers, - &kmxlnd_data.kmx_peers[hash]); - atomic_inc(&kmxlnd_data.kmx_npeers); - } - write_unlock(&kmxlnd_data.kmx_global_lock); - } else { - /* FIXME should write lock here */ - ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */ - mxlnd_peer_decref(peer); /* drop ref taken above */ - if (ret != 0) { - CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n"); - goto cleanup; - } - } - conn_ref = 1; /* peer/conn_alloc() added ref for this function */ - conn = peer->mxp_conn; - } else { /* found peer */ - struct kmx_conn *old_conn = conn; - - if (sid != peer->mxp_sid) { - /* do not call mx_disconnect() or send a BYE */ - mxlnd_conn_disconnect(old_conn, 0, 0); - - /* the ref for this rx was taken on the old_conn */ - mxlnd_conn_decref(old_conn); - - /* This allocs a conn, points peer->mxp_conn to this one. - * The old conn is still on the peer->mxp_conns list. - * As the pending requests complete, they will call - * conn_decref() which will eventually free it. */ - ret = mxlnd_conn_alloc(&conn, peer); - if (ret != 0) { - CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n"); - goto cleanup; - } - /* conn_alloc() adds one ref for the peer and one - * for this function */ - conn_ref = 1; - - peer->mxp_sid = sid; - } - } - write_lock(&kmxlnd_data.kmx_global_lock); - peer->mxp_incarnation = msg->mxm_srcstamp; - peer->mxp_incompatible = incompatible; - write_unlock(&kmxlnd_data.kmx_global_lock); - spin_lock(&conn->mxk_lock); - conn->mxk_incarnation = msg->mxm_srcstamp; - conn->mxk_status = MXLND_CONN_WAIT; - spin_unlock(&conn->mxk_lock); - - /* handle_conn_ack() will create the CONN_ACK msg */ - match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET; - mxlnd_iconnect(peer, match); - - break; - - case MXLND_MSG_CONN_ACK: - if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) { - CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " - "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid), - libcfs_nid2str(msg->mxm_dstnid)); - ret = -1; - goto failed; - } - if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) { - CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " - "incompatible queue depth %d (%d wanted)\n", - libcfs_nid2str(msg->mxm_srcnid), - msg->mxm_u.conn_req.mxcrm_queue_depth, - *kmxlnd_tunables.kmx_credits); - spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; - spin_unlock(&conn->mxk_lock); - incompatible = 1; - ret = -1; - } - if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) { - CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " - "incompatible EAGER size %d (%d wanted)\n", - libcfs_nid2str(msg->mxm_srcnid), - msg->mxm_u.conn_req.mxcrm_eager_size, - (int) MXLND_EAGER_SIZE); - spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; - spin_unlock(&conn->mxk_lock); - incompatible = 1; - ret = -1; - } - write_lock(&kmxlnd_data.kmx_global_lock); - peer->mxp_incarnation = msg->mxm_srcstamp; - peer->mxp_incompatible = incompatible; - write_unlock(&kmxlnd_data.kmx_global_lock); - spin_lock(&conn->mxk_lock); - conn->mxk_credits = *kmxlnd_tunables.kmx_credits; - conn->mxk_outstanding = 0; - conn->mxk_incarnation = msg->mxm_srcstamp; - conn->mxk_timeout = 0; - if (!incompatible) { - conn->mxk_status = MXLND_CONN_READY; - } - spin_unlock(&conn->mxk_lock); - if (incompatible) mxlnd_conn_disconnect(conn, 0, 1); - break; - default: CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type, libcfs_nid2str(rx->mxc_nid)); @@ -3203,11 +3326,10 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) break; } -failed: if (ret < 0) { CDEBUG(D_NET, "setting PEER_CONN_FAILED\n"); spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); spin_unlock(&conn->mxk_lock); } @@ -3222,14 +3344,10 @@ cleanup: /* lnet_parse() failed, etc., repost now */ mxlnd_put_idle_rx(rx); if (conn != NULL && credit == 1) { - if (type == MXLND_MSG_PUT_DATA) { - spin_lock(&conn->mxk_lock); - conn->mxk_outstanding++; - spin_unlock(&conn->mxk_lock); - } else if (type != MXLND_MSG_GET_DATA && - (type == MXLND_MSG_EAGER || - type == MXLND_MSG_PUT_REQ || - type == MXLND_MSG_NOOP)) { + if (type == MXLND_MSG_PUT_DATA || + type == MXLND_MSG_EAGER || + type == MXLND_MSG_PUT_REQ || + type == MXLND_MSG_NOOP) { spin_lock(&conn->mxk_lock); conn->mxk_outstanding++; spin_unlock(&conn->mxk_lock); @@ -3253,143 +3371,71 @@ cleanup: return; } - - -void -mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status) -{ - struct kmx_ctx *tx = NULL; - struct kmx_msg *txmsg = NULL; - struct kmx_conn *conn = peer->mxp_conn; - - /* a conn ref was taken when calling mx_iconnect(), - * hold it until CONN_REQ or CONN_ACK completes */ - - CDEBUG(D_NET, "entering\n"); - if (status.code != MX_STATUS_SUCCESS) { - CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n", - mx_strstatus(status.code), status.code, - libcfs_nid2str(peer->mxp_nid)); - spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; - spin_unlock(&conn->mxk_lock); - - if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) { - struct kmx_conn *new_conn = NULL; - CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n"); - /* FIXME write lock here ? */ - mxlnd_conn_disconnect(conn, 0, 0); - mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */ - mxlnd_conn_decref(new_conn); /* which we no longer need */ - peer->mxp_reconnect_time = 0; - } - - mxlnd_conn_decref(conn); - return; - } - - spin_lock(&conn->mxk_lock); - conn->mxk_epa = status.source; - spin_unlock(&conn->mxk_lock); - /* NOTE we are holding a ref on the conn which has a ref on the peer, - * we should not need to lock the peer */ - mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer); - - /* mx_iconnect() succeeded, reset delay to 0 */ - write_lock(&kmxlnd_data.kmx_global_lock); - peer->mxp_reconnect_time = 0; - write_unlock(&kmxlnd_data.kmx_global_lock); - - /* marshal CONN_REQ msg */ - /* we are still using the conn ref from iconnect() - do not take another */ - tx = mxlnd_get_idle_tx(); - if (tx == NULL) { - CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n", - libcfs_nid2str(peer->mxp_nid)); - spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; - spin_unlock(&conn->mxk_lock); - mxlnd_conn_decref(conn); - return; - } - - tx->mxc_peer = peer; - tx->mxc_conn = conn; - mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid); - txmsg = tx->mxc_msg; - txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits; - txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE; - tx->mxc_match = mxlnd_create_match(tx, 0); - - CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n"); - mxlnd_queue_tx(tx); - return; -} - void -mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) +mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status) { - struct kmx_ctx *tx = NULL; - struct kmx_msg *txmsg = NULL; - struct kmx_conn *conn = peer->mxp_conn; + kmx_ctx_t *tx = NULL; + kmx_msg_t *txmsg = NULL; + kmx_conn_t *conn = peer->mxp_conn; u64 nic_id = 0ULL; u32 ep_id = 0; u32 sid = 0; + u8 type = (msg_type == MXLND_MSG_ICON_REQ ? + MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK); - /* a conn ref was taken when calling mx_iconnect(), + /* a conn ref was taken when calling mx_iconnect(), * hold it until CONN_REQ or CONN_ACK completes */ CDEBUG(D_NET, "entering\n"); if (status.code != MX_STATUS_SUCCESS) { - CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) " + int send_bye = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1); + + CDEBUG(D_NETERROR, "mx_iconnect() failed for %s with %s (%d) " "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n", + mxlnd_msgtype_to_str(msg_type), mx_strstatus(status.code), status.code, libcfs_nid2str(peer->mxp_nid), peer->mxp_nid, peer->mxp_nic_id, peer->mxp_ep_id); spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); spin_unlock(&conn->mxk_lock); - if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) { - struct kmx_conn *new_conn = NULL; + if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT)) { CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n"); - /* FIXME write lock here? */ - mxlnd_conn_disconnect(conn, 0, 1); - mxlnd_conn_alloc(&new_conn, peer); /* adds ref for - this function... */ - mxlnd_conn_decref(new_conn); /* which we no longer need */ - peer->mxp_reconnect_time = 0; + mxlnd_conn_disconnect(conn, 0, send_bye); } mxlnd_conn_decref(conn); return; } mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid); + write_lock(&kmxlnd_data.kmx_global_lock); spin_lock(&conn->mxk_lock); conn->mxk_epa = status.source; - if (likely(!peer->mxp_incompatible)) { - conn->mxk_status = MXLND_CONN_READY; + mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn); + if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) { + mxlnd_set_conn_status(conn, MXLND_CONN_READY); } spin_unlock(&conn->mxk_lock); - /* NOTE we are holding a ref on the conn which has a ref on the peer, - * we should not have to lock the peer */ - mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer); + write_unlock(&kmxlnd_data.kmx_global_lock); /* mx_iconnect() succeeded, reset delay to 0 */ write_lock(&kmxlnd_data.kmx_global_lock); peer->mxp_reconnect_time = 0; - peer->mxp_sid = sid; + peer->mxp_conn->mxk_sid = sid; write_unlock(&kmxlnd_data.kmx_global_lock); - /* marshal CONN_ACK msg */ + /* marshal CONN_REQ or CONN_ACK msg */ + /* we are still using the conn ref from iconnect() - do not take another */ tx = mxlnd_get_idle_tx(); if (tx == NULL) { - CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n", - libcfs_nid2str(peer->mxp_nid)); + CDEBUG(D_NETERROR, "Can't obtain %s tx for %s\n", + mxlnd_msgtype_to_str(type), + libcfs_nid2str(peer->mxp_nid)); spin_lock(&conn->mxk_lock); - conn->mxk_status = MXLND_CONN_FAIL; + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); spin_unlock(&conn->mxk_lock); mxlnd_conn_decref(conn); return; @@ -3397,11 +3443,12 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) tx->mxc_peer = peer; tx->mxc_conn = conn; - CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n"); - mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid); + tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT; + CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type)); + mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid); txmsg = tx->mxc_msg; - txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits; - txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE; + txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits; + txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE; tx->mxc_match = mxlnd_create_match(tx, 0); mxlnd_queue_tx(tx); @@ -3409,8 +3456,8 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) } /** - * The MX request completion thread(s) - * \param arg thread id (as a void *) + * mxlnd_request_waitd - the MX request completion thread(s) + * @arg - thread id (as a void *) * * This thread waits for a MX completion and then completes the request. * We will create one thread per CPU. @@ -3423,10 +3470,10 @@ mxlnd_request_waitd(void *arg) __u32 result = 0; mx_return_t mxret = MX_SUCCESS; mx_status_t status; - struct kmx_ctx *ctx = NULL; + kmx_ctx_t *ctx = NULL; enum kmx_req_state req_type = MXLND_REQ_TX; - struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; + kmx_peer_t *peer = NULL; + kmx_conn_t *conn = NULL; #if MXLND_POLLING int count = 0; #endif @@ -3434,13 +3481,12 @@ mxlnd_request_waitd(void *arg) memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id); cfs_daemonize(name); - //cfs_block_allsigs(); memset(&status, 0, sizeof(status)); CDEBUG(D_NET, "%s starting\n", name); - while (!kmxlnd_data.kmx_shutdown) { + while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) { u8 msg_type = 0; mxret = MX_SUCCESS; @@ -3458,7 +3504,7 @@ mxlnd_request_waitd(void *arg) mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, 0ULL, 0ULL, &status, &result); #endif - if (unlikely(kmxlnd_data.kmx_shutdown)) + if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown))) break; if (result != 1) { @@ -3466,6 +3512,11 @@ mxlnd_request_waitd(void *arg) continue; } + CDEBUG(D_NET, "wait_any() returned with %s (%d) with " + "match_info 0x%llx and length %d\n", + mx_strstatus(status.code), status.code, + (u64) status.match_info, status.msg_length); + if (status.code != MX_STATUS_SUCCESS) { CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with " "match_info 0x%llx and length %d\n", @@ -3479,12 +3530,8 @@ mxlnd_request_waitd(void *arg) * check the bit mask for CONN_REQ and CONN_ACK */ if (msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK) { - peer = (struct kmx_peer*) status.context; - if (msg_type == MXLND_MSG_ICON_REQ) { - mxlnd_handle_conn_req(peer, status); - } else { - mxlnd_handle_conn_ack(peer, status); - } + peer = (kmx_peer_t*) status.context; + mxlnd_handle_connect_msg(peer, msg_type, status); continue; } @@ -3493,7 +3540,7 @@ mxlnd_request_waitd(void *arg) /* NOTE: if this is a RX from the unexpected callback, it may * have very little info. If we dropped it in unexpected_recv(), * it will not have a context. If so, ignore it. */ - ctx = (struct kmx_ctx *) status.context; + ctx = (kmx_ctx_t *) status.context; if (ctx != NULL) { req_type = ctx->mxc_type; @@ -3501,7 +3548,7 @@ mxlnd_request_waitd(void *arg) mxlnd_deq_pending_ctx(ctx); /* copy status to ctx->mxc_status */ - memcpy(&ctx->mxc_status, &status, sizeof(status)); + ctx->mxc_status = status; switch (req_type) { case MXLND_REQ_TX: @@ -3516,12 +3563,9 @@ mxlnd_request_waitd(void *arg) break; } - /* FIXME may need to reconsider this */ /* conn is always set except for the first CONN_REQ rx * from a new peer */ - if (!(status.code == MX_STATUS_SUCCESS || - status.code == MX_STATUS_TRUNCATED) && - conn != NULL) { + if (status.code != MX_STATUS_SUCCESS && conn != NULL) { mxlnd_conn_disconnect(conn, 1, 1); } } @@ -3536,18 +3580,19 @@ mxlnd_request_waitd(void *arg) unsigned long mxlnd_check_timeouts(unsigned long now) { - int i = 0; - int disconnect = 0; - unsigned long next = 0; - struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; - - read_lock(&kmxlnd_data.kmx_global_lock); + int i = 0; + int disconnect = 0; + unsigned long next = 0; /* jiffies */ + kmx_peer_t *peer = NULL; + kmx_conn_t *conn = NULL; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; + + read_lock(g_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { + list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) { - if (unlikely(kmxlnd_data.kmx_shutdown)) { - read_unlock(&kmxlnd_data.kmx_global_lock); + if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown))) { + read_unlock(g_lock); return next; } @@ -3558,7 +3603,6 @@ mxlnd_check_timeouts(unsigned long now) continue; } - /* FIXMEis this needed? */ spin_lock(&conn->mxk_lock); /* if nothing pending (timeout == 0) or @@ -3566,7 +3610,6 @@ mxlnd_check_timeouts(unsigned long now) * skip this conn */ if (conn->mxk_timeout == 0 || conn->mxk_status == MXLND_CONN_DISCONNECT) { - /* FIXME is this needed? */ spin_unlock(&conn->mxk_lock); mxlnd_conn_decref(conn); continue; @@ -3576,7 +3619,7 @@ mxlnd_check_timeouts(unsigned long now) * if it is in the future, we will sleep until then. * if it is in the past, then we will sleep one * second and repeat the process. */ - if ((next == 0) || (conn->mxk_timeout < next)) { + if ((next == 0) || (time_before(conn->mxk_timeout, next))) { next = conn->mxk_timeout; } @@ -3593,15 +3636,370 @@ mxlnd_check_timeouts(unsigned long now) mxlnd_conn_decref(conn); } } - read_unlock(&kmxlnd_data.kmx_global_lock); + read_unlock(g_lock); if (next == 0) next = now + MXLND_COMM_TIMEOUT; return next; } +void +mxlnd_passive_connect(kmx_connparams_t *cp) +{ + int ret = 0; + int incompatible = 0; + u64 nic_id = 0ULL; + u32 ep_id = 0; + u32 sid = 0; + int conn_ref = 0; + kmx_msg_t *msg = &cp->mxr_msg; + kmx_peer_t *peer = cp->mxr_peer; + kmx_conn_t *conn = NULL; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; + + mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid); + + ret = mxlnd_unpack_msg(msg, cp->mxr_nob); + if (ret != 0) { + if (peer) { + CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from %s\n", + ret, libcfs_nid2str(peer->mxp_nid)); + } else { + CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from " + "unknown host with nic_id 0x%llx\n", ret, nic_id); + } + goto cleanup; + } + if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) { + CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n", + libcfs_nid2str(msg->mxm_srcnid), + libcfs_nid2str(msg->mxm_dstnid)); + goto cleanup; + } + if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) { + CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth " + "%d (%d wanted)\n", + libcfs_nid2str(msg->mxm_srcnid), + msg->mxm_u.conn_req.mxcrm_queue_depth, + *kmxlnd_tunables.kmx_peercredits); + incompatible = 1; + } + if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) { + CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size " + "%d (%d wanted)\n", + libcfs_nid2str(msg->mxm_srcnid), + msg->mxm_u.conn_req.mxcrm_eager_size, + (int) MXLND_MSG_SIZE); + incompatible = 1; + } + + if (peer == NULL) { + peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */ + if (peer == NULL) { + int hash = 0; + u32 board = 0; + kmx_peer_t *existing_peer = NULL; + + hash = mxlnd_nid_to_hash(msg->mxm_srcnid); + + mx_nic_id_to_board_number(nic_id, &board); + + /* adds conn ref for peer and one for this function */ + ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid, + board, ep_id, 0ULL); + if (ret != 0) { + goto cleanup; + } + peer->mxp_conn->mxk_sid = sid; + LASSERT(peer->mxp_ep_id == ep_id); + write_lock(g_lock); + existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid); + if (existing_peer) { + mxlnd_conn_decref(peer->mxp_conn); + mxlnd_peer_decref(peer); + peer = existing_peer; + mxlnd_conn_addref(peer->mxp_conn); + conn = peer->mxp_conn; + } else { + list_add_tail(&peer->mxp_list, + &kmxlnd_data.kmx_peers[hash]); + atomic_inc(&kmxlnd_data.kmx_npeers); + } + write_unlock(g_lock); + } else { + ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */ + write_lock(g_lock); + mxlnd_peer_decref(peer); /* drop ref taken above */ + write_unlock(g_lock); + if (ret != 0) { + CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n"); + goto cleanup; + } + } + conn_ref = 1; /* peer/conn_alloc() added ref for this function */ + conn = peer->mxp_conn; + } else { /* unexpected handler found peer */ + kmx_conn_t *old_conn = peer->mxp_conn; + + if (sid != peer->mxp_conn->mxk_sid) { + /* do not call mx_disconnect() or send a BYE */ + mxlnd_conn_disconnect(old_conn, 0, 0); + + /* This allocs a conn, points peer->mxp_conn to this one. + * The old conn is still on the peer->mxp_conns list. + * As the pending requests complete, they will call + * conn_decref() which will eventually free it. */ + ret = mxlnd_conn_alloc(&conn, peer); + if (ret != 0) { + CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n"); + goto cleanup; + } + /* conn_alloc() adds one ref for the peer and one + * for this function */ + conn_ref = 1; + + peer->mxp_conn->mxk_sid = sid; + } else { + /* same sid */ + conn = peer->mxp_conn; + } + } + write_lock(g_lock); + peer->mxp_incompatible = incompatible; + write_unlock(g_lock); + spin_lock(&conn->mxk_lock); + conn->mxk_incarnation = msg->mxm_srcstamp; + mxlnd_set_conn_status(conn, MXLND_CONN_WAIT); + spin_unlock(&conn->mxk_lock); + + /* handle_conn_ack() will create the CONN_ACK msg */ + mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK); + +cleanup: + if (conn_ref) mxlnd_conn_decref(conn); + + mxlnd_connparams_free(cp); + return; +} + +void +mxlnd_check_conn_ack(kmx_connparams_t *cp) +{ + int ret = 0; + int incompatible = 0; + u64 nic_id = 0ULL; + u32 ep_id = 0; + u32 sid = 0; + kmx_msg_t *msg = &cp->mxr_msg; + kmx_peer_t *peer = cp->mxr_peer; + kmx_conn_t *conn = cp->mxr_conn; + + mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid); + + ret = mxlnd_unpack_msg(msg, cp->mxr_nob); + if (ret != 0) { + if (peer) { + CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from %s\n", + ret, libcfs_nid2str(peer->mxp_nid)); + } else { + CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from " + "unknown host with nic_id 0x%llx\n", ret, nic_id); + } + ret = -1; + incompatible = 1; + goto failed; + } + if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) { + CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " + "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid), + libcfs_nid2str(msg->mxm_dstnid)); + ret = -1; + goto failed; + } + if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) { + CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " + "incompatible queue depth %d (%d wanted)\n", + libcfs_nid2str(msg->mxm_srcnid), + msg->mxm_u.conn_req.mxcrm_queue_depth, + *kmxlnd_tunables.kmx_peercredits); + incompatible = 1; + ret = -1; + goto failed; + } + if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) { + CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: " + "incompatible EAGER size %d (%d wanted)\n", + libcfs_nid2str(msg->mxm_srcnid), + msg->mxm_u.conn_req.mxcrm_eager_size, + (int) MXLND_MSG_SIZE); + incompatible = 1; + ret = -1; + goto failed; + } + write_lock(&kmxlnd_data.kmx_global_lock); + peer->mxp_incompatible = incompatible; + write_unlock(&kmxlnd_data.kmx_global_lock); + spin_lock(&conn->mxk_lock); + conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits; + conn->mxk_outstanding = 0; + conn->mxk_incarnation = msg->mxm_srcstamp; + conn->mxk_timeout = 0; + if (!incompatible) { + CDEBUG(D_NET, "setting peer %s CONN_READY\n", + libcfs_nid2str(msg->mxm_srcnid)); + mxlnd_set_conn_status(conn, MXLND_CONN_READY); + } + spin_unlock(&conn->mxk_lock); + + if (!incompatible) + mxlnd_check_sends(peer); + +failed: + if (ret < 0) { + spin_lock(&conn->mxk_lock); + mxlnd_set_conn_status(conn, MXLND_CONN_FAIL); + spin_unlock(&conn->mxk_lock); + } + + if (incompatible) mxlnd_conn_disconnect(conn, 0, 0); + + mxlnd_connparams_free(cp); + return; +} + +int +mxlnd_abort_msgs(void) +{ + int count = 0; + struct list_head *orphans = &kmxlnd_data.kmx_orphan_msgs; + spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock; + + /* abort orphans */ + spin_lock(g_conn_lock); + while (!list_empty(orphans)) { + kmx_ctx_t *ctx = NULL; + kmx_conn_t *conn = NULL; + + ctx = list_entry(orphans->next, kmx_ctx_t, mxc_list); + list_del_init(&ctx->mxc_list); + spin_unlock(g_conn_lock); + + ctx->mxc_errno = -ECONNABORTED; + conn = ctx->mxc_conn; + CDEBUG(D_NET, "aborting %s %s %s\n", + mxlnd_msgtype_to_str(ctx->mxc_msg_type), + ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from", + libcfs_nid2str(ctx->mxc_nid)); + if (ctx->mxc_type == MXLND_REQ_TX) { + mxlnd_put_idle_tx(ctx); /* do not hold any locks */ + if (conn) mxlnd_conn_decref(conn); /* for this tx */ + } else { + ctx->mxc_state = MXLND_CTX_CANCELED; + mxlnd_handle_rx_completion(ctx); + } + + count++; + spin_lock(g_conn_lock); + } + spin_unlock(g_conn_lock); + + return count; +} + +int +mxlnd_free_conn_zombies(void) +{ + int count = 0; + struct list_head *zombies = &kmxlnd_data.kmx_conn_zombies; + spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; + + /* cleanup any zombies */ + spin_lock(g_conn_lock); + while (!list_empty(zombies)) { + kmx_conn_t *conn = NULL; + + conn = list_entry(zombies->next, kmx_conn_t, mxk_zombie); + list_del_init(&conn->mxk_zombie); + spin_unlock(g_conn_lock); + + write_lock(g_lock); + mxlnd_conn_free_locked(conn); + write_unlock(g_lock); + + count++; + spin_lock(g_conn_lock); + } + spin_unlock(g_conn_lock); + CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count); + return count; +} + +/** + * mxlnd_connd - handles incoming connection requests + * @arg - thread id (as a void *) + * + * This thread handles incoming connection requests + */ +int +mxlnd_connd(void *arg) +{ + long id = (long) arg; + + cfs_daemonize("mxlnd_connd"); + + CDEBUG(D_NET, "connd starting\n"); + + while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) { + int ret = 0; + kmx_connparams_t *cp = NULL; + spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock; + struct list_head *conn_reqs = &kmxlnd_data.kmx_conn_reqs; + + ret = down_interruptible(&kmxlnd_data.kmx_conn_sem); + + if (atomic_read(&kmxlnd_data.kmx_shutdown)) + break; + + if (ret != 0) + continue; + + ret = mxlnd_abort_msgs(); + ret += mxlnd_free_conn_zombies(); + + spin_lock(g_conn_lock); + if (list_empty(conn_reqs)) { + if (ret == 0) + CDEBUG(D_NETERROR, "connd woke up but did not " + "find a kmx_connparams_t or zombie conn\n"); + spin_unlock(g_conn_lock); + continue; + } + cp = list_entry(conn_reqs->next, kmx_connparams_t, mxr_list); + list_del_init(&cp->mxr_list); + spin_unlock(g_conn_lock); + + switch (MXLND_MSG_TYPE(cp->mxr_match)) { + case MXLND_MSG_CONN_REQ: + /* We have a connection request. Handle it. */ + mxlnd_passive_connect(cp); + break; + case MXLND_MSG_CONN_ACK: + /* The peer is ready for messages */ + mxlnd_check_conn_ack(cp); + break; + } + } + + mxlnd_free_conn_zombies(); + + CDEBUG(D_NET, "connd stopping\n"); + mxlnd_thread_stop(id); + return 0; +} + /** - * Enforces timeouts on messages - * \param arg thread id (as a void *) + * mxlnd_timeoutd - enforces timeouts on messages + * @arg - thread id (as a void *) * * This thread queries each peer for its earliest timeout. If a peer has timed out, * it calls mxlnd_conn_disconnect(). @@ -3611,20 +4009,21 @@ mxlnd_check_timeouts(unsigned long now) int mxlnd_timeoutd(void *arg) { - int i = 0; - long id = (long) arg; - unsigned long now = 0; - unsigned long next = 0; - unsigned long delay = HZ; - struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; + int i = 0; + long id = (long) arg; + unsigned long now = 0; + unsigned long next = 0; + unsigned long delay = HZ; + kmx_peer_t *peer = NULL; + kmx_peer_t *temp = NULL; + kmx_conn_t *conn = NULL; + rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock; cfs_daemonize("mxlnd_timeoutd"); - //cfs_block_allsigs(); CDEBUG(D_NET, "timeoutd starting\n"); - while (!kmxlnd_data.kmx_shutdown) { + while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) { now = jiffies; /* if the next timeout has not arrived, go back to sleep */ @@ -3632,28 +4031,39 @@ mxlnd_timeoutd(void *arg) next = mxlnd_check_timeouts(now); } - read_lock(&kmxlnd_data.kmx_global_lock); + /* try to progress peers' txs */ + write_lock(g_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { - /* FIXME upgrade to write lock? - * is any lock needed? */ - conn = peer->mxp_conn; - if (conn) mxlnd_conn_addref(conn); /* take ref... */ + struct list_head *peers = &kmxlnd_data.kmx_peers[i]; - if (conn == NULL) + /* NOTE we are safe against the removal of peer, but + * not against the removal of temp */ + list_for_each_entry_safe(peer, temp, peers, mxp_list) { + if (atomic_read(&kmxlnd_data.kmx_shutdown)) + break; + mxlnd_peer_addref(peer); /* add ref... */ + conn = peer->mxp_conn; + if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) { + mxlnd_conn_addref(conn); /* take ref... */ + } else { + CDEBUG(D_NET, "ignoring %s\n", + libcfs_nid2str(peer->mxp_nid)); + mxlnd_peer_decref(peer); /* ...to here */ continue; + } - if (conn->mxk_status != MXLND_CONN_DISCONNECT && + if ((conn->mxk_status == MXLND_CONN_READY || + conn->mxk_status == MXLND_CONN_FAIL) && time_after(now, conn->mxk_last_tx + HZ)) { - /* FIXME drop lock or call check_sends_locked */ - read_unlock(&kmxlnd_data.kmx_global_lock); + write_unlock(g_lock); mxlnd_check_sends(peer); - read_lock(&kmxlnd_data.kmx_global_lock); + write_lock(g_lock); } mxlnd_conn_decref(conn); /* until here */ + mxlnd_peer_decref(peer); /* ...to here */ } } - read_unlock(&kmxlnd_data.kmx_global_lock); + write_unlock(g_lock); mxlnd_sleep(delay); }