1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 /* if memcmp() == 0, it is NULL */
52 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
56 mxlnd_ctxstate_to_str(int mxc_state)
60 return "MXLND_CTX_INIT";
62 return "MXLND_CTX_IDLE";
64 return "MXLND_CTX_PREP";
65 case MXLND_CTX_PENDING:
66 return "MXLND_CTX_PENDING";
67 case MXLND_CTX_COMPLETED:
68 return "MXLND_CTX_COMPLETED";
69 case MXLND_CTX_CANCELED:
70 return "MXLND_CTX_CANCELED";
77 mxlnd_connstatus_to_str(int mxk_status)
80 case MXLND_CONN_READY:
81 return "MXLND_CONN_READY";
83 return "MXLND_CONN_INIT";
85 return "MXLND_CONN_WAIT";
86 case MXLND_CONN_DISCONNECT:
87 return "MXLND_CONN_DISCONNECT";
89 return "MXLND_CONN_FAIL";
96 mxlnd_msgtype_to_str(int type) {
99 return "MXLND_MSG_EAGER";
100 case MXLND_MSG_CONN_REQ:
101 return "MXLND_MSG_CONN_REQ";
102 case MXLND_MSG_CONN_ACK:
103 return "MXLND_MSG_CONN_ACK";
105 return "MXLND_MSG_BYE";
107 return "MXLND_MSG_NOOP";
108 case MXLND_MSG_PUT_REQ:
109 return "MXLND_MSG_PUT_REQ";
110 case MXLND_MSG_PUT_ACK:
111 return "MXLND_MSG_PUT_ACK";
112 case MXLND_MSG_PUT_DATA:
113 return "MXLND_MSG_PUT_DATA";
114 case MXLND_MSG_GET_REQ:
115 return "MXLND_MSG_GET_REQ";
116 case MXLND_MSG_GET_DATA:
117 return "MXLND_MSG_GET_DATA";
124 mxlnd_lnetmsg_to_str(int type)
128 return "LNET_MSG_ACK";
130 return "LNET_MSG_PUT";
132 return "LNET_MSG_GET";
134 return "LNET_MSG_REPLY";
136 return "LNET_MSG_HELLO";
144 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
146 u64 type = (u64) ctx->mxc_msg_type;
147 u64 err = (u64) error;
150 mxlnd_valid_msg_type(ctx->mxc_msg_type);
151 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
152 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
159 *msg_type = (u8) MXLND_MSG_TYPE(match);
160 *error = (u8) MXLND_ERROR_VAL(match);
161 *cookie = match & MXLND_MAX_COOKIE;
162 mxlnd_valid_msg_type(*msg_type);
167 mxlnd_get_idle_rx(kmx_conn_t *conn)
169 cfs_list_t *rxs = NULL;
170 kmx_ctx_t *rx = NULL;
172 LASSERT(conn != NULL);
174 rxs = &conn->mxk_rx_idle;
176 cfs_spin_lock(&conn->mxk_lock);
178 if (cfs_list_empty (rxs)) {
179 cfs_spin_unlock(&conn->mxk_lock);
183 rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
184 cfs_list_del_init(&rx->mxc_list);
185 cfs_spin_unlock(&conn->mxk_lock);
188 if (rx->mxc_get != rx->mxc_put) {
189 CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
190 CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
191 CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
192 CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
193 CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
194 CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
195 CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
196 CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
197 CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
198 CNETERR("*** nob= %d ***\n", rx->mxc_nob);
201 LASSERT (rx->mxc_get == rx->mxc_put);
205 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
206 rx->mxc_state = MXLND_CTX_PREP;
207 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
213 mxlnd_put_idle_rx(kmx_ctx_t *rx)
215 kmx_conn_t *conn = rx->mxc_conn;
216 cfs_list_t *rxs = &conn->mxk_rx_idle;
218 LASSERT(rx->mxc_type == MXLND_REQ_RX);
223 LASSERT(rx->mxc_get == rx->mxc_put);
225 cfs_spin_lock(&conn->mxk_lock);
226 cfs_list_add(&rx->mxc_list, rxs);
227 cfs_spin_unlock(&conn->mxk_lock);
232 mxlnd_get_idle_tx(void)
234 cfs_list_t *tmp = &kmxlnd_data.kmx_tx_idle;
235 kmx_ctx_t *tx = NULL;
237 cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
239 if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
240 CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
241 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
245 tmp = &kmxlnd_data.kmx_tx_idle;
246 tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
247 cfs_list_del_init(&tx->mxc_list);
249 /* Allocate a new completion cookie. It might not be needed,
250 * but we've got a lock right now and we're unlikely to
252 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
253 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
254 kmxlnd_data.kmx_tx_next_cookie = 1;
256 kmxlnd_data.kmx_tx_used++;
257 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
259 LASSERT (tx->mxc_get == tx->mxc_put);
263 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
264 LASSERT (tx->mxc_lntmsg[0] == NULL);
265 LASSERT (tx->mxc_lntmsg[1] == NULL);
267 tx->mxc_state = MXLND_CTX_PREP;
268 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
274 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
277 mxlnd_put_idle_tx(kmx_ctx_t *tx)
280 lnet_msg_t *lntmsg[2];
282 LASSERT(tx->mxc_type == MXLND_REQ_TX);
284 if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
285 kmx_conn_t *conn = tx->mxc_conn;
288 if (tx->mxc_errno != 0) result = tx->mxc_errno;
289 /* FIXME should we set mx_dis? */
290 mxlnd_conn_disconnect(conn, 0, 1);
293 lntmsg[0] = tx->mxc_lntmsg[0];
294 lntmsg[1] = tx->mxc_lntmsg[1];
299 LASSERT(tx->mxc_get == tx->mxc_put);
301 cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
302 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
303 kmxlnd_data.kmx_tx_used--;
304 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
306 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
307 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
313 mxlnd_connparams_free(kmx_connparams_t *cp)
315 LASSERT(cfs_list_empty(&cp->mxr_list));
316 MXLND_FREE(cp, sizeof(*cp));
321 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
322 mx_endpoint_addr_t epa, u64 match, u32 length,
323 kmx_conn_t *conn, kmx_peer_t *peer, void *data)
325 kmx_connparams_t *c = NULL;
327 MXLND_ALLOC(c, sizeof(*c));
328 if (!c) return -ENOMEM;
330 CFS_INIT_LIST_HEAD(&c->mxr_list);
331 c->mxr_context = context;
333 c->mxr_match = match;
337 c->mxr_msg = *((kmx_msg_t *) data);
344 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
346 conn->mxk_status = status;
351 * mxlnd_conn_free_locked - free the conn
352 * @conn - a kmx_conn pointer
354 * The calling function should remove the conn from the conns list first
355 * then destroy it. Caller should have write-locked kmx_global_lock.
358 mxlnd_conn_free_locked(kmx_conn_t *conn)
360 int valid = !mxlnd_endpoint_addr_null(conn->mxk_epa);
361 kmx_peer_t *peer = conn->mxk_peer;
363 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364 LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
365 cfs_list_empty (&conn->mxk_tx_free_queue) &&
366 cfs_list_empty (&conn->mxk_pending));
367 if (!cfs_list_empty(&conn->mxk_list)) {
368 cfs_list_del_init(&conn->mxk_list);
369 if (peer->mxp_conn == conn) {
370 peer->mxp_conn = NULL;
372 kmx_conn_t *temp = NULL;
374 mx_get_endpoint_addr_context(conn->mxk_epa,
377 mx_set_endpoint_addr_context(conn->mxk_epa,
381 /* unlink from global list and drop its ref */
382 cfs_list_del_init(&peer->mxp_list);
383 mxlnd_peer_decref(peer);
386 mxlnd_peer_decref(peer); /* drop conn's ref to peer */
387 if (conn->mxk_rx_pages) {
388 LASSERT (conn->mxk_rxs != NULL);
389 mxlnd_free_pages(conn->mxk_rx_pages);
393 kmx_ctx_t *rx = NULL;
395 for (i = 0; i < MXLND_RX_MSGS(); i++) {
396 rx = &conn->mxk_rxs[i];
397 if (rx->mxc_seg_list != NULL) {
398 LASSERT(rx->mxc_nseg > 0);
399 MXLND_FREE(rx->mxc_seg_list,
401 sizeof(*rx->mxc_seg_list));
404 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
407 MXLND_FREE(conn, sizeof (*conn));
413 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
417 kmx_ctx_t *ctx = NULL;
418 kmx_ctx_t *next = NULL;
419 mx_return_t mxret = MX_SUCCESS;
424 cfs_spin_lock(&conn->mxk_lock);
425 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
427 cfs_list_del_init(&ctx->mxc_list);
428 if (ctx->mxc_type == MXLND_REQ_RX) {
430 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
433 if (mxret != MX_SUCCESS) {
434 CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
437 ctx->mxc_errno = -ECONNABORTED;
438 ctx->mxc_state = MXLND_CTX_CANCELED;
439 cfs_spin_unlock(&conn->mxk_lock);
440 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
441 /* we may be holding the global lock,
442 * move to orphan list so that it can free it */
443 cfs_list_add_tail(&ctx->mxc_list,
444 &kmxlnd_data.kmx_orphan_msgs);
446 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
447 cfs_spin_lock(&conn->mxk_lock);
452 cfs_spin_unlock(&conn->mxk_lock);
460 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
463 cfs_list_t *tmp = NULL;
465 cfs_spin_lock(&conn->mxk_lock);
466 while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
467 !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
469 kmx_ctx_t *tx = NULL;
471 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
472 tmp = &conn->mxk_tx_free_queue;
474 tmp = &conn->mxk_tx_credit_queue;
477 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
478 cfs_list_del_init(&tx->mxc_list);
479 cfs_spin_unlock(&conn->mxk_lock);
480 tx->mxc_errno = -ECONNABORTED;
481 tx->mxc_state = MXLND_CTX_CANCELED;
482 /* move to orphan list and then abort */
483 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
484 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
485 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
487 cfs_spin_lock(&conn->mxk_lock);
489 cfs_spin_unlock(&conn->mxk_lock);
495 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
497 u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
498 (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
500 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
501 epa, match, NULL, NULL);
506 * mxlnd_conn_disconnect - shutdown a connection
507 * @conn - a kmx_conn pointer
508 * @mx_dis - call mx_disconnect()
509 * @send_bye - send peer a BYE msg
511 * This function sets the status to DISCONNECT, completes queued
512 * txs with failure, calls mx_disconnect, which will complete
513 * pending txs and matched rxs with failure.
516 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
518 mx_endpoint_addr_t epa = conn->mxk_epa;
519 int valid = !mxlnd_endpoint_addr_null(epa);
522 cfs_spin_lock(&conn->mxk_lock);
523 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
524 cfs_spin_unlock(&conn->mxk_lock);
527 mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
528 conn->mxk_timeout = 0;
529 cfs_spin_unlock(&conn->mxk_lock);
531 count = mxlnd_cancel_queued_txs(conn);
532 count += mxlnd_conn_cancel_pending_rxs(conn);
535 cfs_up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
537 if (send_bye && valid &&
538 conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
539 /* send a BYE to the peer */
540 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
541 libcfs_nid2str(conn->mxk_peer->mxp_nid));
542 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
543 /* wait to allow the peer to ack our message */
544 mxlnd_sleep(msecs_to_jiffies(20));
547 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
548 unsigned long last_msg = 0;
550 /* notify LNET that we are giving up on this peer */
551 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
552 last_msg = conn->mxk_last_rx;
554 last_msg = conn->mxk_last_tx;
556 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
558 if (mx_dis && valid &&
559 (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
560 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
562 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
568 * mxlnd_conn_alloc - allocate and initialize a new conn struct
569 * @connp - address of a kmx_conn pointer
570 * @peer - owning kmx_peer
572 * Returns 0 on success and -ENOMEM on failure
575 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
582 kmx_conn_t *conn = NULL;
583 kmx_pages_t *pages = NULL;
584 struct page *page = NULL;
585 kmx_ctx_t *rx = NULL;
587 LASSERT(peer != NULL);
589 MXLND_ALLOC(conn, sizeof (*conn));
591 CNETERR("Cannot allocate conn\n");
594 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
596 memset(conn, 0, sizeof(*conn));
598 ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
600 CERROR("Can't allocate rx pages\n");
601 MXLND_FREE(conn, sizeof(*conn));
604 conn->mxk_rx_pages = pages;
606 MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
607 if (conn->mxk_rxs == NULL) {
608 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
609 mxlnd_free_pages(pages);
610 MXLND_FREE(conn, sizeof(*conn));
614 memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
616 conn->mxk_peer = peer;
617 CFS_INIT_LIST_HEAD(&conn->mxk_list);
618 CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
619 cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
620 and one for the caller */
621 if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
625 /* this is localhost, set the epa and status as up */
626 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
627 conn->mxk_epa = kmxlnd_data.kmx_epa;
628 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
629 peer->mxp_reconnect_time = 0;
630 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
631 peer->mxp_nic_id = nic_id;
632 peer->mxp_ep_id = ep_id;
633 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
634 conn->mxk_timeout = 0;
636 /* conn->mxk_incarnation = 0 - will be set by peer */
637 /* conn->mxk_sid = 0 - will be set by peer */
638 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
639 /* mxk_epa - to be set after mx_iconnect() */
641 cfs_spin_lock_init(&conn->mxk_lock);
642 /* conn->mxk_timeout = 0 */
643 /* conn->mxk_last_tx = 0 */
644 /* conn->mxk_last_rx = 0 */
645 CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
647 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
648 /* mxk_outstanding = 0 */
650 CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
651 CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
652 /* conn->mxk_ntx_msgs = 0 */
653 /* conn->mxk_ntx_data = 0 */
654 /* conn->mxk_ntx_posted = 0 */
655 /* conn->mxk_data_posted = 0 */
656 CFS_INIT_LIST_HEAD(&conn->mxk_pending);
658 for (i = 0; i < MXLND_RX_MSGS(); i++) {
660 rx = &conn->mxk_rxs[i];
661 rx->mxc_type = MXLND_REQ_RX;
662 CFS_INIT_LIST_HEAD(&rx->mxc_list);
664 /* map mxc_msg to page */
665 page = pages->mxg_pages[ipage];
666 addr = page_address(page);
667 LASSERT(addr != NULL);
668 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
669 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
673 rx->mxc_nid = peer->mxp_nid;
677 offset += MXLND_MSG_SIZE;
678 LASSERT (offset <= PAGE_SIZE);
680 if (offset == PAGE_SIZE) {
683 LASSERT (ipage <= MXLND_TX_MSG_PAGES());
686 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
691 mxlnd_peer_addref(peer); /* add a ref for this conn */
693 /* add to front of peer's conns list */
694 cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
695 peer->mxp_conn = conn;
700 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
703 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
705 cfs_write_lock(g_lock);
706 ret = mxlnd_conn_alloc_locked(connp, peer);
707 cfs_write_unlock(g_lock);
712 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
715 kmx_conn_t *conn = ctx->mxc_conn;
717 ctx->mxc_state = MXLND_CTX_PENDING;
719 cfs_spin_lock(&conn->mxk_lock);
720 if (conn->mxk_status >= MXLND_CONN_INIT) {
721 cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
722 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
723 conn->mxk_timeout = ctx->mxc_deadline;
726 ctx->mxc_state = MXLND_CTX_COMPLETED;
729 cfs_spin_unlock(&conn->mxk_lock);
735 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
737 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
738 ctx->mxc_state == MXLND_CTX_COMPLETED);
739 if (ctx->mxc_state != MXLND_CTX_PENDING &&
740 ctx->mxc_state != MXLND_CTX_COMPLETED) {
741 CNETERR("deq ctx->mxc_state = %s\n",
742 mxlnd_ctxstate_to_str(ctx->mxc_state));
744 ctx->mxc_state = MXLND_CTX_COMPLETED;
745 if (!cfs_list_empty(&ctx->mxc_list)) {
746 kmx_conn_t *conn = ctx->mxc_conn;
747 kmx_ctx_t *next = NULL;
749 LASSERT(conn != NULL);
750 cfs_spin_lock(&conn->mxk_lock);
751 cfs_list_del_init(&ctx->mxc_list);
752 conn->mxk_timeout = 0;
753 if (!cfs_list_empty(&conn->mxk_pending)) {
754 next = cfs_list_entry(conn->mxk_pending.next,
755 kmx_ctx_t, mxc_list);
756 conn->mxk_timeout = next->mxc_deadline;
758 cfs_spin_unlock(&conn->mxk_lock);
764 * mxlnd_peer_free - free the peer
765 * @peer - a kmx_peer pointer
767 * The calling function should decrement the rxs, drain the tx queues and
768 * remove the peer from the peers list first then destroy it.
771 mxlnd_peer_free(kmx_peer_t *peer)
773 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
775 LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
777 if (!cfs_list_empty(&peer->mxp_list)) {
778 /* assume we are locked */
779 cfs_list_del_init(&peer->mxp_list);
782 MXLND_FREE(peer, sizeof (*peer));
783 cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
788 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
790 int ret = -EHOSTUNREACH;
791 unsigned char *haddr = NULL;
792 struct net_device *dev = NULL;
793 struct neighbour *n = NULL;
794 __be32 dst_ip = htonl(ip);
796 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
800 haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
802 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
805 if (n->nud_state & NUD_VALID) {
806 memcpy(haddr, n->ha, dev->addr_len);
818 /* We only want the MAC address of the peer's Myricom NIC. We
819 * require that each node has the IPoMX interface (myriN) up.
820 * We will not pass any traffic over IPoMX, but it allows us
821 * to get the MAC address. */
823 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
829 cfs_socket_t *sock = NULL;
832 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
833 ret = mxlnd_lookup_mac(ip, &tmp_id);
837 /* not found, try to connect (force an arp) */
838 ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
839 if (ret == -ECONNREFUSED) {
840 /* peer is there, get the MAC address */
841 mxlnd_lookup_mac(ip, &tmp_id);
845 } else if (ret == -EHOSTUNREACH && try < tries) {
846 /* add a little backoff */
847 CDEBUG(D_NET, "sleeping for %d jiffies\n",
849 mxlnd_sleep(CFS_HZ/4);
852 } while (try++ < tries);
853 CDEBUG(D_NET, "done trying. ret = %d\n", ret);
857 #ifdef __LITTLE_ENDIAN
858 *nic_id = ___arch__swab64(tmp_id);
866 * mxlnd_peer_alloc - allocate and initialize a new peer struct
867 * @peerp - address of a kmx_peer pointer
868 * @nid - LNET node id
870 * Returns 0 on success and -ENOMEM on failure
873 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
876 u32 ip = LNET_NIDADDR(nid);
877 kmx_peer_t *peer = NULL;
879 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
881 MXLND_ALLOC(peer, sizeof (*peer));
883 CNETERR("Cannot allocate peer for NID 0x%llx\n",
887 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
889 memset(peer, 0, sizeof(*peer));
891 CFS_INIT_LIST_HEAD(&peer->mxp_list);
893 /* peer->mxp_ni unused - may be used for multi-rail */
894 cfs_atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
896 peer->mxp_board = board;
897 peer->mxp_ep_id = ep_id;
898 peer->mxp_nic_id = nic_id;
900 CFS_INIT_LIST_HEAD(&peer->mxp_conns);
901 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
903 mxlnd_peer_decref(peer);
906 CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
908 if (peer->mxp_nic_id != 0ULL)
909 nic_id = peer->mxp_nic_id;
911 if (nic_id == 0ULL) {
912 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
914 peer->mxp_nic_id = nic_id;
915 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
919 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
921 /* peer->mxp_reconnect_time = 0 */
922 /* peer->mxp_incompatible = 0 */
928 static inline kmx_peer_t *
929 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
933 kmx_peer_t *peer = NULL;
935 hash = mxlnd_nid_to_hash(nid);
937 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
938 if (peer->mxp_nid == nid) {
940 mxlnd_peer_addref(peer);
944 return (found ? peer : NULL);
948 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
952 kmx_peer_t *peer = NULL;
953 kmx_peer_t *old = NULL;
954 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
956 cfs_read_lock(g_lock);
957 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
959 if ((peer && peer->mxp_conn) || /* found peer with conn or */
960 (!peer && !create)) { /* did not find peer and do not create one */
961 cfs_read_unlock(g_lock);
965 cfs_read_unlock(g_lock);
967 /* if peer but _not_ conn */
968 if (peer && !peer->mxp_conn) {
970 cfs_write_lock(g_lock);
971 if (!peer->mxp_conn) { /* check again */
972 /* create the conn */
973 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
975 /* we tried, return the peer only.
976 * the caller needs to see if the conn exists */
977 CNETERR("%s: %s could not alloc conn\n",
978 __func__, libcfs_nid2str(peer->mxp_nid));
980 /* drop extra conn ref */
981 mxlnd_conn_decref(peer->mxp_conn);
984 cfs_write_unlock(g_lock);
989 /* peer not found and we need to create one */
990 hash = mxlnd_nid_to_hash(nid);
992 /* create peer (and conn) */
993 /* adds conn ref for peer and one for this function */
994 ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
995 *kmxlnd_tunables.kmx_ep_id, 0ULL);
996 if (ret != 0) /* no memory, peer is NULL */
999 cfs_write_lock(g_lock);
1002 old = mxlnd_find_peer_by_nid_locked(nid);
1004 /* someone already created one */
1005 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1006 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1007 mxlnd_peer_decref(peer);
1010 /* no other peer, use this one */
1011 cfs_list_add_tail(&peer->mxp_list,
1012 &kmxlnd_data.kmx_peers[hash]);
1013 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
1014 mxlnd_peer_addref(peer);
1015 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1018 cfs_write_unlock(g_lock);
1024 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1026 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1027 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1028 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1029 tx->mxc_msg_type == MXLND_MSG_NOOP);
1033 * mxlnd_init_msg - set type and number of bytes
1034 * @msg - msg pointer
1035 * @type - of message
1036 * @body_nob - bytes in msg body
1039 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1041 msg->mxm_type = type;
1042 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
1046 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1048 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
1049 kmx_msg_t *msg = NULL;
1051 LASSERT (tx != NULL);
1052 LASSERT (nob <= MXLND_MSG_SIZE);
1055 /* tx->mxc_peer should have already been set if we know it */
1056 tx->mxc_msg_type = type;
1058 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1059 tx->mxc_seg.segment_length = nob;
1060 tx->mxc_pin_type = MX_PIN_PHYSICAL;
1063 msg->mxm_type = type;
1070 mxlnd_cksum (void *ptr, int nob)
1076 sum = ((sum << 1) | (sum >> 31)) + *c++;
1078 /* ensure I don't return 0 (== no checksum) */
1079 return (sum == 0) ? 1 : sum;
1083 * mxlnd_pack_msg_locked - complete msg info
1087 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1089 kmx_msg_t *msg = tx->mxc_msg;
1091 /* type and nob should already be set in init_msg() */
1092 msg->mxm_magic = MXLND_MSG_MAGIC;
1093 msg->mxm_version = MXLND_MSG_VERSION;
1095 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1096 * return credits as well */
1097 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1098 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1099 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
1100 tx->mxc_conn->mxk_outstanding = 0;
1102 msg->mxm_credits = 0;
1106 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
1107 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1108 msg->mxm_dstnid = tx->mxc_nid;
1109 /* if it is a new peer, the dststamp will be 0 */
1110 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1112 if (*kmxlnd_tunables.kmx_cksum) {
1113 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1118 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1120 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
1121 __u32 msg_cksum = 0;
1125 /* 6 bytes are enough to have received magic + version */
1127 CNETERR("not enough bytes for magic + hdr: %d\n", nob);
1131 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1133 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1136 CNETERR("Bad magic: %08x\n", msg->mxm_magic);
1140 if (msg->mxm_version !=
1141 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1142 CNETERR("Bad version: %d\n", msg->mxm_version);
1146 if (nob < hdr_size) {
1147 CNETERR("not enough for a header: %d\n", nob);
1151 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1152 if (msg_nob > nob) {
1153 CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
1157 /* checksum must be computed with mxm_cksum zero and BEFORE anything
1159 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1161 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1162 CNETERR("Bad checksum\n");
1165 msg->mxm_cksum = msg_cksum;
1168 /* leave magic unflipped as a clue to peer endianness */
1169 __swab16s(&msg->mxm_version);
1170 CLASSERT (sizeof(msg->mxm_type) == 1);
1171 CLASSERT (sizeof(msg->mxm_credits) == 1);
1172 msg->mxm_nob = msg_nob;
1173 __swab64s(&msg->mxm_srcnid);
1174 __swab64s(&msg->mxm_srcstamp);
1175 __swab64s(&msg->mxm_dstnid);
1176 __swab64s(&msg->mxm_dststamp);
1179 if (msg->mxm_srcnid == LNET_NID_ANY) {
1180 CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1184 switch (msg->mxm_type) {
1186 CNETERR("Unknown message type %x\n", msg->mxm_type);
1189 case MXLND_MSG_NOOP:
1192 case MXLND_MSG_EAGER:
1193 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1194 CNETERR("Short EAGER: %d(%d)\n", msg_nob,
1195 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1200 case MXLND_MSG_PUT_REQ:
1201 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1202 CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
1203 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1207 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1210 case MXLND_MSG_PUT_ACK:
1211 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1212 CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
1213 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1217 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1218 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1222 case MXLND_MSG_GET_REQ:
1223 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1224 CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
1225 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1229 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1233 case MXLND_MSG_CONN_REQ:
1234 case MXLND_MSG_CONN_ACK:
1235 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1236 CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
1237 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1241 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1242 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1252 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1256 * @length - length of incoming message
1257 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1259 * The caller gets the rx and sets nid, peer and conn if known.
1261 * Returns 0 on success and -1 on failure
1264 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1267 mx_return_t mxret = MX_SUCCESS;
1268 uint64_t mask = ~(MXLND_ERROR_MASK);
1270 rx->mxc_msg_type = msg_type;
1271 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1272 rx->mxc_cookie = cookie;
1273 /* rx->mxc_match may already be set */
1274 /* rx->mxc_seg.segment_ptr is already set */
1275 rx->mxc_seg.segment_length = length;
1276 ret = mxlnd_q_pending_ctx(rx);
1278 /* the caller is responsible for calling conn_decref() if needed */
1281 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1282 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1283 if (mxret != MX_SUCCESS) {
1284 mxlnd_deq_pending_ctx(rx);
1285 CNETERR("mx_kirecv() failed with %s (%d)\n",
1286 mx_strerror(mxret), (int) mxret);
1294 * mxlnd_unexpected_recv - this is the callback function that will handle
1295 * unexpected receives
1296 * @context - NULL, ignore
1297 * @source - the peer's mx_endpoint_addr_t
1298 * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1299 * @length - length of incoming message
1300 * @data_if_available - used for CONN_[REQ|ACK]
1302 * If it is an eager-sized msg, we will call recv_msg() with the actual
1303 * length. If it is a large message, we will call recv_msg() with a
1304 * length of 0 bytes to drop it because we should never have a large,
1305 * unexpected message.
1307 * NOTE - The MX library blocks until this function completes. Make it as fast as
1308 * possible. DO NOT allocate memory which can block!
1310 * If we cannot get a rx or the conn is closed, drop the message on the floor
1311 * (i.e. recv 0 bytes and ignore).
1313 mx_unexp_handler_action_t
1314 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1315 uint64_t match_value, uint32_t length, void *data_if_available)
1318 kmx_ctx_t *rx = NULL;
1323 kmx_conn_t *conn = NULL;
1324 kmx_peer_t *peer = NULL;
1329 /* TODO this will change to the net struct */
1330 if (context != NULL) {
1331 CNETERR("non-NULL context\n");
1335 CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1338 mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1339 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1340 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1341 mx_get_endpoint_addr_context(source, (void **) &conn);
1343 mxlnd_conn_addref(conn); /* add ref for this function */
1344 peer = conn->mxk_peer;
1346 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1348 if (msg_type == MXLND_MSG_BYE) {
1350 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1351 libcfs_nid2str(peer->mxp_nid));
1352 mxlnd_conn_disconnect(conn, 1, 0);
1353 mxlnd_conn_decref(conn); /* drop ref taken above */
1355 return MX_RECV_FINISHED;
1358 if (msg_type == MXLND_MSG_CONN_REQ) {
1359 kmx_connparams_t *cp = NULL;
1360 const int expected = offsetof(kmx_msg_t, mxm_u) +
1361 sizeof(kmx_connreq_msg_t);
1363 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1364 if (unlikely(length != expected || !data_if_available)) {
1365 CNETERR("received invalid CONN_REQ from %llx "
1366 "length=%d (expected %d)\n", nic_id, length, expected);
1367 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1368 return MX_RECV_FINISHED;
1371 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1372 conn, peer, data_if_available);
1373 if (unlikely(ret != 0)) {
1374 CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
1376 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1377 return MX_RECV_FINISHED;
1379 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1380 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1381 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1382 cfs_up(&kmxlnd_data.kmx_conn_sem);
1383 return MX_RECV_FINISHED;
1385 if (msg_type == MXLND_MSG_CONN_ACK) {
1386 kmx_connparams_t *cp = NULL;
1387 const int expected = offsetof(kmx_msg_t, mxm_u) +
1388 sizeof(kmx_connreq_msg_t);
1391 if (unlikely(error != 0)) {
1392 CNETERR("received CONN_ACK from %s with error -%d\n",
1393 libcfs_nid2str(peer->mxp_nid), (int) error);
1394 mxlnd_conn_disconnect(conn, 1, 0);
1395 } else if (unlikely(length != expected || !data_if_available)) {
1396 CNETERR("received %s CONN_ACK from %s "
1397 "length=%d (expected %d)\n",
1398 data_if_available ? "short" : "missing",
1399 libcfs_nid2str(peer->mxp_nid), length, expected);
1400 mxlnd_conn_disconnect(conn, 1, 1);
1402 /* peer is ready for messages */
1403 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1404 conn, peer, data_if_available);
1405 if (unlikely(ret != 0)) {
1406 CNETERR("unable to alloc kmx_connparams_t"
1407 " from %llx:%d\n", nic_id, ep_id);
1408 mxlnd_conn_disconnect(conn, 1, 1);
1410 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1411 cfs_list_add_tail(&cp->mxr_list,
1412 &kmxlnd_data.kmx_conn_reqs);
1413 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1414 cfs_up(&kmxlnd_data.kmx_conn_sem);
1417 mxlnd_conn_decref(conn); /* drop ref taken above */
1419 return MX_RECV_FINISHED;
1422 /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1424 LASSERT(peer != NULL && conn != NULL);
1426 rx = mxlnd_get_idle_rx(conn);
1428 if (length <= MXLND_MSG_SIZE) {
1429 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1431 CNETERR("unexpected large receive with "
1432 "match_value=0x%llx length=%d\n",
1433 match_value, length);
1434 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1438 /* hold conn ref until rx completes */
1439 rx->mxc_conn = conn;
1440 rx->mxc_peer = peer;
1441 rx->mxc_nid = peer->mxp_nid;
1443 CNETERR("could not post receive\n");
1444 mxlnd_put_idle_rx(rx);
1448 /* Encountered error, drop incoming message on the floor */
1449 /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1450 * uses the standard code path and acks the sender normally */
1452 if (rx == NULL || ret != 0) {
1453 mxlnd_conn_decref(conn); /* drop ref taken above */
1455 CNETERR("no idle rxs available - dropping rx"
1456 " 0x%llx from %s\n", match_value,
1457 libcfs_nid2str(peer->mxp_nid));
1460 CNETERR("disconnected peer - dropping rx\n");
1462 seg.segment_ptr = 0ULL;
1463 seg.segment_length = 0;
1464 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1465 match_value, ~0ULL, NULL, NULL);
1468 return MX_RECV_CONTINUE;
1473 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1477 kmx_peer_t *peer = NULL;
1479 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1480 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1481 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1484 *nidp = peer->mxp_nid;
1485 *count = cfs_atomic_read(&peer->mxp_refcount);
1491 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1497 mxlnd_del_peer_locked(kmx_peer_t *peer)
1499 if (peer->mxp_conn) {
1500 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1502 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1503 mxlnd_peer_decref(peer); /* drop global list ref */
1509 mxlnd_del_peer(lnet_nid_t nid)
1513 kmx_peer_t *peer = NULL;
1514 kmx_peer_t *next = NULL;
1516 if (nid != LNET_NID_ANY) {
1517 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1519 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1520 if (nid != LNET_NID_ANY) {
1524 mxlnd_peer_decref(peer); /* and drops it */
1525 mxlnd_del_peer_locked(peer);
1527 } else { /* LNET_NID_ANY */
1528 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1529 cfs_list_for_each_entry_safe(peer, next,
1530 &kmxlnd_data.kmx_peers[i],
1532 mxlnd_del_peer_locked(peer);
1536 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1542 mxlnd_get_conn_by_idx(int index)
1545 kmx_peer_t *peer = NULL;
1546 kmx_conn_t *conn = NULL;
1548 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1549 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1550 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1552 cfs_list_for_each_entry(conn, &peer->mxp_conns,
1558 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1559 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1564 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1570 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1572 kmx_conn_t *conn = NULL;
1573 kmx_conn_t *next = NULL;
1575 cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1576 mxlnd_conn_disconnect(conn, 0, 1);
1582 mxlnd_close_matching_conns(lnet_nid_t nid)
1586 kmx_peer_t *peer = NULL;
1588 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1589 if (nid != LNET_NID_ANY) {
1590 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1594 mxlnd_close_matching_conns_locked(peer);
1595 mxlnd_peer_decref(peer); /* and drops it here */
1597 } else { /* LNET_NID_ANY */
1598 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1599 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list)
1600 mxlnd_close_matching_conns_locked(peer);
1603 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1609 * mxlnd_ctl - modify MXLND parameters
1610 * @ni - LNET interface handle
1611 * @cmd - command to change
1612 * @arg - the ioctl data
1615 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1617 struct libcfs_ioctl_data *data = arg;
1620 LASSERT (ni == kmxlnd_data.kmx_ni);
1623 case IOC_LIBCFS_GET_PEER: {
1627 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1628 data->ioc_nid = nid;
1629 data->ioc_count = count;
1632 case IOC_LIBCFS_DEL_PEER: {
1633 ret = mxlnd_del_peer(data->ioc_nid);
1636 case IOC_LIBCFS_GET_CONN: {
1637 kmx_conn_t *conn = NULL;
1639 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1644 data->ioc_nid = conn->mxk_peer->mxp_nid;
1645 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1649 case IOC_LIBCFS_CLOSE_CONNECTION: {
1650 ret = mxlnd_close_matching_conns(data->ioc_nid);
1654 CNETERR("unknown ctl(%d)\n", cmd);
1662 * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1665 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1668 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1670 u8 msg_type = tx->mxc_msg_type;
1671 kmx_conn_t *conn = tx->mxc_conn;
1673 LASSERT (msg_type != 0);
1674 LASSERT (tx->mxc_nid != 0);
1675 LASSERT (tx->mxc_peer != NULL);
1676 LASSERT (tx->mxc_conn != NULL);
1678 tx->mxc_incarnation = conn->mxk_incarnation;
1680 if (msg_type != MXLND_MSG_PUT_DATA &&
1681 msg_type != MXLND_MSG_GET_DATA) {
1683 if (mxlnd_tx_requires_credit(tx)) {
1684 cfs_list_add_tail(&tx->mxc_list,
1685 &conn->mxk_tx_credit_queue);
1686 conn->mxk_ntx_msgs++;
1687 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1688 msg_type == MXLND_MSG_CONN_ACK) {
1689 /* put conn msgs at the front of the queue */
1690 cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1692 /* PUT_ACK, PUT_NAK */
1693 cfs_list_add_tail(&tx->mxc_list,
1694 &conn->mxk_tx_free_queue);
1695 conn->mxk_ntx_msgs++;
1699 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1700 conn->mxk_ntx_data++;
1707 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1710 * Add the tx to the peer's msg or data queue
1713 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1715 LASSERT(tx->mxc_peer != NULL);
1716 LASSERT(tx->mxc_conn != NULL);
1717 cfs_spin_lock(&tx->mxc_conn->mxk_lock);
1718 mxlnd_peer_queue_tx_locked(tx);
1719 cfs_spin_unlock(&tx->mxc_conn->mxk_lock);
1725 * mxlnd_queue_tx - add the tx to the global tx queue
1728 * Add the tx to the global queue and up the tx_queue_sem
1731 mxlnd_queue_tx(kmx_ctx_t *tx)
1733 kmx_peer_t *peer = tx->mxc_peer;
1734 LASSERT (tx->mxc_nid != 0);
1737 if (peer->mxp_incompatible &&
1738 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1739 /* let this fail now */
1740 tx->mxc_errno = -ECONNABORTED;
1741 mxlnd_conn_decref(peer->mxp_conn);
1742 mxlnd_put_idle_tx(tx);
1745 if (tx->mxc_conn == NULL) {
1747 kmx_conn_t *conn = NULL;
1749 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1751 tx->mxc_errno = ret;
1752 mxlnd_put_idle_tx(tx);
1755 tx->mxc_conn = conn;
1756 mxlnd_peer_decref(peer); /* and takes it from peer */
1758 LASSERT(tx->mxc_conn != NULL);
1759 mxlnd_peer_queue_tx(tx);
1760 mxlnd_check_sends(peer);
1762 cfs_spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1763 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1764 cfs_spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1765 cfs_up(&kmxlnd_data.kmx_tx_queue_sem);
1772 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1779 int first_iov_offset = 0;
1780 int first_found = 0;
1782 int last_iov_length = 0;
1783 mx_ksegment_t *seg = NULL;
1785 if (niov == 0) return 0;
1786 LASSERT(iov != NULL);
1788 for (i = 0; i < niov; i++) {
1789 sum = old_sum + (u32) iov[i].iov_len;
1790 if (!first_found && (sum > offset)) {
1792 first_iov_offset = offset - old_sum;
1794 sum = (u32) iov[i].iov_len - first_iov_offset;
1799 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1800 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1805 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1806 nseg = last_iov - first_iov + 1;
1809 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1811 CNETERR("MXLND_ALLOC() failed\n");
1814 memset(seg, 0, nseg * sizeof(*seg));
1815 ctx->mxc_nseg = nseg;
1817 for (i = 0; i < nseg; i++) {
1818 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1819 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1821 seg[i].segment_ptr += (u64) first_iov_offset;
1822 seg[i].segment_length -= (u32) first_iov_offset;
1824 if (i == (nseg - 1)) {
1825 seg[i].segment_length = (u32) last_iov_length;
1827 sum += seg[i].segment_length;
1829 ctx->mxc_seg_list = seg;
1830 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1831 #ifdef MX_PIN_FULLPAGES
1832 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1834 LASSERT(nob == sum);
1839 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1845 int first_kiov = -1;
1846 int first_kiov_offset = 0;
1847 int first_found = 0;
1849 int last_kiov_length = 0;
1850 mx_ksegment_t *seg = NULL;
1852 if (niov == 0) return 0;
1853 LASSERT(kiov != NULL);
1855 for (i = 0; i < niov; i++) {
1856 sum = old_sum + kiov[i].kiov_len;
1857 if (i == 0) sum -= kiov[i].kiov_offset;
1858 if (!first_found && (sum > offset)) {
1860 first_kiov_offset = offset - old_sum;
1861 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1863 sum = kiov[i].kiov_len - first_kiov_offset;
1868 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1869 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1874 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1875 nseg = last_kiov - first_kiov + 1;
1878 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1880 CNETERR("MXLND_ALLOC() failed\n");
1883 memset(seg, 0, niov * sizeof(*seg));
1884 ctx->mxc_nseg = niov;
1886 for (i = 0; i < niov; i++) {
1887 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1888 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1890 seg[i].segment_ptr += (u64) first_kiov_offset;
1891 /* we have to add back the original kiov_offset */
1892 seg[i].segment_length -= first_kiov_offset +
1893 kiov[first_kiov].kiov_offset;
1895 if (i == (nseg - 1)) {
1896 seg[i].segment_length = last_kiov_length;
1898 sum += seg[i].segment_length;
1900 ctx->mxc_seg_list = seg;
1901 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1902 #ifdef MX_PIN_FULLPAGES
1903 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1905 LASSERT(nob == sum);
1910 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1912 LASSERT(type == MXLND_MSG_PUT_ACK);
1913 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1914 tx->mxc_cookie = cookie;
1915 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1916 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1917 tx->mxc_match = mxlnd_create_match(tx, status);
1924 * mxlnd_send_data - get tx, map [k]iov, queue tx
1931 * This setups the DATA send for PUT or GET.
1933 * On success, it queues the tx, on failure it calls lnet_finalize()
1936 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1939 lnet_process_id_t target = lntmsg->msg_target;
1940 unsigned int niov = lntmsg->msg_niov;
1941 struct iovec *iov = lntmsg->msg_iov;
1942 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1943 unsigned int offset = lntmsg->msg_offset;
1944 unsigned int nob = lntmsg->msg_len;
1945 kmx_ctx_t *tx = NULL;
1947 LASSERT(lntmsg != NULL);
1948 LASSERT(peer != NULL);
1949 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1950 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1952 tx = mxlnd_get_idle_tx();
1954 CNETERR("Can't allocate %s tx for %s\n",
1955 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1956 libcfs_nid2str(target.nid));
1959 tx->mxc_nid = target.nid;
1960 /* NOTE called when we have a ref on the conn, get one for this tx */
1961 mxlnd_conn_addref(peer->mxp_conn);
1962 tx->mxc_peer = peer;
1963 tx->mxc_conn = peer->mxp_conn;
1964 tx->mxc_msg_type = msg_type;
1965 tx->mxc_lntmsg[0] = lntmsg;
1966 tx->mxc_cookie = cookie;
1967 tx->mxc_match = mxlnd_create_match(tx, 0);
1969 /* This setups up the mx_ksegment_t to send the DATA payload */
1971 /* do not setup the segments */
1972 CNETERR("nob = 0; why didn't we use an EAGER reply "
1973 "to %s?\n", libcfs_nid2str(target.nid));
1975 } else if (kiov == NULL) {
1976 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1978 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1981 CNETERR("Can't setup send DATA for %s\n",
1982 libcfs_nid2str(target.nid));
1983 tx->mxc_errno = -EIO;
1990 mxlnd_conn_decref(peer->mxp_conn);
1991 mxlnd_put_idle_tx(tx);
1995 CNETERR("no tx avail\n");
1996 lnet_finalize(ni, lntmsg, -EIO);
2001 * mxlnd_recv_data - map [k]iov, post rx
2008 * This setups the DATA receive for PUT or GET.
2010 * On success, it returns 0, on failure it returns -1
2013 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2016 lnet_process_id_t target = lntmsg->msg_target;
2017 unsigned int niov = lntmsg->msg_niov;
2018 struct iovec *iov = lntmsg->msg_iov;
2019 lnet_kiov_t *kiov = lntmsg->msg_kiov;
2020 unsigned int offset = lntmsg->msg_offset;
2021 unsigned int nob = lntmsg->msg_len;
2022 mx_return_t mxret = MX_SUCCESS;
2023 u64 mask = ~(MXLND_ERROR_MASK);
2025 /* above assumes MXLND_MSG_PUT_DATA */
2026 if (msg_type == MXLND_MSG_GET_DATA) {
2027 niov = lntmsg->msg_md->md_niov;
2028 iov = lntmsg->msg_md->md_iov.iov;
2029 kiov = lntmsg->msg_md->md_iov.kiov;
2031 nob = lntmsg->msg_md->md_length;
2034 LASSERT(lntmsg != NULL);
2035 LASSERT(rx != NULL);
2036 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2037 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2039 rx->mxc_msg_type = msg_type;
2040 rx->mxc_state = MXLND_CTX_PENDING;
2041 rx->mxc_nid = target.nid;
2042 /* if posting a GET_DATA, we may not yet know the peer */
2043 if (rx->mxc_peer != NULL) {
2044 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2046 rx->mxc_lntmsg[0] = lntmsg;
2047 rx->mxc_cookie = cookie;
2048 rx->mxc_match = mxlnd_create_match(rx, 0);
2049 /* This setups up the mx_ksegment_t to receive the DATA payload */
2051 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2053 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2055 if (msg_type == MXLND_MSG_GET_DATA) {
2056 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2057 if (rx->mxc_lntmsg[1] == NULL) {
2058 CNETERR("Can't create reply for GET -> %s\n",
2059 libcfs_nid2str(target.nid));
2064 CNETERR("Can't setup %s rx for %s\n",
2065 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2066 libcfs_nid2str(target.nid));
2069 ret = mxlnd_q_pending_ctx(rx);
2073 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2074 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2075 rx->mxc_seg_list, rx->mxc_nseg,
2076 rx->mxc_pin_type, rx->mxc_match,
2079 if (mxret != MX_SUCCESS) {
2080 if (rx->mxc_conn != NULL) {
2081 mxlnd_deq_pending_ctx(rx);
2083 CNETERR("mx_kirecv() failed with %d for %s\n",
2084 (int) mxret, libcfs_nid2str(target.nid));
2092 * mxlnd_send - the LND required send function
2097 * This must not block. Since we may not have a peer struct for the receiver,
2098 * it will append send messages on a global tx list. We will then up the
2099 * tx_queued's semaphore to notify it of the new send.
2102 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2105 int type = lntmsg->msg_type;
2106 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
2107 lnet_process_id_t target = lntmsg->msg_target;
2108 lnet_nid_t nid = target.nid;
2109 int target_is_router = lntmsg->msg_target_is_router;
2110 int routing = lntmsg->msg_routing;
2111 unsigned int payload_niov = lntmsg->msg_niov;
2112 struct iovec *payload_iov = lntmsg->msg_iov;
2113 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
2114 unsigned int payload_offset = lntmsg->msg_offset;
2115 unsigned int payload_nob = lntmsg->msg_len;
2116 kmx_ctx_t *tx = NULL;
2117 kmx_msg_t *txmsg = NULL;
2118 kmx_ctx_t *rx = (kmx_ctx_t *) private; /* for REPLY */
2119 kmx_ctx_t *rx_data = NULL;
2120 kmx_conn_t *conn = NULL;
2122 uint32_t length = 0;
2123 kmx_peer_t *peer = NULL;
2124 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2126 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2127 payload_nob, payload_niov, libcfs_id2str(target));
2129 LASSERT (payload_nob == 0 || payload_niov > 0);
2130 LASSERT (payload_niov <= LNET_MAX_IOV);
2131 /* payload is either all vaddrs or all pages */
2132 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2134 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2136 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2137 * to a new peer, so create one if not found */
2138 peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2139 if (peer == NULL || peer->mxp_conn == NULL) {
2140 /* we could not find it nor could we create one or
2141 * one exists but we cannot create a conn,
2142 * fail this message */
2144 /* found peer without conn, drop ref taken above */
2145 LASSERT(peer->mxp_conn == NULL);
2146 mxlnd_peer_decref(peer);
2151 /* we have a peer with a conn */
2153 if (unlikely(peer->mxp_incompatible)) {
2154 mxlnd_peer_decref(peer); /* drop ref taken above */
2156 cfs_read_lock(g_lock);
2157 conn = peer->mxp_conn;
2158 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2159 mxlnd_conn_addref(conn);
2163 cfs_read_unlock(g_lock);
2164 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2169 LASSERT(peer && conn);
2171 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2175 LASSERT (payload_nob == 0);
2178 case LNET_MSG_REPLY:
2180 /* Is the payload small enough not to need DATA? */
2181 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2182 if (nob <= MXLND_MSG_SIZE)
2183 break; /* send EAGER */
2185 tx = mxlnd_get_idle_tx();
2186 if (unlikely(tx == NULL)) {
2187 CNETERR("Can't allocate %s tx for %s\n",
2188 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2189 libcfs_nid2str(nid));
2190 if (conn) mxlnd_conn_decref(conn);
2194 tx->mxc_peer = peer;
2195 tx->mxc_conn = conn;
2196 /* we added a conn ref above */
2197 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2198 txmsg = tx->mxc_msg;
2199 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2200 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2201 tx->mxc_match = mxlnd_create_match(tx, 0);
2203 /* we must post a receive _before_ sending the request.
2204 * we need to determine how much to receive, it will be either
2205 * a put_ack or a put_nak. The put_ack is larger, so use it. */
2207 rx = mxlnd_get_idle_rx(conn);
2208 if (unlikely(rx == NULL)) {
2209 CNETERR("Can't allocate rx for PUT_ACK for %s\n",
2210 libcfs_nid2str(nid));
2211 mxlnd_put_idle_tx(tx);
2212 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2216 rx->mxc_peer = peer;
2217 mxlnd_conn_addref(conn); /* for this rx */
2218 rx->mxc_conn = conn;
2219 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2220 rx->mxc_cookie = tx->mxc_cookie;
2221 rx->mxc_match = mxlnd_create_match(rx, 0);
2223 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2224 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2225 if (unlikely(ret != 0)) {
2226 CNETERR("recv_msg() failed for PUT_ACK for %s\n",
2227 libcfs_nid2str(nid));
2228 rx->mxc_lntmsg[0] = NULL;
2229 mxlnd_put_idle_rx(rx);
2230 mxlnd_put_idle_tx(tx);
2231 mxlnd_conn_decref(conn); /* for the rx... */
2232 mxlnd_conn_decref(conn); /* and for the tx */
2233 return -EHOSTUNREACH;
2240 if (routing || target_is_router)
2241 break; /* send EAGER */
2243 /* is the REPLY message too small for DATA? */
2244 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2245 if (nob <= MXLND_MSG_SIZE)
2246 break; /* send EAGER */
2248 /* get tx (we need the cookie) , post rx for incoming DATA,
2249 * then post GET_REQ tx */
2250 tx = mxlnd_get_idle_tx();
2251 if (unlikely(tx == NULL)) {
2252 CNETERR("Can't allocate GET tx for %s\n",
2253 libcfs_nid2str(nid));
2254 mxlnd_conn_decref(conn); /* for the ref taken above */
2257 rx_data = mxlnd_get_idle_rx(conn);
2258 if (unlikely(rx_data == NULL)) {
2259 CNETERR("Can't allocate DATA rx for %s\n",
2260 libcfs_nid2str(nid));
2261 mxlnd_put_idle_tx(tx);
2262 mxlnd_conn_decref(conn); /* for the ref taken above */
2265 rx_data->mxc_peer = peer;
2266 /* NOTE no need to lock peer before adding conn ref since we took
2267 * a conn ref for the tx (it cannot be freed between there and here ) */
2268 mxlnd_conn_addref(conn); /* for the rx_data */
2269 rx_data->mxc_conn = conn;
2271 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2272 if (unlikely(ret != 0)) {
2273 CNETERR("Can't setup GET sink for %s\n",
2274 libcfs_nid2str(nid));
2275 mxlnd_put_idle_rx(rx_data);
2276 mxlnd_put_idle_tx(tx);
2277 mxlnd_conn_decref(conn); /* for the rx_data... */
2278 mxlnd_conn_decref(conn); /* and for the tx */
2282 tx->mxc_peer = peer;
2283 tx->mxc_conn = conn;
2284 /* conn ref taken above */
2285 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2286 txmsg = tx->mxc_msg;
2287 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2288 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2289 tx->mxc_match = mxlnd_create_match(tx, 0);
2296 mxlnd_conn_decref(conn); /* drop ref taken above */
2302 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2305 tx = mxlnd_get_idle_tx();
2306 if (unlikely(tx == NULL)) {
2307 CNETERR("Can't send %s to %s: tx descs exhausted\n",
2308 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2309 mxlnd_conn_decref(conn); /* drop ref taken above */
2313 tx->mxc_peer = peer;
2314 tx->mxc_conn = conn;
2315 /* conn ref taken above */
2316 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2317 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2318 tx->mxc_match = mxlnd_create_match(tx, 0);
2320 txmsg = tx->mxc_msg;
2321 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2323 if (payload_kiov != NULL)
2324 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2325 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2326 payload_niov, payload_kiov, payload_offset, payload_nob);
2328 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2329 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2330 payload_niov, payload_iov, payload_offset, payload_nob);
2332 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2338 * mxlnd_recv - the LND required recv function
2349 * This must not block.
2352 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2353 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2354 unsigned int offset, unsigned int mlen, unsigned int rlen)
2359 kmx_ctx_t *rx = private;
2360 kmx_msg_t *rxmsg = rx->mxc_msg;
2361 lnet_nid_t nid = rx->mxc_nid;
2362 kmx_ctx_t *tx = NULL;
2363 kmx_msg_t *txmsg = NULL;
2364 kmx_peer_t *peer = rx->mxc_peer;
2365 kmx_conn_t *conn = peer->mxp_conn;
2367 int msg_type = rxmsg->mxm_type;
2372 LASSERT (mlen <= rlen);
2373 /* Either all pages or all vaddrs */
2374 LASSERT (!(kiov != NULL && iov != NULL));
2375 LASSERT (peer && conn);
2377 /* conn_addref(conn) already taken for the primary rx */
2380 case MXLND_MSG_EAGER:
2381 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2382 len = rx->mxc_status.xfer_length;
2383 if (unlikely(nob > len)) {
2384 CNETERR("Eager message from %s too big: %d(%d)\n",
2385 libcfs_nid2str(nid), nob, len);
2391 lnet_copy_flat2kiov(niov, kiov, offset,
2392 MXLND_MSG_SIZE, rxmsg,
2393 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2396 lnet_copy_flat2iov(niov, iov, offset,
2397 MXLND_MSG_SIZE, rxmsg,
2398 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2404 case MXLND_MSG_PUT_REQ:
2405 /* we are going to reuse the rx, store the needed info */
2406 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2408 /* get tx, post rx, send PUT_ACK */
2410 tx = mxlnd_get_idle_tx();
2411 if (unlikely(tx == NULL)) {
2412 CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
2413 /* Not replying will break the connection */
2417 if (unlikely(mlen == 0)) {
2419 tx->mxc_peer = peer;
2420 tx->mxc_conn = conn;
2421 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2426 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2427 tx->mxc_peer = peer;
2428 tx->mxc_conn = conn;
2429 /* no need to lock peer first since we already have a ref */
2430 mxlnd_conn_addref(conn); /* for the tx */
2431 txmsg = tx->mxc_msg;
2432 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2433 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2434 tx->mxc_cookie = cookie;
2435 tx->mxc_match = mxlnd_create_match(tx, 0);
2437 /* we must post a receive _before_ sending the PUT_ACK */
2439 rx->mxc_state = MXLND_CTX_PREP;
2440 rx->mxc_peer = peer;
2441 rx->mxc_conn = conn;
2442 /* do not take another ref for this rx, it is already taken */
2443 rx->mxc_nid = peer->mxp_nid;
2444 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2445 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2447 if (unlikely(ret != 0)) {
2448 /* Notify peer that it's over */
2449 CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
2450 libcfs_nid2str(nid), ret);
2452 tx->mxc_state = MXLND_CTX_PREP;
2453 tx->mxc_peer = peer;
2454 tx->mxc_conn = conn;
2455 /* finalize = 0, let the PUT_ACK tx finalize this */
2456 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2457 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2458 /* conn ref already taken above */
2459 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2465 /* do not return a credit until after PUT_DATA returns */
2469 case MXLND_MSG_GET_REQ:
2470 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2472 if (likely(lntmsg != NULL)) {
2473 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2476 /* GET didn't match anything */
2477 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2478 * We have to embed the error code in the match bits.
2479 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2480 tx = mxlnd_get_idle_tx();
2481 if (unlikely(tx == NULL)) {
2482 CNETERR("Can't get tx for GET NAK for %s\n",
2483 libcfs_nid2str(nid));
2484 /* we can't get a tx, notify the peer that the GET failed */
2485 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2490 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2491 tx->mxc_state = MXLND_CTX_PENDING;
2493 tx->mxc_peer = peer;
2494 tx->mxc_conn = conn;
2495 /* no need to lock peer first since we already have a ref */
2496 mxlnd_conn_addref(conn); /* for this tx */
2497 tx->mxc_cookie = cookie;
2498 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2499 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2502 /* finalize lntmsg after tx completes */
2510 /* we received a message, increment peer's outstanding credits */
2512 cfs_spin_lock(&conn->mxk_lock);
2513 conn->mxk_outstanding++;
2514 cfs_spin_unlock(&conn->mxk_lock);
2516 /* we are done with the rx */
2517 mxlnd_put_idle_rx(rx);
2518 mxlnd_conn_decref(conn);
2521 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2523 /* we received a credit, see if we can use it to send a msg */
2524 if (credit) mxlnd_check_sends(peer);
2530 mxlnd_sleep(unsigned long timeout)
2532 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2533 cfs_schedule_timeout(timeout);
2538 * mxlnd_tx_queued - the generic send queue thread
2539 * @arg - thread id (as a void *)
2541 * This thread moves send messages from the global tx_queue to the owning
2542 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2543 * it to the global peer list.
2546 mxlnd_tx_queued(void *arg)
2548 long id = (long) arg;
2551 kmx_ctx_t *tx = NULL;
2552 kmx_peer_t *peer = NULL;
2553 cfs_list_t *queue = &kmxlnd_data.kmx_tx_queue;
2554 cfs_spinlock_t *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2555 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2557 cfs_daemonize("mxlnd_tx_queued");
2559 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
2560 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2561 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
2563 if (ret != 0) // Should we check for -EINTR?
2565 cfs_spin_lock(tx_q_lock);
2566 if (cfs_list_empty (&kmxlnd_data.kmx_tx_queue)) {
2567 cfs_spin_unlock(tx_q_lock);
2570 tx = cfs_list_entry (queue->next, kmx_ctx_t, mxc_list);
2571 cfs_list_del_init(&tx->mxc_list);
2572 cfs_spin_unlock(tx_q_lock);
2575 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2577 tx->mxc_peer = peer;
2578 cfs_write_lock(g_lock);
2579 if (peer->mxp_conn == NULL) {
2580 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2582 /* out of memory, give up and fail tx */
2583 tx->mxc_errno = -ENOMEM;
2584 mxlnd_peer_decref(peer);
2585 cfs_write_unlock(g_lock);
2586 mxlnd_put_idle_tx(tx);
2590 tx->mxc_conn = peer->mxp_conn;
2591 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2592 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2593 cfs_write_unlock(g_lock);
2599 kmx_peer_t *peer = NULL;
2600 kmx_peer_t *old = NULL;
2602 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2604 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2605 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2607 /* adds conn ref for this function */
2608 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2609 *kmxlnd_tunables.kmx_board,
2610 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2612 /* finalize message */
2613 tx->mxc_errno = ret;
2614 mxlnd_put_idle_tx(tx);
2617 tx->mxc_peer = peer;
2618 tx->mxc_conn = peer->mxp_conn;
2619 /* this tx will keep the conn ref taken in peer_alloc() */
2621 /* add peer to global peer list, but look to see
2622 * if someone already created it after we released
2624 cfs_write_lock(g_lock);
2625 old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2627 /* we have a peer ref on old */
2628 if (old->mxp_conn) {
2632 /* drop our ref taken above... */
2633 mxlnd_peer_decref(old);
2635 mxlnd_del_peer_locked(old);
2640 cfs_list_add_tail(&peer->mxp_list,
2641 &kmxlnd_data.kmx_peers[hash]);
2642 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
2645 tx->mxc_conn = old->mxp_conn;
2646 LASSERT(old->mxp_conn != NULL);
2647 mxlnd_conn_addref(old->mxp_conn);
2648 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2649 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2650 mxlnd_peer_decref(peer);
2652 cfs_write_unlock(g_lock);
2657 mxlnd_thread_stop(id);
2661 /* When calling this, we must not have the peer lock. */
2663 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2665 mx_return_t mxret = MX_SUCCESS;
2666 mx_request_t request;
2667 kmx_conn_t *conn = peer->mxp_conn;
2668 u64 match = ((u64) msg_type) << MXLND_MSG_OFFSET;
2670 /* NOTE we are holding a conn ref every time we call this function,
2671 * we do not need to lock the peer before taking another ref */
2672 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2674 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2676 if (peer->mxp_reconnect_time == 0) {
2677 peer->mxp_reconnect_time = jiffies;
2680 if (peer->mxp_nic_id == 0ULL) {
2683 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2684 &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2686 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2688 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2689 /* not mapped yet, return */
2690 cfs_spin_lock(&conn->mxk_lock);
2691 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2692 cfs_spin_unlock(&conn->mxk_lock);
2696 if (cfs_time_after(jiffies,
2697 peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2698 conn->mxk_status != MXLND_CONN_DISCONNECT) {
2699 /* give up and notify LNET */
2700 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2701 libcfs_nid2str(peer->mxp_nid));
2702 mxlnd_conn_disconnect(conn, 0, 0);
2703 mxlnd_conn_decref(conn);
2707 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2708 peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2709 (void *) peer, &request);
2710 if (unlikely(mxret != MX_SUCCESS)) {
2711 cfs_spin_lock(&conn->mxk_lock);
2712 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2713 cfs_spin_unlock(&conn->mxk_lock);
2714 CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
2715 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2716 mxlnd_conn_decref(conn);
2718 mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2719 MXLND_CONNECT_TIMEOUT/CFS_HZ*1000);
2723 #define MXLND_STATS 0
2726 mxlnd_check_sends(kmx_peer_t *peer)
2730 mx_return_t mxret = MX_SUCCESS;
2731 kmx_ctx_t *tx = NULL;
2732 kmx_conn_t *conn = NULL;
2739 static unsigned long last = 0;
2742 if (unlikely(peer == NULL)) {
2743 LASSERT(peer != NULL);
2746 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
2747 conn = peer->mxp_conn;
2748 /* NOTE take a ref for the duration of this function since it is called
2749 * when there might not be any queued txs for this peer */
2751 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2752 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2755 mxlnd_conn_addref(conn); /* for duration of this function */
2757 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2759 /* do not add another ref for this tx */
2762 /* we do not have any conns */
2763 CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2768 if (cfs_time_after(jiffies, last)) {
2769 last = jiffies + CFS_HZ;
2770 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2771 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2772 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2773 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2774 conn->mxk_ntx_data, conn->mxk_data_posted);
2778 cfs_spin_lock(&conn->mxk_lock);
2779 ntx_posted = conn->mxk_ntx_posted;
2780 credits = conn->mxk_credits;
2782 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2783 LASSERT(ntx_posted >= 0);
2785 LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2786 LASSERT(credits >= 0);
2788 /* check number of queued msgs, ignore data */
2789 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2790 /* check if any txs queued that could return credits... */
2791 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2792 conn->mxk_ntx_msgs == 0) {
2793 /* if not, send a NOOP */
2794 tx = mxlnd_get_idle_tx();
2795 if (likely(tx != NULL)) {
2796 tx->mxc_peer = peer;
2797 tx->mxc_conn = peer->mxp_conn;
2798 mxlnd_conn_addref(conn); /* for this tx */
2799 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2800 tx->mxc_match = mxlnd_create_match(tx, 0);
2801 mxlnd_peer_queue_tx_locked(tx);
2808 /* if the peer is not ready, try to connect */
2809 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2810 conn->mxk_status == MXLND_CONN_FAIL)) {
2811 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2812 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2813 cfs_spin_unlock(&conn->mxk_lock);
2814 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2818 while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2819 !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2820 /* We have something to send. If we have a queued tx that does not
2821 * require a credit (free), choose it since its completion will
2822 * return a credit (here or at the peer), complete a DATA or
2823 * CONN_REQ or CONN_ACK. */
2824 cfs_list_t *tmp_tx = NULL;
2825 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2826 tmp_tx = &conn->mxk_tx_free_queue;
2828 tmp_tx = &conn->mxk_tx_credit_queue;
2830 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2832 msg_type = tx->mxc_msg_type;
2834 /* don't try to send a rx */
2835 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2837 /* ensure that it is a valid msg type */
2838 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2839 msg_type == MXLND_MSG_CONN_ACK ||
2840 msg_type == MXLND_MSG_NOOP ||
2841 msg_type == MXLND_MSG_EAGER ||
2842 msg_type == MXLND_MSG_PUT_REQ ||
2843 msg_type == MXLND_MSG_PUT_ACK ||
2844 msg_type == MXLND_MSG_PUT_DATA ||
2845 msg_type == MXLND_MSG_GET_REQ ||
2846 msg_type == MXLND_MSG_GET_DATA);
2847 LASSERT(tx->mxc_peer == peer);
2848 LASSERT(tx->mxc_nid == peer->mxp_nid);
2850 credit = mxlnd_tx_requires_credit(tx);
2853 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2854 CDEBUG(D_NET, "%s: posted enough\n",
2855 libcfs_nid2str(peer->mxp_nid));
2859 if (conn->mxk_credits == 0) {
2860 CDEBUG(D_NET, "%s: no credits\n",
2861 libcfs_nid2str(peer->mxp_nid));
2865 if (conn->mxk_credits == 1 && /* last credit reserved for */
2866 conn->mxk_outstanding == 0) { /* giving back credits */
2867 CDEBUG(D_NET, "%s: not using last credit\n",
2868 libcfs_nid2str(peer->mxp_nid));
2873 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2874 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2875 msg_type == MXLND_MSG_CONN_ACK)) {
2876 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2877 mxlnd_connstatus_to_str(conn->mxk_status),
2879 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2880 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2881 cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2882 cfs_list_del_init(&tx->mxc_list);
2883 tx->mxc_errno = -ECONNABORTED;
2884 cfs_spin_unlock(&conn->mxk_lock);
2885 mxlnd_put_idle_tx(tx);
2886 mxlnd_conn_decref(conn);
2893 cfs_list_del_init(&tx->mxc_list);
2895 /* handle credits, etc now while we have the lock to avoid races */
2897 conn->mxk_credits--;
2898 conn->mxk_ntx_posted++;
2900 if (msg_type != MXLND_MSG_PUT_DATA &&
2901 msg_type != MXLND_MSG_GET_DATA) {
2902 if (msg_type != MXLND_MSG_CONN_REQ &&
2903 msg_type != MXLND_MSG_CONN_ACK) {
2904 conn->mxk_ntx_msgs--;
2907 if (tx->mxc_incarnation == 0 &&
2908 conn->mxk_incarnation != 0) {
2909 tx->mxc_incarnation = conn->mxk_incarnation;
2912 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2913 * or (2) there is a non-DATA msg that can return credits in the
2914 * queue, then drop this duplicate NOOP */
2915 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2916 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2917 (conn->mxk_ntx_msgs >= 1)) {
2918 conn->mxk_credits++;
2919 conn->mxk_ntx_posted--;
2920 cfs_spin_unlock(&conn->mxk_lock);
2921 /* redundant NOOP */
2922 mxlnd_put_idle_tx(tx);
2923 mxlnd_conn_decref(conn);
2924 CDEBUG(D_NET, "%s: redundant noop\n",
2925 libcfs_nid2str(peer->mxp_nid));
2932 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2933 (msg_type != MXLND_MSG_GET_DATA))) {
2934 mxlnd_pack_msg_locked(tx);
2939 status = conn->mxk_status;
2940 cfs_spin_unlock(&conn->mxk_lock);
2942 if (likely((status == MXLND_CONN_READY) ||
2943 (msg_type == MXLND_MSG_CONN_REQ) ||
2944 (msg_type == MXLND_MSG_CONN_ACK))) {
2946 if (msg_type != MXLND_MSG_CONN_REQ &&
2947 msg_type != MXLND_MSG_CONN_ACK) {
2948 /* add to the pending list */
2949 ret = mxlnd_q_pending_ctx(tx);
2952 tx->mxc_state = MXLND_CTX_PENDING;
2956 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2957 msg_type != MXLND_MSG_GET_DATA)) {
2958 /* send a msg style tx */
2959 LASSERT(tx->mxc_nseg == 1);
2960 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2961 CDEBUG(D_NET, "sending %s 0x%llx\n",
2962 mxlnd_msgtype_to_str(msg_type),
2964 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2973 /* send a DATA tx */
2974 cfs_spin_lock(&conn->mxk_lock);
2975 conn->mxk_ntx_data--;
2976 conn->mxk_data_posted++;
2977 cfs_spin_unlock(&conn->mxk_lock);
2978 CDEBUG(D_NET, "sending %s 0x%llx\n",
2979 mxlnd_msgtype_to_str(msg_type),
2981 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2992 mxret = MX_CONNECTION_FAILED;
2994 if (likely(mxret == MX_SUCCESS)) {
2997 CNETERR("mx_kisend() failed with %s (%d) "
2998 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2999 libcfs_nid2str(peer->mxp_nid));
3000 /* NOTE mx_kisend() only fails if there are not enough
3001 * resources. Do not change the connection status. */
3002 if (mxret == MX_NO_RESOURCES) {
3003 tx->mxc_errno = -ENOMEM;
3005 tx->mxc_errno = -ECONNABORTED;
3008 cfs_spin_lock(&conn->mxk_lock);
3009 conn->mxk_ntx_posted--;
3010 conn->mxk_credits++;
3011 cfs_spin_unlock(&conn->mxk_lock);
3012 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3013 msg_type == MXLND_MSG_GET_DATA) {
3014 cfs_spin_lock(&conn->mxk_lock);
3015 conn->mxk_data_posted--;
3016 cfs_spin_unlock(&conn->mxk_lock);
3018 if (msg_type != MXLND_MSG_PUT_DATA &&
3019 msg_type != MXLND_MSG_GET_DATA &&
3020 msg_type != MXLND_MSG_CONN_REQ &&
3021 msg_type != MXLND_MSG_CONN_ACK) {
3022 cfs_spin_lock(&conn->mxk_lock);
3023 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3024 cfs_spin_unlock(&conn->mxk_lock);
3026 if (msg_type != MXLND_MSG_CONN_REQ &&
3027 msg_type != MXLND_MSG_CONN_ACK) {
3028 /* remove from the pending list */
3029 mxlnd_deq_pending_ctx(tx);
3031 mxlnd_put_idle_tx(tx);
3032 mxlnd_conn_decref(conn);
3035 cfs_spin_lock(&conn->mxk_lock);
3038 cfs_spin_unlock(&conn->mxk_lock);
3040 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3046 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3047 * @ctx - the tx descriptor
3049 * Determine which type of send request it was and start the next step, if needed,
3050 * or, if done, signal completion to LNET. After we are done, put back on the
3054 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3056 int code = tx->mxc_status.code;
3057 int failed = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3058 kmx_msg_t *msg = tx->mxc_msg;
3059 kmx_peer_t *peer = tx->mxc_peer;
3060 kmx_conn_t *conn = tx->mxc_conn;
3061 u8 type = tx->mxc_msg_type;
3062 int credit = mxlnd_tx_requires_credit(tx);
3063 u64 cookie = tx->mxc_cookie;
3065 CDEBUG(D_NET, "entering %s (0x%llx):\n",
3066 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3068 LASSERT (peer != NULL);
3069 LASSERT (conn != NULL);
3071 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3072 LASSERT (type == msg->mxm_type);
3076 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3078 cfs_spin_lock(&conn->mxk_lock);
3079 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3080 cfs_spin_unlock(&conn->mxk_lock);
3085 case MXLND_MSG_GET_DATA:
3086 cfs_spin_lock(&conn->mxk_lock);
3087 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3088 conn->mxk_outstanding++;
3089 conn->mxk_data_posted--;
3091 cfs_spin_unlock(&conn->mxk_lock);
3094 case MXLND_MSG_PUT_DATA:
3095 cfs_spin_lock(&conn->mxk_lock);
3096 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3097 conn->mxk_data_posted--;
3099 cfs_spin_unlock(&conn->mxk_lock);
3102 case MXLND_MSG_NOOP:
3103 case MXLND_MSG_PUT_REQ:
3104 case MXLND_MSG_PUT_ACK:
3105 case MXLND_MSG_GET_REQ:
3106 case MXLND_MSG_EAGER:
3109 case MXLND_MSG_CONN_ACK:
3110 if (peer->mxp_incompatible) {
3111 /* we sent our params, now close this conn */
3112 mxlnd_conn_disconnect(conn, 0, 1);
3114 case MXLND_MSG_CONN_REQ:
3116 CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
3117 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3118 mx_strstatus(code), code, tx->mxc_errno,
3119 libcfs_nid2str(tx->mxc_nid));
3120 if (!peer->mxp_incompatible) {
3121 cfs_spin_lock(&conn->mxk_lock);
3122 if (code == MX_STATUS_BAD_SESSION)
3123 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3125 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3126 cfs_spin_unlock(&conn->mxk_lock);
3132 CNETERR("Unknown msg type of %d\n", type);
3137 cfs_spin_lock(&conn->mxk_lock);
3138 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3139 conn->mxk_ntx_posted--;
3141 cfs_spin_unlock(&conn->mxk_lock);
3144 mxlnd_put_idle_tx(tx);
3145 mxlnd_conn_decref(conn);
3147 mxlnd_check_sends(peer);
3149 CDEBUG(D_NET, "leaving\n");
3153 /* Handle completion of MSG or DATA rx.
3154 * CONN_REQ and CONN_ACK are handled elsewhere. */
3156 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3161 u32 nob = rx->mxc_status.xfer_length;
3162 u64 bits = rx->mxc_status.match_info;
3163 kmx_msg_t *msg = rx->mxc_msg;
3164 kmx_peer_t *peer = rx->mxc_peer;
3165 kmx_conn_t *conn = rx->mxc_conn;
3166 u8 type = rx->mxc_msg_type;
3168 lnet_msg_t *lntmsg[2];
3173 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3174 * failed GET reply */
3176 /* NOTE peer may still be NULL if it is a new peer and
3177 * conn may be NULL if this is a re-connect */
3178 if (likely(peer != NULL && conn != NULL)) {
3179 /* we have a reference on the conn */
3181 } else if (peer != NULL && conn == NULL) {
3182 /* we have a reference on the peer */
3184 } else if (peer == NULL && conn != NULL) {
3186 CERROR("rx 0x%llx from %s has conn but no peer\n",
3187 bits, libcfs_nid2str(rx->mxc_nid));
3189 } /* else peer and conn == NULL */
3191 if (conn == NULL && peer != NULL) {
3192 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3193 conn = peer->mxp_conn;
3195 mxlnd_conn_addref(conn); /* conn takes ref... */
3196 mxlnd_peer_decref(peer); /* from peer */
3200 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3201 rx->mxc_conn = conn;
3205 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3211 if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3212 rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3213 CNETERR("rx from %s failed with %s (%d)\n",
3214 libcfs_nid2str(rx->mxc_nid),
3215 mx_strstatus(rx->mxc_status.code),
3216 rx->mxc_status.code);
3222 /* this may be a failed GET reply */
3223 if (type == MXLND_MSG_GET_DATA) {
3224 /* get the error (52-59) bits from the match bits */
3225 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3226 lntmsg[0] = rx->mxc_lntmsg[0];
3230 /* we had a rx complete with 0 bytes (no hdr, nothing) */
3231 CNETERR("rx from %s returned with 0 bytes\n",
3232 libcfs_nid2str(rx->mxc_nid));
3237 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3238 if (type == MXLND_MSG_PUT_DATA) {
3240 lntmsg[0] = rx->mxc_lntmsg[0];
3242 } else if (type == MXLND_MSG_GET_DATA) {
3244 lntmsg[0] = rx->mxc_lntmsg[0];
3245 lntmsg[1] = rx->mxc_lntmsg[1];
3249 ret = mxlnd_unpack_msg(msg, nob);
3251 CNETERR("Error %d unpacking rx from %s\n",
3252 ret, libcfs_nid2str(rx->mxc_nid));
3256 type = msg->mxm_type;
3258 if (rx->mxc_nid != msg->mxm_srcnid ||
3259 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3260 CNETERR("rx with mismatched NID (type %s) (my nid is "
3261 "0x%llx and rx msg dst is 0x%llx)\n",
3262 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3267 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3268 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3269 CNETERR("Stale rx from %s with type %s "
3270 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3271 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3272 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3273 msg->mxm_srcstamp, conn->mxk_incarnation,
3274 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3279 CDEBUG(D_NET, "Received %s with %d credits\n",
3280 mxlnd_msgtype_to_str(type), msg->mxm_credits);
3282 LASSERT(peer != NULL && conn != NULL);
3283 if (msg->mxm_credits != 0) {
3284 cfs_spin_lock(&conn->mxk_lock);
3285 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3286 if ((conn->mxk_credits + msg->mxm_credits) >
3287 *kmxlnd_tunables.kmx_peercredits) {
3288 CNETERR("mxk_credits %d mxm_credits %d\n",
3289 conn->mxk_credits, msg->mxm_credits);
3291 conn->mxk_credits += msg->mxm_credits;
3292 LASSERT(conn->mxk_credits >= 0);
3293 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3295 cfs_spin_unlock(&conn->mxk_lock);
3298 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3300 case MXLND_MSG_NOOP:
3303 case MXLND_MSG_EAGER:
3304 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3305 msg->mxm_srcnid, rx, 0);
3309 case MXLND_MSG_PUT_REQ:
3310 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3311 msg->mxm_srcnid, rx, 1);
3315 case MXLND_MSG_PUT_ACK: {
3316 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3317 if (cookie > MXLND_MAX_COOKIE) {
3318 CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3319 libcfs_nid2str(rx->mxc_nid));
3320 result = -((u32) MXLND_ERROR_VAL(cookie));
3321 lntmsg[0] = rx->mxc_lntmsg[0];
3323 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3324 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3325 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3330 case MXLND_MSG_GET_REQ:
3331 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3332 msg->mxm_srcnid, rx, 1);
3337 CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
3338 libcfs_nid2str(rx->mxc_nid));
3344 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3345 cfs_spin_lock(&conn->mxk_lock);
3346 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3347 cfs_spin_unlock(&conn->mxk_lock);
3352 cfs_spin_lock(&conn->mxk_lock);
3353 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3354 cfs_spin_unlock(&conn->mxk_lock);
3358 /* lnet_parse() failed, etc., repost now */
3359 mxlnd_put_idle_rx(rx);
3360 if (conn != NULL && credit == 1) {
3361 if (type == MXLND_MSG_PUT_DATA ||
3362 type == MXLND_MSG_EAGER ||
3363 type == MXLND_MSG_PUT_REQ ||
3364 type == MXLND_MSG_NOOP) {
3365 cfs_spin_lock(&conn->mxk_lock);
3366 conn->mxk_outstanding++;
3367 cfs_spin_unlock(&conn->mxk_lock);
3370 if (conn_ref) mxlnd_conn_decref(conn);
3371 LASSERT(peer_ref == 0);
3374 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3375 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3377 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3380 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3381 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3383 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3389 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3391 kmx_ctx_t *tx = NULL;
3392 kmx_msg_t *txmsg = NULL;
3393 kmx_conn_t *conn = peer->mxp_conn;
3397 u8 type = (msg_type == MXLND_MSG_ICON_REQ ?
3398 MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3400 /* a conn ref was taken when calling mx_iconnect(),
3401 * hold it until CONN_REQ or CONN_ACK completes */
3403 CDEBUG(D_NET, "entering\n");
3404 if (status.code != MX_STATUS_SUCCESS) {
3405 int send_bye = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3407 CNETERR("mx_iconnect() failed for %s with %s (%d) "
3408 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3409 mxlnd_msgtype_to_str(msg_type),
3410 mx_strstatus(status.code), status.code,
3411 libcfs_nid2str(peer->mxp_nid),
3415 cfs_spin_lock(&conn->mxk_lock);
3416 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3417 cfs_spin_unlock(&conn->mxk_lock);
3419 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3420 MXLND_CONNECT_TIMEOUT)) {
3421 CNETERR("timeout, calling conn_disconnect()\n");
3422 mxlnd_conn_disconnect(conn, 0, send_bye);
3425 mxlnd_conn_decref(conn);
3428 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3429 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3430 cfs_spin_lock(&conn->mxk_lock);
3431 conn->mxk_epa = status.source;
3432 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3433 if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3434 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3436 cfs_spin_unlock(&conn->mxk_lock);
3437 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3439 /* mx_iconnect() succeeded, reset delay to 0 */
3440 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3441 peer->mxp_reconnect_time = 0;
3442 peer->mxp_conn->mxk_sid = sid;
3443 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3445 /* marshal CONN_REQ or CONN_ACK msg */
3446 /* we are still using the conn ref from iconnect() - do not take another */
3447 tx = mxlnd_get_idle_tx();
3449 CNETERR("Can't obtain %s tx for %s\n",
3450 mxlnd_msgtype_to_str(type),
3451 libcfs_nid2str(peer->mxp_nid));
3452 cfs_spin_lock(&conn->mxk_lock);
3453 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3454 cfs_spin_unlock(&conn->mxk_lock);
3455 mxlnd_conn_decref(conn);
3459 tx->mxc_peer = peer;
3460 tx->mxc_conn = conn;
3461 tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3462 CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3463 mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3464 txmsg = tx->mxc_msg;
3465 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3466 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3467 tx->mxc_match = mxlnd_create_match(tx, 0);
3474 * mxlnd_request_waitd - the MX request completion thread(s)
3475 * @arg - thread id (as a void *)
3477 * This thread waits for a MX completion and then completes the request.
3478 * We will create one thread per CPU.
3481 mxlnd_request_waitd(void *arg)
3483 long id = (long) arg;
3486 mx_return_t mxret = MX_SUCCESS;
3488 kmx_ctx_t *ctx = NULL;
3489 enum kmx_req_state req_type = MXLND_REQ_TX;
3490 kmx_peer_t *peer = NULL;
3491 kmx_conn_t *conn = NULL;
3496 memset(name, 0, sizeof(name));
3497 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3498 cfs_daemonize(name);
3500 memset(&status, 0, sizeof(status));
3502 CDEBUG(D_NET, "%s starting\n", name);
3504 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3510 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3511 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3515 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3516 0ULL, 0ULL, &status, &result);
3519 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3520 0ULL, 0ULL, &status, &result);
3522 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
3526 /* nothing completed... */
3530 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3531 "match_info 0x%llx and length %d\n",
3532 mx_strstatus(status.code), status.code,
3533 (u64) status.match_info, status.msg_length);
3535 if (status.code != MX_STATUS_SUCCESS) {
3536 CNETERR("wait_any() failed with %s (%d) with "
3537 "match_info 0x%llx and length %d\n",
3538 mx_strstatus(status.code), status.code,
3539 (u64) status.match_info, status.msg_length);
3542 msg_type = MXLND_MSG_TYPE(status.match_info);
3544 /* This may be a mx_iconnect() request completing,
3545 * check the bit mask for CONN_REQ and CONN_ACK */
3546 if (msg_type == MXLND_MSG_ICON_REQ ||
3547 msg_type == MXLND_MSG_ICON_ACK) {
3548 peer = (kmx_peer_t*) status.context;
3549 mxlnd_handle_connect_msg(peer, msg_type, status);
3553 /* This must be a tx or rx */
3555 /* NOTE: if this is a RX from the unexpected callback, it may
3556 * have very little info. If we dropped it in unexpected_recv(),
3557 * it will not have a context. If so, ignore it. */
3558 ctx = (kmx_ctx_t *) status.context;
3561 req_type = ctx->mxc_type;
3562 conn = ctx->mxc_conn; /* this may be NULL */
3563 mxlnd_deq_pending_ctx(ctx);
3565 /* copy status to ctx->mxc_status */
3566 ctx->mxc_status = status;
3570 mxlnd_handle_tx_completion(ctx);
3573 mxlnd_handle_rx_completion(ctx);
3576 CNETERR("Unknown ctx type %d\n", req_type);
3581 /* conn is always set except for the first CONN_REQ rx
3582 * from a new peer */
3583 if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3584 mxlnd_conn_disconnect(conn, 1, 1);
3587 CDEBUG(D_NET, "waitd() completed task\n");
3589 CDEBUG(D_NET, "%s stopping\n", name);
3590 mxlnd_thread_stop(id);
3596 mxlnd_check_timeouts(unsigned long now)
3600 unsigned long next = 0; /* jiffies */
3601 kmx_peer_t *peer = NULL;
3602 kmx_conn_t *conn = NULL;
3603 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3605 cfs_read_lock(g_lock);
3606 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3607 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3610 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3611 cfs_read_unlock(g_lock);
3615 conn = peer->mxp_conn;
3617 mxlnd_conn_addref(conn);
3622 cfs_spin_lock(&conn->mxk_lock);
3624 /* if nothing pending (timeout == 0) or
3625 * if conn is already disconnected,
3627 if (conn->mxk_timeout == 0 ||
3628 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3629 cfs_spin_unlock(&conn->mxk_lock);
3630 mxlnd_conn_decref(conn);
3634 /* we want to find the timeout that will occur first.
3635 * if it is in the future, we will sleep until then.
3636 * if it is in the past, then we will sleep one
3637 * second and repeat the process. */
3639 (cfs_time_before(conn->mxk_timeout, next))) {
3640 next = conn->mxk_timeout;
3645 if (cfs_time_aftereq(now, conn->mxk_timeout)) {
3648 cfs_spin_unlock(&conn->mxk_lock);
3651 mxlnd_conn_disconnect(conn, 1, 1);
3653 mxlnd_conn_decref(conn);
3656 cfs_read_unlock(g_lock);
3657 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3663 mxlnd_passive_connect(kmx_connparams_t *cp)
3666 int incompatible = 0;
3671 kmx_msg_t *msg = &cp->mxr_msg;
3672 kmx_peer_t *peer = cp->mxr_peer;
3673 kmx_conn_t *conn = NULL;
3674 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3676 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3678 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3681 CNETERR("Error %d unpacking CONN_REQ from %s\n",
3682 ret, libcfs_nid2str(peer->mxp_nid));
3684 CNETERR("Error %d unpacking CONN_REQ from "
3685 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3689 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3690 CNETERR("Can't accept %s: bad dst nid %s\n",
3691 libcfs_nid2str(msg->mxm_srcnid),
3692 libcfs_nid2str(msg->mxm_dstnid));
3695 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3696 CNETERR("Can't accept %s: incompatible queue depth "
3698 libcfs_nid2str(msg->mxm_srcnid),
3699 msg->mxm_u.conn_req.mxcrm_queue_depth,
3700 *kmxlnd_tunables.kmx_peercredits);
3703 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3704 CNETERR("Can't accept %s: incompatible EAGER size "
3706 libcfs_nid2str(msg->mxm_srcnid),
3707 msg->mxm_u.conn_req.mxcrm_eager_size,
3708 (int) MXLND_MSG_SIZE);
3713 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3717 kmx_peer_t *existing_peer = NULL;
3719 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3721 mx_nic_id_to_board_number(nic_id, &board);
3723 /* adds conn ref for peer and one for this function */
3724 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3725 board, ep_id, 0ULL);
3729 peer->mxp_conn->mxk_sid = sid;
3730 LASSERT(peer->mxp_ep_id == ep_id);
3731 cfs_write_lock(g_lock);
3732 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3733 if (existing_peer) {
3734 mxlnd_conn_decref(peer->mxp_conn);
3735 mxlnd_peer_decref(peer);
3736 peer = existing_peer;
3737 mxlnd_conn_addref(peer->mxp_conn);
3738 conn = peer->mxp_conn;
3740 cfs_list_add_tail(&peer->mxp_list,
3741 &kmxlnd_data.kmx_peers[hash]);
3742 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
3744 cfs_write_unlock(g_lock);
3746 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3747 cfs_write_lock(g_lock);
3748 mxlnd_peer_decref(peer); /* drop ref taken above */
3749 cfs_write_unlock(g_lock);
3751 CNETERR("Cannot allocate mxp_conn\n");
3755 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3756 conn = peer->mxp_conn;
3757 } else { /* unexpected handler found peer */
3758 kmx_conn_t *old_conn = peer->mxp_conn;
3760 if (sid != peer->mxp_conn->mxk_sid) {
3761 /* do not call mx_disconnect() or send a BYE */
3762 mxlnd_conn_disconnect(old_conn, 0, 0);
3764 /* This allocs a conn, points peer->mxp_conn to this one.
3765 * The old conn is still on the peer->mxp_conns list.
3766 * As the pending requests complete, they will call
3767 * conn_decref() which will eventually free it. */
3768 ret = mxlnd_conn_alloc(&conn, peer);
3770 CNETERR("Cannot allocate peer->mxp_conn\n");
3773 /* conn_alloc() adds one ref for the peer and one
3774 * for this function */
3777 peer->mxp_conn->mxk_sid = sid;
3780 conn = peer->mxp_conn;
3783 cfs_write_lock(g_lock);
3784 peer->mxp_incompatible = incompatible;
3785 cfs_write_unlock(g_lock);
3786 cfs_spin_lock(&conn->mxk_lock);
3787 conn->mxk_incarnation = msg->mxm_srcstamp;
3788 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3789 cfs_spin_unlock(&conn->mxk_lock);
3791 /* handle_conn_ack() will create the CONN_ACK msg */
3792 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3795 if (conn_ref) mxlnd_conn_decref(conn);
3797 mxlnd_connparams_free(cp);
3802 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3805 int incompatible = 0;
3809 kmx_msg_t *msg = &cp->mxr_msg;
3810 kmx_peer_t *peer = cp->mxr_peer;
3811 kmx_conn_t *conn = cp->mxr_conn;
3813 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3815 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3818 CNETERR("Error %d unpacking CONN_ACK from %s\n",
3819 ret, libcfs_nid2str(peer->mxp_nid));
3821 CNETERR("Error %d unpacking CONN_ACK from "
3822 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3828 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3829 CNETERR("Can't accept CONN_ACK from %s: "
3830 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3831 libcfs_nid2str(msg->mxm_dstnid));
3835 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3836 CNETERR("Can't accept CONN_ACK from %s: "
3837 "incompatible queue depth %d (%d wanted)\n",
3838 libcfs_nid2str(msg->mxm_srcnid),
3839 msg->mxm_u.conn_req.mxcrm_queue_depth,
3840 *kmxlnd_tunables.kmx_peercredits);
3845 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3846 CNETERR("Can't accept CONN_ACK from %s: "
3847 "incompatible EAGER size %d (%d wanted)\n",
3848 libcfs_nid2str(msg->mxm_srcnid),
3849 msg->mxm_u.conn_req.mxcrm_eager_size,
3850 (int) MXLND_MSG_SIZE);
3855 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3856 peer->mxp_incompatible = incompatible;
3857 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3858 cfs_spin_lock(&conn->mxk_lock);
3859 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3860 conn->mxk_outstanding = 0;
3861 conn->mxk_incarnation = msg->mxm_srcstamp;
3862 conn->mxk_timeout = 0;
3863 if (!incompatible) {
3864 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3865 libcfs_nid2str(msg->mxm_srcnid));
3866 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3868 cfs_spin_unlock(&conn->mxk_lock);
3871 mxlnd_check_sends(peer);
3875 cfs_spin_lock(&conn->mxk_lock);
3876 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3877 cfs_spin_unlock(&conn->mxk_lock);
3880 if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3882 mxlnd_connparams_free(cp);
3887 mxlnd_abort_msgs(void)
3890 cfs_list_t *orphans = &kmxlnd_data.kmx_orphan_msgs;
3891 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3894 cfs_spin_lock(g_conn_lock);
3895 while (!cfs_list_empty(orphans)) {
3896 kmx_ctx_t *ctx = NULL;
3897 kmx_conn_t *conn = NULL;
3899 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3900 cfs_list_del_init(&ctx->mxc_list);
3901 cfs_spin_unlock(g_conn_lock);
3903 ctx->mxc_errno = -ECONNABORTED;
3904 conn = ctx->mxc_conn;
3905 CDEBUG(D_NET, "aborting %s %s %s\n",
3906 mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3907 ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3908 libcfs_nid2str(ctx->mxc_nid));
3909 if (ctx->mxc_type == MXLND_REQ_TX) {
3910 mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3911 if (conn) mxlnd_conn_decref(conn); /* for this tx */
3913 ctx->mxc_state = MXLND_CTX_CANCELED;
3914 mxlnd_handle_rx_completion(ctx);
3918 cfs_spin_lock(g_conn_lock);
3920 cfs_spin_unlock(g_conn_lock);
3926 mxlnd_free_conn_zombies(void)
3929 cfs_list_t *zombies = &kmxlnd_data.kmx_conn_zombies;
3930 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3931 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3933 /* cleanup any zombies */
3934 cfs_spin_lock(g_conn_lock);
3935 while (!cfs_list_empty(zombies)) {
3936 kmx_conn_t *conn = NULL;
3938 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3939 cfs_list_del_init(&conn->mxk_zombie);
3940 cfs_spin_unlock(g_conn_lock);
3942 cfs_write_lock(g_lock);
3943 mxlnd_conn_free_locked(conn);
3944 cfs_write_unlock(g_lock);
3947 cfs_spin_lock(g_conn_lock);
3949 cfs_spin_unlock(g_conn_lock);
3950 CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3955 * mxlnd_connd - handles incoming connection requests
3956 * @arg - thread id (as a void *)
3958 * This thread handles incoming connection requests
3961 mxlnd_connd(void *arg)
3963 long id = (long) arg;
3965 cfs_daemonize("mxlnd_connd");
3967 CDEBUG(D_NET, "connd starting\n");
3969 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3971 kmx_connparams_t *cp = NULL;
3972 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3973 cfs_list_t *conn_reqs = &kmxlnd_data.kmx_conn_reqs;
3975 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3977 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
3983 ret = mxlnd_abort_msgs();
3984 ret += mxlnd_free_conn_zombies();
3986 cfs_spin_lock(g_conn_lock);
3987 if (cfs_list_empty(conn_reqs)) {
3989 CNETERR("connd woke up but did not "
3990 "find a kmx_connparams_t or zombie conn\n");
3991 cfs_spin_unlock(g_conn_lock);
3994 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3996 cfs_list_del_init(&cp->mxr_list);
3997 cfs_spin_unlock(g_conn_lock);
3999 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
4000 case MXLND_MSG_CONN_REQ:
4001 /* We have a connection request. Handle it. */
4002 mxlnd_passive_connect(cp);
4004 case MXLND_MSG_CONN_ACK:
4005 /* The peer is ready for messages */
4006 mxlnd_check_conn_ack(cp);
4011 mxlnd_free_conn_zombies();
4013 CDEBUG(D_NET, "connd stopping\n");
4014 mxlnd_thread_stop(id);
4019 * mxlnd_timeoutd - enforces timeouts on messages
4020 * @arg - thread id (as a void *)
4022 * This thread queries each peer for its earliest timeout. If a peer has timed out,
4023 * it calls mxlnd_conn_disconnect().
4025 * After checking for timeouts, try progressing sends (call check_sends()).
4028 mxlnd_timeoutd(void *arg)
4031 long id = (long) arg;
4032 unsigned long now = 0;
4033 unsigned long next = 0;
4034 unsigned long delay = CFS_HZ;
4035 kmx_peer_t *peer = NULL;
4036 kmx_peer_t *temp = NULL;
4037 kmx_conn_t *conn = NULL;
4038 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
4040 cfs_daemonize("mxlnd_timeoutd");
4042 CDEBUG(D_NET, "timeoutd starting\n");
4044 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
4047 /* if the next timeout has not arrived, go back to sleep */
4048 if (cfs_time_after(now, next)) {
4049 next = mxlnd_check_timeouts(now);
4052 /* try to progress peers' txs */
4053 cfs_write_lock(g_lock);
4054 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4055 cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4057 /* NOTE we are safe against the removal of peer, but
4058 * not against the removal of temp */
4059 cfs_list_for_each_entry_safe(peer, temp, peers,
4061 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
4063 mxlnd_peer_addref(peer); /* add ref... */
4064 conn = peer->mxp_conn;
4065 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4066 mxlnd_conn_addref(conn); /* take ref... */
4068 CDEBUG(D_NET, "ignoring %s\n",
4069 libcfs_nid2str(peer->mxp_nid));
4070 mxlnd_peer_decref(peer); /* ...to here */
4074 if ((conn->mxk_status == MXLND_CONN_READY ||
4075 conn->mxk_status == MXLND_CONN_FAIL) &&
4079 cfs_write_unlock(g_lock);
4080 mxlnd_check_sends(peer);
4081 cfs_write_lock(g_lock);
4083 mxlnd_conn_decref(conn); /* until here */
4084 mxlnd_peer_decref(peer); /* ...to here */
4087 cfs_write_unlock(g_lock);
4091 CDEBUG(D_NET, "timeoutd stopping\n");
4092 mxlnd_thread_stop(id);