+void
+mxlnd_passive_connect(kmx_connparams_t *cp)
+{
+ int ret = 0;
+ int incompatible = 0;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+ int conn_ref = 0;
+ kmx_msg_t *msg = &cp->mxr_msg;
+ kmx_peer_t *peer = cp->mxr_peer;
+ kmx_conn_t *conn = NULL;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
+
+ ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
+ if (ret != 0) {
+ if (peer) {
+ CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from %s\n",
+ ret, libcfs_nid2str(peer->mxp_nid));
+ } else {
+ CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from "
+ "unknown host with nic_id 0x%llx\n", ret, nic_id);
+ }
+ goto cleanup;
+ }
+ if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
+ CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ libcfs_nid2str(msg->mxm_dstnid));
+ goto cleanup;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
+ CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
+ "%d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_queue_depth,
+ *kmxlnd_tunables.kmx_peercredits);
+ incompatible = 1;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
+ CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
+ "%d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_eager_size,
+ (int) MXLND_MSG_SIZE);
+ incompatible = 1;
+ }
+
+ if (peer == NULL) {
+ peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
+ if (peer == NULL) {
+ int hash = 0;
+ u32 board = 0;
+ kmx_peer_t *existing_peer = NULL;
+
+ hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
+
+ mx_nic_id_to_board_number(nic_id, &board);
+
+ /* adds conn ref for peer and one for this function */
+ ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
+ board, ep_id, 0ULL);
+ if (ret != 0) {
+ goto cleanup;
+ }
+ peer->mxp_conn->mxk_sid = sid;
+ LASSERT(peer->mxp_ep_id == ep_id);
+ write_lock(g_lock);
+ existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
+ if (existing_peer) {
+ mxlnd_conn_decref(peer->mxp_conn);
+ mxlnd_peer_decref(peer);
+ peer = existing_peer;
+ mxlnd_conn_addref(peer->mxp_conn);
+ conn = peer->mxp_conn;
+ } else {
+ list_add_tail(&peer->mxp_list,
+ &kmxlnd_data.kmx_peers[hash]);
+ atomic_inc(&kmxlnd_data.kmx_npeers);
+ }
+ write_unlock(g_lock);
+ } else {
+ ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
+ write_lock(g_lock);
+ mxlnd_peer_decref(peer); /* drop ref taken above */
+ write_unlock(g_lock);
+ if (ret != 0) {
+ CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
+ goto cleanup;
+ }
+ }
+ conn_ref = 1; /* peer/conn_alloc() added ref for this function */
+ conn = peer->mxp_conn;
+ } else { /* unexpected handler found peer */
+ kmx_conn_t *old_conn = peer->mxp_conn;
+
+ if (sid != peer->mxp_conn->mxk_sid) {
+ /* do not call mx_disconnect() or send a BYE */
+ mxlnd_conn_disconnect(old_conn, 0, 0);
+
+ /* This allocs a conn, points peer->mxp_conn to this one.
+ * The old conn is still on the peer->mxp_conns list.
+ * As the pending requests complete, they will call
+ * conn_decref() which will eventually free it. */
+ ret = mxlnd_conn_alloc(&conn, peer);
+ if (ret != 0) {
+ CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
+ goto cleanup;
+ }
+ /* conn_alloc() adds one ref for the peer and one
+ * for this function */
+ conn_ref = 1;
+
+ peer->mxp_conn->mxk_sid = sid;
+ } else {
+ /* same sid */
+ conn = peer->mxp_conn;
+ }
+ }
+ write_lock(g_lock);
+ peer->mxp_incompatible = incompatible;
+ write_unlock(g_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_incarnation = msg->mxm_srcstamp;
+ mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
+ spin_unlock(&conn->mxk_lock);
+
+ /* handle_conn_ack() will create the CONN_ACK msg */
+ mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
+
+cleanup:
+ if (conn_ref) mxlnd_conn_decref(conn);
+
+ mxlnd_connparams_free(cp);
+ return;
+}
+
+void
+mxlnd_check_conn_ack(kmx_connparams_t *cp)
+{
+ int ret = 0;
+ int incompatible = 0;
+ u64 nic_id = 0ULL;
+ u32 ep_id = 0;
+ u32 sid = 0;
+ kmx_msg_t *msg = &cp->mxr_msg;
+ kmx_peer_t *peer = cp->mxr_peer;
+ kmx_conn_t *conn = cp->mxr_conn;
+
+ mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
+
+ ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
+ if (ret != 0) {
+ if (peer) {
+ CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from %s\n",
+ ret, libcfs_nid2str(peer->mxp_nid));
+ } else {
+ CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from "
+ "unknown host with nic_id 0x%llx\n", ret, nic_id);
+ }
+ ret = -1;
+ incompatible = 1;
+ goto failed;
+ }
+ if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
+ CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
+ "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
+ libcfs_nid2str(msg->mxm_dstnid));
+ ret = -1;
+ goto failed;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
+ CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
+ "incompatible queue depth %d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_queue_depth,
+ *kmxlnd_tunables.kmx_peercredits);
+ incompatible = 1;
+ ret = -1;
+ goto failed;
+ }
+ if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
+ CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
+ "incompatible EAGER size %d (%d wanted)\n",
+ libcfs_nid2str(msg->mxm_srcnid),
+ msg->mxm_u.conn_req.mxcrm_eager_size,
+ (int) MXLND_MSG_SIZE);
+ incompatible = 1;
+ ret = -1;
+ goto failed;
+ }
+ write_lock(&kmxlnd_data.kmx_global_lock);
+ peer->mxp_incompatible = incompatible;
+ write_unlock(&kmxlnd_data.kmx_global_lock);
+ spin_lock(&conn->mxk_lock);
+ conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
+ conn->mxk_outstanding = 0;
+ conn->mxk_incarnation = msg->mxm_srcstamp;
+ conn->mxk_timeout = 0;
+ if (!incompatible) {
+ CDEBUG(D_NET, "setting peer %s CONN_READY\n",
+ libcfs_nid2str(msg->mxm_srcnid));
+ mxlnd_set_conn_status(conn, MXLND_CONN_READY);
+ }
+ spin_unlock(&conn->mxk_lock);
+
+ if (!incompatible)
+ mxlnd_check_sends(peer);
+
+failed:
+ if (ret < 0) {
+ spin_lock(&conn->mxk_lock);
+ mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
+ spin_unlock(&conn->mxk_lock);
+ }
+
+ if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
+
+ mxlnd_connparams_free(cp);
+ return;
+}
+
+int
+mxlnd_abort_msgs(void)
+{
+ int count = 0;
+ struct list_head *orphans = &kmxlnd_data.kmx_orphan_msgs;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+
+ /* abort orphans */
+ spin_lock(g_conn_lock);
+ while (!list_empty(orphans)) {
+ kmx_ctx_t *ctx = NULL;
+ kmx_conn_t *conn = NULL;
+
+ ctx = list_entry(orphans->next, kmx_ctx_t, mxc_list);
+ list_del_init(&ctx->mxc_list);
+ spin_unlock(g_conn_lock);
+
+ ctx->mxc_errno = -ECONNABORTED;
+ conn = ctx->mxc_conn;
+ CDEBUG(D_NET, "aborting %s %s %s\n",
+ mxlnd_msgtype_to_str(ctx->mxc_msg_type),
+ ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
+ libcfs_nid2str(ctx->mxc_nid));
+ if (ctx->mxc_type == MXLND_REQ_TX) {
+ mxlnd_put_idle_tx(ctx); /* do not hold any locks */
+ if (conn) mxlnd_conn_decref(conn); /* for this tx */
+ } else {
+ ctx->mxc_state = MXLND_CTX_CANCELED;
+ mxlnd_handle_rx_completion(ctx);
+ }
+
+ count++;
+ spin_lock(g_conn_lock);
+ }
+ spin_unlock(g_conn_lock);
+
+ return count;
+}
+
+int
+mxlnd_free_conn_zombies(void)
+{
+ int count = 0;
+ struct list_head *zombies = &kmxlnd_data.kmx_conn_zombies;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+ rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
+
+ /* cleanup any zombies */
+ spin_lock(g_conn_lock);
+ while (!list_empty(zombies)) {
+ kmx_conn_t *conn = NULL;
+
+ conn = list_entry(zombies->next, kmx_conn_t, mxk_zombie);
+ list_del_init(&conn->mxk_zombie);
+ spin_unlock(g_conn_lock);
+
+ write_lock(g_lock);
+ mxlnd_conn_free_locked(conn);
+ write_unlock(g_lock);
+
+ count++;
+ spin_lock(g_conn_lock);
+ }
+ spin_unlock(g_conn_lock);
+ CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
+ return count;
+}
+
+/**
+ * mxlnd_connd - handles incoming connection requests
+ * @arg - thread id (as a void *)
+ *
+ * This thread handles incoming connection requests
+ */
+int
+mxlnd_connd(void *arg)
+{
+ long id = (long) arg;
+
+ cfs_daemonize("mxlnd_connd");
+
+ CDEBUG(D_NET, "connd starting\n");
+
+ while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
+ int ret = 0;
+ kmx_connparams_t *cp = NULL;
+ spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
+ struct list_head *conn_reqs = &kmxlnd_data.kmx_conn_reqs;
+
+ ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
+
+ if (atomic_read(&kmxlnd_data.kmx_shutdown))
+ break;
+
+ if (ret != 0)
+ continue;
+
+ ret = mxlnd_abort_msgs();
+ ret += mxlnd_free_conn_zombies();
+
+ spin_lock(g_conn_lock);
+ if (list_empty(conn_reqs)) {
+ if (ret == 0)
+ CDEBUG(D_NETERROR, "connd woke up but did not "
+ "find a kmx_connparams_t or zombie conn\n");
+ spin_unlock(g_conn_lock);
+ continue;
+ }
+ cp = list_entry(conn_reqs->next, kmx_connparams_t, mxr_list);
+ list_del_init(&cp->mxr_list);
+ spin_unlock(g_conn_lock);
+
+ switch (MXLND_MSG_TYPE(cp->mxr_match)) {
+ case MXLND_MSG_CONN_REQ:
+ /* We have a connection request. Handle it. */
+ mxlnd_passive_connect(cp);
+ break;
+ case MXLND_MSG_CONN_ACK:
+ /* The peer is ready for messages */
+ mxlnd_check_conn_ack(cp);
+ break;
+ }
+ }
+
+ mxlnd_free_conn_zombies();
+
+ CDEBUG(D_NET, "connd stopping\n");
+ mxlnd_thread_stop(id);
+ return 0;
+}
+