1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 /* if memcmp() == 0, it is NULL */
52 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
56 mxlnd_ctxstate_to_str(int mxc_state)
60 return "MXLND_CTX_INIT";
62 return "MXLND_CTX_IDLE";
64 return "MXLND_CTX_PREP";
65 case MXLND_CTX_PENDING:
66 return "MXLND_CTX_PENDING";
67 case MXLND_CTX_COMPLETED:
68 return "MXLND_CTX_COMPLETED";
69 case MXLND_CTX_CANCELED:
70 return "MXLND_CTX_CANCELED";
77 mxlnd_connstatus_to_str(int mxk_status)
80 case MXLND_CONN_READY:
81 return "MXLND_CONN_READY";
83 return "MXLND_CONN_INIT";
85 return "MXLND_CONN_WAIT";
86 case MXLND_CONN_DISCONNECT:
87 return "MXLND_CONN_DISCONNECT";
89 return "MXLND_CONN_FAIL";
96 mxlnd_msgtype_to_str(int type) {
99 return "MXLND_MSG_EAGER";
100 case MXLND_MSG_CONN_REQ:
101 return "MXLND_MSG_CONN_REQ";
102 case MXLND_MSG_CONN_ACK:
103 return "MXLND_MSG_CONN_ACK";
105 return "MXLND_MSG_BYE";
107 return "MXLND_MSG_NOOP";
108 case MXLND_MSG_PUT_REQ:
109 return "MXLND_MSG_PUT_REQ";
110 case MXLND_MSG_PUT_ACK:
111 return "MXLND_MSG_PUT_ACK";
112 case MXLND_MSG_PUT_DATA:
113 return "MXLND_MSG_PUT_DATA";
114 case MXLND_MSG_GET_REQ:
115 return "MXLND_MSG_GET_REQ";
116 case MXLND_MSG_GET_DATA:
117 return "MXLND_MSG_GET_DATA";
124 mxlnd_lnetmsg_to_str(int type)
128 return "LNET_MSG_ACK";
130 return "LNET_MSG_PUT";
132 return "LNET_MSG_GET";
134 return "LNET_MSG_REPLY";
136 return "LNET_MSG_HELLO";
144 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
146 u64 type = (u64) ctx->mxc_msg_type;
147 u64 err = (u64) error;
150 mxlnd_valid_msg_type(ctx->mxc_msg_type);
151 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
152 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
159 *msg_type = (u8) MXLND_MSG_TYPE(match);
160 *error = (u8) MXLND_ERROR_VAL(match);
161 *cookie = match & MXLND_MAX_COOKIE;
162 mxlnd_valid_msg_type(*msg_type);
167 mxlnd_get_idle_rx(kmx_conn_t *conn)
169 struct list_head *rxs = NULL;
170 kmx_ctx_t *rx = NULL;
172 LASSERT(conn != NULL);
174 rxs = &conn->mxk_rx_idle;
176 spin_lock(&conn->mxk_lock);
178 if (list_empty (rxs)) {
179 spin_unlock(&conn->mxk_lock);
183 rx = list_entry (rxs->next, kmx_ctx_t, mxc_list);
184 list_del_init(&rx->mxc_list);
185 spin_unlock(&conn->mxk_lock);
188 if (rx->mxc_get != rx->mxc_put) {
189 CDEBUG(D_NETERROR, "*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
190 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
191 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
192 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
193 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
194 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
195 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
196 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
197 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
198 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
201 LASSERT (rx->mxc_get == rx->mxc_put);
205 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
206 rx->mxc_state = MXLND_CTX_PREP;
207 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
213 mxlnd_put_idle_rx(kmx_ctx_t *rx)
215 kmx_conn_t *conn = rx->mxc_conn;
216 struct list_head *rxs = &conn->mxk_rx_idle;
218 LASSERT(rx->mxc_type == MXLND_REQ_RX);
223 LASSERT(rx->mxc_get == rx->mxc_put);
225 spin_lock(&conn->mxk_lock);
226 list_add(&rx->mxc_list, rxs);
227 spin_unlock(&conn->mxk_lock);
232 mxlnd_get_idle_tx(void)
234 struct list_head *tmp = &kmxlnd_data.kmx_tx_idle;
235 kmx_ctx_t *tx = NULL;
237 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
239 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
240 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
241 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
245 tmp = &kmxlnd_data.kmx_tx_idle;
246 tx = list_entry (tmp->next, kmx_ctx_t, mxc_list);
247 list_del_init(&tx->mxc_list);
249 /* Allocate a new completion cookie. It might not be needed,
250 * but we've got a lock right now and we're unlikely to
252 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
253 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
254 kmxlnd_data.kmx_tx_next_cookie = 1;
256 kmxlnd_data.kmx_tx_used++;
257 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
259 LASSERT (tx->mxc_get == tx->mxc_put);
263 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
264 LASSERT (tx->mxc_lntmsg[0] == NULL);
265 LASSERT (tx->mxc_lntmsg[1] == NULL);
267 tx->mxc_state = MXLND_CTX_PREP;
268 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
274 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
277 mxlnd_put_idle_tx(kmx_ctx_t *tx)
280 lnet_msg_t *lntmsg[2];
282 LASSERT(tx->mxc_type == MXLND_REQ_TX);
284 if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
285 kmx_conn_t *conn = tx->mxc_conn;
288 if (tx->mxc_errno != 0) result = tx->mxc_errno;
289 /* FIXME should we set mx_dis? */
290 mxlnd_conn_disconnect(conn, 0, 1);
293 lntmsg[0] = tx->mxc_lntmsg[0];
294 lntmsg[1] = tx->mxc_lntmsg[1];
299 LASSERT(tx->mxc_get == tx->mxc_put);
301 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
302 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
303 kmxlnd_data.kmx_tx_used--;
304 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
306 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
307 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
313 mxlnd_connparams_free(kmx_connparams_t *cp)
315 LASSERT(list_empty(&cp->mxr_list));
316 MXLND_FREE(cp, sizeof(*cp));
321 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
322 mx_endpoint_addr_t epa, u64 match, u32 length,
323 kmx_conn_t *conn, kmx_peer_t *peer, void *data)
325 kmx_connparams_t *c = NULL;
327 MXLND_ALLOC(c, sizeof(*c));
328 if (!c) return -ENOMEM;
330 INIT_LIST_HEAD(&c->mxr_list);
331 c->mxr_context = context;
333 c->mxr_match = match;
337 c->mxr_msg = *((kmx_msg_t *) data);
344 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
346 conn->mxk_status = status;
351 * mxlnd_conn_free_locked - free the conn
352 * @conn - a kmx_conn pointer
354 * The calling function should remove the conn from the conns list first
355 * then destroy it. Caller should have write-locked kmx_global_lock.
358 mxlnd_conn_free_locked(kmx_conn_t *conn)
360 int valid = !mxlnd_endpoint_addr_null(conn->mxk_epa);
361 kmx_peer_t *peer = conn->mxk_peer;
363 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365 list_empty (&conn->mxk_tx_free_queue) &&
366 list_empty (&conn->mxk_pending));
367 if (!list_empty(&conn->mxk_list)) {
368 list_del_init(&conn->mxk_list);
369 if (peer->mxp_conn == conn) {
370 peer->mxp_conn = NULL;
372 kmx_conn_t *temp = NULL;
374 mx_get_endpoint_addr_context(conn->mxk_epa,
377 mx_set_endpoint_addr_context(conn->mxk_epa,
381 /* unlink from global list and drop its ref */
382 list_del_init(&peer->mxp_list);
383 mxlnd_peer_decref(peer);
386 mxlnd_peer_decref(peer); /* drop conn's ref to peer */
387 if (conn->mxk_rx_pages) {
388 LASSERT (conn->mxk_rxs != NULL);
389 mxlnd_free_pages(conn->mxk_rx_pages);
393 kmx_ctx_t *rx = NULL;
395 for (i = 0; i < MXLND_RX_MSGS(); i++) {
396 rx = &conn->mxk_rxs[i];
397 if (rx->mxc_seg_list != NULL) {
398 LASSERT(rx->mxc_nseg > 0);
399 MXLND_FREE(rx->mxc_seg_list,
401 sizeof(*rx->mxc_seg_list));
404 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
407 MXLND_FREE(conn, sizeof (*conn));
413 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
417 kmx_ctx_t *ctx = NULL;
418 kmx_ctx_t *next = NULL;
419 mx_return_t mxret = MX_SUCCESS;
424 spin_lock(&conn->mxk_lock);
425 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
426 list_del_init(&ctx->mxc_list);
427 if (ctx->mxc_type == MXLND_REQ_RX) {
429 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
432 if (mxret != MX_SUCCESS) {
433 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
436 ctx->mxc_errno = -ECONNABORTED;
437 ctx->mxc_state = MXLND_CTX_CANCELED;
438 spin_unlock(&conn->mxk_lock);
439 spin_lock(&kmxlnd_data.kmx_conn_lock);
440 /* we may be holding the global lock,
441 * move to orphan list so that it can free it */
442 list_add_tail(&ctx->mxc_list,
443 &kmxlnd_data.kmx_orphan_msgs);
445 spin_unlock(&kmxlnd_data.kmx_conn_lock);
446 spin_lock(&conn->mxk_lock);
451 spin_unlock(&conn->mxk_lock);
459 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
462 struct list_head *tmp = NULL;
464 spin_lock(&conn->mxk_lock);
465 while (!list_empty(&conn->mxk_tx_free_queue) ||
466 !list_empty(&conn->mxk_tx_credit_queue)) {
468 kmx_ctx_t *tx = NULL;
470 if (!list_empty(&conn->mxk_tx_free_queue)) {
471 tmp = &conn->mxk_tx_free_queue;
473 tmp = &conn->mxk_tx_credit_queue;
476 tx = list_entry(tmp->next, kmx_ctx_t, mxc_list);
477 list_del_init(&tx->mxc_list);
478 spin_unlock(&conn->mxk_lock);
479 tx->mxc_errno = -ECONNABORTED;
480 tx->mxc_state = MXLND_CTX_CANCELED;
481 /* move to orphan list and then abort */
482 spin_lock(&kmxlnd_data.kmx_conn_lock);
483 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
484 spin_unlock(&kmxlnd_data.kmx_conn_lock);
486 spin_lock(&conn->mxk_lock);
488 spin_unlock(&conn->mxk_lock);
494 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
496 u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
497 (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
499 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
500 epa, match, NULL, NULL);
505 * mxlnd_conn_disconnect - shutdown a connection
506 * @conn - a kmx_conn pointer
507 * @mx_dis - call mx_disconnect()
508 * @send_bye - send peer a BYE msg
510 * This function sets the status to DISCONNECT, completes queued
511 * txs with failure, calls mx_disconnect, which will complete
512 * pending txs and matched rxs with failure.
515 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
517 mx_endpoint_addr_t epa = conn->mxk_epa;
518 int valid = !mxlnd_endpoint_addr_null(epa);
521 spin_lock(&conn->mxk_lock);
522 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
523 spin_unlock(&conn->mxk_lock);
526 mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
527 conn->mxk_timeout = 0;
528 spin_unlock(&conn->mxk_lock);
530 count = mxlnd_cancel_queued_txs(conn);
531 count += mxlnd_conn_cancel_pending_rxs(conn);
534 up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
536 if (send_bye && valid &&
537 conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
538 /* send a BYE to the peer */
539 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
540 libcfs_nid2str(conn->mxk_peer->mxp_nid));
541 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
542 /* wait to allow the peer to ack our message */
543 mxlnd_sleep(msecs_to_jiffies(20));
546 if (atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
547 unsigned long last_msg = 0;
549 /* notify LNET that we are giving up on this peer */
550 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
551 last_msg = conn->mxk_last_rx;
553 last_msg = conn->mxk_last_tx;
555 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
557 if (mx_dis && valid &&
558 (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
559 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
561 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
567 * mxlnd_conn_alloc - allocate and initialize a new conn struct
568 * @connp - address of a kmx_conn pointer
569 * @peer - owning kmx_peer
571 * Returns 0 on success and -ENOMEM on failure
574 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
581 kmx_conn_t *conn = NULL;
582 kmx_pages_t *pages = NULL;
583 struct page *page = NULL;
584 kmx_ctx_t *rx = NULL;
586 LASSERT(peer != NULL);
588 MXLND_ALLOC(conn, sizeof (*conn));
590 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
593 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
595 memset(conn, 0, sizeof(*conn));
597 ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
599 CERROR("Can't allocate rx pages\n");
600 MXLND_FREE(conn, sizeof(*conn));
603 conn->mxk_rx_pages = pages;
605 MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
606 if (conn->mxk_rxs == NULL) {
607 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
608 mxlnd_free_pages(pages);
609 MXLND_FREE(conn, sizeof(*conn));
613 memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
615 conn->mxk_peer = peer;
616 INIT_LIST_HEAD(&conn->mxk_list);
617 INIT_LIST_HEAD(&conn->mxk_zombie);
618 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
619 and one for the caller */
620 if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
624 /* this is localhost, set the epa and status as up */
625 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
626 conn->mxk_epa = kmxlnd_data.kmx_epa;
627 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
628 peer->mxp_reconnect_time = 0;
629 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
630 peer->mxp_nic_id = nic_id;
631 peer->mxp_ep_id = ep_id;
632 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
633 conn->mxk_timeout = 0;
635 /* conn->mxk_incarnation = 0 - will be set by peer */
636 /* conn->mxk_sid = 0 - will be set by peer */
637 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
638 /* mxk_epa - to be set after mx_iconnect() */
640 spin_lock_init(&conn->mxk_lock);
641 /* conn->mxk_timeout = 0 */
642 /* conn->mxk_last_tx = 0 */
643 /* conn->mxk_last_rx = 0 */
644 INIT_LIST_HEAD(&conn->mxk_rx_idle);
646 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
647 /* mxk_outstanding = 0 */
649 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
650 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
651 /* conn->mxk_ntx_msgs = 0 */
652 /* conn->mxk_ntx_data = 0 */
653 /* conn->mxk_ntx_posted = 0 */
654 /* conn->mxk_data_posted = 0 */
655 INIT_LIST_HEAD(&conn->mxk_pending);
657 for (i = 0; i < MXLND_RX_MSGS(); i++) {
659 rx = &conn->mxk_rxs[i];
660 rx->mxc_type = MXLND_REQ_RX;
661 INIT_LIST_HEAD(&rx->mxc_list);
663 /* map mxc_msg to page */
664 page = pages->mxg_pages[ipage];
665 addr = page_address(page);
666 LASSERT(addr != NULL);
667 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
668 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
672 rx->mxc_nid = peer->mxp_nid;
676 offset += MXLND_MSG_SIZE;
677 LASSERT (offset <= PAGE_SIZE);
679 if (offset == PAGE_SIZE) {
682 LASSERT (ipage <= MXLND_TX_MSG_PAGES());
685 list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
690 mxlnd_peer_addref(peer); /* add a ref for this conn */
692 /* add to front of peer's conns list */
693 list_add(&conn->mxk_list, &peer->mxp_conns);
694 peer->mxp_conn = conn;
699 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
702 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
705 ret = mxlnd_conn_alloc_locked(connp, peer);
706 write_unlock(g_lock);
711 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
714 kmx_conn_t *conn = ctx->mxc_conn;
716 ctx->mxc_state = MXLND_CTX_PENDING;
718 spin_lock(&conn->mxk_lock);
719 if (conn->mxk_status >= MXLND_CONN_INIT) {
720 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
721 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
722 conn->mxk_timeout = ctx->mxc_deadline;
725 ctx->mxc_state = MXLND_CTX_COMPLETED;
728 spin_unlock(&conn->mxk_lock);
734 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
736 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
737 ctx->mxc_state == MXLND_CTX_COMPLETED);
738 if (ctx->mxc_state != MXLND_CTX_PENDING &&
739 ctx->mxc_state != MXLND_CTX_COMPLETED) {
740 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
741 mxlnd_ctxstate_to_str(ctx->mxc_state));
743 ctx->mxc_state = MXLND_CTX_COMPLETED;
744 if (!list_empty(&ctx->mxc_list)) {
745 kmx_conn_t *conn = ctx->mxc_conn;
746 kmx_ctx_t *next = NULL;
748 LASSERT(conn != NULL);
749 spin_lock(&conn->mxk_lock);
750 list_del_init(&ctx->mxc_list);
751 conn->mxk_timeout = 0;
752 if (!list_empty(&conn->mxk_pending)) {
753 next = list_entry(conn->mxk_pending.next, kmx_ctx_t, mxc_list);
754 conn->mxk_timeout = next->mxc_deadline;
756 spin_unlock(&conn->mxk_lock);
762 * mxlnd_peer_free - free the peer
763 * @peer - a kmx_peer pointer
765 * The calling function should decrement the rxs, drain the tx queues and
766 * remove the peer from the peers list first then destroy it.
769 mxlnd_peer_free(kmx_peer_t *peer)
771 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
773 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
775 if (!list_empty(&peer->mxp_list)) {
776 /* assume we are locked */
777 list_del_init(&peer->mxp_list);
780 MXLND_FREE(peer, sizeof (*peer));
781 atomic_dec(&kmxlnd_data.kmx_npeers);
786 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
788 int ret = -EHOSTUNREACH;
789 unsigned char *haddr = NULL;
790 struct net_device *dev = NULL;
791 struct neighbour *n = NULL;
792 __be32 dst_ip = htonl(ip);
794 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
798 haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
800 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
803 if (n->nud_state & NUD_VALID) {
804 memcpy(haddr, n->ha, dev->addr_len);
816 /* We only want the MAC address of the peer's Myricom NIC. We
817 * require that each node has the IPoMX interface (myriN) up.
818 * We will not pass any traffic over IPoMX, but it allows us
819 * to get the MAC address. */
821 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
827 cfs_socket_t *sock = NULL;
830 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
831 ret = mxlnd_lookup_mac(ip, &tmp_id);
835 /* not found, try to connect (force an arp) */
836 ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
837 if (ret == -ECONNREFUSED) {
838 /* peer is there, get the MAC address */
839 mxlnd_lookup_mac(ip, &tmp_id);
843 } else if (ret == -EHOSTUNREACH && try < tries) {
844 /* add a little backoff */
845 CDEBUG(D_NET, "sleeping for %d jiffies\n", HZ/4);
849 } while (try++ < tries);
850 CDEBUG(D_NET, "done trying. ret = %d\n", ret);
854 #ifdef __LITTLE_ENDIAN
855 *nic_id = ___arch__swab64(tmp_id);
863 * mxlnd_peer_alloc - allocate and initialize a new peer struct
864 * @peerp - address of a kmx_peer pointer
865 * @nid - LNET node id
867 * Returns 0 on success and -ENOMEM on failure
870 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
873 u32 ip = LNET_NIDADDR(nid);
874 kmx_peer_t *peer = NULL;
876 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
878 MXLND_ALLOC(peer, sizeof (*peer));
880 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
883 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
885 memset(peer, 0, sizeof(*peer));
887 INIT_LIST_HEAD(&peer->mxp_list);
889 /* peer->mxp_ni unused - may be used for multi-rail */
890 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
892 peer->mxp_board = board;
893 peer->mxp_ep_id = ep_id;
894 peer->mxp_nic_id = nic_id;
896 INIT_LIST_HEAD(&peer->mxp_conns);
897 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
899 mxlnd_peer_decref(peer);
902 INIT_LIST_HEAD(&peer->mxp_tx_queue);
904 if (peer->mxp_nic_id != 0ULL)
905 nic_id = peer->mxp_nic_id;
907 if (nic_id == 0ULL) {
908 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
910 peer->mxp_nic_id = nic_id;
911 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
915 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
917 /* peer->mxp_reconnect_time = 0 */
918 /* peer->mxp_incompatible = 0 */
924 static inline kmx_peer_t *
925 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
929 kmx_peer_t *peer = NULL;
931 hash = mxlnd_nid_to_hash(nid);
933 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
934 if (peer->mxp_nid == nid) {
936 mxlnd_peer_addref(peer);
940 return (found ? peer : NULL);
944 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
948 kmx_peer_t *peer = NULL;
949 kmx_peer_t *old = NULL;
950 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
953 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
955 if ((peer && peer->mxp_conn) || /* found peer with conn or */
956 (!peer && !create)) { /* did not find peer and do not create one */
963 /* if peer but _not_ conn */
964 if (peer && !peer->mxp_conn) {
967 if (!peer->mxp_conn) { /* check again */
968 /* create the conn */
969 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
971 /* we tried, return the peer only.
972 * the caller needs to see if the conn exists */
973 CDEBUG(D_NETERROR, "%s: %s could not alloc conn\n",
974 __func__, libcfs_nid2str(peer->mxp_nid));
976 /* drop extra conn ref */
977 mxlnd_conn_decref(peer->mxp_conn);
980 write_unlock(g_lock);
985 /* peer not found and we need to create one */
986 hash = mxlnd_nid_to_hash(nid);
988 /* create peer (and conn) */
989 /* adds conn ref for peer and one for this function */
990 ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
991 *kmxlnd_tunables.kmx_ep_id, 0ULL);
992 if (ret != 0) /* no memory, peer is NULL */
998 old = mxlnd_find_peer_by_nid_locked(nid);
1000 /* someone already created one */
1001 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1002 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1003 mxlnd_peer_decref(peer);
1006 /* no other peer, use this one */
1007 list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]);
1008 atomic_inc(&kmxlnd_data.kmx_npeers);
1009 mxlnd_peer_addref(peer);
1010 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1013 write_unlock(g_lock);
1019 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1021 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1022 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1023 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1024 tx->mxc_msg_type == MXLND_MSG_NOOP);
1028 * mxlnd_init_msg - set type and number of bytes
1029 * @msg - msg pointer
1030 * @type - of message
1031 * @body_nob - bytes in msg body
1034 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1036 msg->mxm_type = type;
1037 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
1041 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1043 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
1044 kmx_msg_t *msg = NULL;
1046 LASSERT (tx != NULL);
1047 LASSERT (nob <= MXLND_MSG_SIZE);
1050 /* tx->mxc_peer should have already been set if we know it */
1051 tx->mxc_msg_type = type;
1053 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1054 tx->mxc_seg.segment_length = nob;
1055 tx->mxc_pin_type = MX_PIN_PHYSICAL;
1058 msg->mxm_type = type;
1065 mxlnd_cksum (void *ptr, int nob)
1071 sum = ((sum << 1) | (sum >> 31)) + *c++;
1073 /* ensure I don't return 0 (== no checksum) */
1074 return (sum == 0) ? 1 : sum;
1078 * mxlnd_pack_msg_locked - complete msg info
1082 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1084 kmx_msg_t *msg = tx->mxc_msg;
1086 /* type and nob should already be set in init_msg() */
1087 msg->mxm_magic = MXLND_MSG_MAGIC;
1088 msg->mxm_version = MXLND_MSG_VERSION;
1090 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1091 * return credits as well */
1092 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1093 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1094 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
1095 tx->mxc_conn->mxk_outstanding = 0;
1097 msg->mxm_credits = 0;
1101 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
1102 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1103 msg->mxm_dstnid = tx->mxc_nid;
1104 /* if it is a new peer, the dststamp will be 0 */
1105 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1107 if (*kmxlnd_tunables.kmx_cksum) {
1108 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1113 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1115 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
1116 __u32 msg_cksum = 0;
1120 /* 6 bytes are enough to have received magic + version */
1122 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
1126 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1128 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1131 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
1135 if (msg->mxm_version !=
1136 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1137 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
1141 if (nob < hdr_size) {
1142 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
1146 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1147 if (msg_nob > nob) {
1148 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
1152 /* checksum must be computed with mxm_cksum zero and BEFORE anything
1154 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1156 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1157 CDEBUG(D_NETERROR, "Bad checksum\n");
1160 msg->mxm_cksum = msg_cksum;
1163 /* leave magic unflipped as a clue to peer endianness */
1164 __swab16s(&msg->mxm_version);
1165 CLASSERT (sizeof(msg->mxm_type) == 1);
1166 CLASSERT (sizeof(msg->mxm_credits) == 1);
1167 msg->mxm_nob = msg_nob;
1168 __swab64s(&msg->mxm_srcnid);
1169 __swab64s(&msg->mxm_srcstamp);
1170 __swab64s(&msg->mxm_dstnid);
1171 __swab64s(&msg->mxm_dststamp);
1174 if (msg->mxm_srcnid == LNET_NID_ANY) {
1175 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1179 switch (msg->mxm_type) {
1181 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
1184 case MXLND_MSG_NOOP:
1187 case MXLND_MSG_EAGER:
1188 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1189 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
1190 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1195 case MXLND_MSG_PUT_REQ:
1196 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1197 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
1198 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1202 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1205 case MXLND_MSG_PUT_ACK:
1206 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1207 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1208 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1212 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1213 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1217 case MXLND_MSG_GET_REQ:
1218 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1219 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1220 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1224 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1228 case MXLND_MSG_CONN_REQ:
1229 case MXLND_MSG_CONN_ACK:
1230 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1231 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1232 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1236 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1237 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1247 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1251 * @length - length of incoming message
1252 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1254 * The caller gets the rx and sets nid, peer and conn if known.
1256 * Returns 0 on success and -1 on failure
1259 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1262 mx_return_t mxret = MX_SUCCESS;
1263 uint64_t mask = ~(MXLND_ERROR_MASK);
1265 rx->mxc_msg_type = msg_type;
1266 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1267 rx->mxc_cookie = cookie;
1268 /* rx->mxc_match may already be set */
1269 /* rx->mxc_seg.segment_ptr is already set */
1270 rx->mxc_seg.segment_length = length;
1271 ret = mxlnd_q_pending_ctx(rx);
1273 /* the caller is responsible for calling conn_decref() if needed */
1276 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1277 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1278 if (mxret != MX_SUCCESS) {
1279 mxlnd_deq_pending_ctx(rx);
1280 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1281 mx_strerror(mxret), (int) mxret);
1289 * mxlnd_unexpected_recv - this is the callback function that will handle
1290 * unexpected receives
1291 * @context - NULL, ignore
1292 * @source - the peer's mx_endpoint_addr_t
1293 * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1294 * @length - length of incoming message
1295 * @data_if_available - used for CONN_[REQ|ACK]
1297 * If it is an eager-sized msg, we will call recv_msg() with the actual
1298 * length. If it is a large message, we will call recv_msg() with a
1299 * length of 0 bytes to drop it because we should never have a large,
1300 * unexpected message.
1302 * NOTE - The MX library blocks until this function completes. Make it as fast as
1303 * possible. DO NOT allocate memory which can block!
1305 * If we cannot get a rx or the conn is closed, drop the message on the floor
1306 * (i.e. recv 0 bytes and ignore).
1308 mx_unexp_handler_action_t
1309 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1310 uint64_t match_value, uint32_t length, void *data_if_available)
1313 kmx_ctx_t *rx = NULL;
1318 kmx_conn_t *conn = NULL;
1319 kmx_peer_t *peer = NULL;
1324 /* TODO this will change to the net struct */
1325 if (context != NULL) {
1326 CDEBUG(D_NETERROR, "non-NULL context\n");
1330 CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1333 mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1334 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1335 read_lock(&kmxlnd_data.kmx_global_lock);
1336 mx_get_endpoint_addr_context(source, (void **) &conn);
1338 mxlnd_conn_addref(conn); /* add ref for this function */
1339 peer = conn->mxk_peer;
1341 read_unlock(&kmxlnd_data.kmx_global_lock);
1343 if (msg_type == MXLND_MSG_BYE) {
1345 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1346 libcfs_nid2str(peer->mxp_nid));
1347 mxlnd_conn_disconnect(conn, 1, 0);
1348 mxlnd_conn_decref(conn); /* drop ref taken above */
1350 return MX_RECV_FINISHED;
1353 if (msg_type == MXLND_MSG_CONN_REQ) {
1354 kmx_connparams_t *cp = NULL;
1355 const int expected = offsetof(kmx_msg_t, mxm_u) +
1356 sizeof(kmx_connreq_msg_t);
1358 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1359 if (unlikely(length != expected || !data_if_available)) {
1360 CDEBUG(D_NETERROR, "received invalid CONN_REQ from %llx "
1361 "length=%d (expected %d)\n", nic_id, length, expected);
1362 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1363 return MX_RECV_FINISHED;
1366 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1367 conn, peer, data_if_available);
1368 if (unlikely(ret != 0)) {
1369 CDEBUG(D_NETERROR, "unable to alloc CONN_REQ from %llx:%d\n",
1371 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1372 return MX_RECV_FINISHED;
1374 spin_lock(&kmxlnd_data.kmx_conn_lock);
1375 list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1376 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1377 up(&kmxlnd_data.kmx_conn_sem);
1378 return MX_RECV_FINISHED;
1380 if (msg_type == MXLND_MSG_CONN_ACK) {
1381 kmx_connparams_t *cp = NULL;
1382 const int expected = offsetof(kmx_msg_t, mxm_u) +
1383 sizeof(kmx_connreq_msg_t);
1386 if (unlikely(error != 0)) {
1387 CDEBUG(D_NETERROR, "received CONN_ACK from %s "
1389 libcfs_nid2str(peer->mxp_nid), (int) error);
1390 mxlnd_conn_disconnect(conn, 1, 0);
1391 } else if (unlikely(length != expected || !data_if_available)) {
1392 CDEBUG(D_NETERROR, "received %s CONN_ACK from %s "
1393 "length=%d (expected %d)\n",
1394 data_if_available ? "short" : "missing",
1395 libcfs_nid2str(peer->mxp_nid), length, expected);
1396 mxlnd_conn_disconnect(conn, 1, 1);
1398 /* peer is ready for messages */
1399 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1400 conn, peer, data_if_available);
1401 if (unlikely(ret != 0)) {
1402 CDEBUG(D_NETERROR, "unable to alloc kmx_connparams_t"
1403 " from %llx:%d\n", nic_id, ep_id);
1404 mxlnd_conn_disconnect(conn, 1, 1);
1406 spin_lock(&kmxlnd_data.kmx_conn_lock);
1407 list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1408 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1409 up(&kmxlnd_data.kmx_conn_sem);
1412 mxlnd_conn_decref(conn); /* drop ref taken above */
1414 return MX_RECV_FINISHED;
1417 /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1419 LASSERT(peer != NULL && conn != NULL);
1421 rx = mxlnd_get_idle_rx(conn);
1423 if (length <= MXLND_MSG_SIZE) {
1424 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1426 CDEBUG(D_NETERROR, "unexpected large receive with "
1427 "match_value=0x%llx length=%d\n",
1428 match_value, length);
1429 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1433 /* hold conn ref until rx completes */
1434 rx->mxc_conn = conn;
1435 rx->mxc_peer = peer;
1436 rx->mxc_nid = peer->mxp_nid;
1438 CDEBUG(D_NETERROR, "could not post receive\n");
1439 mxlnd_put_idle_rx(rx);
1443 /* Encountered error, drop incoming message on the floor */
1444 /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1445 * uses the standard code path and acks the sender normally */
1447 if (rx == NULL || ret != 0) {
1448 mxlnd_conn_decref(conn); /* drop ref taken above */
1450 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx"
1451 " 0x%llx from %s\n", match_value,
1452 libcfs_nid2str(peer->mxp_nid));
1455 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1457 seg.segment_ptr = 0ULL;
1458 seg.segment_length = 0;
1459 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1460 match_value, ~0ULL, NULL, NULL);
1463 return MX_RECV_CONTINUE;
1468 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1472 kmx_peer_t *peer = NULL;
1474 read_lock(&kmxlnd_data.kmx_global_lock);
1475 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1476 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
1478 *nidp = peer->mxp_nid;
1479 *count = atomic_read(&peer->mxp_refcount);
1485 read_unlock(&kmxlnd_data.kmx_global_lock);
1491 mxlnd_del_peer_locked(kmx_peer_t *peer)
1493 if (peer->mxp_conn) {
1494 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1496 list_del_init(&peer->mxp_list); /* remove from the global list */
1497 mxlnd_peer_decref(peer); /* drop global list ref */
1503 mxlnd_del_peer(lnet_nid_t nid)
1507 kmx_peer_t *peer = NULL;
1508 kmx_peer_t *next = NULL;
1510 if (nid != LNET_NID_ANY) {
1511 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1513 write_lock(&kmxlnd_data.kmx_global_lock);
1514 if (nid != LNET_NID_ANY) {
1518 mxlnd_peer_decref(peer); /* and drops it */
1519 mxlnd_del_peer_locked(peer);
1521 } else { /* LNET_NID_ANY */
1522 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1523 list_for_each_entry_safe(peer, next,
1524 &kmxlnd_data.kmx_peers[i], mxp_list) {
1525 mxlnd_del_peer_locked(peer);
1529 write_unlock(&kmxlnd_data.kmx_global_lock);
1535 mxlnd_get_conn_by_idx(int index)
1538 kmx_peer_t *peer = NULL;
1539 kmx_conn_t *conn = NULL;
1541 read_lock(&kmxlnd_data.kmx_global_lock);
1542 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1543 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
1544 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1549 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1550 read_unlock(&kmxlnd_data.kmx_global_lock);
1555 read_unlock(&kmxlnd_data.kmx_global_lock);
1561 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1563 kmx_conn_t *conn = NULL;
1564 kmx_conn_t *next = NULL;
1566 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1567 mxlnd_conn_disconnect(conn, 0, 1);
1573 mxlnd_close_matching_conns(lnet_nid_t nid)
1577 kmx_peer_t *peer = NULL;
1579 write_lock(&kmxlnd_data.kmx_global_lock);
1580 if (nid != LNET_NID_ANY) {
1581 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1585 mxlnd_close_matching_conns_locked(peer);
1586 mxlnd_peer_decref(peer); /* and drops it here */
1588 } else { /* LNET_NID_ANY */
1589 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1590 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list)
1591 mxlnd_close_matching_conns_locked(peer);
1594 write_unlock(&kmxlnd_data.kmx_global_lock);
1600 * mxlnd_ctl - modify MXLND parameters
1601 * @ni - LNET interface handle
1602 * @cmd - command to change
1603 * @arg - the ioctl data
1606 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1608 struct libcfs_ioctl_data *data = arg;
1611 LASSERT (ni == kmxlnd_data.kmx_ni);
1614 case IOC_LIBCFS_GET_PEER: {
1618 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1619 data->ioc_nid = nid;
1620 data->ioc_count = count;
1623 case IOC_LIBCFS_DEL_PEER: {
1624 ret = mxlnd_del_peer(data->ioc_nid);
1627 case IOC_LIBCFS_GET_CONN: {
1628 kmx_conn_t *conn = NULL;
1630 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1635 data->ioc_nid = conn->mxk_peer->mxp_nid;
1636 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1640 case IOC_LIBCFS_CLOSE_CONNECTION: {
1641 ret = mxlnd_close_matching_conns(data->ioc_nid);
1645 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1653 * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1656 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1659 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1661 u8 msg_type = tx->mxc_msg_type;
1662 kmx_conn_t *conn = tx->mxc_conn;
1664 LASSERT (msg_type != 0);
1665 LASSERT (tx->mxc_nid != 0);
1666 LASSERT (tx->mxc_peer != NULL);
1667 LASSERT (tx->mxc_conn != NULL);
1669 tx->mxc_incarnation = conn->mxk_incarnation;
1671 if (msg_type != MXLND_MSG_PUT_DATA &&
1672 msg_type != MXLND_MSG_GET_DATA) {
1674 if (mxlnd_tx_requires_credit(tx)) {
1675 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1676 conn->mxk_ntx_msgs++;
1677 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1678 msg_type == MXLND_MSG_CONN_ACK) {
1679 /* put conn msgs at the front of the queue */
1680 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1682 /* PUT_ACK, PUT_NAK */
1683 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1684 conn->mxk_ntx_msgs++;
1688 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1689 conn->mxk_ntx_data++;
1696 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1699 * Add the tx to the peer's msg or data queue
1702 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1704 LASSERT(tx->mxc_peer != NULL);
1705 LASSERT(tx->mxc_conn != NULL);
1706 spin_lock(&tx->mxc_conn->mxk_lock);
1707 mxlnd_peer_queue_tx_locked(tx);
1708 spin_unlock(&tx->mxc_conn->mxk_lock);
1714 * mxlnd_queue_tx - add the tx to the global tx queue
1717 * Add the tx to the global queue and up the tx_queue_sem
1720 mxlnd_queue_tx(kmx_ctx_t *tx)
1722 kmx_peer_t *peer = tx->mxc_peer;
1723 LASSERT (tx->mxc_nid != 0);
1726 if (peer->mxp_incompatible &&
1727 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1728 /* let this fail now */
1729 tx->mxc_errno = -ECONNABORTED;
1730 mxlnd_conn_decref(peer->mxp_conn);
1731 mxlnd_put_idle_tx(tx);
1734 if (tx->mxc_conn == NULL) {
1736 kmx_conn_t *conn = NULL;
1738 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1740 tx->mxc_errno = ret;
1741 mxlnd_put_idle_tx(tx);
1744 tx->mxc_conn = conn;
1745 mxlnd_peer_decref(peer); /* and takes it from peer */
1747 LASSERT(tx->mxc_conn != NULL);
1748 mxlnd_peer_queue_tx(tx);
1749 mxlnd_check_sends(peer);
1751 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1752 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1753 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1754 up(&kmxlnd_data.kmx_tx_queue_sem);
1761 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1768 int first_iov_offset = 0;
1769 int first_found = 0;
1771 int last_iov_length = 0;
1772 mx_ksegment_t *seg = NULL;
1774 if (niov == 0) return 0;
1775 LASSERT(iov != NULL);
1777 for (i = 0; i < niov; i++) {
1778 sum = old_sum + (u32) iov[i].iov_len;
1779 if (!first_found && (sum > offset)) {
1781 first_iov_offset = offset - old_sum;
1783 sum = (u32) iov[i].iov_len - first_iov_offset;
1788 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1789 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1794 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1795 nseg = last_iov - first_iov + 1;
1798 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1800 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1803 memset(seg, 0, nseg * sizeof(*seg));
1804 ctx->mxc_nseg = nseg;
1806 for (i = 0; i < nseg; i++) {
1807 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1808 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1810 seg[i].segment_ptr += (u64) first_iov_offset;
1811 seg[i].segment_length -= (u32) first_iov_offset;
1813 if (i == (nseg - 1)) {
1814 seg[i].segment_length = (u32) last_iov_length;
1816 sum += seg[i].segment_length;
1818 ctx->mxc_seg_list = seg;
1819 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1820 #ifdef MX_PIN_FULLPAGES
1821 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1823 LASSERT(nob == sum);
1828 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1834 int first_kiov = -1;
1835 int first_kiov_offset = 0;
1836 int first_found = 0;
1838 int last_kiov_length = 0;
1839 mx_ksegment_t *seg = NULL;
1841 if (niov == 0) return 0;
1842 LASSERT(kiov != NULL);
1844 for (i = 0; i < niov; i++) {
1845 sum = old_sum + kiov[i].kiov_len;
1846 if (i == 0) sum -= kiov[i].kiov_offset;
1847 if (!first_found && (sum > offset)) {
1849 first_kiov_offset = offset - old_sum;
1850 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1852 sum = kiov[i].kiov_len - first_kiov_offset;
1857 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1858 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1863 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1864 nseg = last_kiov - first_kiov + 1;
1867 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1869 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1872 memset(seg, 0, niov * sizeof(*seg));
1873 ctx->mxc_nseg = niov;
1875 for (i = 0; i < niov; i++) {
1876 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1877 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1879 seg[i].segment_ptr += (u64) first_kiov_offset;
1880 /* we have to add back the original kiov_offset */
1881 seg[i].segment_length -= first_kiov_offset +
1882 kiov[first_kiov].kiov_offset;
1884 if (i == (nseg - 1)) {
1885 seg[i].segment_length = last_kiov_length;
1887 sum += seg[i].segment_length;
1889 ctx->mxc_seg_list = seg;
1890 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1891 #ifdef MX_PIN_FULLPAGES
1892 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1894 LASSERT(nob == sum);
1899 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1901 LASSERT(type == MXLND_MSG_PUT_ACK);
1902 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1903 tx->mxc_cookie = cookie;
1904 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1905 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1906 tx->mxc_match = mxlnd_create_match(tx, status);
1913 * mxlnd_send_data - get tx, map [k]iov, queue tx
1920 * This setups the DATA send for PUT or GET.
1922 * On success, it queues the tx, on failure it calls lnet_finalize()
1925 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1928 lnet_process_id_t target = lntmsg->msg_target;
1929 unsigned int niov = lntmsg->msg_niov;
1930 struct iovec *iov = lntmsg->msg_iov;
1931 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1932 unsigned int offset = lntmsg->msg_offset;
1933 unsigned int nob = lntmsg->msg_len;
1934 kmx_ctx_t *tx = NULL;
1936 LASSERT(lntmsg != NULL);
1937 LASSERT(peer != NULL);
1938 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1939 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1941 tx = mxlnd_get_idle_tx();
1943 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1944 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1945 libcfs_nid2str(target.nid));
1948 tx->mxc_nid = target.nid;
1949 /* NOTE called when we have a ref on the conn, get one for this tx */
1950 mxlnd_conn_addref(peer->mxp_conn);
1951 tx->mxc_peer = peer;
1952 tx->mxc_conn = peer->mxp_conn;
1953 tx->mxc_msg_type = msg_type;
1954 tx->mxc_lntmsg[0] = lntmsg;
1955 tx->mxc_cookie = cookie;
1956 tx->mxc_match = mxlnd_create_match(tx, 0);
1958 /* This setups up the mx_ksegment_t to send the DATA payload */
1960 /* do not setup the segments */
1961 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1962 "to %s?\n", libcfs_nid2str(target.nid));
1964 } else if (kiov == NULL) {
1965 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1967 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1970 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1971 libcfs_nid2str(target.nid));
1972 tx->mxc_errno = -EIO;
1979 mxlnd_conn_decref(peer->mxp_conn);
1980 mxlnd_put_idle_tx(tx);
1984 CDEBUG(D_NETERROR, "no tx avail\n");
1985 lnet_finalize(ni, lntmsg, -EIO);
1990 * mxlnd_recv_data - map [k]iov, post rx
1997 * This setups the DATA receive for PUT or GET.
1999 * On success, it returns 0, on failure it returns -1
2002 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2005 lnet_process_id_t target = lntmsg->msg_target;
2006 unsigned int niov = lntmsg->msg_niov;
2007 struct iovec *iov = lntmsg->msg_iov;
2008 lnet_kiov_t *kiov = lntmsg->msg_kiov;
2009 unsigned int offset = lntmsg->msg_offset;
2010 unsigned int nob = lntmsg->msg_len;
2011 mx_return_t mxret = MX_SUCCESS;
2012 u64 mask = ~(MXLND_ERROR_MASK);
2014 /* above assumes MXLND_MSG_PUT_DATA */
2015 if (msg_type == MXLND_MSG_GET_DATA) {
2016 niov = lntmsg->msg_md->md_niov;
2017 iov = lntmsg->msg_md->md_iov.iov;
2018 kiov = lntmsg->msg_md->md_iov.kiov;
2020 nob = lntmsg->msg_md->md_length;
2023 LASSERT(lntmsg != NULL);
2024 LASSERT(rx != NULL);
2025 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2026 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2028 rx->mxc_msg_type = msg_type;
2029 rx->mxc_state = MXLND_CTX_PENDING;
2030 rx->mxc_nid = target.nid;
2031 /* if posting a GET_DATA, we may not yet know the peer */
2032 if (rx->mxc_peer != NULL) {
2033 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2035 rx->mxc_lntmsg[0] = lntmsg;
2036 rx->mxc_cookie = cookie;
2037 rx->mxc_match = mxlnd_create_match(rx, 0);
2038 /* This setups up the mx_ksegment_t to receive the DATA payload */
2040 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2042 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2044 if (msg_type == MXLND_MSG_GET_DATA) {
2045 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2046 if (rx->mxc_lntmsg[1] == NULL) {
2047 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
2048 libcfs_nid2str(target.nid));
2053 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
2054 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2055 libcfs_nid2str(target.nid));
2058 ret = mxlnd_q_pending_ctx(rx);
2062 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2063 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2064 rx->mxc_seg_list, rx->mxc_nseg,
2065 rx->mxc_pin_type, rx->mxc_match,
2068 if (mxret != MX_SUCCESS) {
2069 if (rx->mxc_conn != NULL) {
2070 mxlnd_deq_pending_ctx(rx);
2072 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
2073 (int) mxret, libcfs_nid2str(target.nid));
2081 * mxlnd_send - the LND required send function
2086 * This must not block. Since we may not have a peer struct for the receiver,
2087 * it will append send messages on a global tx list. We will then up the
2088 * tx_queued's semaphore to notify it of the new send.
2091 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2094 int type = lntmsg->msg_type;
2095 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
2096 lnet_process_id_t target = lntmsg->msg_target;
2097 lnet_nid_t nid = target.nid;
2098 int target_is_router = lntmsg->msg_target_is_router;
2099 int routing = lntmsg->msg_routing;
2100 unsigned int payload_niov = lntmsg->msg_niov;
2101 struct iovec *payload_iov = lntmsg->msg_iov;
2102 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
2103 unsigned int payload_offset = lntmsg->msg_offset;
2104 unsigned int payload_nob = lntmsg->msg_len;
2105 kmx_ctx_t *tx = NULL;
2106 kmx_msg_t *txmsg = NULL;
2107 kmx_ctx_t *rx = (kmx_ctx_t *) private; /* for REPLY */
2108 kmx_ctx_t *rx_data = NULL;
2109 kmx_conn_t *conn = NULL;
2111 uint32_t length = 0;
2112 kmx_peer_t *peer = NULL;
2113 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2115 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2116 payload_nob, payload_niov, libcfs_id2str(target));
2118 LASSERT (payload_nob == 0 || payload_niov > 0);
2119 LASSERT (payload_niov <= LNET_MAX_IOV);
2120 /* payload is either all vaddrs or all pages */
2121 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2123 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2125 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2126 * to a new peer, so create one if not found */
2127 peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2128 if (peer == NULL || peer->mxp_conn == NULL) {
2129 /* we could not find it nor could we create one or
2130 * one exists but we cannot create a conn,
2131 * fail this message */
2133 /* found peer without conn, drop ref taken above */
2134 LASSERT(peer->mxp_conn == NULL);
2135 mxlnd_peer_decref(peer);
2140 /* we have a peer with a conn */
2142 if (unlikely(peer->mxp_incompatible)) {
2143 mxlnd_peer_decref(peer); /* drop ref taken above */
2146 conn = peer->mxp_conn;
2147 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2148 mxlnd_conn_addref(conn);
2152 read_unlock(g_lock);
2153 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2158 LASSERT(peer && conn);
2160 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2164 LASSERT (payload_nob == 0);
2167 case LNET_MSG_REPLY:
2169 /* Is the payload small enough not to need DATA? */
2170 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2171 if (nob <= MXLND_MSG_SIZE)
2172 break; /* send EAGER */
2174 tx = mxlnd_get_idle_tx();
2175 if (unlikely(tx == NULL)) {
2176 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
2177 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2178 libcfs_nid2str(nid));
2179 if (conn) mxlnd_conn_decref(conn);
2183 tx->mxc_peer = peer;
2184 tx->mxc_conn = conn;
2185 /* we added a conn ref above */
2186 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2187 txmsg = tx->mxc_msg;
2188 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2189 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2190 tx->mxc_match = mxlnd_create_match(tx, 0);
2192 /* we must post a receive _before_ sending the request.
2193 * we need to determine how much to receive, it will be either
2194 * a put_ack or a put_nak. The put_ack is larger, so use it. */
2196 rx = mxlnd_get_idle_rx(conn);
2197 if (unlikely(rx == NULL)) {
2198 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
2199 libcfs_nid2str(nid));
2200 mxlnd_put_idle_tx(tx);
2201 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2205 rx->mxc_peer = peer;
2206 mxlnd_conn_addref(conn); /* for this rx */
2207 rx->mxc_conn = conn;
2208 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2209 rx->mxc_cookie = tx->mxc_cookie;
2210 rx->mxc_match = mxlnd_create_match(rx, 0);
2212 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2213 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2214 if (unlikely(ret != 0)) {
2215 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
2216 libcfs_nid2str(nid));
2217 rx->mxc_lntmsg[0] = NULL;
2218 mxlnd_put_idle_rx(rx);
2219 mxlnd_put_idle_tx(tx);
2220 mxlnd_conn_decref(conn); /* for the rx... */
2221 mxlnd_conn_decref(conn); /* and for the tx */
2222 return -EHOSTUNREACH;
2229 if (routing || target_is_router)
2230 break; /* send EAGER */
2232 /* is the REPLY message too small for DATA? */
2233 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2234 if (nob <= MXLND_MSG_SIZE)
2235 break; /* send EAGER */
2237 /* get tx (we need the cookie) , post rx for incoming DATA,
2238 * then post GET_REQ tx */
2239 tx = mxlnd_get_idle_tx();
2240 if (unlikely(tx == NULL)) {
2241 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
2242 libcfs_nid2str(nid));
2243 mxlnd_conn_decref(conn); /* for the ref taken above */
2246 rx_data = mxlnd_get_idle_rx(conn);
2247 if (unlikely(rx_data == NULL)) {
2248 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
2249 libcfs_nid2str(nid));
2250 mxlnd_put_idle_tx(tx);
2251 mxlnd_conn_decref(conn); /* for the ref taken above */
2254 rx_data->mxc_peer = peer;
2255 /* NOTE no need to lock peer before adding conn ref since we took
2256 * a conn ref for the tx (it cannot be freed between there and here ) */
2257 mxlnd_conn_addref(conn); /* for the rx_data */
2258 rx_data->mxc_conn = conn;
2260 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2261 if (unlikely(ret != 0)) {
2262 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
2263 libcfs_nid2str(nid));
2264 mxlnd_put_idle_rx(rx_data);
2265 mxlnd_put_idle_tx(tx);
2266 mxlnd_conn_decref(conn); /* for the rx_data... */
2267 mxlnd_conn_decref(conn); /* and for the tx */
2271 tx->mxc_peer = peer;
2272 tx->mxc_conn = conn;
2273 /* conn ref taken above */
2274 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2275 txmsg = tx->mxc_msg;
2276 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2277 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2278 tx->mxc_match = mxlnd_create_match(tx, 0);
2285 mxlnd_conn_decref(conn); /* drop ref taken above */
2291 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2294 tx = mxlnd_get_idle_tx();
2295 if (unlikely(tx == NULL)) {
2296 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2297 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2298 mxlnd_conn_decref(conn); /* drop ref taken above */
2302 tx->mxc_peer = peer;
2303 tx->mxc_conn = conn;
2304 /* conn ref taken above */
2305 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2306 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2307 tx->mxc_match = mxlnd_create_match(tx, 0);
2309 txmsg = tx->mxc_msg;
2310 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2312 if (payload_kiov != NULL)
2313 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2314 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2315 payload_niov, payload_kiov, payload_offset, payload_nob);
2317 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2318 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2319 payload_niov, payload_iov, payload_offset, payload_nob);
2321 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2327 * mxlnd_recv - the LND required recv function
2338 * This must not block.
2341 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2342 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2343 unsigned int offset, unsigned int mlen, unsigned int rlen)
2348 kmx_ctx_t *rx = private;
2349 kmx_msg_t *rxmsg = rx->mxc_msg;
2350 lnet_nid_t nid = rx->mxc_nid;
2351 kmx_ctx_t *tx = NULL;
2352 kmx_msg_t *txmsg = NULL;
2353 kmx_peer_t *peer = rx->mxc_peer;
2354 kmx_conn_t *conn = peer->mxp_conn;
2356 int msg_type = rxmsg->mxm_type;
2361 LASSERT (mlen <= rlen);
2362 /* Either all pages or all vaddrs */
2363 LASSERT (!(kiov != NULL && iov != NULL));
2364 LASSERT (peer && conn);
2366 /* conn_addref(conn) already taken for the primary rx */
2369 case MXLND_MSG_EAGER:
2370 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2371 len = rx->mxc_status.xfer_length;
2372 if (unlikely(nob > len)) {
2373 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2374 libcfs_nid2str(nid), nob, len);
2380 lnet_copy_flat2kiov(niov, kiov, offset,
2381 MXLND_MSG_SIZE, rxmsg,
2382 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2385 lnet_copy_flat2iov(niov, iov, offset,
2386 MXLND_MSG_SIZE, rxmsg,
2387 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2393 case MXLND_MSG_PUT_REQ:
2394 /* we are going to reuse the rx, store the needed info */
2395 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2397 /* get tx, post rx, send PUT_ACK */
2399 tx = mxlnd_get_idle_tx();
2400 if (unlikely(tx == NULL)) {
2401 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2402 /* Not replying will break the connection */
2406 if (unlikely(mlen == 0)) {
2408 tx->mxc_peer = peer;
2409 tx->mxc_conn = conn;
2410 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2415 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2416 tx->mxc_peer = peer;
2417 tx->mxc_conn = conn;
2418 /* no need to lock peer first since we already have a ref */
2419 mxlnd_conn_addref(conn); /* for the tx */
2420 txmsg = tx->mxc_msg;
2421 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2422 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2423 tx->mxc_cookie = cookie;
2424 tx->mxc_match = mxlnd_create_match(tx, 0);
2426 /* we must post a receive _before_ sending the PUT_ACK */
2428 rx->mxc_state = MXLND_CTX_PREP;
2429 rx->mxc_peer = peer;
2430 rx->mxc_conn = conn;
2431 /* do not take another ref for this rx, it is already taken */
2432 rx->mxc_nid = peer->mxp_nid;
2433 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2434 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2436 if (unlikely(ret != 0)) {
2437 /* Notify peer that it's over */
2438 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2439 libcfs_nid2str(nid), ret);
2441 tx->mxc_state = MXLND_CTX_PREP;
2442 tx->mxc_peer = peer;
2443 tx->mxc_conn = conn;
2444 /* finalize = 0, let the PUT_ACK tx finalize this */
2445 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2446 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2447 /* conn ref already taken above */
2448 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2454 /* do not return a credit until after PUT_DATA returns */
2458 case MXLND_MSG_GET_REQ:
2459 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2461 if (likely(lntmsg != NULL)) {
2462 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2465 /* GET didn't match anything */
2466 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2467 * We have to embed the error code in the match bits.
2468 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2469 tx = mxlnd_get_idle_tx();
2470 if (unlikely(tx == NULL)) {
2471 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2472 libcfs_nid2str(nid));
2473 /* we can't get a tx, notify the peer that the GET failed */
2474 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2479 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2480 tx->mxc_state = MXLND_CTX_PENDING;
2482 tx->mxc_peer = peer;
2483 tx->mxc_conn = conn;
2484 /* no need to lock peer first since we already have a ref */
2485 mxlnd_conn_addref(conn); /* for this tx */
2486 tx->mxc_cookie = cookie;
2487 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2488 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2491 /* finalize lntmsg after tx completes */
2499 /* we received a message, increment peer's outstanding credits */
2501 spin_lock(&conn->mxk_lock);
2502 conn->mxk_outstanding++;
2503 spin_unlock(&conn->mxk_lock);
2505 /* we are done with the rx */
2506 mxlnd_put_idle_rx(rx);
2507 mxlnd_conn_decref(conn);
2510 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2512 /* we received a credit, see if we can use it to send a msg */
2513 if (credit) mxlnd_check_sends(peer);
2519 mxlnd_sleep(unsigned long timeout)
2521 set_current_state(TASK_INTERRUPTIBLE);
2522 schedule_timeout(timeout);
2527 * mxlnd_tx_queued - the generic send queue thread
2528 * @arg - thread id (as a void *)
2530 * This thread moves send messages from the global tx_queue to the owning
2531 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2532 * it to the global peer list.
2535 mxlnd_tx_queued(void *arg)
2537 long id = (long) arg;
2540 kmx_ctx_t *tx = NULL;
2541 kmx_peer_t *peer = NULL;
2542 struct list_head *queue = &kmxlnd_data.kmx_tx_queue;
2543 spinlock_t *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2544 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2546 cfs_daemonize("mxlnd_tx_queued");
2548 while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
2549 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2550 if (atomic_read(&kmxlnd_data.kmx_shutdown))
2552 if (ret != 0) // Should we check for -EINTR?
2554 spin_lock(tx_q_lock);
2555 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2556 spin_unlock(tx_q_lock);
2559 tx = list_entry (queue->next, kmx_ctx_t, mxc_list);
2560 list_del_init(&tx->mxc_list);
2561 spin_unlock(tx_q_lock);
2564 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2566 tx->mxc_peer = peer;
2568 if (peer->mxp_conn == NULL) {
2569 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2571 /* out of memory, give up and fail tx */
2572 tx->mxc_errno = -ENOMEM;
2573 mxlnd_peer_decref(peer);
2574 write_unlock(g_lock);
2575 mxlnd_put_idle_tx(tx);
2579 tx->mxc_conn = peer->mxp_conn;
2580 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2581 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2582 write_unlock(g_lock);
2588 kmx_peer_t *peer = NULL;
2589 kmx_peer_t *old = NULL;
2591 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2593 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2594 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2596 /* adds conn ref for this function */
2597 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2598 *kmxlnd_tunables.kmx_board,
2599 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2601 /* finalize message */
2602 tx->mxc_errno = ret;
2603 mxlnd_put_idle_tx(tx);
2606 tx->mxc_peer = peer;
2607 tx->mxc_conn = peer->mxp_conn;
2608 /* this tx will keep the conn ref taken in peer_alloc() */
2610 /* add peer to global peer list, but look to see
2611 * if someone already created it after we released
2614 old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2616 /* we have a peer ref on old */
2617 if (old->mxp_conn) {
2621 /* drop our ref taken above... */
2622 mxlnd_peer_decref(old);
2624 mxlnd_del_peer_locked(old);
2629 list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]);
2630 atomic_inc(&kmxlnd_data.kmx_npeers);
2633 tx->mxc_conn = old->mxp_conn;
2634 LASSERT(old->mxp_conn != NULL);
2635 mxlnd_conn_addref(old->mxp_conn);
2636 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2637 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2638 mxlnd_peer_decref(peer);
2640 write_unlock(g_lock);
2645 mxlnd_thread_stop(id);
2649 /* When calling this, we must not have the peer lock. */
2651 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2653 mx_return_t mxret = MX_SUCCESS;
2654 mx_request_t request;
2655 kmx_conn_t *conn = peer->mxp_conn;
2656 u64 match = ((u64) msg_type) << MXLND_MSG_OFFSET;
2658 /* NOTE we are holding a conn ref every time we call this function,
2659 * we do not need to lock the peer before taking another ref */
2660 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2662 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2664 if (peer->mxp_reconnect_time == 0) {
2665 peer->mxp_reconnect_time = jiffies;
2668 if (peer->mxp_nic_id == 0ULL) {
2671 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2672 &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2674 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2676 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2677 /* not mapped yet, return */
2678 spin_lock(&conn->mxk_lock);
2679 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2680 spin_unlock(&conn->mxk_lock);
2684 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2685 conn->mxk_status != MXLND_CONN_DISCONNECT) {
2686 /* give up and notify LNET */
2687 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2688 libcfs_nid2str(peer->mxp_nid));
2689 mxlnd_conn_disconnect(conn, 0, 0);
2690 mxlnd_conn_decref(conn);
2694 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2695 peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2696 (void *) peer, &request);
2697 if (unlikely(mxret != MX_SUCCESS)) {
2698 spin_lock(&conn->mxk_lock);
2699 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2700 spin_unlock(&conn->mxk_lock);
2701 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2702 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2703 mxlnd_conn_decref(conn);
2705 mx_set_request_timeout(kmxlnd_data.kmx_endpt, request, MXLND_CONNECT_TIMEOUT/HZ*1000);
2709 #define MXLND_STATS 0
2712 mxlnd_check_sends(kmx_peer_t *peer)
2716 mx_return_t mxret = MX_SUCCESS;
2717 kmx_ctx_t *tx = NULL;
2718 kmx_conn_t *conn = NULL;
2725 static unsigned long last = 0;
2728 if (unlikely(peer == NULL)) {
2729 LASSERT(peer != NULL);
2732 write_lock(&kmxlnd_data.kmx_global_lock);
2733 conn = peer->mxp_conn;
2734 /* NOTE take a ref for the duration of this function since it is called
2735 * when there might not be any queued txs for this peer */
2737 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2738 write_unlock(&kmxlnd_data.kmx_global_lock);
2741 mxlnd_conn_addref(conn); /* for duration of this function */
2743 write_unlock(&kmxlnd_data.kmx_global_lock);
2745 /* do not add another ref for this tx */
2748 /* we do not have any conns */
2749 CDEBUG(D_NETERROR, "peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2754 if (time_after(jiffies, last)) {
2755 last = jiffies + HZ;
2756 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2757 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2758 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2759 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2760 conn->mxk_ntx_data, conn->mxk_data_posted);
2764 spin_lock(&conn->mxk_lock);
2765 ntx_posted = conn->mxk_ntx_posted;
2766 credits = conn->mxk_credits;
2768 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2769 LASSERT(ntx_posted >= 0);
2771 LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2772 LASSERT(credits >= 0);
2774 /* check number of queued msgs, ignore data */
2775 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2776 /* check if any txs queued that could return credits... */
2777 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2778 /* if not, send a NOOP */
2779 tx = mxlnd_get_idle_tx();
2780 if (likely(tx != NULL)) {
2781 tx->mxc_peer = peer;
2782 tx->mxc_conn = peer->mxp_conn;
2783 mxlnd_conn_addref(conn); /* for this tx */
2784 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2785 tx->mxc_match = mxlnd_create_match(tx, 0);
2786 mxlnd_peer_queue_tx_locked(tx);
2793 /* if the peer is not ready, try to connect */
2794 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2795 conn->mxk_status == MXLND_CONN_FAIL)) {
2796 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2797 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2798 spin_unlock(&conn->mxk_lock);
2799 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2803 while (!list_empty(&conn->mxk_tx_free_queue) ||
2804 !list_empty(&conn->mxk_tx_credit_queue)) {
2805 /* We have something to send. If we have a queued tx that does not
2806 * require a credit (free), choose it since its completion will
2807 * return a credit (here or at the peer), complete a DATA or
2808 * CONN_REQ or CONN_ACK. */
2809 struct list_head *tmp_tx = NULL;
2810 if (!list_empty(&conn->mxk_tx_free_queue)) {
2811 tmp_tx = &conn->mxk_tx_free_queue;
2813 tmp_tx = &conn->mxk_tx_credit_queue;
2815 tx = list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2817 msg_type = tx->mxc_msg_type;
2819 /* don't try to send a rx */
2820 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2822 /* ensure that it is a valid msg type */
2823 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2824 msg_type == MXLND_MSG_CONN_ACK ||
2825 msg_type == MXLND_MSG_NOOP ||
2826 msg_type == MXLND_MSG_EAGER ||
2827 msg_type == MXLND_MSG_PUT_REQ ||
2828 msg_type == MXLND_MSG_PUT_ACK ||
2829 msg_type == MXLND_MSG_PUT_DATA ||
2830 msg_type == MXLND_MSG_GET_REQ ||
2831 msg_type == MXLND_MSG_GET_DATA);
2832 LASSERT(tx->mxc_peer == peer);
2833 LASSERT(tx->mxc_nid == peer->mxp_nid);
2835 credit = mxlnd_tx_requires_credit(tx);
2838 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2839 CDEBUG(D_NET, "%s: posted enough\n",
2840 libcfs_nid2str(peer->mxp_nid));
2844 if (conn->mxk_credits == 0) {
2845 CDEBUG(D_NET, "%s: no credits\n",
2846 libcfs_nid2str(peer->mxp_nid));
2850 if (conn->mxk_credits == 1 && /* last credit reserved for */
2851 conn->mxk_outstanding == 0) { /* giving back credits */
2852 CDEBUG(D_NET, "%s: not using last credit\n",
2853 libcfs_nid2str(peer->mxp_nid));
2858 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2859 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2860 msg_type == MXLND_MSG_CONN_ACK)) {
2861 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2862 mxlnd_connstatus_to_str(conn->mxk_status),
2864 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2865 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2866 time_after_eq(jiffies, tx->mxc_deadline)) {
2867 list_del_init(&tx->mxc_list);
2868 tx->mxc_errno = -ECONNABORTED;
2869 spin_unlock(&conn->mxk_lock);
2870 mxlnd_put_idle_tx(tx);
2871 mxlnd_conn_decref(conn);
2878 list_del_init(&tx->mxc_list);
2880 /* handle credits, etc now while we have the lock to avoid races */
2882 conn->mxk_credits--;
2883 conn->mxk_ntx_posted++;
2885 if (msg_type != MXLND_MSG_PUT_DATA &&
2886 msg_type != MXLND_MSG_GET_DATA) {
2887 if (msg_type != MXLND_MSG_CONN_REQ &&
2888 msg_type != MXLND_MSG_CONN_ACK) {
2889 conn->mxk_ntx_msgs--;
2892 if (tx->mxc_incarnation == 0 &&
2893 conn->mxk_incarnation != 0) {
2894 tx->mxc_incarnation = conn->mxk_incarnation;
2897 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2898 * or (2) there is a non-DATA msg that can return credits in the
2899 * queue, then drop this duplicate NOOP */
2900 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2901 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2902 (conn->mxk_ntx_msgs >= 1)) {
2903 conn->mxk_credits++;
2904 conn->mxk_ntx_posted--;
2905 spin_unlock(&conn->mxk_lock);
2906 /* redundant NOOP */
2907 mxlnd_put_idle_tx(tx);
2908 mxlnd_conn_decref(conn);
2909 CDEBUG(D_NET, "%s: redundant noop\n",
2910 libcfs_nid2str(peer->mxp_nid));
2917 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2918 (msg_type != MXLND_MSG_GET_DATA))) {
2919 mxlnd_pack_msg_locked(tx);
2924 status = conn->mxk_status;
2925 spin_unlock(&conn->mxk_lock);
2927 if (likely((status == MXLND_CONN_READY) ||
2928 (msg_type == MXLND_MSG_CONN_REQ) ||
2929 (msg_type == MXLND_MSG_CONN_ACK))) {
2931 if (msg_type != MXLND_MSG_CONN_REQ &&
2932 msg_type != MXLND_MSG_CONN_ACK) {
2933 /* add to the pending list */
2934 ret = mxlnd_q_pending_ctx(tx);
2937 tx->mxc_state = MXLND_CTX_PENDING;
2941 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2942 msg_type != MXLND_MSG_GET_DATA)) {
2943 /* send a msg style tx */
2944 LASSERT(tx->mxc_nseg == 1);
2945 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2946 CDEBUG(D_NET, "sending %s 0x%llx\n",
2947 mxlnd_msgtype_to_str(msg_type),
2949 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2958 /* send a DATA tx */
2959 spin_lock(&conn->mxk_lock);
2960 conn->mxk_ntx_data--;
2961 conn->mxk_data_posted++;
2962 spin_unlock(&conn->mxk_lock);
2963 CDEBUG(D_NET, "sending %s 0x%llx\n",
2964 mxlnd_msgtype_to_str(msg_type),
2966 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2977 mxret = MX_CONNECTION_FAILED;
2979 if (likely(mxret == MX_SUCCESS)) {
2982 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2983 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2984 libcfs_nid2str(peer->mxp_nid));
2985 /* NOTE mx_kisend() only fails if there are not enough
2986 * resources. Do not change the connection status. */
2987 if (mxret == MX_NO_RESOURCES) {
2988 tx->mxc_errno = -ENOMEM;
2990 tx->mxc_errno = -ECONNABORTED;
2993 spin_lock(&conn->mxk_lock);
2994 conn->mxk_ntx_posted--;
2995 conn->mxk_credits++;
2996 spin_unlock(&conn->mxk_lock);
2997 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2998 msg_type == MXLND_MSG_GET_DATA) {
2999 spin_lock(&conn->mxk_lock);
3000 conn->mxk_data_posted--;
3001 spin_unlock(&conn->mxk_lock);
3003 if (msg_type != MXLND_MSG_PUT_DATA &&
3004 msg_type != MXLND_MSG_GET_DATA &&
3005 msg_type != MXLND_MSG_CONN_REQ &&
3006 msg_type != MXLND_MSG_CONN_ACK) {
3007 spin_lock(&conn->mxk_lock);
3008 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3009 spin_unlock(&conn->mxk_lock);
3011 if (msg_type != MXLND_MSG_CONN_REQ &&
3012 msg_type != MXLND_MSG_CONN_ACK) {
3013 /* remove from the pending list */
3014 mxlnd_deq_pending_ctx(tx);
3016 mxlnd_put_idle_tx(tx);
3017 mxlnd_conn_decref(conn);
3020 spin_lock(&conn->mxk_lock);
3023 spin_unlock(&conn->mxk_lock);
3025 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3031 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3032 * @ctx - the tx descriptor
3034 * Determine which type of send request it was and start the next step, if needed,
3035 * or, if done, signal completion to LNET. After we are done, put back on the
3039 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3041 int code = tx->mxc_status.code;
3042 int failed = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3043 kmx_msg_t *msg = tx->mxc_msg;
3044 kmx_peer_t *peer = tx->mxc_peer;
3045 kmx_conn_t *conn = tx->mxc_conn;
3046 u8 type = tx->mxc_msg_type;
3047 int credit = mxlnd_tx_requires_credit(tx);
3048 u64 cookie = tx->mxc_cookie;
3050 CDEBUG(D_NET, "entering %s (0x%llx):\n",
3051 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3053 LASSERT (peer != NULL);
3054 LASSERT (conn != NULL);
3056 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3057 LASSERT (type == msg->mxm_type);
3061 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3063 spin_lock(&conn->mxk_lock);
3064 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3065 spin_unlock(&conn->mxk_lock);
3070 case MXLND_MSG_GET_DATA:
3071 spin_lock(&conn->mxk_lock);
3072 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3073 conn->mxk_outstanding++;
3074 conn->mxk_data_posted--;
3076 spin_unlock(&conn->mxk_lock);
3079 case MXLND_MSG_PUT_DATA:
3080 spin_lock(&conn->mxk_lock);
3081 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3082 conn->mxk_data_posted--;
3084 spin_unlock(&conn->mxk_lock);
3087 case MXLND_MSG_NOOP:
3088 case MXLND_MSG_PUT_REQ:
3089 case MXLND_MSG_PUT_ACK:
3090 case MXLND_MSG_GET_REQ:
3091 case MXLND_MSG_EAGER:
3094 case MXLND_MSG_CONN_ACK:
3095 if (peer->mxp_incompatible) {
3096 /* we sent our params, now close this conn */
3097 mxlnd_conn_disconnect(conn, 0, 1);
3099 case MXLND_MSG_CONN_REQ:
3101 CDEBUG(D_NETERROR, "%s failed with %s (%d) (errno = %d)"
3103 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3104 mx_strstatus(code), code, tx->mxc_errno,
3105 libcfs_nid2str(tx->mxc_nid));
3106 if (!peer->mxp_incompatible) {
3107 spin_lock(&conn->mxk_lock);
3108 if (code == MX_STATUS_BAD_SESSION)
3109 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3111 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3112 spin_unlock(&conn->mxk_lock);
3118 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
3123 spin_lock(&conn->mxk_lock);
3124 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3125 conn->mxk_ntx_posted--;
3127 spin_unlock(&conn->mxk_lock);
3130 mxlnd_put_idle_tx(tx);
3131 mxlnd_conn_decref(conn);
3133 mxlnd_check_sends(peer);
3135 CDEBUG(D_NET, "leaving\n");
3139 /* Handle completion of MSG or DATA rx.
3140 * CONN_REQ and CONN_ACK are handled elsewhere. */
3142 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3147 u32 nob = rx->mxc_status.xfer_length;
3148 u64 bits = rx->mxc_status.match_info;
3149 kmx_msg_t *msg = rx->mxc_msg;
3150 kmx_peer_t *peer = rx->mxc_peer;
3151 kmx_conn_t *conn = rx->mxc_conn;
3152 u8 type = rx->mxc_msg_type;
3154 lnet_msg_t *lntmsg[2];
3159 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3160 * failed GET reply */
3162 /* NOTE peer may still be NULL if it is a new peer and
3163 * conn may be NULL if this is a re-connect */
3164 if (likely(peer != NULL && conn != NULL)) {
3165 /* we have a reference on the conn */
3167 } else if (peer != NULL && conn == NULL) {
3168 /* we have a reference on the peer */
3170 } else if (peer == NULL && conn != NULL) {
3172 CERROR("rx 0x%llx from %s has conn but no peer\n",
3173 bits, libcfs_nid2str(rx->mxc_nid));
3175 } /* else peer and conn == NULL */
3177 if (conn == NULL && peer != NULL) {
3178 write_lock(&kmxlnd_data.kmx_global_lock);
3179 conn = peer->mxp_conn;
3181 mxlnd_conn_addref(conn); /* conn takes ref... */
3182 mxlnd_peer_decref(peer); /* from peer */
3186 write_unlock(&kmxlnd_data.kmx_global_lock);
3187 rx->mxc_conn = conn;
3191 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3197 if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3198 rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3199 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
3200 libcfs_nid2str(rx->mxc_nid),
3201 mx_strstatus(rx->mxc_status.code),
3202 rx->mxc_status.code);
3208 /* this may be a failed GET reply */
3209 if (type == MXLND_MSG_GET_DATA) {
3210 /* get the error (52-59) bits from the match bits */
3211 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3212 lntmsg[0] = rx->mxc_lntmsg[0];
3216 /* we had a rx complete with 0 bytes (no hdr, nothing) */
3217 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
3218 libcfs_nid2str(rx->mxc_nid));
3223 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3224 if (type == MXLND_MSG_PUT_DATA) {
3226 lntmsg[0] = rx->mxc_lntmsg[0];
3228 } else if (type == MXLND_MSG_GET_DATA) {
3230 lntmsg[0] = rx->mxc_lntmsg[0];
3231 lntmsg[1] = rx->mxc_lntmsg[1];
3235 ret = mxlnd_unpack_msg(msg, nob);
3237 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
3238 ret, libcfs_nid2str(rx->mxc_nid));
3242 type = msg->mxm_type;
3244 if (rx->mxc_nid != msg->mxm_srcnid ||
3245 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3246 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
3247 "0x%llx and rx msg dst is 0x%llx)\n",
3248 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3253 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3254 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3255 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3256 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3257 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3258 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3259 msg->mxm_srcstamp, conn->mxk_incarnation,
3260 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3265 CDEBUG(D_NET, "Received %s with %d credits\n",
3266 mxlnd_msgtype_to_str(type), msg->mxm_credits);
3268 LASSERT(peer != NULL && conn != NULL);
3269 if (msg->mxm_credits != 0) {
3270 spin_lock(&conn->mxk_lock);
3271 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3272 if ((conn->mxk_credits + msg->mxm_credits) >
3273 *kmxlnd_tunables.kmx_peercredits) {
3274 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
3275 conn->mxk_credits, msg->mxm_credits);
3277 conn->mxk_credits += msg->mxm_credits;
3278 LASSERT(conn->mxk_credits >= 0);
3279 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3281 spin_unlock(&conn->mxk_lock);
3284 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3286 case MXLND_MSG_NOOP:
3289 case MXLND_MSG_EAGER:
3290 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3291 msg->mxm_srcnid, rx, 0);
3295 case MXLND_MSG_PUT_REQ:
3296 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3297 msg->mxm_srcnid, rx, 1);
3301 case MXLND_MSG_PUT_ACK: {
3302 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3303 if (cookie > MXLND_MAX_COOKIE) {
3304 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3305 libcfs_nid2str(rx->mxc_nid));
3306 result = -((u32) MXLND_ERROR_VAL(cookie));
3307 lntmsg[0] = rx->mxc_lntmsg[0];
3309 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3310 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3311 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3316 case MXLND_MSG_GET_REQ:
3317 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3318 msg->mxm_srcnid, rx, 1);
3323 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3324 libcfs_nid2str(rx->mxc_nid));
3330 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3331 spin_lock(&conn->mxk_lock);
3332 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3333 spin_unlock(&conn->mxk_lock);
3338 spin_lock(&conn->mxk_lock);
3339 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3340 spin_unlock(&conn->mxk_lock);
3344 /* lnet_parse() failed, etc., repost now */
3345 mxlnd_put_idle_rx(rx);
3346 if (conn != NULL && credit == 1) {
3347 if (type == MXLND_MSG_PUT_DATA ||
3348 type == MXLND_MSG_EAGER ||
3349 type == MXLND_MSG_PUT_REQ ||
3350 type == MXLND_MSG_NOOP) {
3351 spin_lock(&conn->mxk_lock);
3352 conn->mxk_outstanding++;
3353 spin_unlock(&conn->mxk_lock);
3356 if (conn_ref) mxlnd_conn_decref(conn);
3357 LASSERT(peer_ref == 0);
3360 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3361 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3363 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3366 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3367 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3369 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3375 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3377 kmx_ctx_t *tx = NULL;
3378 kmx_msg_t *txmsg = NULL;
3379 kmx_conn_t *conn = peer->mxp_conn;
3383 u8 type = (msg_type == MXLND_MSG_ICON_REQ ?
3384 MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3386 /* a conn ref was taken when calling mx_iconnect(),
3387 * hold it until CONN_REQ or CONN_ACK completes */
3389 CDEBUG(D_NET, "entering\n");
3390 if (status.code != MX_STATUS_SUCCESS) {
3391 int send_bye = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3393 CDEBUG(D_NETERROR, "mx_iconnect() failed for %s with %s (%d) "
3394 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3395 mxlnd_msgtype_to_str(msg_type),
3396 mx_strstatus(status.code), status.code,
3397 libcfs_nid2str(peer->mxp_nid),
3401 spin_lock(&conn->mxk_lock);
3402 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3403 spin_unlock(&conn->mxk_lock);
3405 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT)) {
3406 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3407 mxlnd_conn_disconnect(conn, 0, send_bye);
3410 mxlnd_conn_decref(conn);
3413 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3414 write_lock(&kmxlnd_data.kmx_global_lock);
3415 spin_lock(&conn->mxk_lock);
3416 conn->mxk_epa = status.source;
3417 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3418 if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3419 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3421 spin_unlock(&conn->mxk_lock);
3422 write_unlock(&kmxlnd_data.kmx_global_lock);
3424 /* mx_iconnect() succeeded, reset delay to 0 */
3425 write_lock(&kmxlnd_data.kmx_global_lock);
3426 peer->mxp_reconnect_time = 0;
3427 peer->mxp_conn->mxk_sid = sid;
3428 write_unlock(&kmxlnd_data.kmx_global_lock);
3430 /* marshal CONN_REQ or CONN_ACK msg */
3431 /* we are still using the conn ref from iconnect() - do not take another */
3432 tx = mxlnd_get_idle_tx();
3434 CDEBUG(D_NETERROR, "Can't obtain %s tx for %s\n",
3435 mxlnd_msgtype_to_str(type),
3436 libcfs_nid2str(peer->mxp_nid));
3437 spin_lock(&conn->mxk_lock);
3438 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3439 spin_unlock(&conn->mxk_lock);
3440 mxlnd_conn_decref(conn);
3444 tx->mxc_peer = peer;
3445 tx->mxc_conn = conn;
3446 tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3447 CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3448 mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3449 txmsg = tx->mxc_msg;
3450 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3451 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3452 tx->mxc_match = mxlnd_create_match(tx, 0);
3459 * mxlnd_request_waitd - the MX request completion thread(s)
3460 * @arg - thread id (as a void *)
3462 * This thread waits for a MX completion and then completes the request.
3463 * We will create one thread per CPU.
3466 mxlnd_request_waitd(void *arg)
3468 long id = (long) arg;
3471 mx_return_t mxret = MX_SUCCESS;
3473 kmx_ctx_t *ctx = NULL;
3474 enum kmx_req_state req_type = MXLND_REQ_TX;
3475 kmx_peer_t *peer = NULL;
3476 kmx_conn_t *conn = NULL;
3481 memset(name, 0, sizeof(name));
3482 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3483 cfs_daemonize(name);
3485 memset(&status, 0, sizeof(status));
3487 CDEBUG(D_NET, "%s starting\n", name);
3489 while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3495 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3496 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3500 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3501 0ULL, 0ULL, &status, &result);
3504 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3505 0ULL, 0ULL, &status, &result);
3507 if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown)))
3511 /* nothing completed... */
3515 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3516 "match_info 0x%llx and length %d\n",
3517 mx_strstatus(status.code), status.code,
3518 (u64) status.match_info, status.msg_length);
3520 if (status.code != MX_STATUS_SUCCESS) {
3521 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3522 "match_info 0x%llx and length %d\n",
3523 mx_strstatus(status.code), status.code,
3524 (u64) status.match_info, status.msg_length);
3527 msg_type = MXLND_MSG_TYPE(status.match_info);
3529 /* This may be a mx_iconnect() request completing,
3530 * check the bit mask for CONN_REQ and CONN_ACK */
3531 if (msg_type == MXLND_MSG_ICON_REQ ||
3532 msg_type == MXLND_MSG_ICON_ACK) {
3533 peer = (kmx_peer_t*) status.context;
3534 mxlnd_handle_connect_msg(peer, msg_type, status);
3538 /* This must be a tx or rx */
3540 /* NOTE: if this is a RX from the unexpected callback, it may
3541 * have very little info. If we dropped it in unexpected_recv(),
3542 * it will not have a context. If so, ignore it. */
3543 ctx = (kmx_ctx_t *) status.context;
3546 req_type = ctx->mxc_type;
3547 conn = ctx->mxc_conn; /* this may be NULL */
3548 mxlnd_deq_pending_ctx(ctx);
3550 /* copy status to ctx->mxc_status */
3551 ctx->mxc_status = status;
3555 mxlnd_handle_tx_completion(ctx);
3558 mxlnd_handle_rx_completion(ctx);
3561 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3566 /* conn is always set except for the first CONN_REQ rx
3567 * from a new peer */
3568 if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3569 mxlnd_conn_disconnect(conn, 1, 1);
3572 CDEBUG(D_NET, "waitd() completed task\n");
3574 CDEBUG(D_NET, "%s stopping\n", name);
3575 mxlnd_thread_stop(id);
3581 mxlnd_check_timeouts(unsigned long now)
3585 unsigned long next = 0; /* jiffies */
3586 kmx_peer_t *peer = NULL;
3587 kmx_conn_t *conn = NULL;
3588 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3591 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3592 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
3594 if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3595 read_unlock(g_lock);
3599 conn = peer->mxp_conn;
3601 mxlnd_conn_addref(conn);
3606 spin_lock(&conn->mxk_lock);
3608 /* if nothing pending (timeout == 0) or
3609 * if conn is already disconnected,
3611 if (conn->mxk_timeout == 0 ||
3612 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3613 spin_unlock(&conn->mxk_lock);
3614 mxlnd_conn_decref(conn);
3618 /* we want to find the timeout that will occur first.
3619 * if it is in the future, we will sleep until then.
3620 * if it is in the past, then we will sleep one
3621 * second and repeat the process. */
3622 if ((next == 0) || (time_before(conn->mxk_timeout, next))) {
3623 next = conn->mxk_timeout;
3628 if (time_after_eq(now, conn->mxk_timeout)) {
3631 spin_unlock(&conn->mxk_lock);
3634 mxlnd_conn_disconnect(conn, 1, 1);
3636 mxlnd_conn_decref(conn);
3639 read_unlock(g_lock);
3640 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3646 mxlnd_passive_connect(kmx_connparams_t *cp)
3649 int incompatible = 0;
3654 kmx_msg_t *msg = &cp->mxr_msg;
3655 kmx_peer_t *peer = cp->mxr_peer;
3656 kmx_conn_t *conn = NULL;
3657 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3659 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3661 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3664 CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from %s\n",
3665 ret, libcfs_nid2str(peer->mxp_nid));
3667 CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from "
3668 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3672 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3673 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3674 libcfs_nid2str(msg->mxm_srcnid),
3675 libcfs_nid2str(msg->mxm_dstnid));
3678 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3679 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3681 libcfs_nid2str(msg->mxm_srcnid),
3682 msg->mxm_u.conn_req.mxcrm_queue_depth,
3683 *kmxlnd_tunables.kmx_peercredits);
3686 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3687 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3689 libcfs_nid2str(msg->mxm_srcnid),
3690 msg->mxm_u.conn_req.mxcrm_eager_size,
3691 (int) MXLND_MSG_SIZE);
3696 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3700 kmx_peer_t *existing_peer = NULL;
3702 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3704 mx_nic_id_to_board_number(nic_id, &board);
3706 /* adds conn ref for peer and one for this function */
3707 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3708 board, ep_id, 0ULL);
3712 peer->mxp_conn->mxk_sid = sid;
3713 LASSERT(peer->mxp_ep_id == ep_id);
3715 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3716 if (existing_peer) {
3717 mxlnd_conn_decref(peer->mxp_conn);
3718 mxlnd_peer_decref(peer);
3719 peer = existing_peer;
3720 mxlnd_conn_addref(peer->mxp_conn);
3721 conn = peer->mxp_conn;
3723 list_add_tail(&peer->mxp_list,
3724 &kmxlnd_data.kmx_peers[hash]);
3725 atomic_inc(&kmxlnd_data.kmx_npeers);
3727 write_unlock(g_lock);
3729 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3731 mxlnd_peer_decref(peer); /* drop ref taken above */
3732 write_unlock(g_lock);
3734 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3738 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3739 conn = peer->mxp_conn;
3740 } else { /* unexpected handler found peer */
3741 kmx_conn_t *old_conn = peer->mxp_conn;
3743 if (sid != peer->mxp_conn->mxk_sid) {
3744 /* do not call mx_disconnect() or send a BYE */
3745 mxlnd_conn_disconnect(old_conn, 0, 0);
3747 /* This allocs a conn, points peer->mxp_conn to this one.
3748 * The old conn is still on the peer->mxp_conns list.
3749 * As the pending requests complete, they will call
3750 * conn_decref() which will eventually free it. */
3751 ret = mxlnd_conn_alloc(&conn, peer);
3753 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3756 /* conn_alloc() adds one ref for the peer and one
3757 * for this function */
3760 peer->mxp_conn->mxk_sid = sid;
3763 conn = peer->mxp_conn;
3767 peer->mxp_incompatible = incompatible;
3768 write_unlock(g_lock);
3769 spin_lock(&conn->mxk_lock);
3770 conn->mxk_incarnation = msg->mxm_srcstamp;
3771 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3772 spin_unlock(&conn->mxk_lock);
3774 /* handle_conn_ack() will create the CONN_ACK msg */
3775 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3778 if (conn_ref) mxlnd_conn_decref(conn);
3780 mxlnd_connparams_free(cp);
3785 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3788 int incompatible = 0;
3792 kmx_msg_t *msg = &cp->mxr_msg;
3793 kmx_peer_t *peer = cp->mxr_peer;
3794 kmx_conn_t *conn = cp->mxr_conn;
3796 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3798 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3801 CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from %s\n",
3802 ret, libcfs_nid2str(peer->mxp_nid));
3804 CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from "
3805 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3811 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3812 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3813 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3814 libcfs_nid2str(msg->mxm_dstnid));
3818 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3819 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3820 "incompatible queue depth %d (%d wanted)\n",
3821 libcfs_nid2str(msg->mxm_srcnid),
3822 msg->mxm_u.conn_req.mxcrm_queue_depth,
3823 *kmxlnd_tunables.kmx_peercredits);
3828 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3829 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3830 "incompatible EAGER size %d (%d wanted)\n",
3831 libcfs_nid2str(msg->mxm_srcnid),
3832 msg->mxm_u.conn_req.mxcrm_eager_size,
3833 (int) MXLND_MSG_SIZE);
3838 write_lock(&kmxlnd_data.kmx_global_lock);
3839 peer->mxp_incompatible = incompatible;
3840 write_unlock(&kmxlnd_data.kmx_global_lock);
3841 spin_lock(&conn->mxk_lock);
3842 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3843 conn->mxk_outstanding = 0;
3844 conn->mxk_incarnation = msg->mxm_srcstamp;
3845 conn->mxk_timeout = 0;
3846 if (!incompatible) {
3847 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3848 libcfs_nid2str(msg->mxm_srcnid));
3849 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3851 spin_unlock(&conn->mxk_lock);
3854 mxlnd_check_sends(peer);
3858 spin_lock(&conn->mxk_lock);
3859 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3860 spin_unlock(&conn->mxk_lock);
3863 if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3865 mxlnd_connparams_free(cp);
3870 mxlnd_abort_msgs(void)
3873 struct list_head *orphans = &kmxlnd_data.kmx_orphan_msgs;
3874 spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3877 spin_lock(g_conn_lock);
3878 while (!list_empty(orphans)) {
3879 kmx_ctx_t *ctx = NULL;
3880 kmx_conn_t *conn = NULL;
3882 ctx = list_entry(orphans->next, kmx_ctx_t, mxc_list);
3883 list_del_init(&ctx->mxc_list);
3884 spin_unlock(g_conn_lock);
3886 ctx->mxc_errno = -ECONNABORTED;
3887 conn = ctx->mxc_conn;
3888 CDEBUG(D_NET, "aborting %s %s %s\n",
3889 mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3890 ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3891 libcfs_nid2str(ctx->mxc_nid));
3892 if (ctx->mxc_type == MXLND_REQ_TX) {
3893 mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3894 if (conn) mxlnd_conn_decref(conn); /* for this tx */
3896 ctx->mxc_state = MXLND_CTX_CANCELED;
3897 mxlnd_handle_rx_completion(ctx);
3901 spin_lock(g_conn_lock);
3903 spin_unlock(g_conn_lock);
3909 mxlnd_free_conn_zombies(void)
3912 struct list_head *zombies = &kmxlnd_data.kmx_conn_zombies;
3913 spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3914 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3916 /* cleanup any zombies */
3917 spin_lock(g_conn_lock);
3918 while (!list_empty(zombies)) {
3919 kmx_conn_t *conn = NULL;
3921 conn = list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3922 list_del_init(&conn->mxk_zombie);
3923 spin_unlock(g_conn_lock);
3926 mxlnd_conn_free_locked(conn);
3927 write_unlock(g_lock);
3930 spin_lock(g_conn_lock);
3932 spin_unlock(g_conn_lock);
3933 CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3938 * mxlnd_connd - handles incoming connection requests
3939 * @arg - thread id (as a void *)
3941 * This thread handles incoming connection requests
3944 mxlnd_connd(void *arg)
3946 long id = (long) arg;
3948 cfs_daemonize("mxlnd_connd");
3950 CDEBUG(D_NET, "connd starting\n");
3952 while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3954 kmx_connparams_t *cp = NULL;
3955 spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3956 struct list_head *conn_reqs = &kmxlnd_data.kmx_conn_reqs;
3958 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3960 if (atomic_read(&kmxlnd_data.kmx_shutdown))
3966 ret = mxlnd_abort_msgs();
3967 ret += mxlnd_free_conn_zombies();
3969 spin_lock(g_conn_lock);
3970 if (list_empty(conn_reqs)) {
3972 CDEBUG(D_NETERROR, "connd woke up but did not "
3973 "find a kmx_connparams_t or zombie conn\n");
3974 spin_unlock(g_conn_lock);
3977 cp = list_entry(conn_reqs->next, kmx_connparams_t, mxr_list);
3978 list_del_init(&cp->mxr_list);
3979 spin_unlock(g_conn_lock);
3981 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
3982 case MXLND_MSG_CONN_REQ:
3983 /* We have a connection request. Handle it. */
3984 mxlnd_passive_connect(cp);
3986 case MXLND_MSG_CONN_ACK:
3987 /* The peer is ready for messages */
3988 mxlnd_check_conn_ack(cp);
3993 mxlnd_free_conn_zombies();
3995 CDEBUG(D_NET, "connd stopping\n");
3996 mxlnd_thread_stop(id);
4001 * mxlnd_timeoutd - enforces timeouts on messages
4002 * @arg - thread id (as a void *)
4004 * This thread queries each peer for its earliest timeout. If a peer has timed out,
4005 * it calls mxlnd_conn_disconnect().
4007 * After checking for timeouts, try progressing sends (call check_sends()).
4010 mxlnd_timeoutd(void *arg)
4013 long id = (long) arg;
4014 unsigned long now = 0;
4015 unsigned long next = 0;
4016 unsigned long delay = HZ;
4017 kmx_peer_t *peer = NULL;
4018 kmx_peer_t *temp = NULL;
4019 kmx_conn_t *conn = NULL;
4020 rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
4022 cfs_daemonize("mxlnd_timeoutd");
4024 CDEBUG(D_NET, "timeoutd starting\n");
4026 while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
4029 /* if the next timeout has not arrived, go back to sleep */
4030 if (time_after(now, next)) {
4031 next = mxlnd_check_timeouts(now);
4034 /* try to progress peers' txs */
4036 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4037 struct list_head *peers = &kmxlnd_data.kmx_peers[i];
4039 /* NOTE we are safe against the removal of peer, but
4040 * not against the removal of temp */
4041 list_for_each_entry_safe(peer, temp, peers, mxp_list) {
4042 if (atomic_read(&kmxlnd_data.kmx_shutdown))
4044 mxlnd_peer_addref(peer); /* add ref... */
4045 conn = peer->mxp_conn;
4046 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4047 mxlnd_conn_addref(conn); /* take ref... */
4049 CDEBUG(D_NET, "ignoring %s\n",
4050 libcfs_nid2str(peer->mxp_nid));
4051 mxlnd_peer_decref(peer); /* ...to here */
4055 if ((conn->mxk_status == MXLND_CONN_READY ||
4056 conn->mxk_status == MXLND_CONN_FAIL) &&
4057 time_after(now, conn->mxk_last_tx + HZ)) {
4058 write_unlock(g_lock);
4059 mxlnd_check_sends(peer);
4062 mxlnd_conn_decref(conn); /* until here */
4063 mxlnd_peer_decref(peer); /* ...to here */
4066 write_unlock(g_lock);
4070 CDEBUG(D_NET, "timeoutd stopping\n");
4071 mxlnd_thread_stop(id);