-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
+ * Copyright (c) 2012, Intel Corporation.
+ *
* Copyright (C) 2006 Myricom, Inc.
*/
/*
#include "mxlnd.h"
-inline void mxlnd_noop(char *s, ...)
+mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
+
+inline int
+mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
{
- return;
+ /* if memcmp() == 0, it is NULL */
+ return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
}
char *
return "MXLND_CONN_READY";
case MXLND_CONN_INIT:
return "MXLND_CONN_INIT";
- case MXLND_CONN_REQ:
- return "MXLND_CONN_REQ";
- case MXLND_CONN_ACK:
- return "MXLND_CONN_ACK";
case MXLND_CONN_WAIT:
return "MXLND_CONN_WAIT";
case MXLND_CONN_DISCONNECT:
return "MXLND_MSG_CONN_REQ";
case MXLND_MSG_CONN_ACK:
return "MXLND_MSG_CONN_ACK";
+ case MXLND_MSG_BYE:
+ return "MXLND_MSG_BYE";
case MXLND_MSG_NOOP:
return "MXLND_MSG_NOOP";
case MXLND_MSG_PUT_REQ:
}
static inline u64
-//mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
-mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
+mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
{
u64 type = (u64) ctx->mxc_msg_type;
u64 err = (u64) error;
- u64 match = 0LL;
+ u64 match = 0ULL;
- LASSERT(ctx->mxc_msg_type != 0);
- LASSERT(ctx->mxc_cookie >> 52 == 0);
- match = (type << 60) | (err << 52) | ctx->mxc_cookie;
+ mxlnd_valid_msg_type(ctx->mxc_msg_type);
+ LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
+ match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
return match;
}
static inline void
mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
{
- *msg_type = (u8) (match >> 60);
- *error = (u8) ((match >> 52) & 0xFF);
- *cookie = match & 0xFFFFFFFFFFFFFLL;
- LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
- match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
- *msg_type == MXLND_MSG_EAGER ||
- *msg_type == MXLND_MSG_CONN_REQ ||
- *msg_type == MXLND_MSG_CONN_ACK ||
- *msg_type == MXLND_MSG_NOOP ||
- *msg_type == MXLND_MSG_PUT_REQ ||
- *msg_type == MXLND_MSG_PUT_ACK ||
- *msg_type == MXLND_MSG_PUT_DATA ||
- *msg_type == MXLND_MSG_GET_REQ ||
- *msg_type == MXLND_MSG_GET_DATA);
+ *msg_type = (u8) MXLND_MSG_TYPE(match);
+ *error = (u8) MXLND_ERROR_VAL(match);
+ *cookie = match & MXLND_MAX_COOKIE;
+ mxlnd_valid_msg_type(*msg_type);
return;
}
-struct kmx_ctx *
-mxlnd_get_idle_rx(void)
+kmx_ctx_t *
+mxlnd_get_idle_rx(kmx_conn_t *conn)
{
- struct list_head *tmp = NULL;
- struct kmx_ctx *rx = NULL;
+ cfs_list_t *rxs = NULL;
+ kmx_ctx_t *rx = NULL;
- spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
+ LASSERT(conn != NULL);
- if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
- spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
- return NULL;
- }
+ rxs = &conn->mxk_rx_idle;
+
+ spin_lock(&conn->mxk_lock);
- tmp = &kmxlnd_data.kmx_rx_idle;
- rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
- list_del_init(&rx->mxc_list);
- spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
+ if (cfs_list_empty (rxs)) {
+ spin_unlock(&conn->mxk_lock);
+ return NULL;
+ }
+
+ rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
+ cfs_list_del_init(&rx->mxc_list);
+ spin_unlock(&conn->mxk_lock);
#if MXLND_DEBUG
if (rx->mxc_get != rx->mxc_put) {
- CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
- CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
- CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
- CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
- CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
- CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
- CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
- CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
- CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
- CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
+ CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
+ CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
+ CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
+ CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
+ CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
+ CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
+ CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
+ CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
+ CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
+ CNETERR("*** nob= %d ***\n", rx->mxc_nob);
}
#endif
LASSERT (rx->mxc_get == rx->mxc_put);
LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
rx->mxc_state = MXLND_CTX_PREP;
+ rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
return rx;
}
int
-mxlnd_put_idle_rx(struct kmx_ctx *rx)
+mxlnd_put_idle_rx(kmx_ctx_t *rx)
{
- if (rx == NULL) {
- CDEBUG(D_NETERROR, "called with NULL pointer\n");
- return -EINVAL;
- } else if (rx->mxc_type != MXLND_REQ_RX) {
- CDEBUG(D_NETERROR, "called with tx\n");
- return -EINVAL;
- }
- LASSERT(rx->mxc_get == rx->mxc_put + 1);
+ kmx_conn_t *conn = rx->mxc_conn;
+ cfs_list_t *rxs = &conn->mxk_rx_idle;
+
+ LASSERT(rx->mxc_type == MXLND_REQ_RX);
+
mxlnd_ctx_init(rx);
+
rx->mxc_put++;
- spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
- list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
- spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
- return 0;
-}
+ LASSERT(rx->mxc_get == rx->mxc_put);
-int
-mxlnd_reduce_idle_rxs(__u32 count)
-{
- __u32 i = 0;
- struct kmx_ctx *rx = NULL;
-
- spin_lock(&kmxlnd_data.kmx_rxs_lock);
- for (i = 0; i < count; i++) {
- rx = mxlnd_get_idle_rx();
- if (rx != NULL) {
- struct list_head *tmp = &rx->mxc_global_list;
- list_del_init(tmp);
- mxlnd_ctx_free(rx);
- } else {
- CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
- break;
- }
- }
- spin_unlock(&kmxlnd_data.kmx_rxs_lock);
- return 0;
+ spin_lock(&conn->mxk_lock);
+ cfs_list_add(&rx->mxc_list, rxs);
+ spin_unlock(&conn->mxk_lock);
+ return 0;
}
-struct kmx_ctx *
+kmx_ctx_t *
mxlnd_get_idle_tx(void)
{
- struct list_head *tmp = NULL;
- struct kmx_ctx *tx = NULL;
+ cfs_list_t *tmp = &kmxlnd_data.kmx_tx_idle;
+ kmx_ctx_t *tx = NULL;
- spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
+ spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
- if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
- CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
- spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
- return NULL;
- }
+ if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
+ CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
+ spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
+ return NULL;
+ }
tmp = &kmxlnd_data.kmx_tx_idle;
- tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
- list_del_init(&tx->mxc_list);
+ tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
+ cfs_list_del_init(&tx->mxc_list);
/* Allocate a new completion cookie. It might not be needed,
* but we've got a lock right now and we're unlikely to
kmxlnd_data.kmx_tx_next_cookie = 1;
}
kmxlnd_data.kmx_tx_used++;
- spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
+ spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
LASSERT (tx->mxc_get == tx->mxc_put);
LASSERT (tx->mxc_lntmsg[1] == NULL);
tx->mxc_state = MXLND_CTX_PREP;
+ tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
return tx;
}
+void
+mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
+
int
-mxlnd_put_idle_tx(struct kmx_ctx *tx)
+mxlnd_put_idle_tx(kmx_ctx_t *tx)
{
- //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
int result = 0;
lnet_msg_t *lntmsg[2];
- if (tx == NULL) {
- CDEBUG(D_NETERROR, "called with NULL pointer\n");
- return -EINVAL;
- } else if (tx->mxc_type != MXLND_REQ_TX) {
- CDEBUG(D_NETERROR, "called with rx\n");
- return -EINVAL;
- }
- if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
- tx->mxc_status.code == MX_STATUS_TRUNCATED))
+ LASSERT(tx->mxc_type == MXLND_REQ_TX);
+
+ if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
+ kmx_conn_t *conn = tx->mxc_conn;
+
result = -EIO;
+ if (tx->mxc_errno != 0) result = tx->mxc_errno;
+ /* FIXME should we set mx_dis? */
+ mxlnd_conn_disconnect(conn, 0, 1);
+ }
lntmsg[0] = tx->mxc_lntmsg[0];
lntmsg[1] = tx->mxc_lntmsg[1];
- LASSERT(tx->mxc_get == tx->mxc_put + 1);
mxlnd_ctx_init(tx);
+
tx->mxc_put++;
- spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
- list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
- kmxlnd_data.kmx_tx_used--;
- spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
- if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
- if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
+ LASSERT(tx->mxc_get == tx->mxc_put);
+
+ spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
+ cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
+ kmxlnd_data.kmx_tx_used--;
+ spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
+
+ if (lntmsg[0] != NULL)
+ lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
+ if (lntmsg[1] != NULL)
+ lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
+ return 0;
+}
+
+
+void
+mxlnd_connparams_free(kmx_connparams_t *cp)
+{
+ LASSERT(cfs_list_empty(&cp->mxr_list));
+ MXLND_FREE(cp, sizeof(*cp));
+ return;
+}
+
+int
+mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
+ mx_endpoint_addr_t epa, u64 match, u32 length,
+ kmx_conn_t *conn, kmx_peer_t *peer, void *data)
+{
+ kmx_connparams_t *c = NULL;
+
+ MXLND_ALLOC(c, sizeof(*c));
+ if (!c) return -ENOMEM;
+
+ CFS_INIT_LIST_HEAD(&c->mxr_list);
+ c->mxr_context = context;
+ c->mxr_epa = epa;
+ c->mxr_match = match;
+ c->mxr_nob = length;
+ c->mxr_conn = conn;
+ c->mxr_peer = peer;
+ c->mxr_msg = *((kmx_msg_t *) data);
+
+ *cp = c;
return 0;
}
+static inline void
+mxlnd_set_conn_status(kmx_conn_t *conn, int status)
+{
+ conn->mxk_status = status;
+ cfs_mb();
+}
+
/**
- * mxlnd_conn_free - free the conn
+ * mxlnd_conn_free_locked - free the conn
* @conn - a kmx_conn pointer
*
* The calling function should remove the conn from the conns list first
- * then destroy it.
+ * then destroy it. Caller should have write-locked kmx_global_lock.
*/
void
-mxlnd_conn_free(struct kmx_conn *conn)
+mxlnd_conn_free_locked(kmx_conn_t *conn)
{
- struct kmx_peer *peer = conn->mxk_peer;
+ int valid = !mxlnd_endpoint_addr_null(conn->mxk_epa);
+ kmx_peer_t *peer = conn->mxk_peer;
CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
- LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
- list_empty (&conn->mxk_tx_free_queue) &&
- list_empty (&conn->mxk_pending));
- if (!list_empty(&conn->mxk_list)) {
- spin_lock(&peer->mxp_lock);
- list_del_init(&conn->mxk_list);
+ LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
+ cfs_list_empty (&conn->mxk_tx_free_queue) &&
+ cfs_list_empty (&conn->mxk_pending));
+ if (!cfs_list_empty(&conn->mxk_list)) {
+ cfs_list_del_init(&conn->mxk_list);
if (peer->mxp_conn == conn) {
peer->mxp_conn = NULL;
- if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
- mx_set_endpoint_addr_context(conn->mxk_epa,
- (void *) NULL);
+ if (valid) {
+ kmx_conn_t *temp = NULL;
+
+ mx_get_endpoint_addr_context(conn->mxk_epa,
+ (void **) &temp);
+ if (conn == temp) {
+ mx_set_endpoint_addr_context(conn->mxk_epa,
+ (void *) NULL);
+ }
+ }
+ /* unlink from global list and drop its ref */
+ cfs_list_del_init(&peer->mxp_list);
+ mxlnd_peer_decref(peer);
+ }
+ }
+ mxlnd_peer_decref(peer); /* drop conn's ref to peer */
+ if (conn->mxk_rx_pages) {
+ LASSERT (conn->mxk_rxs != NULL);
+ mxlnd_free_pages(conn->mxk_rx_pages);
+ }
+ if (conn->mxk_rxs) {
+ int i = 0;
+ kmx_ctx_t *rx = NULL;
+
+ for (i = 0; i < MXLND_RX_MSGS(); i++) {
+ rx = &conn->mxk_rxs[i];
+ if (rx->mxc_seg_list != NULL) {
+ LASSERT(rx->mxc_nseg > 0);
+ MXLND_FREE(rx->mxc_seg_list,
+ rx->mxc_nseg *
+ sizeof(*rx->mxc_seg_list));
}
}
- spin_unlock(&peer->mxp_lock);
+ MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
}
- mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
- MXLND_FREE (conn, sizeof (*conn));
+
+ MXLND_FREE(conn, sizeof (*conn));
return;
}
-void
-mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
+int
+mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
{
- int found = 0;
- struct kmx_ctx *ctx = NULL;
- struct kmx_ctx *next = NULL;
- mx_return_t mxret = MX_SUCCESS;
- u32 result = 0;
+ int found = 0;
+ int count = 0;
+ kmx_ctx_t *ctx = NULL;
+ kmx_ctx_t *next = NULL;
+ mx_return_t mxret = MX_SUCCESS;
+ u32 result = 0;
do {
found = 0;
- spin_lock(&conn->mxk_lock);
- list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
- /* we will delete all including txs */
- list_del_init(&ctx->mxc_list);
+ spin_lock(&conn->mxk_lock);
+ cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
+ mxc_list) {
+ cfs_list_del_init(&ctx->mxc_list);
if (ctx->mxc_type == MXLND_REQ_RX) {
found = 1;
mxret = mx_cancel(kmxlnd_data.kmx_endpt,
&ctx->mxc_mxreq,
&result);
if (mxret != MX_SUCCESS) {
- CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
+ CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
}
if (result == 1) {
- ctx->mxc_status.code = -ECONNABORTED;
+ ctx->mxc_errno = -ECONNABORTED;
ctx->mxc_state = MXLND_CTX_CANCELED;
- /* NOTE this calls lnet_finalize() and
- * we cannot hold any locks when calling it.
- * It also calls mxlnd_conn_decref(conn) */
- spin_unlock(&conn->mxk_lock);
- mxlnd_handle_rx_completion(ctx);
- spin_lock(&conn->mxk_lock);
- }
- break;
- }
+ spin_unlock(&conn->mxk_lock);
+ spin_lock(&kmxlnd_data.kmx_conn_lock);
+ /* we may be holding the global lock,
+ * move to orphan list so that it can free it */
+ cfs_list_add_tail(&ctx->mxc_list,
+ &kmxlnd_data.kmx_orphan_msgs);
+ count++;
+ spin_unlock(&kmxlnd_data.kmx_conn_lock);
+ spin_lock(&conn->mxk_lock);
+ }
+ break;
+ }
+ }
+ spin_unlock(&conn->mxk_lock);
+ } while (found);
+
+ return count;
+}
+
+int
+mxlnd_cancel_queued_txs(kmx_conn_t *conn)
+{
+ int count = 0;
+ cfs_list_t *tmp = NULL;
+
+ spin_lock(&conn->mxk_lock);
+ while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
+ !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
+
+ kmx_ctx_t *tx = NULL;
+
+ if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
+ tmp = &conn->mxk_tx_free_queue;
+ } else {
+ tmp = &conn->mxk_tx_credit_queue;
}
- spin_unlock(&conn->mxk_lock);
- }
- while (found);
+ tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
+ cfs_list_del_init(&tx->mxc_list);
+ spin_unlock(&conn->mxk_lock);
+ tx->mxc_errno = -ECONNABORTED;
+ tx->mxc_state = MXLND_CTX_CANCELED;
+ /* move to orphan list and then abort */
+ spin_lock(&kmxlnd_data.kmx_conn_lock);
+ cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
+ spin_unlock(&kmxlnd_data.kmx_conn_lock);
+ count++;
+ spin_lock(&conn->mxk_lock);
+ }
+ spin_unlock(&conn->mxk_lock);
+
+ return count;
+}
+
+void
+mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
+{
+ u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
+ (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
+
+ mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
+ epa, match, NULL, NULL);
return;
}
/**
* mxlnd_conn_disconnect - shutdown a connection
* @conn - a kmx_conn pointer
+ * @mx_dis - call mx_disconnect()
+ * @send_bye - send peer a BYE msg
*
* This function sets the status to DISCONNECT, completes queued
* txs with failure, calls mx_disconnect, which will complete
* pending txs and matched rxs with failure.
*/
void
-mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
+mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
{
- struct list_head *tmp = NULL;
-
- spin_lock(&conn->mxk_lock);
- if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
- spin_unlock(&conn->mxk_lock);
- return;
- }
- conn->mxk_status = MXLND_CONN_DISCONNECT;
- conn->mxk_timeout = 0;
-
- while (!list_empty(&conn->mxk_tx_free_queue) ||
- !list_empty(&conn->mxk_tx_credit_queue)) {
-
- struct kmx_ctx *tx = NULL;
-
- if (!list_empty(&conn->mxk_tx_free_queue)) {
- tmp = &conn->mxk_tx_free_queue;
- } else {
- tmp = &conn->mxk_tx_credit_queue;
- }
-
- tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
- list_del_init(&tx->mxc_list);
- tx->mxc_status.code = -ECONNABORTED;
- spin_unlock(&conn->mxk_lock);
- mxlnd_put_idle_tx(tx);
- mxlnd_conn_decref(conn); /* for this tx */
- spin_lock(&conn->mxk_lock);
- }
-
- spin_unlock(&conn->mxk_lock);
-
- /* cancel pending rxs */
- mxlnd_conn_cancel_pending_rxs(conn);
-
- if (kmxlnd_data.kmx_shutdown != 1) {
-
- if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
+ mx_endpoint_addr_t epa = conn->mxk_epa;
+ int valid = !mxlnd_endpoint_addr_null(epa);
+ int count = 0;
+
+ spin_lock(&conn->mxk_lock);
+ if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
+ spin_unlock(&conn->mxk_lock);
+ return;
+ }
+ mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
+ conn->mxk_timeout = 0;
+ spin_unlock(&conn->mxk_lock);
+
+ count = mxlnd_cancel_queued_txs(conn);
+ count += mxlnd_conn_cancel_pending_rxs(conn);
+
+ if (count) /* let connd call kmxlnd_abort_msgs() */
+ up(&kmxlnd_data.kmx_conn_sem);
+
+ if (send_bye && valid &&
+ conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
+ /* send a BYE to the peer */
+ CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
+ libcfs_nid2str(conn->mxk_peer->mxp_nid));
+ mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
+ /* wait to allow the peer to ack our message */
+ mxlnd_sleep(msecs_to_jiffies(20));
+ }
+
+ if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
+ unsigned long last_msg = 0;
+
+ /* notify LNET that we are giving up on this peer */
+ if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
+ last_msg = conn->mxk_last_rx;
+ else
+ last_msg = conn->mxk_last_tx;
- if (notify) {
- time_t last_alive = 0;
- unsigned long last_msg = 0;
+ lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
- /* notify LNET that we are giving up on this peer */
- if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
- last_msg = conn->mxk_last_rx;
- } else {
- last_msg = conn->mxk_last_tx;
- }
- last_alive = cfs_time_current_sec() -
- cfs_duration_sec(cfs_time_current() - last_msg);
- lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
- }
+ if (mx_dis && valid &&
+ (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
+ mx_disconnect(kmxlnd_data.kmx_endpt, epa);
}
mxlnd_conn_decref(conn); /* drop the owning peer's reference */
* Returns 0 on success and -ENOMEM on failure
*/
int
-mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
+mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
{
- struct kmx_conn *conn = NULL;
+ int i = 0;
+ int ret = 0;
+ int ipage = 0;
+ int offset = 0;
+ void *addr = NULL;
+ kmx_conn_t *conn = NULL;
+ kmx_pages_t *pages = NULL;
+ struct page *page = NULL;
+ kmx_ctx_t *rx = NULL;
LASSERT(peer != NULL);
MXLND_ALLOC(conn, sizeof (*conn));
if (conn == NULL) {
- CDEBUG(D_NETERROR, "Cannot allocate conn\n");
+ CNETERR("Cannot allocate conn\n");
return -ENOMEM;
}
CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
memset(conn, 0, sizeof(*conn));
- /* conn->mxk_incarnation = 0 - will be set by peer */
- atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
- and one for the caller */
+ ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
+ if (ret != 0) {
+ CERROR("Can't allocate rx pages\n");
+ MXLND_FREE(conn, sizeof(*conn));
+ return -ENOMEM;
+ }
+ conn->mxk_rx_pages = pages;
+
+ MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
+ if (conn->mxk_rxs == NULL) {
+ CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
+ mxlnd_free_pages(pages);
+ MXLND_FREE(conn, sizeof(*conn));
+ return -ENOMEM;
+ }
+
+ memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
+
conn->mxk_peer = peer;
- /* mxk_epa - to be set after mx_iconnect() */
- INIT_LIST_HEAD(&conn->mxk_list);
- spin_lock_init(&conn->mxk_lock);
+ CFS_INIT_LIST_HEAD(&conn->mxk_list);
+ CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
+ cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
+ and one for the caller */
+ if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+
+ /* this is localhost, set the epa and status as up */
+ mxlnd_set_conn_status(conn, MXLND_CONN_READY);
+ conn->mxk_epa = kmxlnd_data.kmx_epa;
+ mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
+ peer->mxp_reconnect_time = 0;
+ mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
+ peer->mxp_nic_id = nic_id;
+ peer->mxp_ep_id = ep_id;
+ conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
+ conn->mxk_timeout = 0;
+ } else {
+ /* conn->mxk_incarnation = 0 - will be set by peer */
+ /* conn->mxk_sid = 0 - will be set by peer */
+ mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
+ /* mxk_epa - to be set after mx_iconnect() */
+ }
+ spin_lock_init(&conn->mxk_lock);
/* conn->mxk_timeout = 0 */
- conn->mxk_last_tx = jiffies;
- conn->mxk_last_rx = conn->mxk_last_tx;
- conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
+ /* conn->mxk_last_tx = 0 */
+ /* conn->mxk_last_rx = 0 */
+ CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
+
+ conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
/* mxk_outstanding = 0 */
- conn->mxk_status = MXLND_CONN_INIT;
- INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
- INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
+
+ CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
+ CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
/* conn->mxk_ntx_msgs = 0 */
/* conn->mxk_ntx_data = 0 */
/* conn->mxk_ntx_posted = 0 */
/* conn->mxk_data_posted = 0 */
- INIT_LIST_HEAD(&conn->mxk_pending);
+ CFS_INIT_LIST_HEAD(&conn->mxk_pending);
+
+ for (i = 0; i < MXLND_RX_MSGS(); i++) {
+
+ rx = &conn->mxk_rxs[i];
+ rx->mxc_type = MXLND_REQ_RX;
+ CFS_INIT_LIST_HEAD(&rx->mxc_list);
+
+ /* map mxc_msg to page */
+ page = pages->mxg_pages[ipage];
+ addr = page_address(page);
+ LASSERT(addr != NULL);
+ rx->mxc_msg = (kmx_msg_t *)(addr + offset);
+ rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
+
+ rx->mxc_conn = conn;
+ rx->mxc_peer = peer;
+ rx->mxc_nid = peer->mxp_nid;
+
+ mxlnd_ctx_init(rx);
+
+ offset += MXLND_MSG_SIZE;
+ LASSERT (offset <= PAGE_SIZE);
+
+ if (offset == PAGE_SIZE) {
+ offset = 0;
+ ipage++;
+ LASSERT (ipage <= MXLND_TX_MSG_PAGES());
+ }
+
+ cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
+ }
*connp = conn;
mxlnd_peer_addref(peer); /* add a ref for this conn */
/* add to front of peer's conns list */
- list_add(&conn->mxk_list, &peer->mxp_conns);
+ cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
peer->mxp_conn = conn;
return 0;
}
int
-mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
+mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
{
- int ret = 0;
- spin_lock(&peer->mxp_lock);
+ int ret = 0;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ write_lock(g_lock);
ret = mxlnd_conn_alloc_locked(connp, peer);
- spin_unlock(&peer->mxp_lock);
+ write_unlock(g_lock);
return ret;
}
int
-mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
+mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
{
- int ret = 0;
- struct kmx_conn *conn = ctx->mxc_conn;
+ int ret = 0;
+ kmx_conn_t *conn = ctx->mxc_conn;
- ctx->mxc_state = MXLND_CTX_PENDING;
- if (conn != NULL) {
- spin_lock(&conn->mxk_lock);
+ ctx->mxc_state = MXLND_CTX_PENDING;
+ if (conn != NULL) {
+ spin_lock(&conn->mxk_lock);
if (conn->mxk_status >= MXLND_CONN_INIT) {
- list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
+ cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
conn->mxk_timeout = ctx->mxc_deadline;
}
ctx->mxc_state = MXLND_CTX_COMPLETED;
ret = -1;
}
- spin_unlock(&conn->mxk_lock);
- }
- return ret;
+ spin_unlock(&conn->mxk_lock);
+ }
+ return ret;
}
int
-mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
+mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
{
LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
ctx->mxc_state == MXLND_CTX_COMPLETED);
if (ctx->mxc_state != MXLND_CTX_PENDING &&
ctx->mxc_state != MXLND_CTX_COMPLETED) {
- CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
- mxlnd_ctxstate_to_str(ctx->mxc_state));
+ CNETERR("deq ctx->mxc_state = %s\n",
+ mxlnd_ctxstate_to_str(ctx->mxc_state));
}
ctx->mxc_state = MXLND_CTX_COMPLETED;
- if (!list_empty(&ctx->mxc_list)) {
- struct kmx_conn *conn = ctx->mxc_conn;
- struct kmx_ctx *next = NULL;
+ if (!cfs_list_empty(&ctx->mxc_list)) {
+ kmx_conn_t *conn = ctx->mxc_conn;
+ kmx_ctx_t *next = NULL;
+
LASSERT(conn != NULL);
- spin_lock(&conn->mxk_lock);
- list_del_init(&ctx->mxc_list);
+ spin_lock(&conn->mxk_lock);
+ cfs_list_del_init(&ctx->mxc_list);
conn->mxk_timeout = 0;
- if (!list_empty(&conn->mxk_pending)) {
- next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
+ if (!cfs_list_empty(&conn->mxk_pending)) {
+ next = cfs_list_entry(conn->mxk_pending.next,
+ kmx_ctx_t, mxc_list);
conn->mxk_timeout = next->mxc_deadline;
}
- spin_unlock(&conn->mxk_lock);
- }
- return 0;
+ spin_unlock(&conn->mxk_lock);
+ }
+ return 0;
}
/**
* remove the peer from the peers list first then destroy it.
*/
void
-mxlnd_peer_free(struct kmx_peer *peer)
+mxlnd_peer_free(kmx_peer_t *peer)
{
- CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
+ CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
- LASSERT (atomic_read(&peer->mxp_refcount) == 0);
+ LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
- if (peer->mxp_host != NULL) {
- spin_lock(&peer->mxp_host->mxh_lock);
- peer->mxp_host->mxh_peer = NULL;
- spin_unlock(&peer->mxp_host->mxh_lock);
- }
- if (!list_empty(&peer->mxp_peers)) {
+ if (!cfs_list_empty(&peer->mxp_list)) {
/* assume we are locked */
- list_del_init(&peer->mxp_peers);
+ cfs_list_del_init(&peer->mxp_list);
}
- MXLND_FREE (peer, sizeof (*peer));
- atomic_dec(&kmxlnd_data.kmx_npeers);
+ MXLND_FREE(peer, sizeof (*peer));
+ cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
return;
}
-void
-mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
+static int
+mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
{
- u64 nic_id = 0LL;
- char name[MX_MAX_HOSTNAME_LEN + 1];
- mx_return_t mxret = MX_SUCCESS;
-
- memset(name, 0, sizeof(name));
- snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
- mxret = mx_hostname_to_nic_id(name, &nic_id);
- if (mxret == MX_SUCCESS) {
- peer->mxp_nic_id = nic_id;
- } else {
- CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
- "with %s\n", name, mx_strerror(mxret));
- mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
- if (mxret == MX_SUCCESS) {
- peer->mxp_nic_id = nic_id;
- } else {
- CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
- "with %s\n", peer->mxp_host->mxh_hostname,
- mx_strerror(mxret));
+ int ret = -EHOSTUNREACH;
+ unsigned char *haddr = NULL;
+ struct net_device *dev = NULL;
+ struct neighbour *n = NULL;
+ __be32 dst_ip = htonl(ip);
+
+ dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
+ if (dev == NULL)
+ return -ENODEV;
+
+ haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
+
+ n = neigh_lookup(&arp_tbl, &dst_ip, dev);
+ if (n) {
+ n->used = jiffies;
+ if (n->nud_state & NUD_VALID) {
+ memcpy(haddr, n->ha, dev->addr_len);
+ neigh_release(n);
+ ret = 0;
}
}
- return;
+
+ dev_put(dev);
+
+ return ret;
+}
+
+
+/* We only want the MAC address of the peer's Myricom NIC. We
+ * require that each node has the IPoMX interface (myriN) up.
+ * We will not pass any traffic over IPoMX, but it allows us
+ * to get the MAC address. */
+static int
+mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
+{
+ int ret = 0;
+ int try = 1;
+ int fatal = 0;
+ u64 tmp_id = 0ULL;
+ cfs_socket_t *sock = NULL;
+
+ do {
+ CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
+ ret = mxlnd_lookup_mac(ip, &tmp_id);
+ if (ret == 0) {
+ break;
+ } else {
+ /* not found, try to connect (force an arp) */
+ ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
+ if (ret == -ECONNREFUSED) {
+ /* peer is there, get the MAC address */
+ mxlnd_lookup_mac(ip, &tmp_id);
+ if (tmp_id != 0ULL)
+ ret = 0;
+ break;
+ } else if (ret == -EHOSTUNREACH && try < tries) {
+ /* add a little backoff */
+ CDEBUG(D_NET, "sleeping for %d jiffies\n",
+ HZ/4);
+ mxlnd_sleep(HZ/4);
+ }
+ }
+ } while (try++ < tries);
+ CDEBUG(D_NET, "done trying. ret = %d\n", ret);
+
+ if (tmp_id == 0ULL)
+ ret = -EHOSTUNREACH;
+#ifdef __LITTLE_ENDIAN
+ *nic_id = ___arch__swab64(tmp_id);
+#else
+ *nic_id = tmp_id;
+#endif
+ return ret;
}
/**
* Returns 0 on success and -ENOMEM on failure
*/
int
-mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
+mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
{
- int i = 0;
- int ret = 0;
- u32 addr = LNET_NIDADDR(nid);
- struct kmx_peer *peer = NULL;
- struct kmx_host *host = NULL;
+ int ret = 0;
+ u32 ip = LNET_NIDADDR(nid);
+ kmx_peer_t *peer = NULL;
LASSERT (nid != LNET_NID_ANY && nid != 0LL);
MXLND_ALLOC(peer, sizeof (*peer));
if (peer == NULL) {
- CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
+ CNETERR("Cannot allocate peer for NID 0x%llx\n",
+ nid);
return -ENOMEM;
}
CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
memset(peer, 0, sizeof(*peer));
- list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
- if (addr == host->mxh_addr) {
- peer->mxp_host = host;
- spin_lock(&host->mxh_lock);
- host->mxh_peer = peer;
- spin_unlock(&host->mxh_lock);
- break;
- }
- }
- if (peer->mxp_host == NULL) {
- CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
- MXLND_FREE(peer, sizeof(*peer));
- return -ENXIO;
- }
-
+ CFS_INIT_LIST_HEAD(&peer->mxp_list);
peer->mxp_nid = nid;
- /* peer->mxp_incarnation */
- atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
- mxlnd_peer_hostname_to_nic_id(peer);
+ /* peer->mxp_ni unused - may be used for multi-rail */
+ cfs_atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
- INIT_LIST_HEAD(&peer->mxp_peers);
- spin_lock_init(&peer->mxp_lock);
- INIT_LIST_HEAD(&peer->mxp_conns);
+ peer->mxp_board = board;
+ peer->mxp_ep_id = ep_id;
+ peer->mxp_nic_id = nic_id;
+
+ CFS_INIT_LIST_HEAD(&peer->mxp_conns);
ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
if (ret != 0) {
mxlnd_peer_decref(peer);
return ret;
}
+ CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
- for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
- struct kmx_ctx *rx = NULL;
- ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
- if (ret != 0) {
- mxlnd_reduce_idle_rxs(i);
- mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
- mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
- mxlnd_peer_decref(peer);
- return ret;
+ if (peer->mxp_nic_id != 0ULL)
+ nic_id = peer->mxp_nic_id;
+
+ if (nic_id == 0ULL) {
+ ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
+ if (ret == 0) {
+ peer->mxp_nic_id = nic_id;
+ mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
}
- spin_lock(&kmxlnd_data.kmx_rxs_lock);
- list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
- spin_unlock(&kmxlnd_data.kmx_rxs_lock);
- rx->mxc_put = -1;
- mxlnd_put_idle_rx(rx);
}
+
+ peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
+
/* peer->mxp_reconnect_time = 0 */
/* peer->mxp_incompatible = 0 */
return 0;
}
-/**
- * mxlnd_nid_to_hash - hash the nid
- * @nid - msg pointer
- *
- * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
- */
-static inline int
-mxlnd_nid_to_hash(lnet_nid_t nid)
-{
- return (nid & MXLND_HASH_MASK) ^
- ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
-}
-
-static inline struct kmx_peer *
+static inline kmx_peer_t *
mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
{
- int found = 0;
- int hash = 0;
- struct kmx_peer *peer = NULL;
+ int found = 0;
+ int hash = 0;
+ kmx_peer_t *peer = NULL;
hash = mxlnd_nid_to_hash(nid);
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
+ cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
if (peer->mxp_nid == nid) {
found = 1;
mxlnd_peer_addref(peer);
return (found ? peer : NULL);
}
-static inline struct kmx_peer *
-mxlnd_find_peer_by_nid(lnet_nid_t nid)
+static kmx_peer_t *
+mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
{
- struct kmx_peer *peer = NULL;
+ int ret = 0;
+ int hash = 0;
+ kmx_peer_t *peer = NULL;
+ kmx_peer_t *old = NULL;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ read_lock(g_lock);
+ peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
+
+ if ((peer && peer->mxp_conn) || /* found peer with conn or */
+ (!peer && !create)) { /* did not find peer and do not create one */
+ read_unlock(g_lock);
+ return peer;
+ }
+
+ read_unlock(g_lock);
+
+ /* if peer but _not_ conn */
+ if (peer && !peer->mxp_conn) {
+ if (create) {
+ write_lock(g_lock);
+ if (!peer->mxp_conn) { /* check again */
+ /* create the conn */
+ ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
+ if (ret != 0) {
+ /* we tried, return the peer only.
+ * the caller needs to see if the conn exists */
+ CNETERR("%s: %s could not alloc conn\n",
+ __func__, libcfs_nid2str(peer->mxp_nid));
+ } else {
+ /* drop extra conn ref */
+ mxlnd_conn_decref(peer->mxp_conn);
+ }
+ }
+ write_unlock(g_lock);
+ }
+ return peer;
+ }
+
+ /* peer not found and we need to create one */
+ hash = mxlnd_nid_to_hash(nid);
+
+ /* create peer (and conn) */
+ /* adds conn ref for peer and one for this function */
+ ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
+ *kmxlnd_tunables.kmx_ep_id, 0ULL);
+ if (ret != 0) /* no memory, peer is NULL */
+ return NULL;
+
+ write_lock(g_lock);
+
+ /* look again */
+ old = mxlnd_find_peer_by_nid_locked(nid);
+ if (old) {
+ /* someone already created one */
+ mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
+ mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
+ mxlnd_peer_decref(peer);
+ peer = old;
+ } else {
+ /* no other peer, use this one */
+ cfs_list_add_tail(&peer->mxp_list,
+ &kmxlnd_data.kmx_peers[hash]);
+ cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
+ mxlnd_peer_addref(peer);
+ mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
+ }
+
+ write_unlock(g_lock);
- read_lock(&kmxlnd_data.kmx_peers_lock);
- peer = mxlnd_find_peer_by_nid_locked(nid);
- read_unlock(&kmxlnd_data.kmx_peers_lock);
return peer;
}
static inline int
-mxlnd_tx_requires_credit(struct kmx_ctx *tx)
+mxlnd_tx_requires_credit(kmx_ctx_t *tx)
{
return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
}
static inline void
-mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
+mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
{
int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
- struct kmx_msg *msg = NULL;
+ kmx_msg_t *msg = NULL;
LASSERT (tx != NULL);
- LASSERT (nob <= MXLND_EAGER_SIZE);
+ LASSERT (nob <= MXLND_MSG_SIZE);
tx->mxc_nid = nid;
/* tx->mxc_peer should have already been set if we know it */
/* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
tx->mxc_seg.segment_length = nob;
tx->mxc_pin_type = MX_PIN_PHYSICAL;
- //tx->mxc_state = MXLND_CTX_PENDING;
msg = tx->mxc_msg;
msg->mxm_type = type;
}
/**
- * mxlnd_pack_msg - complete msg info
+ * mxlnd_pack_msg_locked - complete msg info
* @tx - msg to send
*/
static inline void
-mxlnd_pack_msg(struct kmx_ctx *tx)
+mxlnd_pack_msg_locked(kmx_ctx_t *tx)
{
- struct kmx_msg *msg = tx->mxc_msg;
+ kmx_msg_t *msg = tx->mxc_msg;
/* type and nob should already be set in init_msg() */
msg->mxm_magic = MXLND_MSG_MAGIC;
* return credits as well */
if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
- spin_lock(&tx->mxc_conn->mxk_lock);
msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
tx->mxc_conn->mxk_outstanding = 0;
- spin_unlock(&tx->mxc_conn->mxk_lock);
} else {
msg->mxm_credits = 0;
}
msg->mxm_dstnid = tx->mxc_nid;
/* if it is a new peer, the dststamp will be 0 */
msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
- msg->mxm_seq = tx->mxc_cookie;
if (*kmxlnd_tunables.kmx_cksum) {
msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
/* 6 bytes are enough to have received magic + version */
if (nob < 6) {
- CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
+ CNETERR("not enough bytes for magic + hdr: %d\n", nob);
return -EPROTO;
}
} else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
flip = 1;
} else {
- CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
+ CNETERR("Bad magic: %08x\n", msg->mxm_magic);
return -EPROTO;
}
if (msg->mxm_version !=
(flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
- CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
+ CNETERR("Bad version: %d\n", msg->mxm_version);
return -EPROTO;
}
if (nob < hdr_size) {
- CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
+ CNETERR("not enough for a header: %d\n", nob);
return -EPROTO;
}
msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
if (msg_nob > nob) {
- CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
+ CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
return -EPROTO;
}
msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
msg->mxm_cksum = 0;
if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
- CDEBUG(D_NETERROR, "Bad checksum\n");
+ CNETERR("Bad checksum\n");
return -EPROTO;
}
msg->mxm_cksum = msg_cksum;
__swab64s(&msg->mxm_srcstamp);
__swab64s(&msg->mxm_dstnid);
__swab64s(&msg->mxm_dststamp);
- __swab64s(&msg->mxm_seq);
}
if (msg->mxm_srcnid == LNET_NID_ANY) {
- CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
+ CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
return -EPROTO;
}
switch (msg->mxm_type) {
default:
- CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
+ CNETERR("Unknown message type %x\n", msg->mxm_type);
return -EPROTO;
case MXLND_MSG_NOOP:
case MXLND_MSG_EAGER:
if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
- CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
+ CNETERR("Short EAGER: %d(%d)\n", msg_nob,
(int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
return -EPROTO;
}
case MXLND_MSG_PUT_REQ:
if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
- CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
+ CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
(int)(hdr_size + sizeof(msg->mxm_u.put_req)));
return -EPROTO;
}
case MXLND_MSG_PUT_ACK:
if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
- CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
+ CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
(int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
return -EPROTO;
}
case MXLND_MSG_GET_REQ:
if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
- CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
+ CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
(int)(hdr_size + sizeof(msg->mxm_u.get_req)));
return -EPROTO;
}
case MXLND_MSG_CONN_REQ:
case MXLND_MSG_CONN_ACK:
if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
- CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
+ CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
(int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
return -EPROTO;
}
return 0;
}
+
/**
* mxlnd_recv_msg
* @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
* Returns 0 on success and -1 on failure
*/
int
-mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
+mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
{
int ret = 0;
mx_return_t mxret = MX_SUCCESS;
- uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
+ uint64_t mask = ~(MXLND_ERROR_MASK);
rx->mxc_msg_type = msg_type;
rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
/* rx->mxc_match may already be set */
/* rx->mxc_seg.segment_ptr is already set */
rx->mxc_seg.segment_length = length;
- rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
ret = mxlnd_q_pending_ctx(rx);
if (ret == -1) {
/* the caller is responsible for calling conn_decref() if needed */
cookie, mask, (void *) rx, &rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
mxlnd_deq_pending_ctx(rx);
- CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
- mx_strerror(mxret), (int) mxret);
+ CNETERR("mx_kirecv() failed with %s (%d)\n",
+ mx_strerror(mxret), (int) mxret);
return -1;
}
return 0;
/**
- * mxlnd_unexpected_recv - this is the callback function that will handle
+ * mxlnd_unexpected_recv - this is the callback function that will handle
* unexpected receives
* @context - NULL, ignore
* @source - the peer's mx_endpoint_addr_t
- * @match_value - the msg's bit, should be MXLND_MASK_EAGER
+ * @match_value - the msg's bits, should be MXLND_MSG_EAGER
* @length - length of incoming message
- * @data_if_available - ignore
+ * @data_if_available - used for CONN_[REQ|ACK]
*
* If it is an eager-sized msg, we will call recv_msg() with the actual
* length. If it is a large message, we will call recv_msg() with a
uint64_t match_value, uint32_t length, void *data_if_available)
{
int ret = 0;
- struct kmx_ctx *rx = NULL;
+ kmx_ctx_t *rx = NULL;
mx_ksegment_t seg;
u8 msg_type = 0;
u8 error = 0;
- u64 cookie = 0LL;
-
+ u64 cookie = 0ULL;
+ kmx_conn_t *conn = NULL;
+ kmx_peer_t *peer = NULL;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+
+ /* TODO this will change to the net struct */
if (context != NULL) {
- CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
+ CNETERR("non-NULL context\n");
}
#if MXLND_DEBUG
- CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
+ CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
#endif
- rx = mxlnd_get_idle_rx();
+ mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
+ mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
+ read_lock(&kmxlnd_data.kmx_global_lock);
+ mx_get_endpoint_addr_context(source, (void **) &conn);
+ if (conn) {
+ mxlnd_conn_addref(conn); /* add ref for this function */
+ peer = conn->mxk_peer;
+ }
+ read_unlock(&kmxlnd_data.kmx_global_lock);
+
+ if (msg_type == MXLND_MSG_BYE) {
+ if (conn) {
+ CDEBUG(D_NET, "peer %s sent BYE msg\n",
+ libcfs_nid2str(peer->mxp_nid));
+ mxlnd_conn_disconnect(conn, 1, 0);
+ mxlnd_conn_decref(conn); /* drop ref taken above */
+ }
+ return MX_RECV_FINISHED;
+ }
+
+ if (msg_type == MXLND_MSG_CONN_REQ) {
+ kmx_connparams_t *cp = NULL;
+ const int expected = offsetof(kmx_msg_t, mxm_u) +
+ sizeof(kmx_connreq_msg_t);
+
+ if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
+ if (unlikely(length != expected || !data_if_available)) {
+ CNETERR("received invalid CONN_REQ from %llx "
+ "length=%d (expected %d)\n", nic_id, length, expected);
+ mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
+ return MX_RECV_FINISHED;
+ }
+
+ ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
+ conn, peer, data_if_available);
+ if (unlikely(ret != 0)) {
+ CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
+ nic_id, ep_id);
+ mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
+ return MX_RECV_FINISHED;
+ }
+ spin_lock(&kmxlnd_data.kmx_conn_lock);
+ cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
+ spin_unlock(&kmxlnd_data.kmx_conn_lock);
+ up(&kmxlnd_data.kmx_conn_sem);
+ return MX_RECV_FINISHED;
+ }
+ if (msg_type == MXLND_MSG_CONN_ACK) {
+ kmx_connparams_t *cp = NULL;
+ const int expected = offsetof(kmx_msg_t, mxm_u) +
+ sizeof(kmx_connreq_msg_t);
+
+ LASSERT(conn);
+ if (unlikely(error != 0)) {
+ CNETERR("received CONN_ACK from %s with error -%d\n",
+ libcfs_nid2str(peer->mxp_nid), (int) error);
+ mxlnd_conn_disconnect(conn, 1, 0);
+ } else if (unlikely(length != expected || !data_if_available)) {
+ CNETERR("received %s CONN_ACK from %s "
+ "length=%d (expected %d)\n",
+ data_if_available ? "short" : "missing",
+ libcfs_nid2str(peer->mxp_nid), length, expected);
+ mxlnd_conn_disconnect(conn, 1, 1);
+ } else {
+ /* peer is ready for messages */
+ ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
+ conn, peer, data_if_available);
+ if (unlikely(ret != 0)) {
+ CNETERR("unable to alloc kmx_connparams_t"
+ " from %llx:%d\n", nic_id, ep_id);
+ mxlnd_conn_disconnect(conn, 1, 1);
+ } else {
+ spin_lock(&kmxlnd_data.kmx_conn_lock);
+ cfs_list_add_tail(&cp->mxr_list,
+ &kmxlnd_data.kmx_conn_reqs);
+ spin_unlock(&kmxlnd_data.kmx_conn_lock);
+ up(&kmxlnd_data.kmx_conn_sem);
+ }
+ }
+ mxlnd_conn_decref(conn); /* drop ref taken above */
+
+ return MX_RECV_FINISHED;
+ }
+
+ /* Handle unexpected messages (PUT_REQ and GET_REQ) */
+
+ LASSERT(peer != NULL && conn != NULL);
+
+ rx = mxlnd_get_idle_rx(conn);
if (rx != NULL) {
- mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
- if (length <= MXLND_EAGER_SIZE) {
+ if (length <= MXLND_MSG_SIZE) {
ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
} else {
- CDEBUG(D_NETERROR, "unexpected large receive with "
- "match_value=0x%llx length=%d\n",
- match_value, length);
+ CNETERR("unexpected large receive with "
+ "match_value=0x%llx length=%d\n",
+ match_value, length);
ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
}
if (ret == 0) {
- struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
-
- /* NOTE to avoid a peer disappearing out from under us,
- * read lock the peers lock first */
- read_lock(&kmxlnd_data.kmx_peers_lock);
- mx_get_endpoint_addr_context(source, (void **) &peer);
- if (peer != NULL) {
- mxlnd_peer_addref(peer); /* add a ref... */
- spin_lock(&peer->mxp_lock);
- conn = peer->mxp_conn;
- if (conn) {
- mxlnd_conn_addref(conn); /* add ref until rx completed */
- mxlnd_peer_decref(peer); /* and drop peer ref */
- rx->mxc_conn = conn;
- }
- spin_unlock(&peer->mxp_lock);
- rx->mxc_peer = peer;
- rx->mxc_nid = peer->mxp_nid;
- }
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ /* hold conn ref until rx completes */
+ rx->mxc_conn = conn;
+ rx->mxc_peer = peer;
+ rx->mxc_nid = peer->mxp_nid;
} else {
- CDEBUG(D_NETERROR, "could not post receive\n");
+ CNETERR("could not post receive\n");
mxlnd_put_idle_rx(rx);
}
}
+ /* Encountered error, drop incoming message on the floor */
+ /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
+ * uses the standard code path and acks the sender normally */
+
if (rx == NULL || ret != 0) {
+ mxlnd_conn_decref(conn); /* drop ref taken above */
if (rx == NULL) {
- CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
+ CNETERR("no idle rxs available - dropping rx"
+ " 0x%llx from %s\n", match_value,
+ libcfs_nid2str(peer->mxp_nid));
} else {
/* ret != 0 */
- CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
+ CNETERR("disconnected peer - dropping rx\n");
}
- seg.segment_ptr = 0LL;
+ seg.segment_ptr = 0ULL;
seg.segment_length = 0;
mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
- match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
+ match_value, ~0ULL, NULL, NULL);
}
return MX_RECV_CONTINUE;
int
mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
{
- int i = 0;
- int ret = -ENOENT;
- struct kmx_peer *peer = NULL;
+ int i = 0;
+ int ret = -ENOENT;
+ kmx_peer_t *peer = NULL;
- read_lock(&kmxlnd_data.kmx_peers_lock);
+ read_lock(&kmxlnd_data.kmx_global_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
- if (index-- > 0)
- continue;
-
- *nidp = peer->mxp_nid;
- *count = atomic_read(&peer->mxp_refcount);
- ret = 0;
- break;
+ cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
+ mxp_list) {
+ if (index-- == 0) {
+ *nidp = peer->mxp_nid;
+ *count = cfs_atomic_read(&peer->mxp_refcount);
+ ret = 0;
+ break;
+ }
}
}
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ read_unlock(&kmxlnd_data.kmx_global_lock);
return ret;
}
void
-mxlnd_del_peer_locked(struct kmx_peer *peer)
+mxlnd_del_peer_locked(kmx_peer_t *peer)
{
- list_del_init(&peer->mxp_peers); /* remove from the global list */
- if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
- mxlnd_peer_decref(peer); /* drop global list ref */
+ if (peer->mxp_conn) {
+ mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
+ } else {
+ cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
+ mxlnd_peer_decref(peer); /* drop global list ref */
+ }
return;
}
{
int i = 0;
int ret = 0;
- struct kmx_peer *peer = NULL;
- struct kmx_peer *next = NULL;
+ kmx_peer_t *peer = NULL;
+ kmx_peer_t *next = NULL;
if (nid != LNET_NID_ANY) {
- peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
+ peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
}
- write_lock(&kmxlnd_data.kmx_peers_lock);
+ write_lock(&kmxlnd_data.kmx_global_lock);
if (nid != LNET_NID_ANY) {
if (peer == NULL) {
ret = -ENOENT;
}
} else { /* LNET_NID_ANY */
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry_safe(peer, next,
- &kmxlnd_data.kmx_peers[i], mxp_peers) {
+ cfs_list_for_each_entry_safe(peer, next,
+ &kmxlnd_data.kmx_peers[i],
+ mxp_list) {
mxlnd_del_peer_locked(peer);
}
}
}
- write_unlock(&kmxlnd_data.kmx_peers_lock);
+ write_unlock(&kmxlnd_data.kmx_global_lock);
return ret;
}
-struct kmx_conn *
+kmx_conn_t *
mxlnd_get_conn_by_idx(int index)
{
- int i = 0;
- struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
+ int i = 0;
+ kmx_peer_t *peer = NULL;
+ kmx_conn_t *conn = NULL;
- read_lock(&kmxlnd_data.kmx_peers_lock);
+ read_lock(&kmxlnd_data.kmx_global_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
- spin_lock(&peer->mxp_lock);
- list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
+ cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
+ mxp_list) {
+ cfs_list_for_each_entry(conn, &peer->mxp_conns,
+ mxk_list) {
if (index-- > 0) {
continue;
}
mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
- spin_unlock(&peer->mxp_lock);
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ read_unlock(&kmxlnd_data.kmx_global_lock);
return conn;
}
- spin_unlock(&peer->mxp_lock);
}
}
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ read_unlock(&kmxlnd_data.kmx_global_lock);
return NULL;
}
void
-mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
+mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
{
- struct kmx_conn *conn = NULL;
- struct kmx_conn *next = NULL;
+ kmx_conn_t *conn = NULL;
+ kmx_conn_t *next = NULL;
+
+ cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
+ mxlnd_conn_disconnect(conn, 0, 1);
- spin_lock(&peer->mxp_lock);
- list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
- mxlnd_conn_disconnect(conn, 0 , 0);
- }
- spin_unlock(&peer->mxp_lock);
return;
}
{
int i = 0;
int ret = 0;
- struct kmx_peer *peer = NULL;
+ kmx_peer_t *peer = NULL;
- read_lock(&kmxlnd_data.kmx_peers_lock);
+ write_lock(&kmxlnd_data.kmx_global_lock);
if (nid != LNET_NID_ANY) {
- peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
+ peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
if (peer == NULL) {
ret = -ENOENT;
} else {
}
} else { /* LNET_NID_ANY */
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
+ cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list)
mxlnd_close_matching_conns_locked(peer);
}
}
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ write_unlock(&kmxlnd_data.kmx_global_lock);
return ret;
}
* @ni - LNET interface handle
* @cmd - command to change
* @arg - the ioctl data
- *
- * Not implemented yet.
*/
int
mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
break;
}
case IOC_LIBCFS_GET_CONN: {
- struct kmx_conn *conn = NULL;
+ kmx_conn_t *conn = NULL;
conn = mxlnd_get_conn_by_idx(data->ioc_count);
if (conn == NULL) {
break;
}
default:
- CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
+ CNETERR("unknown ctl(%d)\n", cmd);
break;
}
}
/**
- * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
+ * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
* @tx
*
* Add the tx to the peer's msg or data queue. The caller has locked the peer.
*/
void
-mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
+mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
{
- u8 msg_type = tx->mxc_msg_type;
- //struct kmx_peer *peer = tx->mxc_peer;
- struct kmx_conn *conn = tx->mxc_conn;
+ u8 msg_type = tx->mxc_msg_type;
+ kmx_conn_t *conn = tx->mxc_conn;
LASSERT (msg_type != 0);
LASSERT (tx->mxc_nid != 0);
msg_type != MXLND_MSG_GET_DATA) {
/* msg style tx */
if (mxlnd_tx_requires_credit(tx)) {
- list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
+ cfs_list_add_tail(&tx->mxc_list,
+ &conn->mxk_tx_credit_queue);
conn->mxk_ntx_msgs++;
} else if (msg_type == MXLND_MSG_CONN_REQ ||
msg_type == MXLND_MSG_CONN_ACK) {
/* put conn msgs at the front of the queue */
- list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
+ cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
} else {
/* PUT_ACK, PUT_NAK */
- list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
+ cfs_list_add_tail(&tx->mxc_list,
+ &conn->mxk_tx_free_queue);
conn->mxk_ntx_msgs++;
}
} else {
/* data style tx */
- list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
+ cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
conn->mxk_ntx_data++;
}
* Add the tx to the peer's msg or data queue
*/
static inline void
-mxlnd_peer_queue_tx(struct kmx_ctx *tx)
+mxlnd_peer_queue_tx(kmx_ctx_t *tx)
{
- LASSERT(tx->mxc_peer != NULL);
- LASSERT(tx->mxc_conn != NULL);
- spin_lock(&tx->mxc_conn->mxk_lock);
- mxlnd_peer_queue_tx_locked(tx);
- spin_unlock(&tx->mxc_conn->mxk_lock);
+ LASSERT(tx->mxc_peer != NULL);
+ LASSERT(tx->mxc_conn != NULL);
+ spin_lock(&tx->mxc_conn->mxk_lock);
+ mxlnd_peer_queue_tx_locked(tx);
+ spin_unlock(&tx->mxc_conn->mxk_lock);
- return;
+ return;
}
/**
* Add the tx to the global queue and up the tx_queue_sem
*/
void
-mxlnd_queue_tx(struct kmx_ctx *tx)
+mxlnd_queue_tx(kmx_ctx_t *tx)
{
- struct kmx_peer *peer = tx->mxc_peer;
+ kmx_peer_t *peer = tx->mxc_peer;
LASSERT (tx->mxc_nid != 0);
if (peer != NULL) {
if (peer->mxp_incompatible &&
tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
/* let this fail now */
- tx->mxc_status.code = -ECONNABORTED;
+ tx->mxc_errno = -ECONNABORTED;
mxlnd_conn_decref(peer->mxp_conn);
mxlnd_put_idle_tx(tx);
return;
}
if (tx->mxc_conn == NULL) {
int ret = 0;
- struct kmx_conn *conn = NULL;
+ kmx_conn_t *conn = NULL;
ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
if (ret != 0) {
- tx->mxc_status.code = ret;
+ tx->mxc_errno = ret;
mxlnd_put_idle_tx(tx);
goto done;
}
mxlnd_peer_queue_tx(tx);
mxlnd_check_sends(peer);
} else {
- spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
- list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
- spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
- up(&kmxlnd_data.kmx_tx_queue_sem);
- }
+ spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
+ cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
+ spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
+ up(&kmxlnd_data.kmx_tx_queue_sem);
+ }
done:
- return;
+ return;
}
int
-mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
+mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
{
int i = 0;
int sum = 0;
nseg = last_iov - first_iov + 1;
LASSERT(nseg > 0);
- MXLND_ALLOC (seg, nseg * sizeof(*seg));
+ MXLND_ALLOC(seg, nseg * sizeof(*seg));
if (seg == NULL) {
- CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
+ CNETERR("MXLND_ALLOC() failed\n");
return -1;
}
memset(seg, 0, nseg * sizeof(*seg));
}
int
-mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
+mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
{
int i = 0;
int sum = 0;
if (!first_found && (sum > offset)) {
first_kiov = i;
first_kiov_offset = offset - old_sum;
- //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
first_found = 1;
sum = kiov[i].kiov_len - first_kiov_offset;
nseg = last_kiov - first_kiov + 1;
LASSERT(nseg > 0);
- MXLND_ALLOC (seg, nseg * sizeof(*seg));
+ MXLND_ALLOC(seg, nseg * sizeof(*seg));
if (seg == NULL) {
- CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
+ CNETERR("MXLND_ALLOC() failed\n");
return -1;
}
memset(seg, 0, niov * sizeof(*seg));
}
void
-mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
+mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
{
LASSERT(type == MXLND_MSG_PUT_ACK);
mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
tx->mxc_cookie = cookie;
tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
- tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
+ tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
tx->mxc_match = mxlnd_create_match(tx, status);
mxlnd_queue_tx(tx);
* On success, it queues the tx, on failure it calls lnet_finalize()
*/
void
-mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
+mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
{
- int ret = 0;
- lnet_process_id_t target = lntmsg->msg_target;
- unsigned int niov = lntmsg->msg_niov;
- struct iovec *iov = lntmsg->msg_iov;
- lnet_kiov_t *kiov = lntmsg->msg_kiov;
- unsigned int offset = lntmsg->msg_offset;
- unsigned int nob = lntmsg->msg_len;
- struct kmx_ctx *tx = NULL;
+ int ret = 0;
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kmx_ctx_t *tx = NULL;
LASSERT(lntmsg != NULL);
LASSERT(peer != NULL);
LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
- LASSERT((cookie>>52) == 0);
+ LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
tx = mxlnd_get_idle_tx();
if (tx == NULL) {
- CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
+ CNETERR("Can't allocate %s tx for %s\n",
msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
libcfs_nid2str(target.nid));
goto failed_0;
tx->mxc_peer = peer;
tx->mxc_conn = peer->mxp_conn;
tx->mxc_msg_type = msg_type;
- tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
- tx->mxc_state = MXLND_CTX_PENDING;
tx->mxc_lntmsg[0] = lntmsg;
tx->mxc_cookie = cookie;
tx->mxc_match = mxlnd_create_match(tx, 0);
/* This setups up the mx_ksegment_t to send the DATA payload */
if (nob == 0) {
/* do not setup the segments */
- CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
- "to %s?\n", libcfs_nid2str(target.nid));
+ CNETERR("nob = 0; why didn't we use an EAGER reply "
+ "to %s?\n", libcfs_nid2str(target.nid));
ret = 0;
} else if (kiov == NULL) {
ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
}
if (ret != 0) {
- CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
- libcfs_nid2str(target.nid));
- tx->mxc_status.code = -EIO;
+ CNETERR("Can't setup send DATA for %s\n",
+ libcfs_nid2str(target.nid));
+ tx->mxc_errno = -EIO;
goto failed_1;
}
mxlnd_queue_tx(tx);
return;
failed_0:
- CDEBUG(D_NETERROR, "no tx avail\n");
+ CNETERR("no tx avail\n");
lnet_finalize(ni, lntmsg, -EIO);
return;
}
* On success, it returns 0, on failure it returns -1
*/
int
-mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
+mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
{
- int ret = 0;
- lnet_process_id_t target = lntmsg->msg_target;
- unsigned int niov = lntmsg->msg_niov;
- struct iovec *iov = lntmsg->msg_iov;
- lnet_kiov_t *kiov = lntmsg->msg_kiov;
- unsigned int offset = lntmsg->msg_offset;
- unsigned int nob = lntmsg->msg_len;
- mx_return_t mxret = MX_SUCCESS;
+ int ret = 0;
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ mx_return_t mxret = MX_SUCCESS;
+ u64 mask = ~(MXLND_ERROR_MASK);
/* above assumes MXLND_MSG_PUT_DATA */
if (msg_type == MXLND_MSG_GET_DATA) {
LASSERT(lntmsg != NULL);
LASSERT(rx != NULL);
LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
- LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
+ LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
rx->mxc_msg_type = msg_type;
- rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
rx->mxc_state = MXLND_CTX_PENDING;
rx->mxc_nid = target.nid;
/* if posting a GET_DATA, we may not yet know the peer */
if (msg_type == MXLND_MSG_GET_DATA) {
rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
if (rx->mxc_lntmsg[1] == NULL) {
- CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
- libcfs_nid2str(target.nid));
+ CNETERR("Can't create reply for GET -> %s\n",
+ libcfs_nid2str(target.nid));
ret = -1;
}
}
if (ret != 0) {
- CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
+ CNETERR("Can't setup %s rx for %s\n",
msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
libcfs_nid2str(target.nid));
return -1;
mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
rx->mxc_seg_list, rx->mxc_nseg,
rx->mxc_pin_type, rx->mxc_match,
- 0xF00FFFFFFFFFFFFFLL, (void *) rx,
+ mask, (void *) rx,
&rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
if (rx->mxc_conn != NULL) {
mxlnd_deq_pending_ctx(rx);
}
- CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
- (int) mxret, libcfs_nid2str(target.nid));
+ CNETERR("mx_kirecv() failed with %d for %s\n",
+ (int) mxret, libcfs_nid2str(target.nid));
return -1;
}
*
* This must not block. Since we may not have a peer struct for the receiver,
* it will append send messages on a global tx list. We will then up the
- * tx_queued's semaphore to notify it of the new send.
+ * tx_queued's semaphore to notify it of the new send.
*/
int
mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
unsigned int payload_offset = lntmsg->msg_offset;
unsigned int payload_nob = lntmsg->msg_len;
- struct kmx_ctx *tx = NULL;
- struct kmx_msg *txmsg = NULL;
- struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
- struct kmx_ctx *rx_data = NULL;
- struct kmx_conn *conn = NULL;
+ kmx_ctx_t *tx = NULL;
+ kmx_msg_t *txmsg = NULL;
+ kmx_ctx_t *rx = (kmx_ctx_t *) private; /* for REPLY */
+ kmx_ctx_t *rx_data = NULL;
+ kmx_conn_t *conn = NULL;
int nob = 0;
uint32_t length = 0;
- struct kmx_peer *peer = NULL;
+ kmx_peer_t *peer = NULL;
+ rwlock_t *g_lock =&kmxlnd_data.kmx_global_lock;
CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
payload_nob, payload_niov, libcfs_id2str(target));
/* private is used on LNET_GET_REPLY only, NULL for all other cases */
/* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
- * to a new peer, use the nid */
- peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
- if (peer != NULL) {
- if (unlikely(peer->mxp_incompatible)) {
- mxlnd_peer_decref(peer); /* drop ref taken above */
- } else {
- spin_lock(&peer->mxp_lock);
- conn = peer->mxp_conn;
- if (conn) {
- mxlnd_conn_addref(conn);
- mxlnd_peer_decref(peer); /* drop peer ref taken above */
- }
- spin_unlock(&peer->mxp_lock);
+ * to a new peer, so create one if not found */
+ peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
+ if (peer == NULL || peer->mxp_conn == NULL) {
+ /* we could not find it nor could we create one or
+ * one exists but we cannot create a conn,
+ * fail this message */
+ if (peer) {
+ /* found peer without conn, drop ref taken above */
+ LASSERT(peer->mxp_conn == NULL);
+ mxlnd_peer_decref(peer);
}
+ return -ENOMEM;
}
- if (conn == NULL && peer != NULL) {
- CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
- peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
+
+ /* we have a peer with a conn */
+
+ if (unlikely(peer->mxp_incompatible)) {
+ mxlnd_peer_decref(peer); /* drop ref taken above */
+ } else {
+ read_lock(g_lock);
+ conn = peer->mxp_conn;
+ if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT)
+ mxlnd_conn_addref(conn);
+ else
+ conn = NULL;
+ read_unlock(g_lock);
+ mxlnd_peer_decref(peer); /* drop peer ref taken above */
+ if (!conn)
+ return -ENOTCONN;
}
+ LASSERT(peer && conn);
+
+ CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
+
switch (type) {
case LNET_MSG_ACK:
LASSERT (payload_nob == 0);
case LNET_MSG_PUT:
/* Is the payload small enough not to need DATA? */
nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
- if (nob <= MXLND_EAGER_SIZE)
+ if (nob <= MXLND_MSG_SIZE)
break; /* send EAGER */
tx = mxlnd_get_idle_tx();
if (unlikely(tx == NULL)) {
- CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
- type == LNET_MSG_PUT ? "PUT" : "REPLY",
- libcfs_nid2str(nid));
+ CNETERR("Can't allocate %s tx for %s\n",
+ type == LNET_MSG_PUT ? "PUT" : "REPLY",
+ libcfs_nid2str(nid));
if (conn) mxlnd_conn_decref(conn);
return -ENOMEM;
}
- /* the peer may be NULL */
tx->mxc_peer = peer;
- tx->mxc_conn = conn; /* may be NULL */
+ tx->mxc_conn = conn;
/* we added a conn ref above */
mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
txmsg = tx->mxc_msg;
* we need to determine how much to receive, it will be either
* a put_ack or a put_nak. The put_ack is larger, so use it. */
- rx = mxlnd_get_idle_rx();
+ rx = mxlnd_get_idle_rx(conn);
if (unlikely(rx == NULL)) {
- CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
- libcfs_nid2str(nid));
+ CNETERR("Can't allocate rx for PUT_ACK for %s\n",
+ libcfs_nid2str(nid));
mxlnd_put_idle_tx(tx);
if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
return -ENOMEM;
}
rx->mxc_nid = nid;
rx->mxc_peer = peer;
- /* conn may be NULL but unlikely since the first msg is always small */
- /* NOTE no need to lock peer before adding conn ref since we took
- * a conn ref for the tx (it cannot be freed between there and here ) */
- if (conn) mxlnd_conn_addref(conn); /* for this rx */
+ mxlnd_conn_addref(conn); /* for this rx */
rx->mxc_conn = conn;
rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
rx->mxc_cookie = tx->mxc_cookie;
length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
if (unlikely(ret != 0)) {
- CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
+ CNETERR("recv_msg() failed for PUT_ACK for %s\n",
libcfs_nid2str(nid));
rx->mxc_lntmsg[0] = NULL;
mxlnd_put_idle_rx(rx);
mxlnd_put_idle_tx(tx);
- if (conn) {
- mxlnd_conn_decref(conn); /* for the rx... */
- mxlnd_conn_decref(conn); /* and for the tx */
- }
+ mxlnd_conn_decref(conn); /* for the rx... */
+ mxlnd_conn_decref(conn); /* and for the tx */
return -EHOSTUNREACH;
}
/* is the REPLY message too small for DATA? */
nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
- if (nob <= MXLND_EAGER_SIZE)
+ if (nob <= MXLND_MSG_SIZE)
break; /* send EAGER */
- /* get tx (we need the cookie) , post rx for incoming DATA,
+ /* get tx (we need the cookie) , post rx for incoming DATA,
* then post GET_REQ tx */
tx = mxlnd_get_idle_tx();
if (unlikely(tx == NULL)) {
- CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
- libcfs_nid2str(nid));
- if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
+ CNETERR("Can't allocate GET tx for %s\n",
+ libcfs_nid2str(nid));
+ mxlnd_conn_decref(conn); /* for the ref taken above */
return -ENOMEM;
}
- rx_data = mxlnd_get_idle_rx();
+ rx_data = mxlnd_get_idle_rx(conn);
if (unlikely(rx_data == NULL)) {
- CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
- libcfs_nid2str(nid));
+ CNETERR("Can't allocate DATA rx for %s\n",
+ libcfs_nid2str(nid));
mxlnd_put_idle_tx(tx);
- if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
+ mxlnd_conn_decref(conn); /* for the ref taken above */
return -ENOMEM;
}
rx_data->mxc_peer = peer;
/* NOTE no need to lock peer before adding conn ref since we took
* a conn ref for the tx (it cannot be freed between there and here ) */
- if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
- rx_data->mxc_conn = conn; /* may be NULL */
+ mxlnd_conn_addref(conn); /* for the rx_data */
+ rx_data->mxc_conn = conn;
ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
if (unlikely(ret != 0)) {
- CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
- libcfs_nid2str(nid));
+ CNETERR("Can't setup GET sink for %s\n",
+ libcfs_nid2str(nid));
mxlnd_put_idle_rx(rx_data);
mxlnd_put_idle_tx(tx);
- if (conn) {
- mxlnd_conn_decref(conn); /* for the rx_data... */
- mxlnd_conn_decref(conn); /* and for the tx */
- }
+ mxlnd_conn_decref(conn); /* for the rx_data... */
+ mxlnd_conn_decref(conn); /* and for the tx */
return -EIO;
}
tx->mxc_peer = peer;
- tx->mxc_conn = conn; /* may be NULL */
+ tx->mxc_conn = conn;
/* conn ref taken above */
mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
txmsg = tx->mxc_msg;
default:
LBUG();
- if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
+ mxlnd_conn_decref(conn); /* drop ref taken above */
return -EIO;
}
/* send EAGER */
LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
- <= MXLND_EAGER_SIZE);
+ <= MXLND_MSG_SIZE);
tx = mxlnd_get_idle_tx();
if (unlikely(tx == NULL)) {
- CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
- mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
- if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
+ CNETERR("Can't send %s to %s: tx descs exhausted\n",
+ mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
+ mxlnd_conn_decref(conn); /* drop ref taken above */
return -ENOMEM;
}
tx->mxc_peer = peer;
- tx->mxc_conn = conn; /* may be NULL */
+ tx->mxc_conn = conn;
/* conn ref taken above */
nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
txmsg->mxm_u.eager.mxem_hdr = *hdr;
if (payload_kiov != NULL)
- lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
+ lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
payload_niov, payload_kiov, payload_offset, payload_nob);
else
- lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
+ lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
payload_niov, payload_iov, payload_offset, payload_nob);
unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
unsigned int offset, unsigned int mlen, unsigned int rlen)
{
- int ret = 0;
- int nob = 0;
- int len = 0;
- struct kmx_ctx *rx = private;
- struct kmx_msg *rxmsg = rx->mxc_msg;
- lnet_nid_t nid = rx->mxc_nid;
- struct kmx_ctx *tx = NULL;
- struct kmx_msg *txmsg = NULL;
- struct kmx_peer *peer = rx->mxc_peer;
- struct kmx_conn *conn = peer->mxp_conn;
- u64 cookie = 0LL;
- int msg_type = rxmsg->mxm_type;
- int repost = 1;
- int credit = 0;
- int finalize = 0;
+ int ret = 0;
+ int nob = 0;
+ int len = 0;
+ kmx_ctx_t *rx = private;
+ kmx_msg_t *rxmsg = rx->mxc_msg;
+ lnet_nid_t nid = rx->mxc_nid;
+ kmx_ctx_t *tx = NULL;
+ kmx_msg_t *txmsg = NULL;
+ kmx_peer_t *peer = rx->mxc_peer;
+ kmx_conn_t *conn = peer->mxp_conn;
+ u64 cookie = 0ULL;
+ int msg_type = rxmsg->mxm_type;
+ int repost = 1;
+ int credit = 0;
+ int finalize = 0;
LASSERT (mlen <= rlen);
/* Either all pages or all vaddrs */
LASSERT (!(kiov != NULL && iov != NULL));
- LASSERT (peer != NULL);
+ LASSERT (peer && conn);
/* conn_addref(conn) already taken for the primary rx */
nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
len = rx->mxc_status.xfer_length;
if (unlikely(nob > len)) {
- CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
- libcfs_nid2str(nid), nob, len);
+ CNETERR("Eager message from %s too big: %d(%d)\n",
+ libcfs_nid2str(nid), nob, len);
ret = -EPROTO;
break;
}
if (kiov != NULL)
lnet_copy_flat2kiov(niov, kiov, offset,
- MXLND_EAGER_SIZE, rxmsg,
+ MXLND_MSG_SIZE, rxmsg,
offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
mlen);
else
lnet_copy_flat2iov(niov, iov, offset,
- MXLND_EAGER_SIZE, rxmsg,
+ MXLND_MSG_SIZE, rxmsg,
offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
mlen);
finalize = 1;
tx = mxlnd_get_idle_tx();
if (unlikely(tx == NULL)) {
- CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
+ CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
/* Not replying will break the connection */
ret = -ENOMEM;
break;
rx->mxc_conn = conn;
/* do not take another ref for this rx, it is already taken */
rx->mxc_nid = peer->mxp_nid;
- ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
+ ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
txmsg->mxm_u.put_ack.mxpam_dst_cookie);
if (unlikely(ret != 0)) {
/* Notify peer that it's over */
- CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
- libcfs_nid2str(nid), ret);
+ CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
+ libcfs_nid2str(nid), ret);
mxlnd_ctx_init(tx);
tx->mxc_state = MXLND_CTX_PREP;
tx->mxc_peer = peer;
break;
case MXLND_MSG_GET_REQ:
+ cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
+
if (likely(lntmsg != NULL)) {
mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
- rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
+ cookie);
} else {
/* GET didn't match anything */
/* The initiator has a rx mapped to [k]iov. We cannot send a nak.
* We have to embed the error code in the match bits.
* Send the error in bits 52-59 and the cookie in bits 0-51 */
- u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
-
tx = mxlnd_get_idle_tx();
if (unlikely(tx == NULL)) {
- CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
- libcfs_nid2str(nid));
+ CNETERR("Can't get tx for GET NAK for %s\n",
+ libcfs_nid2str(nid));
+ /* we can't get a tx, notify the peer that the GET failed */
+ mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
+ ENODATA, cookie);
ret = -ENOMEM;
break;
}
if (repost) {
/* we received a message, increment peer's outstanding credits */
- if (credit == 1) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_outstanding++;
- spin_unlock(&conn->mxk_lock);
- }
+ if (credit == 1) {
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_outstanding++;
+ spin_unlock(&conn->mxk_lock);
+ }
/* we are done with the rx */
mxlnd_put_idle_rx(rx);
mxlnd_conn_decref(conn);
}
- if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
+ if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
/* we received a credit, see if we can use it to send a msg */
if (credit) mxlnd_check_sends(peer);
void
mxlnd_sleep(unsigned long timeout)
{
- set_current_state(TASK_INTERRUPTIBLE);
- schedule_timeout(timeout);
+ cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
+ cfs_schedule_timeout(timeout);
return;
}
long id = (long) arg;
int ret = 0;
int found = 0;
- struct kmx_ctx *tx = NULL;
- struct kmx_peer *peer = NULL;
- struct list_head *tmp_tx = NULL;
-
- cfs_daemonize("mxlnd_tx_queued");
- //cfs_block_allsigs();
-
- while (!kmxlnd_data.kmx_shutdown) {
- ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
- if (kmxlnd_data.kmx_shutdown)
- break;
- if (ret != 0) // Should we check for -EINTR?
- continue;
- spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
- if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
- spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
- continue;
- }
- tmp_tx = &kmxlnd_data.kmx_tx_queue;
- tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
- list_del_init(&tx->mxc_list);
- spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
-
- found = 0;
- peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
- if (peer != NULL) {
- tx->mxc_peer = peer;
- spin_lock(&peer->mxp_lock);
- if (peer->mxp_conn == NULL) {
- ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
- if (ret != 0) {
- /* out of memory, give up and fail tx */
- tx->mxc_status.code = -ENOMEM;
- spin_unlock(&peer->mxp_lock);
- mxlnd_peer_decref(peer);
- mxlnd_put_idle_tx(tx);
- continue;
- }
- }
- tx->mxc_conn = peer->mxp_conn;
- mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
- spin_unlock(&peer->mxp_lock);
- mxlnd_peer_decref(peer); /* drop peer ref taken above */
+ kmx_ctx_t *tx = NULL;
+ kmx_peer_t *peer = NULL;
+ cfs_list_t *queue = &kmxlnd_data.kmx_tx_queue;
+ spinlock_t *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
+ ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
+ if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
+ break;
+ if (ret != 0) /* Should we check for -EINTR? */
+ continue;
+ spin_lock(tx_q_lock);
+ if (cfs_list_empty(&kmxlnd_data.kmx_tx_queue)) {
+ spin_unlock(tx_q_lock);
+ continue;
+ }
+ tx = cfs_list_entry(queue->next, kmx_ctx_t, mxc_list);
+ cfs_list_del_init(&tx->mxc_list);
+ spin_unlock(tx_q_lock);
+
+ found = 0;
+ peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds ref*/
+ if (peer != NULL) {
+ tx->mxc_peer = peer;
+ write_lock(g_lock);
+ if (peer->mxp_conn == NULL) {
+ ret = mxlnd_conn_alloc_locked(&peer->mxp_conn,
+ peer);
+ if (ret != 0) {
+ /* out of memory: give up, fail tx */
+ tx->mxc_errno = -ENOMEM;
+ mxlnd_peer_decref(peer);
+ write_unlock(g_lock);
+ mxlnd_put_idle_tx(tx);
+ continue;
+ }
+ }
+ tx->mxc_conn = peer->mxp_conn;
+ mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
+ mxlnd_peer_decref(peer); /* drop peer ref taken above */
+ write_unlock(g_lock);
mxlnd_queue_tx(tx);
found = 1;
}
if (found == 0) {
- int hash = 0;
- struct kmx_peer *peer = NULL;
- struct kmx_peer *old = NULL;
+ int hash = 0;
+ kmx_peer_t *peer = NULL;
+ kmx_peer_t *old = NULL;
hash = mxlnd_nid_to_hash(tx->mxc_nid);
tx->mxc_msg_type != MXLND_MSG_GET_DATA);
/* create peer */
/* adds conn ref for this function */
- ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
+ ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
+ *kmxlnd_tunables.kmx_board,
+ *kmxlnd_tunables.kmx_ep_id, 0ULL);
if (ret != 0) {
/* finalize message */
- tx->mxc_status.code = ret;
+ tx->mxc_errno = ret;
mxlnd_put_idle_tx(tx);
continue;
}
/* add peer to global peer list, but look to see
* if someone already created it after we released
* the read lock */
- write_lock(&kmxlnd_data.kmx_peers_lock);
- list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
- if (old->mxp_nid == peer->mxp_nid) {
- /* somebody beat us here, we created a duplicate */
+ write_lock(g_lock);
+ old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
+ if (old) {
+ /* we have a peer ref on old */
+ if (old->mxp_conn) {
found = 1;
- break;
+ } else {
+ /* no conn */
+ /* drop our ref taken above... */
+ mxlnd_peer_decref(old);
+ /* and delete it */
+ mxlnd_del_peer_locked(old);
}
}
if (found == 0) {
- list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
- atomic_inc(&kmxlnd_data.kmx_npeers);
+ cfs_list_add_tail(&peer->mxp_list,
+ &kmxlnd_data.kmx_peers[hash]);
+ cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
} else {
tx->mxc_peer = old;
- spin_lock(&old->mxp_lock);
tx->mxc_conn = old->mxp_conn;
- /* FIXME can conn be NULL? */
LASSERT(old->mxp_conn != NULL);
mxlnd_conn_addref(old->mxp_conn);
- spin_unlock(&old->mxp_lock);
- mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
mxlnd_peer_decref(peer);
}
- write_unlock(&kmxlnd_data.kmx_peers_lock);
+ write_unlock(g_lock);
mxlnd_queue_tx(tx);
}
/* When calling this, we must not have the peer lock. */
void
-mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
+mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
{
- mx_return_t mxret = MX_SUCCESS;
- mx_request_t request;
- struct kmx_conn *conn = peer->mxp_conn;
+ mx_return_t mxret = MX_SUCCESS;
+ mx_request_t request;
+ kmx_conn_t *conn = peer->mxp_conn;
+ u64 match = ((u64) msg_type) << MXLND_MSG_OFFSET;
/* NOTE we are holding a conn ref every time we call this function,
* we do not need to lock the peer before taking another ref */
mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
- LASSERT(mask == MXLND_MASK_ICON_REQ ||
- mask == MXLND_MASK_ICON_ACK);
+ LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
if (peer->mxp_reconnect_time == 0) {
peer->mxp_reconnect_time = jiffies;
}
- if (peer->mxp_nic_id == 0LL) {
- mxlnd_peer_hostname_to_nic_id(peer);
- if (peer->mxp_nic_id == 0LL) {
- /* not mapped yet, return */
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_INIT;
- spin_unlock(&conn->mxk_lock);
- if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
- /* give up and notify LNET */
- mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
- function... */
- mxlnd_conn_decref(peer->mxp_conn); /* which we no
- longer need */
- }
- mxlnd_conn_decref(conn);
- return;
+ if (peer->mxp_nic_id == 0ULL) {
+ int ret = 0;
+
+ ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
+ &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
+ if (ret == 0) {
+ mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
}
+ if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
+ /* not mapped yet, return */
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
+ spin_unlock(&conn->mxk_lock);
+ }
+ }
+
+ if (cfs_time_after(jiffies,
+ peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
+ conn->mxk_status != MXLND_CONN_DISCONNECT) {
+ /* give up and notify LNET */
+ CDEBUG(D_NET, "timeout trying to connect to %s\n",
+ libcfs_nid2str(peer->mxp_nid));
+ mxlnd_conn_disconnect(conn, 0, 0);
+ mxlnd_conn_decref(conn);
+ return;
}
mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
- peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
+ peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
(void *) peer, &request);
if (unlikely(mxret != MX_SUCCESS)) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
- CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
+ CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
mxlnd_conn_decref(conn);
}
- return;
+ mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
+ MXLND_CONNECT_TIMEOUT/HZ*1000);
+ return;
}
#define MXLND_STATS 0
int
-mxlnd_check_sends(struct kmx_peer *peer)
+mxlnd_check_sends(kmx_peer_t *peer)
{
- int ret = 0;
- int found = 0;
- mx_return_t mxret = MX_SUCCESS;
- struct kmx_ctx *tx = NULL;
- struct kmx_conn *conn = NULL;
- u8 msg_type = 0;
- int credit = 0;
- int status = 0;
- int ntx_posted = 0;
- int credits = 0;
+ int ret = 0;
+ int found = 0;
+ mx_return_t mxret = MX_SUCCESS;
+ kmx_ctx_t *tx = NULL;
+ kmx_conn_t *conn = NULL;
+ u8 msg_type = 0;
+ int credit = 0;
+ int status = 0;
+ int ntx_posted = 0;
+ int credits = 0;
#if MXLND_STATS
- static unsigned long last = 0;
+ static unsigned long last = 0;
#endif
if (unlikely(peer == NULL)) {
LASSERT(peer != NULL);
return -1;
}
- spin_lock(&peer->mxp_lock);
- conn = peer->mxp_conn;
- /* NOTE take a ref for the duration of this function since it is called
- * when there might not be any queued txs for this peer */
- if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
- spin_unlock(&peer->mxp_lock);
+ write_lock(&kmxlnd_data.kmx_global_lock);
+ conn = peer->mxp_conn;
+ /* NOTE take a ref for the duration of this function since it is
+ * called when there might not be any queued txs for this peer */
+ if (conn) {
+ if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
+ write_unlock(&kmxlnd_data.kmx_global_lock);
+ return -1;
+ }
+ mxlnd_conn_addref(conn); /* for duration of this function */
+ }
+ write_unlock(&kmxlnd_data.kmx_global_lock);
/* do not add another ref for this tx */
if (conn == NULL) {
/* we do not have any conns */
+ CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
return -1;
}
#if MXLND_STATS
- if (time_after(jiffies, last)) {
- last = jiffies + HZ;
- CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
- "ntx_posted= %d ntx_data= %d data_posted= %d\n",
- mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
- conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
- conn->mxk_ntx_data, conn->mxk_data_posted);
- }
+ if (cfs_time_after(jiffies, last)) {
+ last = jiffies + HZ;
+ CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
+ "ntx_posted= %d ntx_data= %d data_posted= %d\n",
+ mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
+ conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
+ conn->mxk_ntx_data, conn->mxk_data_posted);
+ }
#endif
- /* cache peer state for asserts */
- spin_lock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
ntx_posted = conn->mxk_ntx_posted;
credits = conn->mxk_credits;
- spin_unlock(&conn->mxk_lock);
- LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
+ LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
LASSERT(ntx_posted >= 0);
- LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
+ LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
LASSERT(credits >= 0);
/* check number of queued msgs, ignore data */
- spin_lock(&conn->mxk_lock);
- if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
+ if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
/* check if any txs queued that could return credits... */
- if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
+ if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
+ conn->mxk_ntx_msgs == 0) {
/* if not, send a NOOP */
tx = mxlnd_get_idle_tx();
if (likely(tx != NULL)) {
}
}
}
- spin_unlock(&conn->mxk_lock);
/* if the peer is not ready, try to connect */
- spin_lock(&conn->mxk_lock);
if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
- conn->mxk_status == MXLND_CONN_FAIL ||
- conn->mxk_status == MXLND_CONN_REQ)) {
+ conn->mxk_status == MXLND_CONN_FAIL)) {
CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
- conn->mxk_status = MXLND_CONN_WAIT;
- spin_unlock(&conn->mxk_lock);
- mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
+ mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
+ spin_unlock(&conn->mxk_lock);
+ mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
goto done;
}
- spin_unlock(&conn->mxk_lock);
- spin_lock(&conn->mxk_lock);
- while (!list_empty(&conn->mxk_tx_free_queue) ||
- !list_empty(&conn->mxk_tx_credit_queue)) {
+ while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
+ !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
/* We have something to send. If we have a queued tx that does not
- * require a credit (free), choose it since its completion will
- * return a credit (here or at the peer), complete a DATA or
+ * require a credit (free), choose it since its completion will
+ * return a credit (here or at the peer), complete a DATA or
* CONN_REQ or CONN_ACK. */
- struct list_head *tmp_tx = NULL;
- if (!list_empty(&conn->mxk_tx_free_queue)) {
+ cfs_list_t *tmp_tx = NULL;
+ if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
tmp_tx = &conn->mxk_tx_free_queue;
} else {
tmp_tx = &conn->mxk_tx_credit_queue;
}
- tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
+ tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
msg_type = tx->mxc_msg_type;
credit = mxlnd_tx_requires_credit(tx);
if (credit) {
- if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
+ if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
CDEBUG(D_NET, "%s: posted enough\n",
libcfs_nid2str(peer->mxp_nid));
goto done_locked;
mxlnd_connstatus_to_str(conn->mxk_status),
tx->mxc_cookie,
mxlnd_msgtype_to_str(tx->mxc_msg_type));
- if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
- list_del_init(&tx->mxc_list);
- tx->mxc_status.code = -ECONNABORTED;
+ if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
+ cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
+ cfs_list_del_init(&tx->mxc_list);
+ tx->mxc_errno = -ECONNABORTED;
+ spin_unlock(&conn->mxk_lock);
mxlnd_put_idle_tx(tx);
mxlnd_conn_decref(conn);
+ goto done;
}
goto done_locked;
}
}
- list_del_init(&tx->mxc_list);
+ cfs_list_del_init(&tx->mxc_list);
/* handle credits, etc now while we have the lock to avoid races */
if (credit) {
conn->mxk_incarnation != 0) {
tx->mxc_incarnation = conn->mxk_incarnation;
}
- spin_unlock(&conn->mxk_lock);
- /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
- * or (2) there is a non-DATA msg that can return credits in the
+ /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
+ * or (2) there is a non-DATA msg that can return credits in the
* queue, then drop this duplicate NOOP */
if (unlikely(msg_type == MXLND_MSG_NOOP)) {
- spin_lock(&conn->mxk_lock);
- if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
+ if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
(conn->mxk_ntx_msgs >= 1)) {
conn->mxk_credits++;
conn->mxk_ntx_posted--;
- spin_unlock(&conn->mxk_lock);
+ spin_unlock(&conn->mxk_lock);
/* redundant NOOP */
mxlnd_put_idle_tx(tx);
mxlnd_conn_decref(conn);
found = 1;
goto done;
}
- spin_unlock(&conn->mxk_lock);
}
found = 1;
if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
(msg_type != MXLND_MSG_GET_DATA))) {
- mxlnd_pack_msg(tx);
+ mxlnd_pack_msg_locked(tx);
}
- //ret = -ECONNABORTED;
mxret = MX_SUCCESS;
- spin_lock(&conn->mxk_lock);
status = conn->mxk_status;
- spin_unlock(&conn->mxk_lock);
+ spin_unlock(&conn->mxk_lock);
if (likely((status == MXLND_CONN_READY) ||
(msg_type == MXLND_MSG_CONN_REQ) ||
msg_type != MXLND_MSG_CONN_ACK) {
/* add to the pending list */
ret = mxlnd_q_pending_ctx(tx);
- if (ret == -1) {
- /* FIXME the conn is disconnected, now what? */
- }
} else {
/* CONN_REQ/ACK */
tx->mxc_state = MXLND_CTX_PENDING;
&tx->mxc_mxreq);
} else {
/* send a DATA tx */
- spin_lock(&conn->mxk_lock);
- conn->mxk_ntx_data--;
- conn->mxk_data_posted++;
- spin_unlock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_ntx_data--;
+ conn->mxk_data_posted++;
+ spin_unlock(&conn->mxk_lock);
CDEBUG(D_NET, "sending %s 0x%llx\n",
mxlnd_msgtype_to_str(msg_type),
tx->mxc_cookie);
&tx->mxc_mxreq);
}
} else {
+ /* ret != 0 */
mxret = MX_CONNECTION_FAILED;
}
if (likely(mxret == MX_SUCCESS)) {
ret = 0;
} else {
- CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
- "sending to %s\n", mx_strerror(mxret), (int) mxret,
+ CNETERR("mx_kisend() failed with %s (%d) "
+ "sending to %s\n", mx_strerror(mxret), (int) mxret,
libcfs_nid2str(peer->mxp_nid));
- /* NOTE mx_kisend() only fails if there are not enough
+ /* NOTE mx_kisend() only fails if there are not enough
* resources. Do not change the connection status. */
if (mxret == MX_NO_RESOURCES) {
- tx->mxc_status.code = -ENOMEM;
+ tx->mxc_errno = -ENOMEM;
} else {
- tx->mxc_status.code = -ECONNABORTED;
+ tx->mxc_errno = -ECONNABORTED;
}
if (credit) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_ntx_posted--;
- conn->mxk_credits++;
- spin_unlock(&conn->mxk_lock);
- } else if (msg_type == MXLND_MSG_PUT_DATA ||
- msg_type == MXLND_MSG_GET_DATA) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_data_posted--;
- spin_unlock(&conn->mxk_lock);
- }
- if (msg_type != MXLND_MSG_PUT_DATA &&
- msg_type != MXLND_MSG_GET_DATA &&
- msg_type != MXLND_MSG_CONN_REQ &&
- msg_type != MXLND_MSG_CONN_ACK) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
- spin_unlock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_ntx_posted--;
+ conn->mxk_credits++;
+ spin_unlock(&conn->mxk_lock);
+ } else if (msg_type == MXLND_MSG_PUT_DATA ||
+ msg_type == MXLND_MSG_GET_DATA) {
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_data_posted--;
+ spin_unlock(&conn->mxk_lock);
+ }
+ if (msg_type != MXLND_MSG_PUT_DATA &&
+ msg_type != MXLND_MSG_GET_DATA &&
+ msg_type != MXLND_MSG_CONN_REQ &&
+ msg_type != MXLND_MSG_CONN_ACK) {
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_outstanding +=
+ tx->mxc_msg->mxm_credits;
+ spin_unlock(&conn->mxk_lock);
}
if (msg_type != MXLND_MSG_CONN_REQ &&
msg_type != MXLND_MSG_CONN_ACK) {
mxlnd_conn_decref(conn);
}
}
- spin_lock(&conn->mxk_lock);
- }
+ spin_lock(&conn->mxk_lock);
+ }
done_locked:
- spin_unlock(&conn->mxk_lock);
+ spin_unlock(&conn->mxk_lock);
done:
- mxlnd_conn_decref(conn); /* drop ref taken at start of function */
- return found;
+ mxlnd_conn_decref(conn); /* drop ref taken at start of function */
+ return found;
}
* idle tx list.
*/
void
-mxlnd_handle_tx_completion(struct kmx_ctx *tx)
+mxlnd_handle_tx_completion(kmx_ctx_t *tx)
{
- int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
- struct kmx_msg *msg = tx->mxc_msg;
- struct kmx_peer *peer = tx->mxc_peer;
- struct kmx_conn *conn = tx->mxc_conn;
+ int code = tx->mxc_status.code;
+ int failed = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
+ kmx_msg_t *msg = tx->mxc_msg;
+ kmx_peer_t *peer = tx->mxc_peer;
+ kmx_conn_t *conn = tx->mxc_conn;
u8 type = tx->mxc_msg_type;
int credit = mxlnd_tx_requires_credit(tx);
u64 cookie = tx->mxc_cookie;
CDEBUG(D_NET, "entering %s (0x%llx):\n",
mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
- if (unlikely(conn == NULL)) {
- mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
- conn = peer->mxp_conn;
- if (conn != NULL) {
- /* do not add a ref for the tx, it was set before sending */
- tx->mxc_conn = conn;
- tx->mxc_peer = conn->mxk_peer;
- }
- }
LASSERT (peer != NULL);
LASSERT (conn != NULL);
}
if (failed) {
- tx->mxc_status.code = -EIO;
+ if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
} else {
- spin_lock(&conn->mxk_lock);
- conn->mxk_last_tx = jiffies;
- spin_unlock(&conn->mxk_lock);
- }
-
- switch (type) {
-
- case MXLND_MSG_GET_DATA:
- spin_lock(&conn->mxk_lock);
- if (conn->mxk_incarnation == tx->mxc_incarnation) {
- conn->mxk_outstanding++;
- conn->mxk_data_posted--;
- }
- spin_unlock(&conn->mxk_lock);
- break;
-
- case MXLND_MSG_PUT_DATA:
- spin_lock(&conn->mxk_lock);
- if (conn->mxk_incarnation == tx->mxc_incarnation) {
- conn->mxk_data_posted--;
- }
- spin_unlock(&conn->mxk_lock);
- break;
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_last_tx = cfs_time_current(); /* jiffies */
+ spin_unlock(&conn->mxk_lock);
+ }
+
+ switch (type) {
+
+ case MXLND_MSG_GET_DATA:
+ spin_lock(&conn->mxk_lock);
+ if (conn->mxk_incarnation == tx->mxc_incarnation) {
+ conn->mxk_outstanding++;
+ conn->mxk_data_posted--;
+ }
+ spin_unlock(&conn->mxk_lock);
+ break;
+
+ case MXLND_MSG_PUT_DATA:
+ spin_lock(&conn->mxk_lock);
+ if (conn->mxk_incarnation == tx->mxc_incarnation) {
+ conn->mxk_data_posted--;
+ }
+ spin_unlock(&conn->mxk_lock);
+ break;
case MXLND_MSG_NOOP:
case MXLND_MSG_PUT_REQ:
case MXLND_MSG_PUT_ACK:
case MXLND_MSG_GET_REQ:
case MXLND_MSG_EAGER:
- //case MXLND_MSG_NAK:
break;
case MXLND_MSG_CONN_ACK:
}
case MXLND_MSG_CONN_REQ:
if (failed) {
- CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
- "failed with %s (%d) to %s\n",
+ CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
- mx_strstatus(tx->mxc_status.code),
- tx->mxc_status.code,
+ mx_strstatus(code), code, tx->mxc_errno,
libcfs_nid2str(tx->mxc_nid));
if (!peer->mxp_incompatible) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
+ if (code == MX_STATUS_BAD_SESSION)
+ mxlnd_set_conn_status(conn,
+ MXLND_CONN_INIT);
+ else
+ mxlnd_set_conn_status(conn,
+ MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
}
}
break;
default:
- CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
+ CNETERR("Unknown msg type of %d\n", type);
LBUG();
}
if (credit) {
- spin_lock(&conn->mxk_lock);
- if (conn->mxk_incarnation == tx->mxc_incarnation) {
- conn->mxk_ntx_posted--;
- }
- spin_unlock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
+ if (conn->mxk_incarnation == tx->mxc_incarnation) {
+ conn->mxk_ntx_posted--;
+ }
+ spin_unlock(&conn->mxk_lock);
}
- CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
mxlnd_put_idle_tx(tx);
mxlnd_conn_decref(conn);
mxlnd_check_sends(peer);
+ CDEBUG(D_NET, "leaving\n");
return;
}
+/* Handle completion of MSG or DATA rx.
+ * CONN_REQ and CONN_ACK are handled elsewhere. */
void
-mxlnd_handle_rx_completion(struct kmx_ctx *rx)
+mxlnd_handle_rx_completion(kmx_ctx_t *rx)
{
- int ret = 0;
- int repost = 1;
- int credit = 1;
- u32 nob = rx->mxc_status.xfer_length;
- u64 bits = rx->mxc_status.match_info;
- struct kmx_msg *msg = rx->mxc_msg;
- struct kmx_peer *peer = rx->mxc_peer;
- struct kmx_conn *conn = rx->mxc_conn;
- u8 type = rx->mxc_msg_type;
- u64 seq = 0LL;
- lnet_msg_t *lntmsg[2];
- int result = 0;
- u64 nic_id = 0LL;
- u32 ep_id = 0;
- int peer_ref = 0;
- int conn_ref = 0;
- int incompatible = 0;
-
- /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
- * failed GET reply, CONN_REQ, or a CONN_ACK */
+ int ret = 0;
+ int repost = 1;
+ int credit = 1;
+ u32 nob = rx->mxc_status.xfer_length;
+ u64 bits = rx->mxc_status.match_info;
+ kmx_msg_t *msg = rx->mxc_msg;
+ kmx_peer_t *peer = rx->mxc_peer;
+ kmx_conn_t *conn = rx->mxc_conn;
+ u8 type = rx->mxc_msg_type;
+ u64 seq = bits;
+ lnet_msg_t *lntmsg[2];
+ int result = 0;
+ int peer_ref = 0;
+ int conn_ref = 0;
+
+ /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
+ * failed GET reply */
/* NOTE peer may still be NULL if it is a new peer and
* conn may be NULL if this is a re-connect */
peer_ref = 1;
} else if (peer == NULL && conn != NULL) {
/* fatal error */
- CDEBUG(D_NETERROR, "rx has conn but no peer\n");
+ CERROR("rx 0x%llx from %s has conn but no peer\n",
+ bits, libcfs_nid2str(rx->mxc_nid));
LBUG();
} /* else peer and conn == NULL */
-#if 0
- if (peer == NULL || conn == NULL) {
- /* if the peer was disconnected, the peer may exist but
- * not have any valid conns */
- decref = 0; /* no peer means no ref was taken for this rx */
- }
-#endif
-
if (conn == NULL && peer != NULL) {
- spin_lock(&peer->mxp_lock);
+ write_lock(&kmxlnd_data.kmx_global_lock);
conn = peer->mxp_conn;
if (conn) {
mxlnd_conn_addref(conn); /* conn takes ref... */
conn_ref = 1;
peer_ref = 0;
}
- spin_unlock(&peer->mxp_lock);
+ write_unlock(&kmxlnd_data.kmx_global_lock);
rx->mxc_conn = conn;
}
lntmsg[0] = NULL;
lntmsg[1] = NULL;
- if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
- CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
- libcfs_nid2str(rx->mxc_nid),
- mx_strstatus(rx->mxc_status.code),
- (int) rx->mxc_status.code);
+ if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
+ rx->mxc_status.code != MX_STATUS_TRUNCATED) {
+ CNETERR("rx from %s failed with %s (%d)\n",
+ libcfs_nid2str(rx->mxc_nid),
+ mx_strstatus(rx->mxc_status.code),
+ rx->mxc_status.code);
credit = 0;
goto cleanup;
}
if (nob == 0) {
/* this may be a failed GET reply */
if (type == MXLND_MSG_GET_DATA) {
- bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
- ret = (u32) (bits>>52);
+ /* get the error (52-59) bits from the match bits */
+ ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
lntmsg[0] = rx->mxc_lntmsg[0];
result = -ret;
goto cleanup;
} else {
/* we had a rx complete with 0 bytes (no hdr, nothing) */
- CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
- libcfs_nid2str(rx->mxc_nid));
+ CNETERR("rx from %s returned with 0 bytes\n",
+ libcfs_nid2str(rx->mxc_nid));
goto cleanup;
}
}
/* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
if (type == MXLND_MSG_PUT_DATA) {
- result = rx->mxc_status.code;
+ /* result = 0; */
lntmsg[0] = rx->mxc_lntmsg[0];
goto cleanup;
} else if (type == MXLND_MSG_GET_DATA) {
- result = rx->mxc_status.code;
+ /* result = 0; */
lntmsg[0] = rx->mxc_lntmsg[0];
lntmsg[1] = rx->mxc_lntmsg[1];
goto cleanup;
ret = mxlnd_unpack_msg(msg, nob);
if (ret != 0) {
- CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
- ret, libcfs_nid2str(rx->mxc_nid));
+ CNETERR("Error %d unpacking rx from %s\n",
+ ret, libcfs_nid2str(rx->mxc_nid));
goto cleanup;
}
rx->mxc_nob = nob;
type = msg->mxm_type;
- seq = msg->mxm_seq;
- if (type != MXLND_MSG_CONN_REQ &&
- (rx->mxc_nid != msg->mxm_srcnid ||
- kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
- CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
+ if (rx->mxc_nid != msg->mxm_srcnid ||
+ kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
+ CNETERR("rx with mismatched NID (type %s) (my nid is "
"0x%llx and rx msg dst is 0x%llx)\n",
mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
msg->mxm_dstnid);
goto cleanup;
}
- if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
- if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
- msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
- if (conn != NULL) {
- CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
- "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
- "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
- libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
- msg->mxm_srcstamp, conn->mxk_incarnation,
- msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
- } else {
- CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
- "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
- libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
- msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
- }
- credit = 0;
- goto cleanup;
- }
+ if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
+ msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
+ CNETERR("Stale rx from %s with type %s "
+ "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
+ "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
+ libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
+ msg->mxm_srcstamp, conn->mxk_incarnation,
+ msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
+ credit = 0;
+ goto cleanup;
}
CDEBUG(D_NET, "Received %s with %d credits\n",
mxlnd_msgtype_to_str(type), msg->mxm_credits);
- if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
- msg->mxm_type != MXLND_MSG_CONN_ACK) {
- LASSERT(peer != NULL);
- LASSERT(conn != NULL);
- if (msg->mxm_credits != 0) {
- spin_lock(&conn->mxk_lock);
- if (msg->mxm_srcstamp == conn->mxk_incarnation) {
- if ((conn->mxk_credits + msg->mxm_credits) >
- *kmxlnd_tunables.kmx_credits) {
- CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
- conn->mxk_credits, msg->mxm_credits);
- }
- conn->mxk_credits += msg->mxm_credits;
- LASSERT(conn->mxk_credits >= 0);
- LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
+ LASSERT(peer != NULL && conn != NULL);
+ if (msg->mxm_credits != 0) {
+ spin_lock(&conn->mxk_lock);
+ if (msg->mxm_srcstamp == conn->mxk_incarnation) {
+ if ((conn->mxk_credits + msg->mxm_credits) >
+ *kmxlnd_tunables.kmx_peercredits) {
+ CNETERR("mxk_credits %d mxm_credits %d\n",
+ conn->mxk_credits, msg->mxm_credits);
}
- spin_unlock(&conn->mxk_lock);
+ conn->mxk_credits += msg->mxm_credits;
+ LASSERT(conn->mxk_credits >= 0);
+ LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
}
+ spin_unlock(&conn->mxk_lock);
}
CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
case MXLND_MSG_PUT_ACK: {
u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
if (cookie > MXLND_MAX_COOKIE) {
- CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
- libcfs_nid2str(rx->mxc_nid));
- result = -((cookie >> 52) & 0xff);
+ CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
+ libcfs_nid2str(rx->mxc_nid));
+ result = -((u32) MXLND_ERROR_VAL(cookie));
lntmsg[0] = rx->mxc_lntmsg[0];
} else {
mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
repost = ret < 0;
break;
- case MXLND_MSG_CONN_REQ:
- if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
- CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
- libcfs_nid2str(msg->mxm_srcnid),
- libcfs_nid2str(msg->mxm_dstnid));
- goto cleanup;
- }
- if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
- CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
- "%d (%d wanted)\n",
- libcfs_nid2str(msg->mxm_srcnid),
- msg->mxm_u.conn_req.mxcrm_queue_depth,
- *kmxlnd_tunables.kmx_credits);
- incompatible = 1;
- }
- if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
- CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
- "%d (%d wanted)\n",
- libcfs_nid2str(msg->mxm_srcnid),
- msg->mxm_u.conn_req.mxcrm_eager_size,
- (int) MXLND_EAGER_SIZE);
- incompatible = 1;
- }
- if (peer == NULL) {
- peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
- if (peer == NULL) {
- int hash = 0;
- struct kmx_peer *existing_peer = NULL;
- hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
-
- mx_decompose_endpoint_addr(rx->mxc_status.source,
- &nic_id, &ep_id);
- rx->mxc_nid = msg->mxm_srcnid;
-
- /* adds conn ref for peer and one for this function */
- ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
- if (ret != 0) {
- goto cleanup;
- }
- LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
- write_lock(&kmxlnd_data.kmx_peers_lock);
- existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
- if (existing_peer) {
- mxlnd_conn_decref(peer->mxp_conn);
- mxlnd_peer_decref(peer);
- peer = existing_peer;
- mxlnd_conn_addref(peer->mxp_conn);
- } else {
- list_add_tail(&peer->mxp_peers,
- &kmxlnd_data.kmx_peers[hash]);
- write_unlock(&kmxlnd_data.kmx_peers_lock);
- atomic_inc(&kmxlnd_data.kmx_npeers);
- }
- } else {
- ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
- mxlnd_peer_decref(peer); /* drop ref taken above */
- if (ret != 0) {
- CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
- goto cleanup;
- }
- }
- conn_ref = 1; /* peer/conn_alloc() added ref for this function */
- conn = peer->mxp_conn;
- } else {
- struct kmx_conn *old_conn = conn;
-
- /* do not call mx_disconnect() */
- mxlnd_conn_disconnect(old_conn, 0, 0);
-
- /* the ref for this rx was taken on the old_conn */
- mxlnd_conn_decref(old_conn);
-
- /* This allocs a conn, points peer->mxp_conn to this one.
- * The old conn is still on the peer->mxp_conns list.
- * As the pending requests complete, they will call
- * conn_decref() which will eventually free it. */
- ret = mxlnd_conn_alloc(&conn, peer);
- if (ret != 0) {
- CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
- goto cleanup;
- }
- /* conn_alloc() adds one ref for the peer and one for this function */
- conn_ref = 1;
- }
- spin_lock(&peer->mxp_lock);
- peer->mxp_incarnation = msg->mxm_srcstamp;
- peer->mxp_incompatible = incompatible;
- spin_unlock(&peer->mxp_lock);
- spin_lock(&conn->mxk_lock);
- conn->mxk_incarnation = msg->mxm_srcstamp;
- conn->mxk_status = MXLND_CONN_WAIT;
- spin_unlock(&conn->mxk_lock);
-
- /* handle_conn_ack() will create the CONN_ACK msg */
- mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
-
- break;
-
- case MXLND_MSG_CONN_ACK:
- if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
- CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
- "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
- libcfs_nid2str(msg->mxm_dstnid));
- ret = -1;
- goto failed;
- }
- if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
- CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
- "incompatible queue depth %d (%d wanted)\n",
- libcfs_nid2str(msg->mxm_srcnid),
- msg->mxm_u.conn_req.mxcrm_queue_depth,
- *kmxlnd_tunables.kmx_credits);
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
- incompatible = 1;
- ret = -1;
- }
- if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
- CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
- "incompatible EAGER size %d (%d wanted)\n",
- libcfs_nid2str(msg->mxm_srcnid),
- msg->mxm_u.conn_req.mxcrm_eager_size,
- (int) MXLND_EAGER_SIZE);
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
- incompatible = 1;
- ret = -1;
- }
- spin_lock(&peer->mxp_lock);
- peer->mxp_incarnation = msg->mxm_srcstamp;
- peer->mxp_incompatible = incompatible;
- spin_unlock(&peer->mxp_lock);
- spin_lock(&conn->mxk_lock);
- conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
- conn->mxk_outstanding = 0;
- conn->mxk_incarnation = msg->mxm_srcstamp;
- conn->mxk_timeout = 0;
- if (!incompatible) {
- conn->mxk_status = MXLND_CONN_READY;
- }
- spin_unlock(&conn->mxk_lock);
- if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
- break;
-
default:
- CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
- libcfs_nid2str(rx->mxc_nid));
+ CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
+ libcfs_nid2str(rx->mxc_nid));
ret = -EPROTO;
break;
}
-failed:
if (ret < 0) {
- MXLND_PRINT("setting PEER_CONN_FAILED\n");
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
- }
+ CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
+ }
cleanup:
- if (conn != NULL) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_last_rx = cfs_time_current(); /* jiffies */
- spin_unlock(&conn->mxk_lock);
+ if (conn != NULL) {
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_last_rx = cfs_time_current(); /* jiffies */
+ spin_unlock(&conn->mxk_lock);
}
if (repost) {
/* lnet_parse() failed, etc., repost now */
mxlnd_put_idle_rx(rx);
if (conn != NULL && credit == 1) {
- if (type == MXLND_MSG_PUT_DATA) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_outstanding++;
- spin_unlock(&conn->mxk_lock);
- } else if (type != MXLND_MSG_GET_DATA &&
- (type == MXLND_MSG_EAGER ||
- type == MXLND_MSG_PUT_REQ ||
- type == MXLND_MSG_NOOP)) {
- spin_lock(&conn->mxk_lock);
- conn->mxk_outstanding++;
- spin_unlock(&conn->mxk_lock);
+ if (type == MXLND_MSG_PUT_DATA ||
+ type == MXLND_MSG_EAGER ||
+ type == MXLND_MSG_PUT_REQ ||
+ type == MXLND_MSG_NOOP) {
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_outstanding++;
+ spin_unlock(&conn->mxk_lock);
}
}
if (conn_ref) mxlnd_conn_decref(conn);
return;
}
-
-
void
-mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
+mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
{
- struct kmx_ctx *tx = NULL;
- struct kmx_msg *txmsg = NULL;
- struct kmx_conn *conn = peer->mxp_conn;
-
- /* a conn ref was taken when calling mx_iconnect(),
+ kmx_ctx_t *tx = NULL;
+ kmx_msg_t *txmsg = NULL;
+ kmx_conn_t *conn = peer->mxp_conn;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+ u8 type = (msg_type == MXLND_MSG_ICON_REQ ?
+ MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
+
+ /* a conn ref was taken when calling mx_iconnect(),
* hold it until CONN_REQ or CONN_ACK completes */
CDEBUG(D_NET, "entering\n");
if (status.code != MX_STATUS_SUCCESS) {
- CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
- mx_strstatus(status.code), status.code,
- libcfs_nid2str(peer->mxp_nid));
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
-
- if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
- struct kmx_conn *new_conn = NULL;
- CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
- mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
- mxlnd_conn_decref(new_conn); /* which we no longer need */
- spin_lock(&peer->mxp_lock);
- peer->mxp_reconnect_time = 0;
- spin_unlock(&peer->mxp_lock);
- }
-
- mxlnd_conn_decref(conn);
- return;
- }
-
- spin_lock(&conn->mxk_lock);
- conn->mxk_epa = status.source;
- spin_unlock(&conn->mxk_lock);
- /* NOTE we are holding a ref on the conn which has a ref on the peer,
- * we should not need to lock the peer */
- mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
-
- /* mx_iconnect() succeeded, reset delay to 0 */
- spin_lock(&peer->mxp_lock);
- peer->mxp_reconnect_time = 0;
- spin_unlock(&peer->mxp_lock);
+ int send_bye = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
- /* marshal CONN_REQ msg */
- /* we are still using the conn ref from iconnect() - do not take another */
- tx = mxlnd_get_idle_tx();
- if (tx == NULL) {
- CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
- libcfs_nid2str(peer->mxp_nid));
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
- mxlnd_conn_decref(conn);
- return;
- }
-
- tx->mxc_peer = peer;
- tx->mxc_conn = conn;
- mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
- txmsg = tx->mxc_msg;
- txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
- txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
- tx->mxc_match = mxlnd_create_match(tx, 0);
-
- CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
- mxlnd_queue_tx(tx);
- return;
-}
-
-void
-mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
-{
- struct kmx_ctx *tx = NULL;
- struct kmx_msg *txmsg = NULL;
- struct kmx_conn *conn = peer->mxp_conn;
-
- /* a conn ref was taken when calling mx_iconnect(),
- * hold it until CONN_REQ or CONN_ACK completes */
-
- CDEBUG(D_NET, "entering\n");
- if (status.code != MX_STATUS_SUCCESS) {
- CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
- "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
+ CNETERR("mx_iconnect() failed for %s with %s (%d) "
+ "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
+ mxlnd_msgtype_to_str(msg_type),
mx_strstatus(status.code), status.code,
libcfs_nid2str(peer->mxp_nid),
peer->mxp_nid,
peer->mxp_nic_id,
- peer->mxp_host->mxh_ep_id);
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
-
- if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
- struct kmx_conn *new_conn = NULL;
- CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
- mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
- this function... */
- mxlnd_conn_decref(new_conn); /* which we no longer need */
- spin_lock(&peer->mxp_lock);
- peer->mxp_reconnect_time = 0;
- spin_unlock(&peer->mxp_lock);
+ peer->mxp_ep_id);
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
+
+ if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
+ MXLND_CONNECT_TIMEOUT)) {
+ CNETERR("timeout, calling conn_disconnect()\n");
+ mxlnd_conn_disconnect(conn, 0, send_bye);
}
mxlnd_conn_decref(conn);
return;
}
- spin_lock(&conn->mxk_lock);
- conn->mxk_epa = status.source;
- if (likely(!peer->mxp_incompatible)) {
- conn->mxk_status = MXLND_CONN_READY;
- }
- spin_unlock(&conn->mxk_lock);
- /* NOTE we are holding a ref on the conn which has a ref on the peer,
- * we should not have to lock the peer */
- mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
-
- /* mx_iconnect() succeeded, reset delay to 0 */
- spin_lock(&peer->mxp_lock);
- peer->mxp_reconnect_time = 0;
- spin_unlock(&peer->mxp_lock);
-
- /* marshal CONN_ACK msg */
+ mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
+ write_lock(&kmxlnd_data.kmx_global_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_epa = status.source;
+ mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
+ if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
+ mxlnd_set_conn_status(conn, MXLND_CONN_READY);
+ }
+ spin_unlock(&conn->mxk_lock);
+ write_unlock(&kmxlnd_data.kmx_global_lock);
+
+ /* mx_iconnect() succeeded, reset delay to 0 */
+ write_lock(&kmxlnd_data.kmx_global_lock);
+ peer->mxp_reconnect_time = 0;
+ peer->mxp_conn->mxk_sid = sid;
+ write_unlock(&kmxlnd_data.kmx_global_lock);
+
+ /* marshal CONN_REQ or CONN_ACK msg */
+ /* we are still using the conn ref from iconnect() - do not take another */
tx = mxlnd_get_idle_tx();
if (tx == NULL) {
- CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
- libcfs_nid2str(peer->mxp_nid));
- spin_lock(&conn->mxk_lock);
- conn->mxk_status = MXLND_CONN_FAIL;
- spin_unlock(&conn->mxk_lock);
+ CNETERR("Can't obtain %s tx for %s\n",
+ mxlnd_msgtype_to_str(type),
+ libcfs_nid2str(peer->mxp_nid));
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
mxlnd_conn_decref(conn);
return;
}
tx->mxc_peer = peer;
tx->mxc_conn = conn;
- CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
- mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
+ tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
+ CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
+ mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
txmsg = tx->mxc_msg;
- txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
- txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
+ txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
+ txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
tx->mxc_match = mxlnd_create_match(tx, 0);
mxlnd_queue_tx(tx);
mxlnd_request_waitd(void *arg)
{
long id = (long) arg;
- char name[24];
__u32 result = 0;
mx_return_t mxret = MX_SUCCESS;
mx_status_t status;
- struct kmx_ctx *ctx = NULL;
+ kmx_ctx_t *ctx = NULL;
enum kmx_req_state req_type = MXLND_REQ_TX;
- struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
+ kmx_peer_t *peer = NULL;
+ kmx_conn_t *conn = NULL;
#if MXLND_POLLING
int count = 0;
#endif
- memset(name, 0, sizeof(name));
- snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
- cfs_daemonize(name);
- //cfs_block_allsigs();
-
memset(&status, 0, sizeof(status));
CDEBUG(D_NET, "%s starting\n", name);
- while (!kmxlnd_data.kmx_shutdown) {
+ while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
+ u8 msg_type = 0;
+
mxret = MX_SUCCESS;
result = 0;
#if MXLND_POLLING
if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
- mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
+ mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
&status, &result);
} else {
count = 0;
mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
- 0LL, 0LL, &status, &result);
+ 0ULL, 0ULL, &status, &result);
}
#else
mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
- 0LL, 0LL, &status, &result);
+ 0ULL, 0ULL, &status, &result);
#endif
- if (unlikely(kmxlnd_data.kmx_shutdown))
+ if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
break;
if (result != 1) {
continue;
}
+ CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
+ "match_info 0x%llx and length %d\n",
+ mx_strstatus(status.code), status.code,
+ (u64) status.match_info, status.msg_length);
+
if (status.code != MX_STATUS_SUCCESS) {
- CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
- "match_info 0x%llx and length %d\n",
- mx_strstatus(status.code), status.code,
- (u64) status.match_info, status.msg_length);
+ CNETERR("wait_any() failed with %s (%d) with "
+ "match_info 0x%llx and length %d\n",
+ mx_strstatus(status.code), status.code,
+ (u64) status.match_info, status.msg_length);
}
+ msg_type = MXLND_MSG_TYPE(status.match_info);
+
/* This may be a mx_iconnect() request completing,
* check the bit mask for CONN_REQ and CONN_ACK */
- if (status.match_info == MXLND_MASK_ICON_REQ ||
- status.match_info == MXLND_MASK_ICON_ACK) {
- peer = (struct kmx_peer*) status.context;
- if (status.match_info == MXLND_MASK_ICON_REQ) {
- mxlnd_handle_conn_req(peer, status);
- } else {
- mxlnd_handle_conn_ack(peer, status);
- }
+ if (msg_type == MXLND_MSG_ICON_REQ ||
+ msg_type == MXLND_MSG_ICON_ACK) {
+ peer = (kmx_peer_t*) status.context;
+ mxlnd_handle_connect_msg(peer, msg_type, status);
continue;
}
/* NOTE: if this is a RX from the unexpected callback, it may
* have very little info. If we dropped it in unexpected_recv(),
* it will not have a context. If so, ignore it. */
- ctx = (struct kmx_ctx *) status.context;
+ ctx = (kmx_ctx_t *) status.context;
if (ctx != NULL) {
req_type = ctx->mxc_type;
mxlnd_deq_pending_ctx(ctx);
/* copy status to ctx->mxc_status */
- memcpy(&ctx->mxc_status, &status, sizeof(status));
+ ctx->mxc_status = status;
switch (req_type) {
case MXLND_REQ_TX:
mxlnd_handle_rx_completion(ctx);
break;
default:
- CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
+ CNETERR("Unknown ctx type %d\n", req_type);
LBUG();
break;
}
- /* FIXME may need to reconsider this */
/* conn is always set except for the first CONN_REQ rx
* from a new peer */
- if (!(status.code == MX_STATUS_SUCCESS ||
- status.code == MX_STATUS_TRUNCATED) &&
- conn != NULL) {
+ if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
mxlnd_conn_disconnect(conn, 1, 1);
}
}
unsigned long
mxlnd_check_timeouts(unsigned long now)
{
- int i = 0;
- int disconnect = 0;
- unsigned long next = 0;
- struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
-
- read_lock(&kmxlnd_data.kmx_peers_lock);
- for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
-
- if (unlikely(kmxlnd_data.kmx_shutdown)) {
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+ int i = 0;
+ int disconnect = 0;
+ unsigned long next = 0; /* jiffies */
+ kmx_peer_t *peer = NULL;
+ kmx_conn_t *conn = NULL;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ read_lock(g_lock);
+ for (i = 0; i < MXLND_HASH_SIZE; i++) {
+ cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
+ mxp_list) {
+
+ if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
+ read_unlock(g_lock);
return next;
}
- spin_lock(&peer->mxp_lock);
conn = peer->mxp_conn;
if (conn) {
mxlnd_conn_addref(conn);
- spin_unlock(&peer->mxp_lock);
} else {
- spin_unlock(&peer->mxp_lock);
continue;
}
- spin_lock(&conn->mxk_lock);
+ spin_lock(&conn->mxk_lock);
- /* if nothing pending (timeout == 0) or
- * if conn is already disconnected,
- * skip this conn */
- if (conn->mxk_timeout == 0 ||
- conn->mxk_status == MXLND_CONN_DISCONNECT) {
- spin_unlock(&conn->mxk_lock);
+ /* if nothing pending (timeout == 0) or
+ * if conn is already disconnected,
+ * skip this conn */
+ if (conn->mxk_timeout == 0 ||
+ conn->mxk_status == MXLND_CONN_DISCONNECT) {
+ spin_unlock(&conn->mxk_lock);
mxlnd_conn_decref(conn);
continue;
}
* if it is in the future, we will sleep until then.
* if it is in the past, then we will sleep one
* second and repeat the process. */
- if ((next == 0) || (conn->mxk_timeout < next)) {
+ if ((next == 0) ||
+ (cfs_time_before(conn->mxk_timeout, next))) {
next = conn->mxk_timeout;
}
disconnect = 0;
- if (time_after_eq(now, conn->mxk_timeout)) {
- disconnect = 1;
+ if (cfs_time_aftereq(now, conn->mxk_timeout))
+ disconnect = 1;
+ spin_unlock(&conn->mxk_lock);
+
+ if (disconnect)
+ mxlnd_conn_disconnect(conn, 1, 1);
+ mxlnd_conn_decref(conn);
+ }
+ }
+ read_unlock(g_lock);
+ if (next == 0)
+ next = now + MXLND_COMM_TIMEOUT;
+
+ return next;
+}
+
+void
+mxlnd_passive_connect(kmx_connparams_t *cp)
+{
+ int ret = 0;
+ int incompatible = 0;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+ int conn_ref = 0;
+ kmx_msg_t *msg = &cp->mxr_msg;
+ kmx_peer_t *peer = cp->mxr_peer;
+ kmx_conn_t *conn = NULL;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
+
+ ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
+ if (ret != 0) {
+ if (peer) {
+ CNETERR("Error %d unpacking CONN_REQ from %s\n",
+ ret, libcfs_nid2str(peer->mxp_nid));
+ } else {
+ CNETERR("Error %d unpacking CONN_REQ from "
+ "unknown host with nic_id 0x%llx\n", ret, nic_id);
+ }
+ goto cleanup;
+ }
+ if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
+ CNETERR("Can't accept %s: bad dst nid %s\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ libcfs_nid2str(msg->mxm_dstnid));
+ goto cleanup;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
+ CNETERR("Can't accept %s: incompatible queue depth "
+ "%d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_queue_depth,
+ *kmxlnd_tunables.kmx_peercredits);
+ incompatible = 1;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
+ CNETERR("Can't accept %s: incompatible EAGER size "
+ "%d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_eager_size,
+ (int) MXLND_MSG_SIZE);
+ incompatible = 1;
+ }
+
+ if (peer == NULL) {
+ peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
+ if (peer == NULL) {
+ int hash = 0;
+ u32 board = 0;
+ kmx_peer_t *existing_peer = NULL;
+
+ hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
+
+ mx_nic_id_to_board_number(nic_id, &board);
+
+ /* adds conn ref for peer and one for this function */
+ ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
+ board, ep_id, 0ULL);
+ if (ret != 0) {
+ goto cleanup;
+ }
+ peer->mxp_conn->mxk_sid = sid;
+ LASSERT(peer->mxp_ep_id == ep_id);
+ write_lock(g_lock);
+ existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
+ if (existing_peer) {
+ mxlnd_conn_decref(peer->mxp_conn);
+ mxlnd_peer_decref(peer);
+ peer = existing_peer;
+ mxlnd_conn_addref(peer->mxp_conn);
+ conn = peer->mxp_conn;
+ } else {
+ cfs_list_add_tail(&peer->mxp_list,
+ &kmxlnd_data.kmx_peers[hash]);
+ cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
}
- spin_unlock(&conn->mxk_lock);
+ write_unlock(g_lock);
+ } else {
+ ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
+ write_lock(g_lock);
+ mxlnd_peer_decref(peer); /* drop ref taken above */
+ write_unlock(g_lock);
+ if (ret != 0) {
+ CNETERR("Cannot allocate mxp_conn\n");
+ goto cleanup;
+ }
+ }
+ conn_ref = 1; /* peer/conn_alloc() added ref for this function */
+ conn = peer->mxp_conn;
+ } else { /* unexpected handler found peer */
+ kmx_conn_t *old_conn = peer->mxp_conn;
- if (disconnect) {
- mxlnd_conn_disconnect(conn, 1, 1);
+ if (sid != peer->mxp_conn->mxk_sid) {
+ /* do not call mx_disconnect() or send a BYE */
+ mxlnd_conn_disconnect(old_conn, 0, 0);
+
+ /* This allocs a conn, points peer->mxp_conn to this one.
+ * The old conn is still on the peer->mxp_conns list.
+ * As the pending requests complete, they will call
+ * conn_decref() which will eventually free it. */
+ ret = mxlnd_conn_alloc(&conn, peer);
+ if (ret != 0) {
+ CNETERR("Cannot allocate peer->mxp_conn\n");
+ goto cleanup;
}
- mxlnd_conn_decref(conn);
+ /* conn_alloc() adds one ref for the peer and one
+ * for this function */
+ conn_ref = 1;
+
+ peer->mxp_conn->mxk_sid = sid;
+ } else {
+ /* same sid */
+ conn = peer->mxp_conn;
}
}
- read_unlock(&kmxlnd_data.kmx_peers_lock);
- if (next == 0) next = now + MXLND_COMM_TIMEOUT;
+ write_lock(g_lock);
+ peer->mxp_incompatible = incompatible;
+ write_unlock(g_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_incarnation = msg->mxm_srcstamp;
+ mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
+ spin_unlock(&conn->mxk_lock);
+
+ /* handle_conn_ack() will create the CONN_ACK msg */
+ mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
- return next;
+cleanup:
+ if (conn_ref) mxlnd_conn_decref(conn);
+
+ mxlnd_connparams_free(cp);
+ return;
+}
+
+void
+mxlnd_check_conn_ack(kmx_connparams_t *cp)
+{
+ int ret = 0;
+ int incompatible = 0;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+ kmx_msg_t *msg = &cp->mxr_msg;
+ kmx_peer_t *peer = cp->mxr_peer;
+ kmx_conn_t *conn = cp->mxr_conn;
+
+ mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
+
+ ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
+ if (ret != 0) {
+ if (peer) {
+ CNETERR("Error %d unpacking CONN_ACK from %s\n",
+ ret, libcfs_nid2str(peer->mxp_nid));
+ } else {
+ CNETERR("Error %d unpacking CONN_ACK from "
+ "unknown host with nic_id 0x%llx\n", ret, nic_id);
+ }
+ ret = -1;
+ incompatible = 1;
+ goto failed;
+ }
+ if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
+ CNETERR("Can't accept CONN_ACK from %s: "
+ "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
+ libcfs_nid2str(msg->mxm_dstnid));
+ ret = -1;
+ goto failed;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
+ CNETERR("Can't accept CONN_ACK from %s: "
+ "incompatible queue depth %d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_queue_depth,
+ *kmxlnd_tunables.kmx_peercredits);
+ incompatible = 1;
+ ret = -1;
+ goto failed;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
+ CNETERR("Can't accept CONN_ACK from %s: "
+ "incompatible EAGER size %d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_eager_size,
+ (int) MXLND_MSG_SIZE);
+ incompatible = 1;
+ ret = -1;
+ goto failed;
+ }
+ write_lock(&kmxlnd_data.kmx_global_lock);
+ peer->mxp_incompatible = incompatible;
+ write_unlock(&kmxlnd_data.kmx_global_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
+ conn->mxk_outstanding = 0;
+ conn->mxk_incarnation = msg->mxm_srcstamp;
+ conn->mxk_timeout = 0;
+ if (!incompatible) {
+ CDEBUG(D_NET, "setting peer %s CONN_READY\n",
+ libcfs_nid2str(msg->mxm_srcnid));
+ mxlnd_set_conn_status(conn, MXLND_CONN_READY);
+ }
+ spin_unlock(&conn->mxk_lock);
+
+ if (!incompatible)
+ mxlnd_check_sends(peer);
+
+failed:
+ if (ret < 0) {
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
+ }
+
+ if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
+
+ mxlnd_connparams_free(cp);
+ return;
+}
+
+int
+mxlnd_abort_msgs(void)
+{
+ int count = 0;
+ cfs_list_t *orphans = &kmxlnd_data.kmx_orphan_msgs;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+
+ /* abort orphans */
+ spin_lock(g_conn_lock);
+ while (!cfs_list_empty(orphans)) {
+ kmx_ctx_t *ctx = NULL;
+ kmx_conn_t *conn = NULL;
+
+ ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
+ cfs_list_del_init(&ctx->mxc_list);
+ spin_unlock(g_conn_lock);
+
+ ctx->mxc_errno = -ECONNABORTED;
+ conn = ctx->mxc_conn;
+ CDEBUG(D_NET, "aborting %s %s %s\n",
+ mxlnd_msgtype_to_str(ctx->mxc_msg_type),
+ ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
+ libcfs_nid2str(ctx->mxc_nid));
+ if (ctx->mxc_type == MXLND_REQ_TX) {
+ mxlnd_put_idle_tx(ctx); /* do not hold any locks */
+ if (conn) mxlnd_conn_decref(conn); /* for this tx */
+ } else {
+ ctx->mxc_state = MXLND_CTX_CANCELED;
+ mxlnd_handle_rx_completion(ctx);
+ }
+
+ count++;
+ spin_lock(g_conn_lock);
+ }
+ spin_unlock(g_conn_lock);
+
+ return count;
+}
+
+int
+mxlnd_free_conn_zombies(void)
+{
+ int count = 0;
+ cfs_list_t *zombies = &kmxlnd_data.kmx_conn_zombies;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ /* cleanup any zombies */
+ spin_lock(g_conn_lock);
+ while (!cfs_list_empty(zombies)) {
+ kmx_conn_t *conn = NULL;
+
+ conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
+ cfs_list_del_init(&conn->mxk_zombie);
+ spin_unlock(g_conn_lock);
+
+ write_lock(g_lock);
+ mxlnd_conn_free_locked(conn);
+ write_unlock(g_lock);
+
+ count++;
+ spin_lock(g_conn_lock);
+ }
+ spin_unlock(g_conn_lock);
+ CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
+ return count;
+}
+
+/**
+ * mxlnd_connd - handles incoming connection requests
+ * @arg - thread id (as a void *)
+ *
+ * This thread handles incoming connection requests
+ */
+int
+mxlnd_connd(void *arg)
+{
+ long id = (long) arg;
+
+ CDEBUG(D_NET, "connd starting\n");
+
+ while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
+ int ret = 0;
+ kmx_connparams_t *cp = NULL;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+ cfs_list_t *conn_reqs = &kmxlnd_data.kmx_conn_reqs;
+
+ ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
+
+ if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
+ break;
+
+ if (ret != 0)
+ continue;
+
+ ret = mxlnd_abort_msgs();
+ ret += mxlnd_free_conn_zombies();
+
+ spin_lock(g_conn_lock);
+ if (cfs_list_empty(conn_reqs)) {
+ if (ret == 0)
+ CNETERR("connd woke up but did not find a "
+ "kmx_connparams_t or zombie conn\n");
+ spin_unlock(g_conn_lock);
+ continue;
+ }
+ cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
+ mxr_list);
+ cfs_list_del_init(&cp->mxr_list);
+ spin_unlock(g_conn_lock);
+
+ switch (MXLND_MSG_TYPE(cp->mxr_match)) {
+ case MXLND_MSG_CONN_REQ:
+ /* We have a connection request. Handle it. */
+ mxlnd_passive_connect(cp);
+ break;
+ case MXLND_MSG_CONN_ACK:
+ /* The peer is ready for messages */
+ mxlnd_check_conn_ack(cp);
+ break;
+ }
+ }
+
+ mxlnd_free_conn_zombies();
+
+ CDEBUG(D_NET, "connd stopping\n");
+ mxlnd_thread_stop(id);
+ return 0;
}
/**
int
mxlnd_timeoutd(void *arg)
{
- int i = 0;
- long id = (long) arg;
- unsigned long now = 0;
- unsigned long next = 0;
- unsigned long delay = HZ;
- struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
-
- cfs_daemonize("mxlnd_timeoutd");
- //cfs_block_allsigs();
+ int i = 0;
+ long id = (long) arg;
+ unsigned long now = 0;
+ unsigned long next = 0;
+ unsigned long delay = HZ;
+ kmx_peer_t *peer = NULL;
+ kmx_peer_t *temp = NULL;
+ kmx_conn_t *conn = NULL;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
CDEBUG(D_NET, "timeoutd starting\n");
- while (!kmxlnd_data.kmx_shutdown) {
+ while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
now = jiffies;
/* if the next timeout has not arrived, go back to sleep */
- if (time_after(now, next)) {
+ if (cfs_time_after(now, next)) {
next = mxlnd_check_timeouts(now);
}
- read_lock(&kmxlnd_data.kmx_peers_lock);
+ /* try to progress peers' txs */
+ write_lock(g_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
- spin_lock(&peer->mxp_lock);
- conn = peer->mxp_conn;
- if (conn) mxlnd_conn_addref(conn); /* take ref... */
- spin_unlock(&peer->mxp_lock);
+ cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
- if (conn == NULL)
+ /* NOTE we are safe against the removal of peer, but
+ * not against the removal of temp */
+ cfs_list_for_each_entry_safe(peer, temp, peers,
+ mxp_list) {
+ if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
+ break;
+ mxlnd_peer_addref(peer); /* add ref... */
+ conn = peer->mxp_conn;
+ if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
+ mxlnd_conn_addref(conn); /* take ref... */
+ } else {
+ CDEBUG(D_NET, "ignoring %s\n",
+ libcfs_nid2str(peer->mxp_nid));
+ mxlnd_peer_decref(peer); /* ...to here */
continue;
-
- if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
- time_after(now, conn->mxk_last_tx + HZ)) {
- mxlnd_check_sends(peer);
}
- mxlnd_conn_decref(conn); /* until here */
- }
- }
- read_unlock(&kmxlnd_data.kmx_peers_lock);
+
+ if ((conn->mxk_status == MXLND_CONN_READY ||
+ conn->mxk_status == MXLND_CONN_FAIL) &&
+ cfs_time_after(now,
+ conn->mxk_last_tx +
+ HZ)) {
+ write_unlock(g_lock);
+ mxlnd_check_sends(peer);
+ write_lock(g_lock);
+ }
+ mxlnd_conn_decref(conn); /* until here */
+ mxlnd_peer_decref(peer); /* ...to here */
+ }
+ }
+ write_unlock(g_lock);
mxlnd_sleep(delay);
}