case LNET_MSG_HELLO:
return "LNET_MSG_HELLO";
default:
+ LBUG();
return "*unknown*";
}
}
* wrap... */
tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
- tx->mxc_cookie = 1;
+ kmxlnd_data.kmx_tx_next_cookie = 1;
}
kmxlnd_data.kmx_tx_used++;
spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
int
mxlnd_put_idle_tx(struct kmx_ctx *tx)
{
- int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
- int result = failed ? -EIO : 0;
+ //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
+ int result = 0;
lnet_msg_t *lntmsg[2];
if (tx == NULL) {
CDEBUG(D_NETERROR, "called with rx\n");
return -EINVAL;
}
+ if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
+ tx->mxc_status.code == MX_STATUS_TRUNCATED))
+ result = -EIO;
lntmsg[0] = tx->mxc_lntmsg[0];
lntmsg[1] = tx->mxc_lntmsg[1];
}
}
mxlnd_conn_decref(conn); /* drop the owning peer's reference */
-
+
return;
}
* Returns 0 on success and -ENOMEM on failure
*/
int
-mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
+mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
{
struct kmx_conn *conn = NULL;
memset(conn, 0, sizeof(*conn));
/* conn->mxk_incarnation = 0 - will be set by peer */
- atomic_set(&conn->mxk_refcount, 1); /* ref for owning peer */
+ atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
+ and one for the caller */
conn->mxk_peer = peer;
/* mxk_epa - to be set after mx_iconnect() */
INIT_LIST_HEAD(&conn->mxk_list);
mxlnd_peer_addref(peer); /* add a ref for this conn */
/* add to front of peer's conns list */
- spin_lock(&peer->mxp_lock);
list_add(&conn->mxk_list, &peer->mxp_conns);
peer->mxp_conn = conn;
- spin_unlock(&peer->mxp_lock);
return 0;
}
+int
+mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
+{
+ int ret = 0;
+ spin_lock(&peer->mxp_lock);
+ ret = mxlnd_conn_alloc_locked(connp, peer);
+ spin_unlock(&peer->mxp_lock);
+ return ret;
+}
int
mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
conn->mxk_timeout = next->mxc_deadline;
}
- spin_unlock(&ctx->mxc_conn->mxk_lock);
+ spin_unlock(&conn->mxk_lock);
}
return 0;
}
peer->mxp_nic_id = nic_id;
} else {
CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
- "with %s\n", mx_strerror(mxret), name);
+ "with %s\n", name, mx_strerror(mxret));
mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
if (mxret == MX_SUCCESS) {
peer->mxp_nic_id = nic_id;
} else {
CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
- "with %s\n", mx_strerror(mxret),
- peer->mxp_host->mxh_hostname);
+ "with %s\n", peer->mxp_host->mxh_hostname,
+ mx_strerror(mxret));
}
}
return;
break;
}
}
- LASSERT(peer->mxp_host != NULL);
+ if (peer->mxp_host == NULL) {
+ CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
+ MXLND_FREE(peer, sizeof(*peer));
+ return -ENXIO;
+ }
peer->mxp_nid = nid;
/* peer->mxp_incarnation */
INIT_LIST_HEAD(&peer->mxp_peers);
spin_lock_init(&peer->mxp_lock);
INIT_LIST_HEAD(&peer->mxp_conns);
- ret = mxlnd_conn_alloc(&peer->mxp_conn, peer);
+ ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
if (ret != 0) {
mxlnd_peer_decref(peer);
return ret;
ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
if (ret != 0) {
mxlnd_reduce_idle_rxs(i);
+ mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
+ mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
mxlnd_peer_decref(peer);
return ret;
}
}
static inline struct kmx_peer *
-mxlnd_find_peer_by_nid(lnet_nid_t nid)
+mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
{
int found = 0;
int hash = 0;
hash = mxlnd_nid_to_hash(nid);
- read_lock(&kmxlnd_data.kmx_peers_lock);
list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
if (peer->mxp_nid == nid) {
found = 1;
+ mxlnd_peer_addref(peer);
break;
}
}
- read_unlock(&kmxlnd_data.kmx_peers_lock);
return (found ? peer : NULL);
}
+static inline struct kmx_peer *
+mxlnd_find_peer_by_nid(lnet_nid_t nid)
+{
+ struct kmx_peer *peer = NULL;
+
+ read_lock(&kmxlnd_data.kmx_peers_lock);
+ peer = mxlnd_find_peer_by_nid_locked(nid);
+ read_unlock(&kmxlnd_data.kmx_peers_lock);
+ return peer;
+}
+
static inline int
mxlnd_tx_requires_credit(struct kmx_ctx *tx)
{
{
int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
struct kmx_msg *msg = NULL;
-
+
LASSERT (tx != NULL);
LASSERT (nob <= MXLND_EAGER_SIZE);
return;
}
-static inline __u32
+static inline __u32
mxlnd_cksum (void *ptr, int nob)
{
char *c = ptr;
msg->mxm_credits = 0;
}
/* mxm_nob */
- msg->mxm_cksum = 0;
+ msg->mxm_cksum = 0;
msg->mxm_srcnid = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
msg->mxm_dstnid = tx->mxc_nid;
/* if it is a new peer, the dststamp will be 0 */
- msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
+ msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
msg->mxm_seq = tx->mxc_cookie;
if (*kmxlnd_tunables.kmx_cksum) {
msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
- }
+ }
}
int
{
int ret = 0;
mx_return_t mxret = MX_SUCCESS;
- __u64 mask = 0xF00FFFFFFFFFFFFFLL;
+ uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
rx->mxc_msg_type = msg_type;
rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
ret = mxlnd_q_pending_ctx(rx);
if (ret == -1) {
- /* FIXME the conn is disconnected, now what? */
+ /* the caller is responsible for calling conn_decref() if needed */
return -1;
}
mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
*/
mx_unexp_handler_action_t
mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
- __u64 match_value, __u32 length, void *data_if_available)
+ uint64_t match_value, uint32_t length, void *data_if_available)
{
int ret = 0;
struct kmx_ctx *rx = NULL;
ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
} else {
CDEBUG(D_NETERROR, "unexpected large receive with "
- "match_value=0x%llx length=%d\n",
+ "match_value=0x%llx length=%d\n",
match_value, length);
ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
}
+
if (ret == 0) {
+ struct kmx_peer *peer = NULL;
struct kmx_conn *conn = NULL;
- mx_get_endpoint_addr_context(source, (void **) &conn);
- if (conn != NULL) {
- mxlnd_conn_addref(conn);
- rx->mxc_conn = conn;
- rx->mxc_peer = conn->mxk_peer;
- if (conn->mxk_peer != NULL) {
- rx->mxc_nid = conn->mxk_peer->mxp_nid;
- } else {
- CDEBUG(D_NETERROR, "conn is 0x%p and peer "
- "is NULL\n", conn);
+
+ /* NOTE to avoid a peer disappearing out from under us,
+ * read lock the peers lock first */
+ read_lock(&kmxlnd_data.kmx_peers_lock);
+ mx_get_endpoint_addr_context(source, (void **) &peer);
+ if (peer != NULL) {
+ mxlnd_peer_addref(peer); /* add a ref... */
+ spin_lock(&peer->mxp_lock);
+ conn = peer->mxp_conn;
+ if (conn) {
+ mxlnd_conn_addref(conn); /* add ref until rx completed */
+ mxlnd_peer_decref(peer); /* and drop peer ref */
+ rx->mxc_conn = conn;
}
+ spin_unlock(&peer->mxp_lock);
+ rx->mxc_peer = peer;
+ rx->mxc_nid = peer->mxp_nid;
}
+ read_unlock(&kmxlnd_data.kmx_peers_lock);
} else {
CDEBUG(D_NETERROR, "could not post receive\n");
mxlnd_put_idle_rx(rx);
int i = 0;
int ret = -ENOENT;
struct kmx_peer *peer = NULL;
- struct kmx_conn *conn = NULL;
read_lock(&kmxlnd_data.kmx_peers_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
- conn = peer->mxp_conn;
if (index-- > 0)
continue;
}
}
read_unlock(&kmxlnd_data.kmx_peers_lock);
-
+
return ret;
}
mxlnd_del_peer_locked(struct kmx_peer *peer)
{
list_del_init(&peer->mxp_peers); /* remove from the global list */
- if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 0, 0);
+ if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
mxlnd_peer_decref(peer); /* drop global list ref */
return;
}
struct kmx_peer *next = NULL;
if (nid != LNET_NID_ANY) {
- peer = mxlnd_find_peer_by_nid(nid);
+ peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
}
write_lock(&kmxlnd_data.kmx_peers_lock);
if (nid != LNET_NID_ANY) {
if (peer == NULL) {
ret = -ENOENT;
} else {
+ mxlnd_peer_decref(peer); /* and drops it */
mxlnd_del_peer_locked(peer);
}
} else { /* LNET_NID_ANY */
for (i = 0; i < MXLND_HASH_SIZE; i++) {
- list_for_each_entry_safe(peer, next,
+ list_for_each_entry_safe(peer, next,
&kmxlnd_data.kmx_peers[i], mxp_peers) {
mxlnd_del_peer_locked(peer);
}
read_lock(&kmxlnd_data.kmx_peers_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
+ spin_lock(&peer->mxp_lock);
list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
- if (index-- > 0)
+ if (index-- > 0) {
continue;
+ }
mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
+ spin_unlock(&peer->mxp_lock);
read_unlock(&kmxlnd_data.kmx_peers_lock);
return conn;
}
+ spin_unlock(&peer->mxp_lock);
}
}
read_unlock(&kmxlnd_data.kmx_peers_lock);
-
+
return NULL;
}
struct kmx_conn *conn = NULL;
struct kmx_conn *next = NULL;
+ spin_lock(&peer->mxp_lock);
list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
mxlnd_conn_disconnect(conn, 0 , 0);
}
+ spin_unlock(&peer->mxp_lock);
return;
}
read_lock(&kmxlnd_data.kmx_peers_lock);
if (nid != LNET_NID_ANY) {
- peer = mxlnd_find_peer_by_nid(nid);
+ peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
if (peer == NULL) {
ret = -ENOENT;
} else {
mxlnd_close_matching_conns_locked(peer);
+ mxlnd_peer_decref(peer); /* and drops it here */
}
} else { /* LNET_NID_ANY */
for (i = 0; i < MXLND_HASH_SIZE; i++) {
CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
break;
}
-
+
return ret;
}
void
mxlnd_queue_tx(struct kmx_ctx *tx)
{
- int ret = 0;
struct kmx_peer *peer = tx->mxc_peer;
LASSERT (tx->mxc_nid != 0);
tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
/* let this fail now */
tx->mxc_status.code = -ECONNABORTED;
+ mxlnd_conn_decref(peer->mxp_conn);
mxlnd_put_idle_tx(tx);
return;
}
if (tx->mxc_conn == NULL) {
- mxlnd_conn_alloc(&tx->mxc_conn, peer);
+ int ret = 0;
+ struct kmx_conn *conn = NULL;
+
+ ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
+ if (ret != 0) {
+ tx->mxc_status.code = ret;
+ mxlnd_put_idle_tx(tx);
+ goto done;
+ }
+ tx->mxc_conn = conn;
+ mxlnd_peer_decref(peer); /* and takes it from peer */
}
LASSERT(tx->mxc_conn != NULL);
mxlnd_peer_queue_tx(tx);
- ret = mxlnd_check_sends(peer);
+ mxlnd_check_sends(peer);
} else {
spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
up(&kmxlnd_data.kmx_tx_queue_sem);
}
+done:
return;
}
LASSERT(first_iov >= 0 && last_iov >= first_iov);
nseg = last_iov - first_iov + 1;
LASSERT(nseg > 0);
-
+
MXLND_ALLOC (seg, nseg * sizeof(*seg));
if (seg == NULL) {
CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
nseg = last_kiov - first_kiov + 1;
LASSERT(nseg > 0);
-
+
MXLND_ALLOC (seg, nseg * sizeof(*seg));
if (seg == NULL) {
CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
goto failed_0;
}
tx->mxc_nid = target.nid;
+ /* NOTE called when we have a ref on the conn, get one for this tx */
mxlnd_conn_addref(peer->mxp_conn);
tx->mxc_peer = peer;
tx->mxc_conn = peer->mxp_conn;
return -1;
}
CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
- mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
+ mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
rx->mxc_seg_list, rx->mxc_nseg,
rx->mxc_pin_type, rx->mxc_match,
- 0xF00FFFFFFFFFFFFFLL, (void *) rx,
+ 0xF00FFFFFFFFFFFFFLL, (void *) rx,
&rx->mxc_mxreq);
if (mxret != MX_SUCCESS) {
if (rx->mxc_conn != NULL) {
mxlnd_deq_pending_ctx(rx);
}
- CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
+ CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
(int) mxret, libcfs_nid2str(target.nid));
return -1;
}
struct kmx_ctx *rx_data = NULL;
struct kmx_conn *conn = NULL;
int nob = 0;
- __u32 length = 0;
+ uint32_t length = 0;
struct kmx_peer *peer = NULL;
CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
/* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
* to a new peer, use the nid */
- peer = mxlnd_find_peer_by_nid(nid);
+ peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
if (peer != NULL) {
- conn = peer->mxp_conn;
- if (conn) mxlnd_conn_addref(conn);
+ if (unlikely(peer->mxp_incompatible)) {
+ mxlnd_peer_decref(peer); /* drop ref taken above */
+ } else {
+ spin_lock(&peer->mxp_lock);
+ conn = peer->mxp_conn;
+ if (conn) {
+ mxlnd_conn_addref(conn);
+ mxlnd_peer_decref(peer); /* drop peer ref taken above */
+ }
+ spin_unlock(&peer->mxp_lock);
+ }
}
if (conn == NULL && peer != NULL) {
- CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
- peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" :
- ((type==LNET_MSG_GET) ? "GET" : "Other")));
+ CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
+ peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
}
switch (type) {
rx->mxc_nid = nid;
rx->mxc_peer = peer;
/* conn may be NULL but unlikely since the first msg is always small */
+ /* NOTE no need to lock peer before adding conn ref since we took
+ * a conn ref for the tx (it cannot be freed between there and here ) */
if (conn) mxlnd_conn_addref(conn); /* for this rx */
rx->mxc_conn = conn;
rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
mxlnd_conn_decref(conn); /* for the rx... */
mxlnd_conn_decref(conn); /* and for the tx */
}
- return -ENOMEM;
+ return -EHOSTUNREACH;
}
mxlnd_queue_tx(tx);
return -ENOMEM;
}
rx_data->mxc_peer = peer;
+ /* NOTE no need to lock peer before adding conn ref since we took
+ * a conn ref for the tx (it cannot be freed between there and here ) */
if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
rx_data->mxc_conn = conn; /* may be NULL */
mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
tx->mxc_peer = peer;
tx->mxc_conn = conn;
+ /* no need to lock peer first since we already have a ref */
mxlnd_conn_addref(conn); /* for the tx */
txmsg = tx->mxc_msg;
txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
tx->mxc_nid = nid;
tx->mxc_peer = peer;
tx->mxc_conn = conn;
+ /* no need to lock peer first since we already have a ref */
mxlnd_conn_addref(conn); /* for this tx */
tx->mxc_cookie = cookie;
tx->mxc_match = mxlnd_create_match(tx, ENODATA);
/* we received a credit, see if we can use it to send a msg */
if (credit) mxlnd_check_sends(peer);
-
+
return ret;
}
spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
found = 0;
- peer = mxlnd_find_peer_by_nid(tx->mxc_nid);
+ peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
if (peer != NULL) {
tx->mxc_peer = peer;
+ spin_lock(&peer->mxp_lock);
+ if (peer->mxp_conn == NULL) {
+ ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
+ if (ret != 0) {
+ /* out of memory, give up and fail tx */
+ tx->mxc_status.code = -ENOMEM;
+ spin_unlock(&peer->mxp_lock);
+ mxlnd_peer_decref(peer);
+ mxlnd_put_idle_tx(tx);
+ continue;
+ }
+ }
tx->mxc_conn = peer->mxp_conn;
mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
+ spin_unlock(&peer->mxp_lock);
+ mxlnd_peer_decref(peer); /* drop peer ref taken above */
mxlnd_queue_tx(tx);
found = 1;
}
LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
tx->mxc_msg_type != MXLND_MSG_GET_DATA);
/* create peer */
+ /* adds conn ref for this function */
ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
if (ret != 0) {
/* finalize message */
- tx->mxc_status.code = -ECONNABORTED;
+ tx->mxc_status.code = ret;
mxlnd_put_idle_tx(tx);
continue;
}
tx->mxc_peer = peer;
tx->mxc_conn = peer->mxp_conn;
+ /* this tx will keep the conn ref taken in peer_alloc() */
/* add peer to global peer list, but look to see
* if someone already created it after we released
atomic_inc(&kmxlnd_data.kmx_npeers);
} else {
tx->mxc_peer = old;
+ spin_lock(&old->mxp_lock);
tx->mxc_conn = old->mxp_conn;
+ /* FIXME can conn be NULL? */
+ LASSERT(old->mxp_conn != NULL);
+ mxlnd_conn_addref(old->mxp_conn);
+ spin_unlock(&old->mxp_lock);
mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
+ mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
+ mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
mxlnd_peer_decref(peer);
}
- mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
write_unlock(&kmxlnd_data.kmx_peers_lock);
mxlnd_queue_tx(tx);
mx_request_t request;
struct kmx_conn *conn = peer->mxp_conn;
+ /* NOTE we are holding a conn ref every time we call this function,
+ * we do not need to lock the peer before taking another ref */
mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
LASSERT(mask == MXLND_MASK_ICON_REQ ||
if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
/* give up and notify LNET */
mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&peer->mxp_conn, peer);
+ mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
+ function... */
+ mxlnd_conn_decref(peer->mxp_conn); /* which we no
+ longer need */
}
mxlnd_conn_decref(conn);
return;
}
}
- mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
- peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
+ mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
+ peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
(void *) peer, &request);
if (unlikely(mxret != MX_SUCCESS)) {
spin_lock(&conn->mxk_lock);
LASSERT(peer != NULL);
return -1;
}
+ spin_lock(&peer->mxp_lock);
conn = peer->mxp_conn;
+ /* NOTE take a ref for the duration of this function since it is called
+ * when there might not be any queued txs for this peer */
+ if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
+ spin_unlock(&peer->mxp_lock);
+
/* do not add another ref for this tx */
if (conn == NULL) {
if (time_after(jiffies, last)) {
last = jiffies + HZ;
CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
- "ntx_posted= %d ntx_data= %d data_posted= %d\n",
- mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
+ "ntx_posted= %d ntx_data= %d data_posted= %d\n",
+ mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
conn->mxk_ntx_data, conn->mxk_data_posted);
}
if (credit) {
if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
- CDEBUG(D_NET, "%s: posted enough\n",
+ CDEBUG(D_NET, "%s: posted enough\n",
libcfs_nid2str(peer->mxp_nid));
goto done_locked;
}
-
+
if (conn->mxk_credits == 0) {
- CDEBUG(D_NET, "%s: no credits\n",
+ CDEBUG(D_NET, "%s: no credits\n",
libcfs_nid2str(peer->mxp_nid));
goto done_locked;
}
if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
msg_type == MXLND_MSG_CONN_ACK)) {
CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
- mxlnd_connstatus_to_str(conn->mxk_status),
- tx->mxc_cookie,
+ mxlnd_connstatus_to_str(conn->mxk_status),
+ tx->mxc_cookie,
mxlnd_msgtype_to_str(tx->mxc_msg_type));
if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
list_del_init(&tx->mxc_list);
/* send a msg style tx */
LASSERT(tx->mxc_nseg == 1);
LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
- CDEBUG(D_NET, "sending %s 0x%llx\n",
+ CDEBUG(D_NET, "sending %s 0x%llx\n",
mxlnd_msgtype_to_str(msg_type),
tx->mxc_cookie);
- mxret = mx_kisend(kmxlnd_data.kmx_endpt,
- &tx->mxc_seg,
+ mxret = mx_kisend(kmxlnd_data.kmx_endpt,
+ &tx->mxc_seg,
tx->mxc_nseg,
tx->mxc_pin_type,
- conn->mxk_epa,
- tx->mxc_match,
+ conn->mxk_epa,
+ tx->mxc_match,
(void *) tx,
&tx->mxc_mxreq);
} else {
conn->mxk_ntx_data--;
conn->mxk_data_posted++;
spin_unlock(&conn->mxk_lock);
- CDEBUG(D_NET, "sending %s 0x%llx\n",
- mxlnd_msgtype_to_str(msg_type),
+ CDEBUG(D_NET, "sending %s 0x%llx\n",
+ mxlnd_msgtype_to_str(msg_type),
tx->mxc_cookie);
- mxret = mx_kisend(kmxlnd_data.kmx_endpt,
- tx->mxc_seg_list,
+ mxret = mx_kisend(kmxlnd_data.kmx_endpt,
+ tx->mxc_seg_list,
tx->mxc_nseg,
tx->mxc_pin_type,
- conn->mxk_epa,
- tx->mxc_match,
+ conn->mxk_epa,
+ tx->mxc_match,
(void *) tx,
&tx->mxc_mxreq);
}
ret = 0;
} else {
CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
- "sending to %s\n", mx_strerror(mxret), (int) mxret,
+ "sending to %s\n", mx_strerror(mxret), (int) mxret,
libcfs_nid2str(peer->mxp_nid));
/* NOTE mx_kisend() only fails if there are not enough
* resources. Do not change the connection status. */
conn->mxk_ntx_posted--;
conn->mxk_credits++;
spin_unlock(&conn->mxk_lock);
- } else if (msg_type == MXLND_MSG_PUT_DATA ||
+ } else if (msg_type == MXLND_MSG_PUT_DATA ||
msg_type == MXLND_MSG_GET_DATA) {
spin_lock(&conn->mxk_lock);
conn->mxk_data_posted--;
done_locked:
spin_unlock(&conn->mxk_lock);
done:
+ mxlnd_conn_decref(conn); /* drop ref taken at start of function */
return found;
}
int credit = mxlnd_tx_requires_credit(tx);
u64 cookie = tx->mxc_cookie;
- CDEBUG(D_NET, "entering %s (0x%llx):\n",
+ CDEBUG(D_NET, "entering %s (0x%llx):\n",
mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
if (unlikely(conn == NULL)) {
- mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &conn);
+ mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
+ conn = peer->mxp_conn;
if (conn != NULL) {
/* do not add a ref for the tx, it was set before sending */
tx->mxc_conn = conn;
CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
"failed with %s (%d) to %s\n",
type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
- mx_strstatus(tx->mxc_status.code),
+ mx_strstatus(tx->mxc_status.code),
tx->mxc_status.code,
libcfs_nid2str(tx->mxc_nid));
if (!peer->mxp_incompatible) {
int result = 0;
u64 nic_id = 0LL;
u32 ep_id = 0;
- int decref = 1;
+ int peer_ref = 0;
+ int conn_ref = 0;
int incompatible = 0;
/* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
* failed GET reply, CONN_REQ, or a CONN_ACK */
- /* NOTE peer may still be NULL if it is a new peer */
+ /* NOTE peer may still be NULL if it is a new peer and
+ * conn may be NULL if this is a re-connect */
+ if (likely(peer != NULL && conn != NULL)) {
+ /* we have a reference on the conn */
+ conn_ref = 1;
+ } else if (peer != NULL && conn == NULL) {
+ /* we have a reference on the peer */
+ peer_ref = 1;
+ } else if (peer == NULL && conn != NULL) {
+ /* fatal error */
+ CDEBUG(D_NETERROR, "rx has conn but no peer\n");
+ LBUG();
+ } /* else peer and conn == NULL */
+
+#if 0
if (peer == NULL || conn == NULL) {
/* if the peer was disconnected, the peer may exist but
* not have any valid conns */
decref = 0; /* no peer means no ref was taken for this rx */
}
+#endif
if (conn == NULL && peer != NULL) {
+ spin_lock(&peer->mxp_lock);
conn = peer->mxp_conn;
+ if (conn) {
+ mxlnd_conn_addref(conn); /* conn takes ref... */
+ mxlnd_peer_decref(peer); /* from peer */
+ conn_ref = 1;
+ peer_ref = 0;
+ }
+ spin_unlock(&peer->mxp_lock);
rx->mxc_conn = conn;
}
if (nob == 0) {
/* this may be a failed GET reply */
if (type == MXLND_MSG_GET_DATA) {
- bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
+ bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
ret = (u32) (bits>>52);
lntmsg[0] = rx->mxc_lntmsg[0];
result = -ret;
(!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
- "0x%llx and rx msg dst is 0x%llx)\n",
- mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
+ "0x%llx and rx msg dst is 0x%llx)\n",
+ mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
msg->mxm_dstnid);
goto cleanup;
}
if (conn != NULL) {
CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
"(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
- "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
+ "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
- msg->mxm_srcstamp, conn->mxk_incarnation,
+ msg->mxm_srcstamp, conn->mxk_incarnation,
msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
} else {
CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
- "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
+ "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
}
}
}
- CDEBUG(D_NET, "Received %s with %d credits\n",
+ CDEBUG(D_NET, "Received %s with %d credits\n",
mxlnd_msgtype_to_str(type), msg->mxm_credits);
if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
case MXLND_MSG_PUT_ACK: {
u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
if (cookie > MXLND_MAX_COOKIE) {
- CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
+ CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
libcfs_nid2str(rx->mxc_nid));
result = -((cookie >> 52) & 0xff);
lntmsg[0] = rx->mxc_lntmsg[0];
} else {
- mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
- rx->mxc_peer, MXLND_MSG_PUT_DATA,
+ mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
+ rx->mxc_peer, MXLND_MSG_PUT_DATA,
rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
}
/* repost == 1 */
incompatible = 1;
}
if (peer == NULL) {
- peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid);
+ peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
if (peer == NULL) {
- int hash = 0;
+ int hash = 0;
+ struct kmx_peer *existing_peer = NULL;
hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
-
+
mx_decompose_endpoint_addr(rx->mxc_status.source,
&nic_id, &ep_id);
rx->mxc_nid = msg->mxm_srcnid;
-
+
+ /* adds conn ref for peer and one for this function */
ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
if (ret != 0) {
goto cleanup;
}
LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
write_lock(&kmxlnd_data.kmx_peers_lock);
- list_add_tail(&peer->mxp_peers,
- &kmxlnd_data.kmx_peers[hash]);
- write_unlock(&kmxlnd_data.kmx_peers_lock);
- atomic_inc(&kmxlnd_data.kmx_npeers);
+ existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
+ if (existing_peer) {
+ mxlnd_conn_decref(peer->mxp_conn);
+ mxlnd_peer_decref(peer);
+ peer = existing_peer;
+ mxlnd_conn_addref(peer->mxp_conn);
+ } else {
+ list_add_tail(&peer->mxp_peers,
+ &kmxlnd_data.kmx_peers[hash]);
+ write_unlock(&kmxlnd_data.kmx_peers_lock);
+ atomic_inc(&kmxlnd_data.kmx_npeers);
+ }
} else {
- ret = mxlnd_conn_alloc(&conn, peer);
+ ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
+ mxlnd_peer_decref(peer); /* drop ref taken above */
if (ret != 0) {
CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
goto cleanup;
}
}
+ conn_ref = 1; /* peer/conn_alloc() added ref for this function */
conn = peer->mxp_conn;
} else {
struct kmx_conn *old_conn = conn;
/* the ref for this rx was taken on the old_conn */
mxlnd_conn_decref(old_conn);
- /* do not decref this conn below */
- decref = 0;
-
/* This allocs a conn, points peer->mxp_conn to this one.
* The old conn is still on the peer->mxp_conns list.
* As the pending requests complete, they will call
CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
goto cleanup;
}
+ /* conn_alloc() adds one ref for the peer and one for this function */
+ conn_ref = 1;
}
spin_lock(&peer->mxp_lock);
peer->mxp_incarnation = msg->mxm_srcstamp;
spin_unlock(&conn->mxk_lock);
}
}
- if (decref) mxlnd_conn_decref(conn);
+ if (conn_ref) mxlnd_conn_decref(conn);
+ LASSERT(peer_ref == 0);
}
if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
}
- if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
- if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
+ if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
+ if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
CDEBUG(D_NET, "entering\n");
if (status.code != MX_STATUS_SUCCESS) {
- CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
- mx_strstatus(status.code), status.code,
+ CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
+ mx_strstatus(status.code), status.code,
libcfs_nid2str(peer->mxp_nid));
spin_lock(&conn->mxk_lock);
conn->mxk_status = MXLND_CONN_FAIL;
struct kmx_conn *new_conn = NULL;
CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&new_conn, peer);
+ mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
+ mxlnd_conn_decref(new_conn); /* which we no longer need */
spin_lock(&peer->mxp_lock);
peer->mxp_reconnect_time = 0;
spin_unlock(&peer->mxp_lock);
spin_lock(&conn->mxk_lock);
conn->mxk_epa = status.source;
spin_unlock(&conn->mxk_lock);
- mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
+ /* NOTE we are holding a ref on the conn which has a ref on the peer,
+ * we should not need to lock the peer */
+ mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
/* mx_iconnect() succeeded, reset delay to 0 */
spin_lock(&peer->mxp_lock);
/* we are still using the conn ref from iconnect() - do not take another */
tx = mxlnd_get_idle_tx();
if (tx == NULL) {
- CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
+ CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
libcfs_nid2str(peer->mxp_nid));
spin_lock(&conn->mxk_lock);
conn->mxk_status = MXLND_CONN_FAIL;
CDEBUG(D_NET, "entering\n");
if (status.code != MX_STATUS_SUCCESS) {
- struct kmx_conn *conn = peer->mxp_conn;
CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
- "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
- mx_strstatus(status.code), status.code,
+ "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
+ mx_strstatus(status.code), status.code,
libcfs_nid2str(peer->mxp_nid),
peer->mxp_nid,
peer->mxp_nic_id,
struct kmx_conn *new_conn = NULL;
CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
mxlnd_conn_disconnect(conn, 0, 1);
- mxlnd_conn_alloc(&new_conn, peer);
+ mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
+ this function... */
+ mxlnd_conn_decref(new_conn); /* which we no longer need */
spin_lock(&peer->mxp_lock);
peer->mxp_reconnect_time = 0;
spin_unlock(&peer->mxp_lock);
conn->mxk_status = MXLND_CONN_READY;
}
spin_unlock(&conn->mxk_lock);
- mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
+ /* NOTE we are holding a ref on the conn which has a ref on the peer,
+ * we should not have to lock the peer */
+ mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
/* mx_iconnect() succeeded, reset delay to 0 */
spin_lock(&peer->mxp_lock);
/* marshal CONN_ACK msg */
tx = mxlnd_get_idle_tx();
if (tx == NULL) {
- CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
+ CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
libcfs_nid2str(peer->mxp_nid));
spin_lock(&conn->mxk_lock);
conn->mxk_status = MXLND_CONN_FAIL;
result = 0;
#if MXLND_POLLING
if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
- mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
+ mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
&status, &result);
} else {
count = 0;
- mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
+ mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
0LL, 0LL, &status, &result);
}
#else
- mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
+ mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
0LL, 0LL, &status, &result);
#endif
if (unlikely(kmxlnd_data.kmx_shutdown))
if (status.code != MX_STATUS_SUCCESS) {
CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
- "match_info 0x%llx and length %d\n",
- mx_strstatus(status.code), status.code,
+ "match_info 0x%llx and length %d\n",
+ mx_strstatus(status.code), status.code,
(u64) status.match_info, status.msg_length);
}
req_type = ctx->mxc_type;
conn = ctx->mxc_conn; /* this may be NULL */
mxlnd_deq_pending_ctx(ctx);
-
+
/* copy status to ctx->mxc_status */
memcpy(&ctx->mxc_status, &status, sizeof(status));
-
+
switch (req_type) {
case MXLND_REQ_TX:
mxlnd_handle_tx_completion(ctx);
LBUG();
break;
}
-
+
+ /* FIXME may need to reconsider this */
/* conn is always set except for the first CONN_REQ rx
* from a new peer */
- if (!(status.code == MX_STATUS_SUCCESS ||
+ if (!(status.code == MX_STATUS_SUCCESS ||
status.code == MX_STATUS_TRUNCATED) &&
conn != NULL) {
mxlnd_conn_disconnect(conn, 1, 1);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
- if (unlikely(kmxlnd_data.kmx_shutdown))
+ if (unlikely(kmxlnd_data.kmx_shutdown)) {
+ read_unlock(&kmxlnd_data.kmx_peers_lock);
return next;
-
+ }
+
+ spin_lock(&peer->mxp_lock);
conn = peer->mxp_conn;
- if (conn == NULL)
+ if (conn) {
+ mxlnd_conn_addref(conn);
+ spin_unlock(&peer->mxp_lock);
+ } else {
+ spin_unlock(&peer->mxp_lock);
continue;
+ }
- mxlnd_conn_addref(conn);
spin_lock(&conn->mxk_lock);
-
+
/* if nothing pending (timeout == 0) or
* if conn is already disconnected,
* skip this conn */
if ((next == 0) || (conn->mxk_timeout < next)) {
next = conn->mxk_timeout;
}
-
+
disconnect = 0;
if (time_after_eq(now, conn->mxk_timeout)) {
read_lock(&kmxlnd_data.kmx_peers_lock);
for (i = 0; i < MXLND_HASH_SIZE; i++) {
list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
+ spin_lock(&peer->mxp_lock);
conn = peer->mxp_conn;
+ if (conn) mxlnd_conn_addref(conn); /* take ref... */
+ spin_unlock(&peer->mxp_lock);
+
if (conn == NULL)
continue;
time_after(now, conn->mxk_last_tx + HZ)) {
mxlnd_check_sends(peer);
}
+ mxlnd_conn_decref(conn); /* until here */
}
}
read_unlock(&kmxlnd_data.kmx_peers_lock);