From 79068d88c9fed52a926811c084b2f98096c7a4ec Mon Sep 17 00:00:00 2001 From: isaac Date: Wed, 26 Mar 2008 07:48:45 +0000 Subject: [PATCH] - mxlnd updates from upstream. --- lnet/klnds/mxlnd/README | 6 +- lnet/klnds/mxlnd/mxlnd.c | 67 +++--- lnet/klnds/mxlnd/mxlnd.h | 9 +- lnet/klnds/mxlnd/mxlnd_cb.c | 449 +++++++++++++++++++++++++------------ lnet/klnds/mxlnd/mxlnd_modparams.c | 15 +- 5 files changed, 362 insertions(+), 184 deletions(-) diff --git a/lnet/klnds/mxlnd/README b/lnet/klnds/mxlnd/README index cc87e7a..eb79608 100644 --- a/lnet/klnds/mxlnd/README +++ b/lnet/klnds/mxlnd/README @@ -149,9 +149,9 @@ send email to help@myri.com. 2. Multi-homing -At this time, the MXLND cannot drive more than one interface at a time. Thus, -a single Lustre router cannot route between two MX-10G, between two MX-2G, or -between MX-10G and MX-2G fabrics. +At this time, the MXLND does not support more than one interface at a time. +Thus, a single Lustre router cannot route between two MX-10G, between two +MX-2G, or between MX-10G and MX-2G fabrics. 3. MX endpoint collision diff --git a/lnet/klnds/mxlnd/mxlnd.c b/lnet/klnds/mxlnd/mxlnd.c index 34dbf5e..141ad20 100644 --- a/lnet/klnds/mxlnd/mxlnd.c +++ b/lnet/klnds/mxlnd/mxlnd.c @@ -122,11 +122,9 @@ mxlnd_ctx_init(struct kmx_ctx *ctx) ctx->mxc_state = MXLND_CTX_IDLE; /* ignore mxc_global_list */ if (ctx->mxc_list.next != NULL && !list_empty(&ctx->mxc_list)) { - if (ctx->mxc_peer != NULL) - spin_lock(&ctx->mxc_lock); + if (ctx->mxc_peer != NULL) spin_lock(&ctx->mxc_lock); list_del_init(&ctx->mxc_list); - if (ctx->mxc_peer != NULL) - spin_unlock(&ctx->mxc_lock); + if (ctx->mxc_peer != NULL) spin_unlock(&ctx->mxc_lock); } /* ignore mxc_rx_list */ /* ignore mxc_lock */ @@ -388,6 +386,10 @@ mxlnd_parse_line(char *line) host->mxh_addr = ((ip[0]<<24)|(ip[1]<<16)|(ip[2]<<8)|ip[3]); len = strlen(hostname); MXLND_ALLOC(host->mxh_hostname, len + 1); + if (host->mxh_hostname == NULL) { + mxlnd_host_free(host); + return -ENOMEM; + } memset(host->mxh_hostname, 0, len + 1); strncpy(host->mxh_hostname, hostname, len); host->mxh_board = board; @@ -433,6 +435,7 @@ mxlnd_parse_hosts(char *filename) s32 allocd = 0; loff_t offset = 0; struct file *filp = NULL; + struct inode *inode = NULL; char *buf = NULL; s32 buf_off = 0; char *sep = NULL; @@ -446,7 +449,13 @@ mxlnd_parse_hosts(char *filename) return -1; } - size = (s32) cfs_filp_size(filp); + inode = filp->f_dentry->d_inode; + if (!S_ISREG(inode->i_mode)) { + CERROR("%s is not a regular file\n", filename); + return -1; + } + + size = (s32) inode->i_size; if (size < MXLND_BUFSIZE) bufsize = size; allocd = bufsize; MXLND_ALLOC(buf, allocd + 1); @@ -608,8 +617,8 @@ mxlnd_thread_start(int (*fn)(void *arg), void *arg) init_completion(&kmxlnd_data.kmx_completions[i]); pid = kernel_thread (fn, arg, 0); - if (pid <= 0) { - CERROR("mx_thread_start() failed with %d\n", pid); + if (pid < 0) { + CERROR("kernel_thread() failed with %d\n", pid); atomic_dec(&kmxlnd_data.kmx_nthreads); } return pid; @@ -638,7 +647,8 @@ mxlnd_thread_stop(long id) void mxlnd_shutdown (lnet_ni_t *ni) { - int i = 0; + int i = 0; + int nthreads = 2 + *kmxlnd_tunables.kmx_n_waitd; LASSERT (ni == kmxlnd_data.kmx_ni); LASSERT (ni->ni_data == &kmxlnd_data); @@ -666,7 +676,7 @@ mxlnd_shutdown (lnet_ni_t *ni) CDEBUG(D_NET, "waiting on threads\n"); /* wait for threads to complete */ - for (i = 0; i < MXLND_NCOMPLETIONS; i++) { + for (i = 0; i < nthreads; i++) { wait_for_completion(&kmxlnd_data.kmx_completions[i]); } LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0); @@ -745,9 +755,10 @@ mxlnd_shutdown (lnet_ni_t *ni) int mxlnd_startup (lnet_ni_t *ni) { - int i = 0; - int ret = 0; - struct timeval tv; + int i = 0; + int ret = 0; + int nthreads = 2; /* for timeoutd and tx_queued */ + struct timeval tv; LASSERT (ni->ni_lnd == &the_kmxlnd); @@ -762,6 +773,8 @@ mxlnd_startup (lnet_ni_t *ni) /* reserve 1/2 of tx for connect request messages */ ni->ni_maxtxcredits = *kmxlnd_tunables.kmx_ntx / 2; ni->ni_peertxcredits = *kmxlnd_tunables.kmx_credits; + if (ni->ni_maxtxcredits < ni->ni_peertxcredits) + ni->ni_maxtxcredits = ni->ni_peertxcredits; PORTAL_MODULE_USE; memset (&kmxlnd_data, 0, sizeof (kmxlnd_data)); @@ -799,7 +812,7 @@ mxlnd_startup (lnet_ni_t *ni) spin_lock_init (&kmxlnd_data.kmx_rxs_lock); INIT_LIST_HEAD (&kmxlnd_data.kmx_rx_idle); spin_lock_init (&kmxlnd_data.kmx_rx_idle_lock); - + kmxlnd_data.kmx_init = MXLND_INIT_DATA; /*****************************************************/ @@ -830,20 +843,17 @@ mxlnd_startup (lnet_ni_t *ni) /* start threads */ + nthreads += *kmxlnd_tunables.kmx_n_waitd; MXLND_ALLOC (kmxlnd_data.kmx_completions, - MXLND_NCOMPLETIONS * sizeof(struct completion)); + nthreads * sizeof(struct completion)); if (kmxlnd_data.kmx_completions == NULL) { - CERROR("failed to alloc kmxlnd_data.kmx_completions"); + CERROR("failed to alloc kmxlnd_data.kmx_completions\n"); goto failed; } memset(kmxlnd_data.kmx_completions, 0, - MXLND_NCOMPLETIONS * sizeof(struct completion)); + nthreads * sizeof(struct completion)); { - int i = 0; - if (MXLND_N_SCHED > *kmxlnd_tunables.kmx_n_waitd) { - *kmxlnd_tunables.kmx_n_waitd = MXLND_N_SCHED; - } CDEBUG(D_NET, "using %d %s in mx_wait_any()\n", *kmxlnd_tunables.kmx_n_waitd, *kmxlnd_tunables.kmx_n_waitd == 1 ? "thread" : "threads"); @@ -852,6 +862,8 @@ mxlnd_startup (lnet_ni_t *ni) ret = mxlnd_thread_start(mxlnd_request_waitd, (void*)((long)i)); if (ret < 0) { CERROR("Starting mxlnd_request_waitd[%d] failed with %d\n", i, ret); + kmxlnd_data.kmx_shutdown = 1; + mx_wakeup(kmxlnd_data.kmx_endpt); for (--i; i >= 0; i--) { wait_for_completion(&kmxlnd_data.kmx_completions[i]); } @@ -865,6 +877,8 @@ mxlnd_startup (lnet_ni_t *ni) ret = mxlnd_thread_start(mxlnd_tx_queued, (void*)((long)i++)); if (ret < 0) { CERROR("Starting mxlnd_tx_queued failed with %d\n", ret); + kmxlnd_data.kmx_shutdown = 1; + mx_wakeup(kmxlnd_data.kmx_endpt); for (--i; i >= 0; i--) { wait_for_completion(&kmxlnd_data.kmx_completions[i]); } @@ -876,6 +890,9 @@ mxlnd_startup (lnet_ni_t *ni) ret = mxlnd_thread_start(mxlnd_timeoutd, (void*)((long)i++)); if (ret < 0) { CERROR("Starting mxlnd_timeoutd failed with %d\n", ret); + kmxlnd_data.kmx_shutdown = 1; + mx_wakeup(kmxlnd_data.kmx_endpt); + up(&kmxlnd_data.kmx_tx_queue_sem); for (--i; i >= 0; i--) { wait_for_completion(&kmxlnd_data.kmx_completions[i]); } @@ -888,27 +905,27 @@ mxlnd_startup (lnet_ni_t *ni) kmxlnd_data.kmx_init = MXLND_INIT_THREADS; /*****************************************************/ - + kmxlnd_data.kmx_init = MXLND_INIT_ALL; CDEBUG(D_MALLOC, "startup complete (kmx_mem_used %ld)\n", kmxlnd_data.kmx_mem_used); - + return 0; failed: CERROR("mxlnd_startup failed\n"); - mxlnd_shutdown (ni); + mxlnd_shutdown(ni); return (-ENETDOWN); } static int mxlnd_init(void) { lnet_register_lnd(&the_kmxlnd); - return 0; + return 0; } static void mxlnd_exit(void) { lnet_unregister_lnd(&the_kmxlnd); - return; + return; } module_init(mxlnd_init); diff --git a/lnet/klnds/mxlnd/mxlnd.h b/lnet/klnds/mxlnd/mxlnd.h index 0ffa5f8..4071903 100644 --- a/lnet/klnds/mxlnd/mxlnd.h +++ b/lnet/klnds/mxlnd/mxlnd.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -107,7 +108,7 @@ #define MXLND_MX_EP_ID 3 /* MX endpoint ID */ #define MXLND_COMM_TIMEOUT (20 * HZ) /* timeout for send/recv (jiffies) */ #define MXLND_WAIT_TIMEOUT HZ /* timeout for wait (jiffies) */ -#define MXLND_POLLING 0 /* poll iterations before blocking */ +#define MXLND_POLLING 1000 /* poll iterations before blocking */ #define MXLND_MAX_PEERS 1024 /* number of nodes talking to me */ #define MXLND_EAGER_NUM MXLND_MAX_PEERS /* number of pre-posted receives */ #define MXLND_EAGER_SIZE PAGE_SIZE /* pre-posted eager message size */ @@ -371,9 +372,9 @@ extern lnet_nid_t mxlnd_nic_id2nid(lnet_ni_t *ni, u64 nic_id); extern u64 mxlnd_nid2nic_id(lnet_nid_t nid); /* in mxlnd_cb.c */ -void mxlnd_eager_recv(void *context, __u64 match_value, __u32 length); +void mxlnd_eager_recv(void *context, uint64_t match_value, uint32_t length); extern mx_unexp_handler_action_t mxlnd_unexpected_recv(void *context, - mx_endpoint_addr_t source, __u64 match_value, __u64 length, + mx_endpoint_addr_t source, uint64_t match_value, uint32_t length, void *data_if_available); extern void mxlnd_peer_free(struct kmx_peer *peer); extern void mxlnd_conn_free(struct kmx_conn *conn); @@ -389,6 +390,7 @@ extern int mxlnd_connd(void *arg); #define mxlnd_peer_addref(peer) \ do { \ + LASSERT(peer != NULL); \ LASSERT(atomic_read(&(peer)->mxp_refcount) > 0); \ atomic_inc(&(peer)->mxp_refcount); \ } while (0) @@ -403,6 +405,7 @@ do { \ #define mxlnd_conn_addref(conn) \ do { \ + LASSERT(conn != NULL); \ LASSERT(atomic_read(&(conn)->mxk_refcount) > 0); \ atomic_inc(&(conn)->mxk_refcount); \ } while (0) diff --git a/lnet/klnds/mxlnd/mxlnd_cb.c b/lnet/klnds/mxlnd/mxlnd_cb.c index ee38da7..535f547 100644 --- a/lnet/klnds/mxlnd/mxlnd_cb.c +++ b/lnet/klnds/mxlnd/mxlnd_cb.c @@ -114,6 +114,7 @@ mxlnd_lnetmsg_to_str(int type) case LNET_MSG_HELLO: return "LNET_MSG_HELLO"; default: + LBUG(); return "*unknown*"; } } @@ -258,7 +259,7 @@ mxlnd_get_idle_tx(void) * wrap... */ tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++; if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) { - tx->mxc_cookie = 1; + kmxlnd_data.kmx_tx_next_cookie = 1; } kmxlnd_data.kmx_tx_used++; spin_unlock(&kmxlnd_data.kmx_tx_idle_lock); @@ -279,8 +280,8 @@ mxlnd_get_idle_tx(void) int mxlnd_put_idle_tx(struct kmx_ctx *tx) { - int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED); - int result = failed ? -EIO : 0; + //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED); + int result = 0; lnet_msg_t *lntmsg[2]; if (tx == NULL) { @@ -290,6 +291,9 @@ mxlnd_put_idle_tx(struct kmx_ctx *tx) CDEBUG(D_NETERROR, "called with rx\n"); return -EINVAL; } + if (!(tx->mxc_status.code == MX_STATUS_SUCCESS || + tx->mxc_status.code == MX_STATUS_TRUNCATED)) + result = -EIO; lntmsg[0] = tx->mxc_lntmsg[0]; lntmsg[1] = tx->mxc_lntmsg[1]; @@ -449,7 +453,7 @@ mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify) } } mxlnd_conn_decref(conn); /* drop the owning peer's reference */ - + return; } @@ -461,7 +465,7 @@ mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify) * Returns 0 on success and -ENOMEM on failure */ int -mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer) +mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer) { struct kmx_conn *conn = NULL; @@ -477,7 +481,8 @@ mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer) memset(conn, 0, sizeof(*conn)); /* conn->mxk_incarnation = 0 - will be set by peer */ - atomic_set(&conn->mxk_refcount, 1); /* ref for owning peer */ + atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer + and one for the caller */ conn->mxk_peer = peer; /* mxk_epa - to be set after mx_iconnect() */ INIT_LIST_HEAD(&conn->mxk_list); @@ -501,13 +506,20 @@ mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer) mxlnd_peer_addref(peer); /* add a ref for this conn */ /* add to front of peer's conns list */ - spin_lock(&peer->mxp_lock); list_add(&conn->mxk_list, &peer->mxp_conns); peer->mxp_conn = conn; - spin_unlock(&peer->mxp_lock); return 0; } +int +mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer) +{ + int ret = 0; + spin_lock(&peer->mxp_lock); + ret = mxlnd_conn_alloc_locked(connp, peer); + spin_unlock(&peer->mxp_lock); + return ret; +} int mxlnd_q_pending_ctx(struct kmx_ctx *ctx) @@ -554,7 +566,7 @@ mxlnd_deq_pending_ctx(struct kmx_ctx *ctx) next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list); conn->mxk_timeout = next->mxc_deadline; } - spin_unlock(&ctx->mxc_conn->mxk_lock); + spin_unlock(&conn->mxk_lock); } return 0; } @@ -602,14 +614,14 @@ mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer) peer->mxp_nic_id = nic_id; } else { CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s " - "with %s\n", mx_strerror(mxret), name); + "with %s\n", name, mx_strerror(mxret)); mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id); if (mxret == MX_SUCCESS) { peer->mxp_nic_id = nic_id; } else { CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s " - "with %s\n", mx_strerror(mxret), - peer->mxp_host->mxh_hostname); + "with %s\n", peer->mxp_host->mxh_hostname, + mx_strerror(mxret)); } } return; @@ -651,7 +663,11 @@ mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid) break; } } - LASSERT(peer->mxp_host != NULL); + if (peer->mxp_host == NULL) { + CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid); + MXLND_FREE(peer, sizeof(*peer)); + return -ENXIO; + } peer->mxp_nid = nid; /* peer->mxp_incarnation */ @@ -661,7 +677,7 @@ mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid) INIT_LIST_HEAD(&peer->mxp_peers); spin_lock_init(&peer->mxp_lock); INIT_LIST_HEAD(&peer->mxp_conns); - ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); + ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */ if (ret != 0) { mxlnd_peer_decref(peer); return ret; @@ -672,6 +688,8 @@ mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid) ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX); if (ret != 0) { mxlnd_reduce_idle_rxs(i); + mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */ + mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */ mxlnd_peer_decref(peer); return ret; } @@ -702,7 +720,7 @@ mxlnd_nid_to_hash(lnet_nid_t nid) } static inline struct kmx_peer * -mxlnd_find_peer_by_nid(lnet_nid_t nid) +mxlnd_find_peer_by_nid_locked(lnet_nid_t nid) { int found = 0; int hash = 0; @@ -710,17 +728,27 @@ mxlnd_find_peer_by_nid(lnet_nid_t nid) hash = mxlnd_nid_to_hash(nid); - read_lock(&kmxlnd_data.kmx_peers_lock); list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) { if (peer->mxp_nid == nid) { found = 1; + mxlnd_peer_addref(peer); break; } } - read_unlock(&kmxlnd_data.kmx_peers_lock); return (found ? peer : NULL); } +static inline struct kmx_peer * +mxlnd_find_peer_by_nid(lnet_nid_t nid) +{ + struct kmx_peer *peer = NULL; + + read_lock(&kmxlnd_data.kmx_peers_lock); + peer = mxlnd_find_peer_by_nid_locked(nid); + read_unlock(&kmxlnd_data.kmx_peers_lock); + return peer; +} + static inline int mxlnd_tx_requires_credit(struct kmx_ctx *tx) { @@ -748,7 +776,7 @@ mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid) { int nob = offsetof (kmx_msg_t, mxm_u) + body_nob; struct kmx_msg *msg = NULL; - + LASSERT (tx != NULL); LASSERT (nob <= MXLND_EAGER_SIZE); @@ -768,7 +796,7 @@ mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid) return; } -static inline __u32 +static inline __u32 mxlnd_cksum (void *ptr, int nob) { char *c = ptr; @@ -806,17 +834,17 @@ mxlnd_pack_msg(struct kmx_ctx *tx) msg->mxm_credits = 0; } /* mxm_nob */ - msg->mxm_cksum = 0; + msg->mxm_cksum = 0; msg->mxm_srcnid = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid); msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation; msg->mxm_dstnid = tx->mxc_nid; /* if it is a new peer, the dststamp will be 0 */ - msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation; + msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation; msg->mxm_seq = tx->mxc_cookie; if (*kmxlnd_tunables.kmx_cksum) { msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob); - } + } } int @@ -970,7 +998,7 @@ mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, { int ret = 0; mx_return_t mxret = MX_SUCCESS; - __u64 mask = 0xF00FFFFFFFFFFFFFLL; + uint64_t mask = 0xF00FFFFFFFFFFFFFLL; rx->mxc_msg_type = msg_type; rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */ @@ -981,7 +1009,7 @@ mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; ret = mxlnd_q_pending_ctx(rx); if (ret == -1) { - /* FIXME the conn is disconnected, now what? */ + /* the caller is responsible for calling conn_decref() if needed */ return -1; } mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL, @@ -1018,7 +1046,7 @@ mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, */ mx_unexp_handler_action_t mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source, - __u64 match_value, __u32 length, void *data_if_available) + uint64_t match_value, uint32_t length, void *data_if_available) { int ret = 0; struct kmx_ctx *rx = NULL; @@ -1042,24 +1070,33 @@ mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source, ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length); } else { CDEBUG(D_NETERROR, "unexpected large receive with " - "match_value=0x%llx length=%d\n", + "match_value=0x%llx length=%d\n", match_value, length); ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0); } + if (ret == 0) { + struct kmx_peer *peer = NULL; struct kmx_conn *conn = NULL; - mx_get_endpoint_addr_context(source, (void **) &conn); - if (conn != NULL) { - mxlnd_conn_addref(conn); - rx->mxc_conn = conn; - rx->mxc_peer = conn->mxk_peer; - if (conn->mxk_peer != NULL) { - rx->mxc_nid = conn->mxk_peer->mxp_nid; - } else { - CDEBUG(D_NETERROR, "conn is 0x%p and peer " - "is NULL\n", conn); + + /* NOTE to avoid a peer disappearing out from under us, + * read lock the peers lock first */ + read_lock(&kmxlnd_data.kmx_peers_lock); + mx_get_endpoint_addr_context(source, (void **) &peer); + if (peer != NULL) { + mxlnd_peer_addref(peer); /* add a ref... */ + spin_lock(&peer->mxp_lock); + conn = peer->mxp_conn; + if (conn) { + mxlnd_conn_addref(conn); /* add ref until rx completed */ + mxlnd_peer_decref(peer); /* and drop peer ref */ + rx->mxc_conn = conn; } + spin_unlock(&peer->mxp_lock); + rx->mxc_peer = peer; + rx->mxc_nid = peer->mxp_nid; } + read_unlock(&kmxlnd_data.kmx_peers_lock); } else { CDEBUG(D_NETERROR, "could not post receive\n"); mxlnd_put_idle_rx(rx); @@ -1089,12 +1126,10 @@ mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count) int i = 0; int ret = -ENOENT; struct kmx_peer *peer = NULL; - struct kmx_conn *conn = NULL; read_lock(&kmxlnd_data.kmx_peers_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { - conn = peer->mxp_conn; if (index-- > 0) continue; @@ -1105,7 +1140,7 @@ mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count) } } read_unlock(&kmxlnd_data.kmx_peers_lock); - + return ret; } @@ -1113,7 +1148,7 @@ void mxlnd_del_peer_locked(struct kmx_peer *peer) { list_del_init(&peer->mxp_peers); /* remove from the global list */ - if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 0, 0); + if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0); mxlnd_peer_decref(peer); /* drop global list ref */ return; } @@ -1127,18 +1162,19 @@ mxlnd_del_peer(lnet_nid_t nid) struct kmx_peer *next = NULL; if (nid != LNET_NID_ANY) { - peer = mxlnd_find_peer_by_nid(nid); + peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ } write_lock(&kmxlnd_data.kmx_peers_lock); if (nid != LNET_NID_ANY) { if (peer == NULL) { ret = -ENOENT; } else { + mxlnd_peer_decref(peer); /* and drops it */ mxlnd_del_peer_locked(peer); } } else { /* LNET_NID_ANY */ for (i = 0; i < MXLND_HASH_SIZE; i++) { - list_for_each_entry_safe(peer, next, + list_for_each_entry_safe(peer, next, &kmxlnd_data.kmx_peers[i], mxp_peers) { mxlnd_del_peer_locked(peer); } @@ -1159,18 +1195,22 @@ mxlnd_get_conn_by_idx(int index) read_lock(&kmxlnd_data.kmx_peers_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { + spin_lock(&peer->mxp_lock); list_for_each_entry(conn, &peer->mxp_conns, mxk_list) { - if (index-- > 0) + if (index-- > 0) { continue; + } mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */ + spin_unlock(&peer->mxp_lock); read_unlock(&kmxlnd_data.kmx_peers_lock); return conn; } + spin_unlock(&peer->mxp_lock); } } read_unlock(&kmxlnd_data.kmx_peers_lock); - + return NULL; } @@ -1180,9 +1220,11 @@ mxlnd_close_matching_conns_locked(struct kmx_peer *peer) struct kmx_conn *conn = NULL; struct kmx_conn *next = NULL; + spin_lock(&peer->mxp_lock); list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) { mxlnd_conn_disconnect(conn, 0 , 0); } + spin_unlock(&peer->mxp_lock); return; } @@ -1195,11 +1237,12 @@ mxlnd_close_matching_conns(lnet_nid_t nid) read_lock(&kmxlnd_data.kmx_peers_lock); if (nid != LNET_NID_ANY) { - peer = mxlnd_find_peer_by_nid(nid); + peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ if (peer == NULL) { ret = -ENOENT; } else { mxlnd_close_matching_conns_locked(peer); + mxlnd_peer_decref(peer); /* and drops it here */ } } else { /* LNET_NID_ANY */ for (i = 0; i < MXLND_HASH_SIZE; i++) { @@ -1263,7 +1306,7 @@ mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd); break; } - + return ret; } @@ -1338,7 +1381,6 @@ mxlnd_peer_queue_tx(struct kmx_ctx *tx) void mxlnd_queue_tx(struct kmx_ctx *tx) { - int ret = 0; struct kmx_peer *peer = tx->mxc_peer; LASSERT (tx->mxc_nid != 0); @@ -1347,21 +1389,33 @@ mxlnd_queue_tx(struct kmx_ctx *tx) tx->mxc_msg_type != MXLND_MSG_CONN_ACK) { /* let this fail now */ tx->mxc_status.code = -ECONNABORTED; + mxlnd_conn_decref(peer->mxp_conn); mxlnd_put_idle_tx(tx); return; } if (tx->mxc_conn == NULL) { - mxlnd_conn_alloc(&tx->mxc_conn, peer); + int ret = 0; + struct kmx_conn *conn = NULL; + + ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */ + if (ret != 0) { + tx->mxc_status.code = ret; + mxlnd_put_idle_tx(tx); + goto done; + } + tx->mxc_conn = conn; + mxlnd_peer_decref(peer); /* and takes it from peer */ } LASSERT(tx->mxc_conn != NULL); mxlnd_peer_queue_tx(tx); - ret = mxlnd_check_sends(peer); + mxlnd_check_sends(peer); } else { spin_lock(&kmxlnd_data.kmx_tx_queue_lock); list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue); spin_unlock(&kmxlnd_data.kmx_tx_queue_lock); up(&kmxlnd_data.kmx_tx_queue_sem); } +done: return; } @@ -1402,7 +1456,7 @@ mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u3 LASSERT(first_iov >= 0 && last_iov >= first_iov); nseg = last_iov - first_iov + 1; LASSERT(nseg > 0); - + MXLND_ALLOC (seg, nseg * sizeof(*seg)); if (seg == NULL) { CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n"); @@ -1472,7 +1526,7 @@ mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u LASSERT(first_kiov >= 0 && last_kiov >= first_kiov); nseg = last_kiov - first_kiov + 1; LASSERT(nseg > 0); - + MXLND_ALLOC (seg, nseg * sizeof(*seg)); if (seg == NULL) { CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n"); @@ -1555,6 +1609,7 @@ mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg goto failed_0; } tx->mxc_nid = target.nid; + /* NOTE called when we have a ref on the conn, get one for this tx */ mxlnd_conn_addref(peer->mxp_conn); tx->mxc_peer = peer; tx->mxc_conn = peer->mxp_conn; @@ -1670,16 +1725,16 @@ mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_ty return -1; } CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie); - mxret = mx_kirecv(kmxlnd_data.kmx_endpt, + mxret = mx_kirecv(kmxlnd_data.kmx_endpt, rx->mxc_seg_list, rx->mxc_nseg, rx->mxc_pin_type, rx->mxc_match, - 0xF00FFFFFFFFFFFFFLL, (void *) rx, + 0xF00FFFFFFFFFFFFFLL, (void *) rx, &rx->mxc_mxreq); if (mxret != MX_SUCCESS) { if (rx->mxc_conn != NULL) { mxlnd_deq_pending_ctx(rx); } - CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n", + CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n", (int) mxret, libcfs_nid2str(target.nid)); return -1; } @@ -1718,7 +1773,7 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) struct kmx_ctx *rx_data = NULL; struct kmx_conn *conn = NULL; int nob = 0; - __u32 length = 0; + uint32_t length = 0; struct kmx_peer *peer = NULL; CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", @@ -1733,15 +1788,23 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ * to a new peer, use the nid */ - peer = mxlnd_find_peer_by_nid(nid); + peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */ if (peer != NULL) { - conn = peer->mxp_conn; - if (conn) mxlnd_conn_addref(conn); + if (unlikely(peer->mxp_incompatible)) { + mxlnd_peer_decref(peer); /* drop ref taken above */ + } else { + spin_lock(&peer->mxp_lock); + conn = peer->mxp_conn; + if (conn) { + mxlnd_conn_addref(conn); + mxlnd_peer_decref(peer); /* drop peer ref taken above */ + } + spin_unlock(&peer->mxp_lock); + } } if (conn == NULL && peer != NULL) { - CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n", - peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" : - ((type==LNET_MSG_GET) ? "GET" : "Other"))); + CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n", + peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type)); } switch (type) { @@ -1790,6 +1853,8 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) rx->mxc_nid = nid; rx->mxc_peer = peer; /* conn may be NULL but unlikely since the first msg is always small */ + /* NOTE no need to lock peer before adding conn ref since we took + * a conn ref for the tx (it cannot be freed between there and here ) */ if (conn) mxlnd_conn_addref(conn); /* for this rx */ rx->mxc_conn = conn; rx->mxc_msg_type = MXLND_MSG_PUT_ACK; @@ -1808,7 +1873,7 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) mxlnd_conn_decref(conn); /* for the rx... */ mxlnd_conn_decref(conn); /* and for the tx */ } - return -ENOMEM; + return -EHOSTUNREACH; } mxlnd_queue_tx(tx); @@ -1841,6 +1906,8 @@ mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) return -ENOMEM; } rx_data->mxc_peer = peer; + /* NOTE no need to lock peer before adding conn ref since we took + * a conn ref for the tx (it cannot be freed between there and here ) */ if (conn) mxlnd_conn_addref(conn); /* for the rx_data */ rx_data->mxc_conn = conn; /* may be NULL */ @@ -2004,6 +2071,7 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid); tx->mxc_peer = peer; tx->mxc_conn = conn; + /* no need to lock peer first since we already have a ref */ mxlnd_conn_addref(conn); /* for the tx */ txmsg = tx->mxc_msg; txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie; @@ -2066,6 +2134,7 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, tx->mxc_nid = nid; tx->mxc_peer = peer; tx->mxc_conn = conn; + /* no need to lock peer first since we already have a ref */ mxlnd_conn_addref(conn); /* for this tx */ tx->mxc_cookie = cookie; tx->mxc_match = mxlnd_create_match(tx, ENODATA); @@ -2095,7 +2164,7 @@ mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, /* we received a credit, see if we can use it to send a msg */ if (credit) mxlnd_check_sends(peer); - + return ret; } @@ -2145,11 +2214,25 @@ mxlnd_tx_queued(void *arg) spin_unlock(&kmxlnd_data.kmx_tx_queue_lock); found = 0; - peer = mxlnd_find_peer_by_nid(tx->mxc_nid); + peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */ if (peer != NULL) { tx->mxc_peer = peer; + spin_lock(&peer->mxp_lock); + if (peer->mxp_conn == NULL) { + ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer); + if (ret != 0) { + /* out of memory, give up and fail tx */ + tx->mxc_status.code = -ENOMEM; + spin_unlock(&peer->mxp_lock); + mxlnd_peer_decref(peer); + mxlnd_put_idle_tx(tx); + continue; + } + } tx->mxc_conn = peer->mxp_conn; mxlnd_conn_addref(tx->mxc_conn); /* for this tx */ + spin_unlock(&peer->mxp_lock); + mxlnd_peer_decref(peer); /* drop peer ref taken above */ mxlnd_queue_tx(tx); found = 1; } @@ -2163,15 +2246,17 @@ mxlnd_tx_queued(void *arg) LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA && tx->mxc_msg_type != MXLND_MSG_GET_DATA); /* create peer */ + /* adds conn ref for this function */ ret = mxlnd_peer_alloc(&peer, tx->mxc_nid); if (ret != 0) { /* finalize message */ - tx->mxc_status.code = -ECONNABORTED; + tx->mxc_status.code = ret; mxlnd_put_idle_tx(tx); continue; } tx->mxc_peer = peer; tx->mxc_conn = peer->mxp_conn; + /* this tx will keep the conn ref taken in peer_alloc() */ /* add peer to global peer list, but look to see * if someone already created it after we released @@ -2190,11 +2275,17 @@ mxlnd_tx_queued(void *arg) atomic_inc(&kmxlnd_data.kmx_npeers); } else { tx->mxc_peer = old; + spin_lock(&old->mxp_lock); tx->mxc_conn = old->mxp_conn; + /* FIXME can conn be NULL? */ + LASSERT(old->mxp_conn != NULL); + mxlnd_conn_addref(old->mxp_conn); + spin_unlock(&old->mxp_lock); mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1); + mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */ + mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */ mxlnd_peer_decref(peer); } - mxlnd_conn_addref(tx->mxc_conn); /* for this tx */ write_unlock(&kmxlnd_data.kmx_peers_lock); mxlnd_queue_tx(tx); @@ -2212,6 +2303,8 @@ mxlnd_iconnect(struct kmx_peer *peer, u64 mask) mx_request_t request; struct kmx_conn *conn = peer->mxp_conn; + /* NOTE we are holding a conn ref every time we call this function, + * we do not need to lock the peer before taking another ref */ mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */ LASSERT(mask == MXLND_MASK_ICON_REQ || @@ -2231,15 +2324,18 @@ mxlnd_iconnect(struct kmx_peer *peer, u64 mask) if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) { /* give up and notify LNET */ mxlnd_conn_disconnect(conn, 0, 1); - mxlnd_conn_alloc(&peer->mxp_conn, peer); + mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this + function... */ + mxlnd_conn_decref(peer->mxp_conn); /* which we no + longer need */ } mxlnd_conn_decref(conn); return; } } - mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id, - peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask, + mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id, + peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask, (void *) peer, &request); if (unlikely(mxret != MX_SUCCESS)) { spin_lock(&conn->mxk_lock); @@ -2275,7 +2371,13 @@ mxlnd_check_sends(struct kmx_peer *peer) LASSERT(peer != NULL); return -1; } + spin_lock(&peer->mxp_lock); conn = peer->mxp_conn; + /* NOTE take a ref for the duration of this function since it is called + * when there might not be any queued txs for this peer */ + if (conn) mxlnd_conn_addref(conn); /* for duration of this function */ + spin_unlock(&peer->mxp_lock); + /* do not add another ref for this tx */ if (conn == NULL) { @@ -2287,8 +2389,8 @@ mxlnd_check_sends(struct kmx_peer *peer) if (time_after(jiffies, last)) { last = jiffies + HZ; CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d " - "ntx_posted= %d ntx_data= %d data_posted= %d\n", - mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits, + "ntx_posted= %d ntx_data= %d data_posted= %d\n", + mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits, conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted, conn->mxk_ntx_data, conn->mxk_data_posted); } @@ -2377,13 +2479,13 @@ mxlnd_check_sends(struct kmx_peer *peer) if (credit) { if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) { - CDEBUG(D_NET, "%s: posted enough\n", + CDEBUG(D_NET, "%s: posted enough\n", libcfs_nid2str(peer->mxp_nid)); goto done_locked; } - + if (conn->mxk_credits == 0) { - CDEBUG(D_NET, "%s: no credits\n", + CDEBUG(D_NET, "%s: no credits\n", libcfs_nid2str(peer->mxp_nid)); goto done_locked; } @@ -2400,8 +2502,8 @@ mxlnd_check_sends(struct kmx_peer *peer) if ( ! (msg_type == MXLND_MSG_CONN_REQ || msg_type == MXLND_MSG_CONN_ACK)) { CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n", - mxlnd_connstatus_to_str(conn->mxk_status), - tx->mxc_cookie, + mxlnd_connstatus_to_str(conn->mxk_status), + tx->mxc_cookie, mxlnd_msgtype_to_str(tx->mxc_msg_type)); if (conn->mxk_status == MXLND_CONN_DISCONNECT) { list_del_init(&tx->mxc_list); @@ -2489,15 +2591,15 @@ mxlnd_check_sends(struct kmx_peer *peer) /* send a msg style tx */ LASSERT(tx->mxc_nseg == 1); LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL); - CDEBUG(D_NET, "sending %s 0x%llx\n", + CDEBUG(D_NET, "sending %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), tx->mxc_cookie); - mxret = mx_kisend(kmxlnd_data.kmx_endpt, - &tx->mxc_seg, + mxret = mx_kisend(kmxlnd_data.kmx_endpt, + &tx->mxc_seg, tx->mxc_nseg, tx->mxc_pin_type, - conn->mxk_epa, - tx->mxc_match, + conn->mxk_epa, + tx->mxc_match, (void *) tx, &tx->mxc_mxreq); } else { @@ -2506,15 +2608,15 @@ mxlnd_check_sends(struct kmx_peer *peer) conn->mxk_ntx_data--; conn->mxk_data_posted++; spin_unlock(&conn->mxk_lock); - CDEBUG(D_NET, "sending %s 0x%llx\n", - mxlnd_msgtype_to_str(msg_type), + CDEBUG(D_NET, "sending %s 0x%llx\n", + mxlnd_msgtype_to_str(msg_type), tx->mxc_cookie); - mxret = mx_kisend(kmxlnd_data.kmx_endpt, - tx->mxc_seg_list, + mxret = mx_kisend(kmxlnd_data.kmx_endpt, + tx->mxc_seg_list, tx->mxc_nseg, tx->mxc_pin_type, - conn->mxk_epa, - tx->mxc_match, + conn->mxk_epa, + tx->mxc_match, (void *) tx, &tx->mxc_mxreq); } @@ -2525,7 +2627,7 @@ mxlnd_check_sends(struct kmx_peer *peer) ret = 0; } else { CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) " - "sending to %s\n", mx_strerror(mxret), (int) mxret, + "sending to %s\n", mx_strerror(mxret), (int) mxret, libcfs_nid2str(peer->mxp_nid)); /* NOTE mx_kisend() only fails if there are not enough * resources. Do not change the connection status. */ @@ -2539,7 +2641,7 @@ mxlnd_check_sends(struct kmx_peer *peer) conn->mxk_ntx_posted--; conn->mxk_credits++; spin_unlock(&conn->mxk_lock); - } else if (msg_type == MXLND_MSG_PUT_DATA || + } else if (msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA) { spin_lock(&conn->mxk_lock); conn->mxk_data_posted--; @@ -2567,6 +2669,7 @@ mxlnd_check_sends(struct kmx_peer *peer) done_locked: spin_unlock(&conn->mxk_lock); done: + mxlnd_conn_decref(conn); /* drop ref taken at start of function */ return found; } @@ -2590,11 +2693,12 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) int credit = mxlnd_tx_requires_credit(tx); u64 cookie = tx->mxc_cookie; - CDEBUG(D_NET, "entering %s (0x%llx):\n", + CDEBUG(D_NET, "entering %s (0x%llx):\n", mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie); if (unlikely(conn == NULL)) { - mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &conn); + mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer); + conn = peer->mxp_conn; if (conn != NULL) { /* do not add a ref for the tx, it was set before sending */ tx->mxc_conn = conn; @@ -2653,7 +2757,7 @@ mxlnd_handle_tx_completion(struct kmx_ctx *tx) CDEBUG(D_NETERROR, "handle_tx_completion(): %s " "failed with %s (%d) to %s\n", type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK", - mx_strstatus(tx->mxc_status.code), + mx_strstatus(tx->mxc_status.code), tx->mxc_status.code, libcfs_nid2str(tx->mxc_nid)); if (!peer->mxp_incompatible) { @@ -2703,21 +2807,45 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) int result = 0; u64 nic_id = 0LL; u32 ep_id = 0; - int decref = 1; + int peer_ref = 0; + int conn_ref = 0; int incompatible = 0; /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, * failed GET reply, CONN_REQ, or a CONN_ACK */ - /* NOTE peer may still be NULL if it is a new peer */ + /* NOTE peer may still be NULL if it is a new peer and + * conn may be NULL if this is a re-connect */ + if (likely(peer != NULL && conn != NULL)) { + /* we have a reference on the conn */ + conn_ref = 1; + } else if (peer != NULL && conn == NULL) { + /* we have a reference on the peer */ + peer_ref = 1; + } else if (peer == NULL && conn != NULL) { + /* fatal error */ + CDEBUG(D_NETERROR, "rx has conn but no peer\n"); + LBUG(); + } /* else peer and conn == NULL */ + +#if 0 if (peer == NULL || conn == NULL) { /* if the peer was disconnected, the peer may exist but * not have any valid conns */ decref = 0; /* no peer means no ref was taken for this rx */ } +#endif if (conn == NULL && peer != NULL) { + spin_lock(&peer->mxp_lock); conn = peer->mxp_conn; + if (conn) { + mxlnd_conn_addref(conn); /* conn takes ref... */ + mxlnd_peer_decref(peer); /* from peer */ + conn_ref = 1; + peer_ref = 0; + } + spin_unlock(&peer->mxp_lock); rx->mxc_conn = conn; } @@ -2740,7 +2868,7 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) if (nob == 0) { /* this may be a failed GET reply */ if (type == MXLND_MSG_GET_DATA) { - bits = rx->mxc_status.match_info & 0x0FF0000000000000LL; + bits = rx->mxc_status.match_info & 0x0FF0000000000000LL; ret = (u32) (bits>>52); lntmsg[0] = rx->mxc_lntmsg[0]; result = -ret; @@ -2779,8 +2907,8 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) || !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) { CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is " - "0x%llx and rx msg dst is 0x%llx)\n", - mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid, + "0x%llx and rx msg dst is 0x%llx)\n", + mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid); goto cleanup; } @@ -2791,13 +2919,13 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) if (conn != NULL) { CDEBUG(D_NETERROR, "Stale rx from %s with type %s " "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) " - "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", + "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type), - msg->mxm_srcstamp, conn->mxk_incarnation, + msg->mxm_srcstamp, conn->mxk_incarnation, msg->mxm_dststamp, kmxlnd_data.kmx_incarnation); } else { CDEBUG(D_NETERROR, "Stale rx from %s with type %s " - "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", + "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type), msg->mxm_dststamp, kmxlnd_data.kmx_incarnation); } @@ -2806,7 +2934,7 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) } } - CDEBUG(D_NET, "Received %s with %d credits\n", + CDEBUG(D_NET, "Received %s with %d credits\n", mxlnd_msgtype_to_str(type), msg->mxm_credits); if (msg->mxm_type != MXLND_MSG_CONN_REQ && @@ -2849,13 +2977,13 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) case MXLND_MSG_PUT_ACK: { u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie; if (cookie > MXLND_MAX_COOKIE) { - CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type, + CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type, libcfs_nid2str(rx->mxc_nid)); result = -((cookie >> 52) & 0xff); lntmsg[0] = rx->mxc_lntmsg[0]; } else { - mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0], - rx->mxc_peer, MXLND_MSG_PUT_DATA, + mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0], + rx->mxc_peer, MXLND_MSG_PUT_DATA, rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie); } /* repost == 1 */ @@ -2891,32 +3019,44 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) incompatible = 1; } if (peer == NULL) { - peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); + peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */ if (peer == NULL) { - int hash = 0; + int hash = 0; + struct kmx_peer *existing_peer = NULL; hash = mxlnd_nid_to_hash(msg->mxm_srcnid); - + mx_decompose_endpoint_addr(rx->mxc_status.source, &nic_id, &ep_id); rx->mxc_nid = msg->mxm_srcnid; - + + /* adds conn ref for peer and one for this function */ ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid); if (ret != 0) { goto cleanup; } LASSERT(peer->mxp_host->mxh_ep_id == ep_id); write_lock(&kmxlnd_data.kmx_peers_lock); - list_add_tail(&peer->mxp_peers, - &kmxlnd_data.kmx_peers[hash]); - write_unlock(&kmxlnd_data.kmx_peers_lock); - atomic_inc(&kmxlnd_data.kmx_npeers); + existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid); + if (existing_peer) { + mxlnd_conn_decref(peer->mxp_conn); + mxlnd_peer_decref(peer); + peer = existing_peer; + mxlnd_conn_addref(peer->mxp_conn); + } else { + list_add_tail(&peer->mxp_peers, + &kmxlnd_data.kmx_peers[hash]); + write_unlock(&kmxlnd_data.kmx_peers_lock); + atomic_inc(&kmxlnd_data.kmx_npeers); + } } else { - ret = mxlnd_conn_alloc(&conn, peer); + ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */ + mxlnd_peer_decref(peer); /* drop ref taken above */ if (ret != 0) { CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n"); goto cleanup; } } + conn_ref = 1; /* peer/conn_alloc() added ref for this function */ conn = peer->mxp_conn; } else { struct kmx_conn *old_conn = conn; @@ -2927,9 +3067,6 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) /* the ref for this rx was taken on the old_conn */ mxlnd_conn_decref(old_conn); - /* do not decref this conn below */ - decref = 0; - /* This allocs a conn, points peer->mxp_conn to this one. * The old conn is still on the peer->mxp_conns list. * As the pending requests complete, they will call @@ -2939,6 +3076,8 @@ mxlnd_handle_rx_completion(struct kmx_ctx *rx) CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n"); goto cleanup; } + /* conn_alloc() adds one ref for the peer and one for this function */ + conn_ref = 1; } spin_lock(&peer->mxp_lock); peer->mxp_incarnation = msg->mxm_srcstamp; @@ -3041,7 +3180,8 @@ cleanup: spin_unlock(&conn->mxk_lock); } } - if (decref) mxlnd_conn_decref(conn); + if (conn_ref) mxlnd_conn_decref(conn); + LASSERT(peer_ref == 0); } if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) { @@ -3050,8 +3190,8 @@ cleanup: CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq); } - if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result); - if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result); + if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result); + if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result); if (conn != NULL && credit == 1) mxlnd_check_sends(peer); @@ -3072,8 +3212,8 @@ mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status) CDEBUG(D_NET, "entering\n"); if (status.code != MX_STATUS_SUCCESS) { - CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n", - mx_strstatus(status.code), status.code, + CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n", + mx_strstatus(status.code), status.code, libcfs_nid2str(peer->mxp_nid)); spin_lock(&conn->mxk_lock); conn->mxk_status = MXLND_CONN_FAIL; @@ -3083,7 +3223,8 @@ mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status) struct kmx_conn *new_conn = NULL; CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n"); mxlnd_conn_disconnect(conn, 0, 1); - mxlnd_conn_alloc(&new_conn, peer); + mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */ + mxlnd_conn_decref(new_conn); /* which we no longer need */ spin_lock(&peer->mxp_lock); peer->mxp_reconnect_time = 0; spin_unlock(&peer->mxp_lock); @@ -3096,7 +3237,9 @@ mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status) spin_lock(&conn->mxk_lock); conn->mxk_epa = status.source; spin_unlock(&conn->mxk_lock); - mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn); + /* NOTE we are holding a ref on the conn which has a ref on the peer, + * we should not need to lock the peer */ + mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer); /* mx_iconnect() succeeded, reset delay to 0 */ spin_lock(&peer->mxp_lock); @@ -3107,7 +3250,7 @@ mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status) /* we are still using the conn ref from iconnect() - do not take another */ tx = mxlnd_get_idle_tx(); if (tx == NULL) { - CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n", + CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n", libcfs_nid2str(peer->mxp_nid)); spin_lock(&conn->mxk_lock); conn->mxk_status = MXLND_CONN_FAIL; @@ -3141,10 +3284,9 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) CDEBUG(D_NET, "entering\n"); if (status.code != MX_STATUS_SUCCESS) { - struct kmx_conn *conn = peer->mxp_conn; CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) " - "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n", - mx_strstatus(status.code), status.code, + "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n", + mx_strstatus(status.code), status.code, libcfs_nid2str(peer->mxp_nid), peer->mxp_nid, peer->mxp_nic_id, @@ -3157,7 +3299,9 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) struct kmx_conn *new_conn = NULL; CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n"); mxlnd_conn_disconnect(conn, 0, 1); - mxlnd_conn_alloc(&new_conn, peer); + mxlnd_conn_alloc(&new_conn, peer); /* adds ref for + this function... */ + mxlnd_conn_decref(new_conn); /* which we no longer need */ spin_lock(&peer->mxp_lock); peer->mxp_reconnect_time = 0; spin_unlock(&peer->mxp_lock); @@ -3172,7 +3316,9 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) conn->mxk_status = MXLND_CONN_READY; } spin_unlock(&conn->mxk_lock); - mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn); + /* NOTE we are holding a ref on the conn which has a ref on the peer, + * we should not have to lock the peer */ + mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer); /* mx_iconnect() succeeded, reset delay to 0 */ spin_lock(&peer->mxp_lock); @@ -3182,7 +3328,7 @@ mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status) /* marshal CONN_ACK msg */ tx = mxlnd_get_idle_tx(); if (tx == NULL) { - CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n", + CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n", libcfs_nid2str(peer->mxp_nid)); spin_lock(&conn->mxk_lock); conn->mxk_status = MXLND_CONN_FAIL; @@ -3241,15 +3387,15 @@ mxlnd_request_waitd(void *arg) result = 0; #if MXLND_POLLING if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) { - mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL, + mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL, &status, &result); } else { count = 0; - mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, + mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, 0LL, 0LL, &status, &result); } #else - mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, + mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, 0LL, 0LL, &status, &result); #endif if (unlikely(kmxlnd_data.kmx_shutdown)) @@ -3262,8 +3408,8 @@ mxlnd_request_waitd(void *arg) if (status.code != MX_STATUS_SUCCESS) { CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with " - "match_info 0x%llx and length %d\n", - mx_strstatus(status.code), status.code, + "match_info 0x%llx and length %d\n", + mx_strstatus(status.code), status.code, (u64) status.match_info, status.msg_length); } @@ -3291,10 +3437,10 @@ mxlnd_request_waitd(void *arg) req_type = ctx->mxc_type; conn = ctx->mxc_conn; /* this may be NULL */ mxlnd_deq_pending_ctx(ctx); - + /* copy status to ctx->mxc_status */ memcpy(&ctx->mxc_status, &status, sizeof(status)); - + switch (req_type) { case MXLND_REQ_TX: mxlnd_handle_tx_completion(ctx); @@ -3307,10 +3453,11 @@ mxlnd_request_waitd(void *arg) LBUG(); break; } - + + /* FIXME may need to reconsider this */ /* conn is always set except for the first CONN_REQ rx * from a new peer */ - if (!(status.code == MX_STATUS_SUCCESS || + if (!(status.code == MX_STATUS_SUCCESS || status.code == MX_STATUS_TRUNCATED) && conn != NULL) { mxlnd_conn_disconnect(conn, 1, 1); @@ -3337,16 +3484,23 @@ mxlnd_check_timeouts(unsigned long now) for (i = 0; i < MXLND_HASH_SIZE; i++) { list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { - if (unlikely(kmxlnd_data.kmx_shutdown)) + if (unlikely(kmxlnd_data.kmx_shutdown)) { + read_unlock(&kmxlnd_data.kmx_peers_lock); return next; - + } + + spin_lock(&peer->mxp_lock); conn = peer->mxp_conn; - if (conn == NULL) + if (conn) { + mxlnd_conn_addref(conn); + spin_unlock(&peer->mxp_lock); + } else { + spin_unlock(&peer->mxp_lock); continue; + } - mxlnd_conn_addref(conn); spin_lock(&conn->mxk_lock); - + /* if nothing pending (timeout == 0) or * if conn is already disconnected, * skip this conn */ @@ -3364,7 +3518,7 @@ mxlnd_check_timeouts(unsigned long now) if ((next == 0) || (conn->mxk_timeout < next)) { next = conn->mxk_timeout; } - + disconnect = 0; if (time_after_eq(now, conn->mxk_timeout)) { @@ -3420,7 +3574,11 @@ mxlnd_timeoutd(void *arg) read_lock(&kmxlnd_data.kmx_peers_lock); for (i = 0; i < MXLND_HASH_SIZE; i++) { list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) { + spin_lock(&peer->mxp_lock); conn = peer->mxp_conn; + if (conn) mxlnd_conn_addref(conn); /* take ref... */ + spin_unlock(&peer->mxp_lock); + if (conn == NULL) continue; @@ -3428,6 +3586,7 @@ mxlnd_timeoutd(void *arg) time_after(now, conn->mxk_last_tx + HZ)) { mxlnd_check_sends(peer); } + mxlnd_conn_decref(conn); /* until here */ } } read_unlock(&kmxlnd_data.kmx_peers_lock); diff --git a/lnet/klnds/mxlnd/mxlnd_modparams.c b/lnet/klnds/mxlnd/mxlnd_modparams.c index 37d77f1..d36a188 100644 --- a/lnet/klnds/mxlnd/mxlnd_modparams.c +++ b/lnet/klnds/mxlnd/mxlnd_modparams.c @@ -30,7 +30,7 @@ CFS_MODULE_PARM(n_waitd, "i", int, 0444, static int max_peers = MXLND_MAX_PEERS; CFS_MODULE_PARM(max_peers, "i", int, 0444, - "maximum number of peers that may connect"); + "maximum number of peers that may connect"); static int cksum = MXLND_CKSUM; CFS_MODULE_PARM(cksum, "i", int, 0644, @@ -38,27 +38,26 @@ CFS_MODULE_PARM(cksum, "i", int, 0644, static int ntx = MXLND_NTX; CFS_MODULE_PARM(ntx, "i", int, 0444, - "# of total tx message descriptors"); + "# of total tx message descriptors"); static int credits = MXLND_MSG_QUEUE_DEPTH; CFS_MODULE_PARM(credits, "i", int, 0444, - "# concurrent sends"); + "# concurrent sends"); static int board = MXLND_MX_BOARD; CFS_MODULE_PARM(board, "i", int, 0444, - "index value of the Myrinet board (NIC)"); + "index value of the Myrinet board (NIC)"); static int ep_id = MXLND_MX_EP_ID; -CFS_MODULE_PARM(ep_id, "i", int, 0444, - "MX endpoint ID"); +CFS_MODULE_PARM(ep_id, "i", int, 0444, "MX endpoint ID"); static int polling = MXLND_POLLING; CFS_MODULE_PARM(polling, "i", int, 0444, - "Use 0 to block (wait). A value > 0 will poll that many times before blocking"); + "Use 0 to block (wait). A value > 0 will poll that many times before blocking"); static char *hosts = NULL; CFS_MODULE_PARM(hosts, "s", charp, 0444, - "IP-to-hostname resolution file"); + "IP-to-hostname resolution file"); kmx_tunables_t kmxlnd_tunables = { .kmx_n_waitd = &n_waitd, -- 1.8.3.1