4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (C) 2006 Myricom, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/mxlnd/mxlnd.c
38 * Author: Eric Barton <eric@bartonsoftware.com>
39 * Author: Scott Atchley <atchley at myri.com>
44 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
49 /* if memcmp() == 0, it is NULL */
50 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
54 mxlnd_ctxstate_to_str(int mxc_state)
58 return "MXLND_CTX_INIT";
60 return "MXLND_CTX_IDLE";
62 return "MXLND_CTX_PREP";
63 case MXLND_CTX_PENDING:
64 return "MXLND_CTX_PENDING";
65 case MXLND_CTX_COMPLETED:
66 return "MXLND_CTX_COMPLETED";
67 case MXLND_CTX_CANCELED:
68 return "MXLND_CTX_CANCELED";
75 mxlnd_connstatus_to_str(int mxk_status)
78 case MXLND_CONN_READY:
79 return "MXLND_CONN_READY";
81 return "MXLND_CONN_INIT";
83 return "MXLND_CONN_WAIT";
84 case MXLND_CONN_DISCONNECT:
85 return "MXLND_CONN_DISCONNECT";
87 return "MXLND_CONN_FAIL";
94 mxlnd_msgtype_to_str(int type) {
97 return "MXLND_MSG_EAGER";
98 case MXLND_MSG_CONN_REQ:
99 return "MXLND_MSG_CONN_REQ";
100 case MXLND_MSG_CONN_ACK:
101 return "MXLND_MSG_CONN_ACK";
103 return "MXLND_MSG_BYE";
105 return "MXLND_MSG_NOOP";
106 case MXLND_MSG_PUT_REQ:
107 return "MXLND_MSG_PUT_REQ";
108 case MXLND_MSG_PUT_ACK:
109 return "MXLND_MSG_PUT_ACK";
110 case MXLND_MSG_PUT_DATA:
111 return "MXLND_MSG_PUT_DATA";
112 case MXLND_MSG_GET_REQ:
113 return "MXLND_MSG_GET_REQ";
114 case MXLND_MSG_GET_DATA:
115 return "MXLND_MSG_GET_DATA";
122 mxlnd_lnetmsg_to_str(int type)
126 return "LNET_MSG_ACK";
128 return "LNET_MSG_PUT";
130 return "LNET_MSG_GET";
132 return "LNET_MSG_REPLY";
134 return "LNET_MSG_HELLO";
142 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
144 u64 type = (u64) ctx->mxc_msg_type;
145 u64 err = (u64) error;
148 mxlnd_valid_msg_type(ctx->mxc_msg_type);
149 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
150 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
155 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
157 *msg_type = (u8) MXLND_MSG_TYPE(match);
158 *error = (u8) MXLND_ERROR_VAL(match);
159 *cookie = match & MXLND_MAX_COOKIE;
160 mxlnd_valid_msg_type(*msg_type);
165 mxlnd_get_idle_rx(kmx_conn_t *conn)
167 cfs_list_t *rxs = NULL;
168 kmx_ctx_t *rx = NULL;
170 LASSERT(conn != NULL);
172 rxs = &conn->mxk_rx_idle;
174 cfs_spin_lock(&conn->mxk_lock);
176 if (cfs_list_empty (rxs)) {
177 cfs_spin_unlock(&conn->mxk_lock);
181 rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
182 cfs_list_del_init(&rx->mxc_list);
183 cfs_spin_unlock(&conn->mxk_lock);
186 if (rx->mxc_get != rx->mxc_put) {
187 CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
188 CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
189 CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
190 CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
191 CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
192 CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
193 CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
194 CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
195 CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
196 CNETERR("*** nob= %d ***\n", rx->mxc_nob);
199 LASSERT (rx->mxc_get == rx->mxc_put);
203 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
204 rx->mxc_state = MXLND_CTX_PREP;
205 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
211 mxlnd_put_idle_rx(kmx_ctx_t *rx)
213 kmx_conn_t *conn = rx->mxc_conn;
214 cfs_list_t *rxs = &conn->mxk_rx_idle;
216 LASSERT(rx->mxc_type == MXLND_REQ_RX);
221 LASSERT(rx->mxc_get == rx->mxc_put);
223 cfs_spin_lock(&conn->mxk_lock);
224 cfs_list_add(&rx->mxc_list, rxs);
225 cfs_spin_unlock(&conn->mxk_lock);
230 mxlnd_get_idle_tx(void)
232 cfs_list_t *tmp = &kmxlnd_data.kmx_tx_idle;
233 kmx_ctx_t *tx = NULL;
235 cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
237 if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
238 CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
239 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
243 tmp = &kmxlnd_data.kmx_tx_idle;
244 tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
245 cfs_list_del_init(&tx->mxc_list);
247 /* Allocate a new completion cookie. It might not be needed,
248 * but we've got a lock right now and we're unlikely to
250 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
251 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
252 kmxlnd_data.kmx_tx_next_cookie = 1;
254 kmxlnd_data.kmx_tx_used++;
255 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
257 LASSERT (tx->mxc_get == tx->mxc_put);
261 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
262 LASSERT (tx->mxc_lntmsg[0] == NULL);
263 LASSERT (tx->mxc_lntmsg[1] == NULL);
265 tx->mxc_state = MXLND_CTX_PREP;
266 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
272 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
275 mxlnd_put_idle_tx(kmx_ctx_t *tx)
278 lnet_msg_t *lntmsg[2];
280 LASSERT(tx->mxc_type == MXLND_REQ_TX);
282 if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
283 kmx_conn_t *conn = tx->mxc_conn;
286 if (tx->mxc_errno != 0) result = tx->mxc_errno;
287 /* FIXME should we set mx_dis? */
288 mxlnd_conn_disconnect(conn, 0, 1);
291 lntmsg[0] = tx->mxc_lntmsg[0];
292 lntmsg[1] = tx->mxc_lntmsg[1];
297 LASSERT(tx->mxc_get == tx->mxc_put);
299 cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
300 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
301 kmxlnd_data.kmx_tx_used--;
302 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
304 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
305 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
311 mxlnd_connparams_free(kmx_connparams_t *cp)
313 LASSERT(cfs_list_empty(&cp->mxr_list));
314 MXLND_FREE(cp, sizeof(*cp));
319 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
320 mx_endpoint_addr_t epa, u64 match, u32 length,
321 kmx_conn_t *conn, kmx_peer_t *peer, void *data)
323 kmx_connparams_t *c = NULL;
325 MXLND_ALLOC(c, sizeof(*c));
326 if (!c) return -ENOMEM;
328 CFS_INIT_LIST_HEAD(&c->mxr_list);
329 c->mxr_context = context;
331 c->mxr_match = match;
335 c->mxr_msg = *((kmx_msg_t *) data);
342 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
344 conn->mxk_status = status;
349 * mxlnd_conn_free_locked - free the conn
350 * @conn - a kmx_conn pointer
352 * The calling function should remove the conn from the conns list first
353 * then destroy it. Caller should have write-locked kmx_global_lock.
356 mxlnd_conn_free_locked(kmx_conn_t *conn)
358 int valid = !mxlnd_endpoint_addr_null(conn->mxk_epa);
359 kmx_peer_t *peer = conn->mxk_peer;
361 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
362 LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
363 cfs_list_empty (&conn->mxk_tx_free_queue) &&
364 cfs_list_empty (&conn->mxk_pending));
365 if (!cfs_list_empty(&conn->mxk_list)) {
366 cfs_list_del_init(&conn->mxk_list);
367 if (peer->mxp_conn == conn) {
368 peer->mxp_conn = NULL;
370 kmx_conn_t *temp = NULL;
372 mx_get_endpoint_addr_context(conn->mxk_epa,
375 mx_set_endpoint_addr_context(conn->mxk_epa,
379 /* unlink from global list and drop its ref */
380 cfs_list_del_init(&peer->mxp_list);
381 mxlnd_peer_decref(peer);
384 mxlnd_peer_decref(peer); /* drop conn's ref to peer */
385 if (conn->mxk_rx_pages) {
386 LASSERT (conn->mxk_rxs != NULL);
387 mxlnd_free_pages(conn->mxk_rx_pages);
391 kmx_ctx_t *rx = NULL;
393 for (i = 0; i < MXLND_RX_MSGS(); i++) {
394 rx = &conn->mxk_rxs[i];
395 if (rx->mxc_seg_list != NULL) {
396 LASSERT(rx->mxc_nseg > 0);
397 MXLND_FREE(rx->mxc_seg_list,
399 sizeof(*rx->mxc_seg_list));
402 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
405 MXLND_FREE(conn, sizeof (*conn));
411 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
415 kmx_ctx_t *ctx = NULL;
416 kmx_ctx_t *next = NULL;
417 mx_return_t mxret = MX_SUCCESS;
422 cfs_spin_lock(&conn->mxk_lock);
423 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
425 cfs_list_del_init(&ctx->mxc_list);
426 if (ctx->mxc_type == MXLND_REQ_RX) {
428 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
431 if (mxret != MX_SUCCESS) {
432 CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
435 ctx->mxc_errno = -ECONNABORTED;
436 ctx->mxc_state = MXLND_CTX_CANCELED;
437 cfs_spin_unlock(&conn->mxk_lock);
438 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
439 /* we may be holding the global lock,
440 * move to orphan list so that it can free it */
441 cfs_list_add_tail(&ctx->mxc_list,
442 &kmxlnd_data.kmx_orphan_msgs);
444 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
445 cfs_spin_lock(&conn->mxk_lock);
450 cfs_spin_unlock(&conn->mxk_lock);
458 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
461 cfs_list_t *tmp = NULL;
463 cfs_spin_lock(&conn->mxk_lock);
464 while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
465 !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
467 kmx_ctx_t *tx = NULL;
469 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
470 tmp = &conn->mxk_tx_free_queue;
472 tmp = &conn->mxk_tx_credit_queue;
475 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
476 cfs_list_del_init(&tx->mxc_list);
477 cfs_spin_unlock(&conn->mxk_lock);
478 tx->mxc_errno = -ECONNABORTED;
479 tx->mxc_state = MXLND_CTX_CANCELED;
480 /* move to orphan list and then abort */
481 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
482 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
483 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
485 cfs_spin_lock(&conn->mxk_lock);
487 cfs_spin_unlock(&conn->mxk_lock);
493 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
495 u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
496 (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
498 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
499 epa, match, NULL, NULL);
504 * mxlnd_conn_disconnect - shutdown a connection
505 * @conn - a kmx_conn pointer
506 * @mx_dis - call mx_disconnect()
507 * @send_bye - send peer a BYE msg
509 * This function sets the status to DISCONNECT, completes queued
510 * txs with failure, calls mx_disconnect, which will complete
511 * pending txs and matched rxs with failure.
514 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
516 mx_endpoint_addr_t epa = conn->mxk_epa;
517 int valid = !mxlnd_endpoint_addr_null(epa);
520 cfs_spin_lock(&conn->mxk_lock);
521 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
522 cfs_spin_unlock(&conn->mxk_lock);
525 mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
526 conn->mxk_timeout = 0;
527 cfs_spin_unlock(&conn->mxk_lock);
529 count = mxlnd_cancel_queued_txs(conn);
530 count += mxlnd_conn_cancel_pending_rxs(conn);
533 cfs_up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
535 if (send_bye && valid &&
536 conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
537 /* send a BYE to the peer */
538 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
539 libcfs_nid2str(conn->mxk_peer->mxp_nid));
540 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
541 /* wait to allow the peer to ack our message */
542 mxlnd_sleep(msecs_to_jiffies(20));
545 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
546 unsigned long last_msg = 0;
548 /* notify LNET that we are giving up on this peer */
549 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
550 last_msg = conn->mxk_last_rx;
552 last_msg = conn->mxk_last_tx;
554 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
556 if (mx_dis && valid &&
557 (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
558 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
560 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
566 * mxlnd_conn_alloc - allocate and initialize a new conn struct
567 * @connp - address of a kmx_conn pointer
568 * @peer - owning kmx_peer
570 * Returns 0 on success and -ENOMEM on failure
573 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
580 kmx_conn_t *conn = NULL;
581 kmx_pages_t *pages = NULL;
582 struct page *page = NULL;
583 kmx_ctx_t *rx = NULL;
585 LASSERT(peer != NULL);
587 MXLND_ALLOC(conn, sizeof (*conn));
589 CNETERR("Cannot allocate conn\n");
592 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
594 memset(conn, 0, sizeof(*conn));
596 ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
598 CERROR("Can't allocate rx pages\n");
599 MXLND_FREE(conn, sizeof(*conn));
602 conn->mxk_rx_pages = pages;
604 MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
605 if (conn->mxk_rxs == NULL) {
606 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
607 mxlnd_free_pages(pages);
608 MXLND_FREE(conn, sizeof(*conn));
612 memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
614 conn->mxk_peer = peer;
615 CFS_INIT_LIST_HEAD(&conn->mxk_list);
616 CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
617 cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
618 and one for the caller */
619 if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
623 /* this is localhost, set the epa and status as up */
624 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
625 conn->mxk_epa = kmxlnd_data.kmx_epa;
626 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
627 peer->mxp_reconnect_time = 0;
628 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
629 peer->mxp_nic_id = nic_id;
630 peer->mxp_ep_id = ep_id;
631 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
632 conn->mxk_timeout = 0;
634 /* conn->mxk_incarnation = 0 - will be set by peer */
635 /* conn->mxk_sid = 0 - will be set by peer */
636 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
637 /* mxk_epa - to be set after mx_iconnect() */
639 cfs_spin_lock_init(&conn->mxk_lock);
640 /* conn->mxk_timeout = 0 */
641 /* conn->mxk_last_tx = 0 */
642 /* conn->mxk_last_rx = 0 */
643 CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
645 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
646 /* mxk_outstanding = 0 */
648 CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
649 CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
650 /* conn->mxk_ntx_msgs = 0 */
651 /* conn->mxk_ntx_data = 0 */
652 /* conn->mxk_ntx_posted = 0 */
653 /* conn->mxk_data_posted = 0 */
654 CFS_INIT_LIST_HEAD(&conn->mxk_pending);
656 for (i = 0; i < MXLND_RX_MSGS(); i++) {
658 rx = &conn->mxk_rxs[i];
659 rx->mxc_type = MXLND_REQ_RX;
660 CFS_INIT_LIST_HEAD(&rx->mxc_list);
662 /* map mxc_msg to page */
663 page = pages->mxg_pages[ipage];
664 addr = page_address(page);
665 LASSERT(addr != NULL);
666 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
667 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
671 rx->mxc_nid = peer->mxp_nid;
675 offset += MXLND_MSG_SIZE;
676 LASSERT (offset <= PAGE_SIZE);
678 if (offset == PAGE_SIZE) {
681 LASSERT (ipage <= MXLND_TX_MSG_PAGES());
684 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
689 mxlnd_peer_addref(peer); /* add a ref for this conn */
691 /* add to front of peer's conns list */
692 cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
693 peer->mxp_conn = conn;
698 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
701 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
703 cfs_write_lock(g_lock);
704 ret = mxlnd_conn_alloc_locked(connp, peer);
705 cfs_write_unlock(g_lock);
710 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
713 kmx_conn_t *conn = ctx->mxc_conn;
715 ctx->mxc_state = MXLND_CTX_PENDING;
717 cfs_spin_lock(&conn->mxk_lock);
718 if (conn->mxk_status >= MXLND_CONN_INIT) {
719 cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
720 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
721 conn->mxk_timeout = ctx->mxc_deadline;
724 ctx->mxc_state = MXLND_CTX_COMPLETED;
727 cfs_spin_unlock(&conn->mxk_lock);
733 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
735 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
736 ctx->mxc_state == MXLND_CTX_COMPLETED);
737 if (ctx->mxc_state != MXLND_CTX_PENDING &&
738 ctx->mxc_state != MXLND_CTX_COMPLETED) {
739 CNETERR("deq ctx->mxc_state = %s\n",
740 mxlnd_ctxstate_to_str(ctx->mxc_state));
742 ctx->mxc_state = MXLND_CTX_COMPLETED;
743 if (!cfs_list_empty(&ctx->mxc_list)) {
744 kmx_conn_t *conn = ctx->mxc_conn;
745 kmx_ctx_t *next = NULL;
747 LASSERT(conn != NULL);
748 cfs_spin_lock(&conn->mxk_lock);
749 cfs_list_del_init(&ctx->mxc_list);
750 conn->mxk_timeout = 0;
751 if (!cfs_list_empty(&conn->mxk_pending)) {
752 next = cfs_list_entry(conn->mxk_pending.next,
753 kmx_ctx_t, mxc_list);
754 conn->mxk_timeout = next->mxc_deadline;
756 cfs_spin_unlock(&conn->mxk_lock);
762 * mxlnd_peer_free - free the peer
763 * @peer - a kmx_peer pointer
765 * The calling function should decrement the rxs, drain the tx queues and
766 * remove the peer from the peers list first then destroy it.
769 mxlnd_peer_free(kmx_peer_t *peer)
771 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
773 LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
775 if (!cfs_list_empty(&peer->mxp_list)) {
776 /* assume we are locked */
777 cfs_list_del_init(&peer->mxp_list);
780 MXLND_FREE(peer, sizeof (*peer));
781 cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
786 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
788 int ret = -EHOSTUNREACH;
789 unsigned char *haddr = NULL;
790 struct net_device *dev = NULL;
791 struct neighbour *n = NULL;
792 __be32 dst_ip = htonl(ip);
794 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
798 haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
800 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
803 if (n->nud_state & NUD_VALID) {
804 memcpy(haddr, n->ha, dev->addr_len);
816 /* We only want the MAC address of the peer's Myricom NIC. We
817 * require that each node has the IPoMX interface (myriN) up.
818 * We will not pass any traffic over IPoMX, but it allows us
819 * to get the MAC address. */
821 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
827 cfs_socket_t *sock = NULL;
830 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
831 ret = mxlnd_lookup_mac(ip, &tmp_id);
835 /* not found, try to connect (force an arp) */
836 ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
837 if (ret == -ECONNREFUSED) {
838 /* peer is there, get the MAC address */
839 mxlnd_lookup_mac(ip, &tmp_id);
843 } else if (ret == -EHOSTUNREACH && try < tries) {
844 /* add a little backoff */
845 CDEBUG(D_NET, "sleeping for %d jiffies\n",
847 mxlnd_sleep(CFS_HZ/4);
850 } while (try++ < tries);
851 CDEBUG(D_NET, "done trying. ret = %d\n", ret);
855 #ifdef __LITTLE_ENDIAN
856 *nic_id = ___arch__swab64(tmp_id);
864 * mxlnd_peer_alloc - allocate and initialize a new peer struct
865 * @peerp - address of a kmx_peer pointer
866 * @nid - LNET node id
868 * Returns 0 on success and -ENOMEM on failure
871 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
874 u32 ip = LNET_NIDADDR(nid);
875 kmx_peer_t *peer = NULL;
877 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
879 MXLND_ALLOC(peer, sizeof (*peer));
881 CNETERR("Cannot allocate peer for NID 0x%llx\n",
885 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
887 memset(peer, 0, sizeof(*peer));
889 CFS_INIT_LIST_HEAD(&peer->mxp_list);
891 /* peer->mxp_ni unused - may be used for multi-rail */
892 cfs_atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
894 peer->mxp_board = board;
895 peer->mxp_ep_id = ep_id;
896 peer->mxp_nic_id = nic_id;
898 CFS_INIT_LIST_HEAD(&peer->mxp_conns);
899 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
901 mxlnd_peer_decref(peer);
904 CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
906 if (peer->mxp_nic_id != 0ULL)
907 nic_id = peer->mxp_nic_id;
909 if (nic_id == 0ULL) {
910 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
912 peer->mxp_nic_id = nic_id;
913 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
917 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
919 /* peer->mxp_reconnect_time = 0 */
920 /* peer->mxp_incompatible = 0 */
926 static inline kmx_peer_t *
927 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
931 kmx_peer_t *peer = NULL;
933 hash = mxlnd_nid_to_hash(nid);
935 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
936 if (peer->mxp_nid == nid) {
938 mxlnd_peer_addref(peer);
942 return (found ? peer : NULL);
946 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
950 kmx_peer_t *peer = NULL;
951 kmx_peer_t *old = NULL;
952 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
954 cfs_read_lock(g_lock);
955 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
957 if ((peer && peer->mxp_conn) || /* found peer with conn or */
958 (!peer && !create)) { /* did not find peer and do not create one */
959 cfs_read_unlock(g_lock);
963 cfs_read_unlock(g_lock);
965 /* if peer but _not_ conn */
966 if (peer && !peer->mxp_conn) {
968 cfs_write_lock(g_lock);
969 if (!peer->mxp_conn) { /* check again */
970 /* create the conn */
971 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
973 /* we tried, return the peer only.
974 * the caller needs to see if the conn exists */
975 CNETERR("%s: %s could not alloc conn\n",
976 __func__, libcfs_nid2str(peer->mxp_nid));
978 /* drop extra conn ref */
979 mxlnd_conn_decref(peer->mxp_conn);
982 cfs_write_unlock(g_lock);
987 /* peer not found and we need to create one */
988 hash = mxlnd_nid_to_hash(nid);
990 /* create peer (and conn) */
991 /* adds conn ref for peer and one for this function */
992 ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
993 *kmxlnd_tunables.kmx_ep_id, 0ULL);
994 if (ret != 0) /* no memory, peer is NULL */
997 cfs_write_lock(g_lock);
1000 old = mxlnd_find_peer_by_nid_locked(nid);
1002 /* someone already created one */
1003 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1004 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1005 mxlnd_peer_decref(peer);
1008 /* no other peer, use this one */
1009 cfs_list_add_tail(&peer->mxp_list,
1010 &kmxlnd_data.kmx_peers[hash]);
1011 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
1012 mxlnd_peer_addref(peer);
1013 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1016 cfs_write_unlock(g_lock);
1022 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1024 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1025 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1026 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1027 tx->mxc_msg_type == MXLND_MSG_NOOP);
1031 * mxlnd_init_msg - set type and number of bytes
1032 * @msg - msg pointer
1033 * @type - of message
1034 * @body_nob - bytes in msg body
1037 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1039 msg->mxm_type = type;
1040 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
1044 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1046 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
1047 kmx_msg_t *msg = NULL;
1049 LASSERT (tx != NULL);
1050 LASSERT (nob <= MXLND_MSG_SIZE);
1053 /* tx->mxc_peer should have already been set if we know it */
1054 tx->mxc_msg_type = type;
1056 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1057 tx->mxc_seg.segment_length = nob;
1058 tx->mxc_pin_type = MX_PIN_PHYSICAL;
1061 msg->mxm_type = type;
1068 mxlnd_cksum (void *ptr, int nob)
1074 sum = ((sum << 1) | (sum >> 31)) + *c++;
1076 /* ensure I don't return 0 (== no checksum) */
1077 return (sum == 0) ? 1 : sum;
1081 * mxlnd_pack_msg_locked - complete msg info
1085 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1087 kmx_msg_t *msg = tx->mxc_msg;
1089 /* type and nob should already be set in init_msg() */
1090 msg->mxm_magic = MXLND_MSG_MAGIC;
1091 msg->mxm_version = MXLND_MSG_VERSION;
1093 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1094 * return credits as well */
1095 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1096 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1097 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
1098 tx->mxc_conn->mxk_outstanding = 0;
1100 msg->mxm_credits = 0;
1104 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
1105 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1106 msg->mxm_dstnid = tx->mxc_nid;
1107 /* if it is a new peer, the dststamp will be 0 */
1108 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1110 if (*kmxlnd_tunables.kmx_cksum) {
1111 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1116 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1118 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
1119 __u32 msg_cksum = 0;
1123 /* 6 bytes are enough to have received magic + version */
1125 CNETERR("not enough bytes for magic + hdr: %d\n", nob);
1129 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1131 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1134 CNETERR("Bad magic: %08x\n", msg->mxm_magic);
1138 if (msg->mxm_version !=
1139 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1140 CNETERR("Bad version: %d\n", msg->mxm_version);
1144 if (nob < hdr_size) {
1145 CNETERR("not enough for a header: %d\n", nob);
1149 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1150 if (msg_nob > nob) {
1151 CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
1155 /* checksum must be computed with mxm_cksum zero and BEFORE anything
1157 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1159 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1160 CNETERR("Bad checksum\n");
1163 msg->mxm_cksum = msg_cksum;
1166 /* leave magic unflipped as a clue to peer endianness */
1167 __swab16s(&msg->mxm_version);
1168 CLASSERT (sizeof(msg->mxm_type) == 1);
1169 CLASSERT (sizeof(msg->mxm_credits) == 1);
1170 msg->mxm_nob = msg_nob;
1171 __swab64s(&msg->mxm_srcnid);
1172 __swab64s(&msg->mxm_srcstamp);
1173 __swab64s(&msg->mxm_dstnid);
1174 __swab64s(&msg->mxm_dststamp);
1177 if (msg->mxm_srcnid == LNET_NID_ANY) {
1178 CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1182 switch (msg->mxm_type) {
1184 CNETERR("Unknown message type %x\n", msg->mxm_type);
1187 case MXLND_MSG_NOOP:
1190 case MXLND_MSG_EAGER:
1191 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1192 CNETERR("Short EAGER: %d(%d)\n", msg_nob,
1193 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1198 case MXLND_MSG_PUT_REQ:
1199 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1200 CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
1201 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1205 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1208 case MXLND_MSG_PUT_ACK:
1209 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1210 CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
1211 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1215 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1216 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1220 case MXLND_MSG_GET_REQ:
1221 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1222 CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
1223 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1227 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1231 case MXLND_MSG_CONN_REQ:
1232 case MXLND_MSG_CONN_ACK:
1233 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1234 CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
1235 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1239 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1240 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1250 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1254 * @length - length of incoming message
1255 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1257 * The caller gets the rx and sets nid, peer and conn if known.
1259 * Returns 0 on success and -1 on failure
1262 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1265 mx_return_t mxret = MX_SUCCESS;
1266 uint64_t mask = ~(MXLND_ERROR_MASK);
1268 rx->mxc_msg_type = msg_type;
1269 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1270 rx->mxc_cookie = cookie;
1271 /* rx->mxc_match may already be set */
1272 /* rx->mxc_seg.segment_ptr is already set */
1273 rx->mxc_seg.segment_length = length;
1274 ret = mxlnd_q_pending_ctx(rx);
1276 /* the caller is responsible for calling conn_decref() if needed */
1279 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1280 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1281 if (mxret != MX_SUCCESS) {
1282 mxlnd_deq_pending_ctx(rx);
1283 CNETERR("mx_kirecv() failed with %s (%d)\n",
1284 mx_strerror(mxret), (int) mxret);
1292 * mxlnd_unexpected_recv - this is the callback function that will handle
1293 * unexpected receives
1294 * @context - NULL, ignore
1295 * @source - the peer's mx_endpoint_addr_t
1296 * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1297 * @length - length of incoming message
1298 * @data_if_available - used for CONN_[REQ|ACK]
1300 * If it is an eager-sized msg, we will call recv_msg() with the actual
1301 * length. If it is a large message, we will call recv_msg() with a
1302 * length of 0 bytes to drop it because we should never have a large,
1303 * unexpected message.
1305 * NOTE - The MX library blocks until this function completes. Make it as fast as
1306 * possible. DO NOT allocate memory which can block!
1308 * If we cannot get a rx or the conn is closed, drop the message on the floor
1309 * (i.e. recv 0 bytes and ignore).
1311 mx_unexp_handler_action_t
1312 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1313 uint64_t match_value, uint32_t length, void *data_if_available)
1316 kmx_ctx_t *rx = NULL;
1321 kmx_conn_t *conn = NULL;
1322 kmx_peer_t *peer = NULL;
1327 /* TODO this will change to the net struct */
1328 if (context != NULL) {
1329 CNETERR("non-NULL context\n");
1333 CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1336 mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1337 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1338 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1339 mx_get_endpoint_addr_context(source, (void **) &conn);
1341 mxlnd_conn_addref(conn); /* add ref for this function */
1342 peer = conn->mxk_peer;
1344 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1346 if (msg_type == MXLND_MSG_BYE) {
1348 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1349 libcfs_nid2str(peer->mxp_nid));
1350 mxlnd_conn_disconnect(conn, 1, 0);
1351 mxlnd_conn_decref(conn); /* drop ref taken above */
1353 return MX_RECV_FINISHED;
1356 if (msg_type == MXLND_MSG_CONN_REQ) {
1357 kmx_connparams_t *cp = NULL;
1358 const int expected = offsetof(kmx_msg_t, mxm_u) +
1359 sizeof(kmx_connreq_msg_t);
1361 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1362 if (unlikely(length != expected || !data_if_available)) {
1363 CNETERR("received invalid CONN_REQ from %llx "
1364 "length=%d (expected %d)\n", nic_id, length, expected);
1365 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1366 return MX_RECV_FINISHED;
1369 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1370 conn, peer, data_if_available);
1371 if (unlikely(ret != 0)) {
1372 CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
1374 mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1375 return MX_RECV_FINISHED;
1377 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1378 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1379 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1380 cfs_up(&kmxlnd_data.kmx_conn_sem);
1381 return MX_RECV_FINISHED;
1383 if (msg_type == MXLND_MSG_CONN_ACK) {
1384 kmx_connparams_t *cp = NULL;
1385 const int expected = offsetof(kmx_msg_t, mxm_u) +
1386 sizeof(kmx_connreq_msg_t);
1389 if (unlikely(error != 0)) {
1390 CNETERR("received CONN_ACK from %s with error -%d\n",
1391 libcfs_nid2str(peer->mxp_nid), (int) error);
1392 mxlnd_conn_disconnect(conn, 1, 0);
1393 } else if (unlikely(length != expected || !data_if_available)) {
1394 CNETERR("received %s CONN_ACK from %s "
1395 "length=%d (expected %d)\n",
1396 data_if_available ? "short" : "missing",
1397 libcfs_nid2str(peer->mxp_nid), length, expected);
1398 mxlnd_conn_disconnect(conn, 1, 1);
1400 /* peer is ready for messages */
1401 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1402 conn, peer, data_if_available);
1403 if (unlikely(ret != 0)) {
1404 CNETERR("unable to alloc kmx_connparams_t"
1405 " from %llx:%d\n", nic_id, ep_id);
1406 mxlnd_conn_disconnect(conn, 1, 1);
1408 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1409 cfs_list_add_tail(&cp->mxr_list,
1410 &kmxlnd_data.kmx_conn_reqs);
1411 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1412 cfs_up(&kmxlnd_data.kmx_conn_sem);
1415 mxlnd_conn_decref(conn); /* drop ref taken above */
1417 return MX_RECV_FINISHED;
1420 /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1422 LASSERT(peer != NULL && conn != NULL);
1424 rx = mxlnd_get_idle_rx(conn);
1426 if (length <= MXLND_MSG_SIZE) {
1427 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1429 CNETERR("unexpected large receive with "
1430 "match_value=0x%llx length=%d\n",
1431 match_value, length);
1432 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1436 /* hold conn ref until rx completes */
1437 rx->mxc_conn = conn;
1438 rx->mxc_peer = peer;
1439 rx->mxc_nid = peer->mxp_nid;
1441 CNETERR("could not post receive\n");
1442 mxlnd_put_idle_rx(rx);
1446 /* Encountered error, drop incoming message on the floor */
1447 /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1448 * uses the standard code path and acks the sender normally */
1450 if (rx == NULL || ret != 0) {
1451 mxlnd_conn_decref(conn); /* drop ref taken above */
1453 CNETERR("no idle rxs available - dropping rx"
1454 " 0x%llx from %s\n", match_value,
1455 libcfs_nid2str(peer->mxp_nid));
1458 CNETERR("disconnected peer - dropping rx\n");
1460 seg.segment_ptr = 0ULL;
1461 seg.segment_length = 0;
1462 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1463 match_value, ~0ULL, NULL, NULL);
1466 return MX_RECV_CONTINUE;
1471 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1475 kmx_peer_t *peer = NULL;
1477 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1478 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1479 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1482 *nidp = peer->mxp_nid;
1483 *count = cfs_atomic_read(&peer->mxp_refcount);
1489 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1495 mxlnd_del_peer_locked(kmx_peer_t *peer)
1497 if (peer->mxp_conn) {
1498 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1500 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1501 mxlnd_peer_decref(peer); /* drop global list ref */
1507 mxlnd_del_peer(lnet_nid_t nid)
1511 kmx_peer_t *peer = NULL;
1512 kmx_peer_t *next = NULL;
1514 if (nid != LNET_NID_ANY) {
1515 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1517 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1518 if (nid != LNET_NID_ANY) {
1522 mxlnd_peer_decref(peer); /* and drops it */
1523 mxlnd_del_peer_locked(peer);
1525 } else { /* LNET_NID_ANY */
1526 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1527 cfs_list_for_each_entry_safe(peer, next,
1528 &kmxlnd_data.kmx_peers[i],
1530 mxlnd_del_peer_locked(peer);
1534 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1540 mxlnd_get_conn_by_idx(int index)
1543 kmx_peer_t *peer = NULL;
1544 kmx_conn_t *conn = NULL;
1546 cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1547 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1548 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1550 cfs_list_for_each_entry(conn, &peer->mxp_conns,
1556 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1557 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1562 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1568 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1570 kmx_conn_t *conn = NULL;
1571 kmx_conn_t *next = NULL;
1573 cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1574 mxlnd_conn_disconnect(conn, 0, 1);
1580 mxlnd_close_matching_conns(lnet_nid_t nid)
1584 kmx_peer_t *peer = NULL;
1586 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1587 if (nid != LNET_NID_ANY) {
1588 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1592 mxlnd_close_matching_conns_locked(peer);
1593 mxlnd_peer_decref(peer); /* and drops it here */
1595 } else { /* LNET_NID_ANY */
1596 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1597 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list)
1598 mxlnd_close_matching_conns_locked(peer);
1601 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1607 * mxlnd_ctl - modify MXLND parameters
1608 * @ni - LNET interface handle
1609 * @cmd - command to change
1610 * @arg - the ioctl data
1613 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1615 struct libcfs_ioctl_data *data = arg;
1618 LASSERT (ni == kmxlnd_data.kmx_ni);
1621 case IOC_LIBCFS_GET_PEER: {
1625 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1626 data->ioc_nid = nid;
1627 data->ioc_count = count;
1630 case IOC_LIBCFS_DEL_PEER: {
1631 ret = mxlnd_del_peer(data->ioc_nid);
1634 case IOC_LIBCFS_GET_CONN: {
1635 kmx_conn_t *conn = NULL;
1637 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1642 data->ioc_nid = conn->mxk_peer->mxp_nid;
1643 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1647 case IOC_LIBCFS_CLOSE_CONNECTION: {
1648 ret = mxlnd_close_matching_conns(data->ioc_nid);
1652 CNETERR("unknown ctl(%d)\n", cmd);
1660 * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1663 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1666 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1668 u8 msg_type = tx->mxc_msg_type;
1669 kmx_conn_t *conn = tx->mxc_conn;
1671 LASSERT (msg_type != 0);
1672 LASSERT (tx->mxc_nid != 0);
1673 LASSERT (tx->mxc_peer != NULL);
1674 LASSERT (tx->mxc_conn != NULL);
1676 tx->mxc_incarnation = conn->mxk_incarnation;
1678 if (msg_type != MXLND_MSG_PUT_DATA &&
1679 msg_type != MXLND_MSG_GET_DATA) {
1681 if (mxlnd_tx_requires_credit(tx)) {
1682 cfs_list_add_tail(&tx->mxc_list,
1683 &conn->mxk_tx_credit_queue);
1684 conn->mxk_ntx_msgs++;
1685 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1686 msg_type == MXLND_MSG_CONN_ACK) {
1687 /* put conn msgs at the front of the queue */
1688 cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1690 /* PUT_ACK, PUT_NAK */
1691 cfs_list_add_tail(&tx->mxc_list,
1692 &conn->mxk_tx_free_queue);
1693 conn->mxk_ntx_msgs++;
1697 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1698 conn->mxk_ntx_data++;
1705 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1708 * Add the tx to the peer's msg or data queue
1711 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1713 LASSERT(tx->mxc_peer != NULL);
1714 LASSERT(tx->mxc_conn != NULL);
1715 cfs_spin_lock(&tx->mxc_conn->mxk_lock);
1716 mxlnd_peer_queue_tx_locked(tx);
1717 cfs_spin_unlock(&tx->mxc_conn->mxk_lock);
1723 * mxlnd_queue_tx - add the tx to the global tx queue
1726 * Add the tx to the global queue and up the tx_queue_sem
1729 mxlnd_queue_tx(kmx_ctx_t *tx)
1731 kmx_peer_t *peer = tx->mxc_peer;
1732 LASSERT (tx->mxc_nid != 0);
1735 if (peer->mxp_incompatible &&
1736 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1737 /* let this fail now */
1738 tx->mxc_errno = -ECONNABORTED;
1739 mxlnd_conn_decref(peer->mxp_conn);
1740 mxlnd_put_idle_tx(tx);
1743 if (tx->mxc_conn == NULL) {
1745 kmx_conn_t *conn = NULL;
1747 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1749 tx->mxc_errno = ret;
1750 mxlnd_put_idle_tx(tx);
1753 tx->mxc_conn = conn;
1754 mxlnd_peer_decref(peer); /* and takes it from peer */
1756 LASSERT(tx->mxc_conn != NULL);
1757 mxlnd_peer_queue_tx(tx);
1758 mxlnd_check_sends(peer);
1760 cfs_spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1761 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1762 cfs_spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1763 cfs_up(&kmxlnd_data.kmx_tx_queue_sem);
1770 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1777 int first_iov_offset = 0;
1778 int first_found = 0;
1780 int last_iov_length = 0;
1781 mx_ksegment_t *seg = NULL;
1783 if (niov == 0) return 0;
1784 LASSERT(iov != NULL);
1786 for (i = 0; i < niov; i++) {
1787 sum = old_sum + (u32) iov[i].iov_len;
1788 if (!first_found && (sum > offset)) {
1790 first_iov_offset = offset - old_sum;
1792 sum = (u32) iov[i].iov_len - first_iov_offset;
1797 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1798 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1803 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1804 nseg = last_iov - first_iov + 1;
1807 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1809 CNETERR("MXLND_ALLOC() failed\n");
1812 memset(seg, 0, nseg * sizeof(*seg));
1813 ctx->mxc_nseg = nseg;
1815 for (i = 0; i < nseg; i++) {
1816 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1817 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1819 seg[i].segment_ptr += (u64) first_iov_offset;
1820 seg[i].segment_length -= (u32) first_iov_offset;
1822 if (i == (nseg - 1)) {
1823 seg[i].segment_length = (u32) last_iov_length;
1825 sum += seg[i].segment_length;
1827 ctx->mxc_seg_list = seg;
1828 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1829 #ifdef MX_PIN_FULLPAGES
1830 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1832 LASSERT(nob == sum);
1837 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1843 int first_kiov = -1;
1844 int first_kiov_offset = 0;
1845 int first_found = 0;
1847 int last_kiov_length = 0;
1848 mx_ksegment_t *seg = NULL;
1850 if (niov == 0) return 0;
1851 LASSERT(kiov != NULL);
1853 for (i = 0; i < niov; i++) {
1854 sum = old_sum + kiov[i].kiov_len;
1855 if (i == 0) sum -= kiov[i].kiov_offset;
1856 if (!first_found && (sum > offset)) {
1858 first_kiov_offset = offset - old_sum;
1859 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1861 sum = kiov[i].kiov_len - first_kiov_offset;
1866 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1867 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1872 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1873 nseg = last_kiov - first_kiov + 1;
1876 MXLND_ALLOC(seg, nseg * sizeof(*seg));
1878 CNETERR("MXLND_ALLOC() failed\n");
1881 memset(seg, 0, niov * sizeof(*seg));
1882 ctx->mxc_nseg = niov;
1884 for (i = 0; i < niov; i++) {
1885 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1886 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1888 seg[i].segment_ptr += (u64) first_kiov_offset;
1889 /* we have to add back the original kiov_offset */
1890 seg[i].segment_length -= first_kiov_offset +
1891 kiov[first_kiov].kiov_offset;
1893 if (i == (nseg - 1)) {
1894 seg[i].segment_length = last_kiov_length;
1896 sum += seg[i].segment_length;
1898 ctx->mxc_seg_list = seg;
1899 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1900 #ifdef MX_PIN_FULLPAGES
1901 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1903 LASSERT(nob == sum);
1908 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1910 LASSERT(type == MXLND_MSG_PUT_ACK);
1911 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1912 tx->mxc_cookie = cookie;
1913 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1914 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1915 tx->mxc_match = mxlnd_create_match(tx, status);
1922 * mxlnd_send_data - get tx, map [k]iov, queue tx
1929 * This setups the DATA send for PUT or GET.
1931 * On success, it queues the tx, on failure it calls lnet_finalize()
1934 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1937 lnet_process_id_t target = lntmsg->msg_target;
1938 unsigned int niov = lntmsg->msg_niov;
1939 struct iovec *iov = lntmsg->msg_iov;
1940 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1941 unsigned int offset = lntmsg->msg_offset;
1942 unsigned int nob = lntmsg->msg_len;
1943 kmx_ctx_t *tx = NULL;
1945 LASSERT(lntmsg != NULL);
1946 LASSERT(peer != NULL);
1947 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1948 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1950 tx = mxlnd_get_idle_tx();
1952 CNETERR("Can't allocate %s tx for %s\n",
1953 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1954 libcfs_nid2str(target.nid));
1957 tx->mxc_nid = target.nid;
1958 /* NOTE called when we have a ref on the conn, get one for this tx */
1959 mxlnd_conn_addref(peer->mxp_conn);
1960 tx->mxc_peer = peer;
1961 tx->mxc_conn = peer->mxp_conn;
1962 tx->mxc_msg_type = msg_type;
1963 tx->mxc_lntmsg[0] = lntmsg;
1964 tx->mxc_cookie = cookie;
1965 tx->mxc_match = mxlnd_create_match(tx, 0);
1967 /* This setups up the mx_ksegment_t to send the DATA payload */
1969 /* do not setup the segments */
1970 CNETERR("nob = 0; why didn't we use an EAGER reply "
1971 "to %s?\n", libcfs_nid2str(target.nid));
1973 } else if (kiov == NULL) {
1974 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1976 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1979 CNETERR("Can't setup send DATA for %s\n",
1980 libcfs_nid2str(target.nid));
1981 tx->mxc_errno = -EIO;
1988 mxlnd_conn_decref(peer->mxp_conn);
1989 mxlnd_put_idle_tx(tx);
1993 CNETERR("no tx avail\n");
1994 lnet_finalize(ni, lntmsg, -EIO);
1999 * mxlnd_recv_data - map [k]iov, post rx
2006 * This setups the DATA receive for PUT or GET.
2008 * On success, it returns 0, on failure it returns -1
2011 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2014 lnet_process_id_t target = lntmsg->msg_target;
2015 unsigned int niov = lntmsg->msg_niov;
2016 struct iovec *iov = lntmsg->msg_iov;
2017 lnet_kiov_t *kiov = lntmsg->msg_kiov;
2018 unsigned int offset = lntmsg->msg_offset;
2019 unsigned int nob = lntmsg->msg_len;
2020 mx_return_t mxret = MX_SUCCESS;
2021 u64 mask = ~(MXLND_ERROR_MASK);
2023 /* above assumes MXLND_MSG_PUT_DATA */
2024 if (msg_type == MXLND_MSG_GET_DATA) {
2025 niov = lntmsg->msg_md->md_niov;
2026 iov = lntmsg->msg_md->md_iov.iov;
2027 kiov = lntmsg->msg_md->md_iov.kiov;
2029 nob = lntmsg->msg_md->md_length;
2032 LASSERT(lntmsg != NULL);
2033 LASSERT(rx != NULL);
2034 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2035 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2037 rx->mxc_msg_type = msg_type;
2038 rx->mxc_state = MXLND_CTX_PENDING;
2039 rx->mxc_nid = target.nid;
2040 /* if posting a GET_DATA, we may not yet know the peer */
2041 if (rx->mxc_peer != NULL) {
2042 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2044 rx->mxc_lntmsg[0] = lntmsg;
2045 rx->mxc_cookie = cookie;
2046 rx->mxc_match = mxlnd_create_match(rx, 0);
2047 /* This setups up the mx_ksegment_t to receive the DATA payload */
2049 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2051 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2053 if (msg_type == MXLND_MSG_GET_DATA) {
2054 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2055 if (rx->mxc_lntmsg[1] == NULL) {
2056 CNETERR("Can't create reply for GET -> %s\n",
2057 libcfs_nid2str(target.nid));
2062 CNETERR("Can't setup %s rx for %s\n",
2063 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2064 libcfs_nid2str(target.nid));
2067 ret = mxlnd_q_pending_ctx(rx);
2071 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2072 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2073 rx->mxc_seg_list, rx->mxc_nseg,
2074 rx->mxc_pin_type, rx->mxc_match,
2077 if (mxret != MX_SUCCESS) {
2078 if (rx->mxc_conn != NULL) {
2079 mxlnd_deq_pending_ctx(rx);
2081 CNETERR("mx_kirecv() failed with %d for %s\n",
2082 (int) mxret, libcfs_nid2str(target.nid));
2090 * mxlnd_send - the LND required send function
2095 * This must not block. Since we may not have a peer struct for the receiver,
2096 * it will append send messages on a global tx list. We will then up the
2097 * tx_queued's semaphore to notify it of the new send.
2100 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2103 int type = lntmsg->msg_type;
2104 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
2105 lnet_process_id_t target = lntmsg->msg_target;
2106 lnet_nid_t nid = target.nid;
2107 int target_is_router = lntmsg->msg_target_is_router;
2108 int routing = lntmsg->msg_routing;
2109 unsigned int payload_niov = lntmsg->msg_niov;
2110 struct iovec *payload_iov = lntmsg->msg_iov;
2111 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
2112 unsigned int payload_offset = lntmsg->msg_offset;
2113 unsigned int payload_nob = lntmsg->msg_len;
2114 kmx_ctx_t *tx = NULL;
2115 kmx_msg_t *txmsg = NULL;
2116 kmx_ctx_t *rx = (kmx_ctx_t *) private; /* for REPLY */
2117 kmx_ctx_t *rx_data = NULL;
2118 kmx_conn_t *conn = NULL;
2120 uint32_t length = 0;
2121 kmx_peer_t *peer = NULL;
2122 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2124 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2125 payload_nob, payload_niov, libcfs_id2str(target));
2127 LASSERT (payload_nob == 0 || payload_niov > 0);
2128 LASSERT (payload_niov <= LNET_MAX_IOV);
2129 /* payload is either all vaddrs or all pages */
2130 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2132 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2134 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2135 * to a new peer, so create one if not found */
2136 peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2137 if (peer == NULL || peer->mxp_conn == NULL) {
2138 /* we could not find it nor could we create one or
2139 * one exists but we cannot create a conn,
2140 * fail this message */
2142 /* found peer without conn, drop ref taken above */
2143 LASSERT(peer->mxp_conn == NULL);
2144 mxlnd_peer_decref(peer);
2149 /* we have a peer with a conn */
2151 if (unlikely(peer->mxp_incompatible)) {
2152 mxlnd_peer_decref(peer); /* drop ref taken above */
2154 cfs_read_lock(g_lock);
2155 conn = peer->mxp_conn;
2156 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2157 mxlnd_conn_addref(conn);
2161 cfs_read_unlock(g_lock);
2162 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2167 LASSERT(peer && conn);
2169 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2173 LASSERT (payload_nob == 0);
2176 case LNET_MSG_REPLY:
2178 /* Is the payload small enough not to need DATA? */
2179 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2180 if (nob <= MXLND_MSG_SIZE)
2181 break; /* send EAGER */
2183 tx = mxlnd_get_idle_tx();
2184 if (unlikely(tx == NULL)) {
2185 CNETERR("Can't allocate %s tx for %s\n",
2186 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2187 libcfs_nid2str(nid));
2188 if (conn) mxlnd_conn_decref(conn);
2192 tx->mxc_peer = peer;
2193 tx->mxc_conn = conn;
2194 /* we added a conn ref above */
2195 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2196 txmsg = tx->mxc_msg;
2197 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2198 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2199 tx->mxc_match = mxlnd_create_match(tx, 0);
2201 /* we must post a receive _before_ sending the request.
2202 * we need to determine how much to receive, it will be either
2203 * a put_ack or a put_nak. The put_ack is larger, so use it. */
2205 rx = mxlnd_get_idle_rx(conn);
2206 if (unlikely(rx == NULL)) {
2207 CNETERR("Can't allocate rx for PUT_ACK for %s\n",
2208 libcfs_nid2str(nid));
2209 mxlnd_put_idle_tx(tx);
2210 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2214 rx->mxc_peer = peer;
2215 mxlnd_conn_addref(conn); /* for this rx */
2216 rx->mxc_conn = conn;
2217 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2218 rx->mxc_cookie = tx->mxc_cookie;
2219 rx->mxc_match = mxlnd_create_match(rx, 0);
2221 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2222 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2223 if (unlikely(ret != 0)) {
2224 CNETERR("recv_msg() failed for PUT_ACK for %s\n",
2225 libcfs_nid2str(nid));
2226 rx->mxc_lntmsg[0] = NULL;
2227 mxlnd_put_idle_rx(rx);
2228 mxlnd_put_idle_tx(tx);
2229 mxlnd_conn_decref(conn); /* for the rx... */
2230 mxlnd_conn_decref(conn); /* and for the tx */
2231 return -EHOSTUNREACH;
2238 if (routing || target_is_router)
2239 break; /* send EAGER */
2241 /* is the REPLY message too small for DATA? */
2242 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2243 if (nob <= MXLND_MSG_SIZE)
2244 break; /* send EAGER */
2246 /* get tx (we need the cookie) , post rx for incoming DATA,
2247 * then post GET_REQ tx */
2248 tx = mxlnd_get_idle_tx();
2249 if (unlikely(tx == NULL)) {
2250 CNETERR("Can't allocate GET tx for %s\n",
2251 libcfs_nid2str(nid));
2252 mxlnd_conn_decref(conn); /* for the ref taken above */
2255 rx_data = mxlnd_get_idle_rx(conn);
2256 if (unlikely(rx_data == NULL)) {
2257 CNETERR("Can't allocate DATA rx for %s\n",
2258 libcfs_nid2str(nid));
2259 mxlnd_put_idle_tx(tx);
2260 mxlnd_conn_decref(conn); /* for the ref taken above */
2263 rx_data->mxc_peer = peer;
2264 /* NOTE no need to lock peer before adding conn ref since we took
2265 * a conn ref for the tx (it cannot be freed between there and here ) */
2266 mxlnd_conn_addref(conn); /* for the rx_data */
2267 rx_data->mxc_conn = conn;
2269 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2270 if (unlikely(ret != 0)) {
2271 CNETERR("Can't setup GET sink for %s\n",
2272 libcfs_nid2str(nid));
2273 mxlnd_put_idle_rx(rx_data);
2274 mxlnd_put_idle_tx(tx);
2275 mxlnd_conn_decref(conn); /* for the rx_data... */
2276 mxlnd_conn_decref(conn); /* and for the tx */
2280 tx->mxc_peer = peer;
2281 tx->mxc_conn = conn;
2282 /* conn ref taken above */
2283 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2284 txmsg = tx->mxc_msg;
2285 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2286 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2287 tx->mxc_match = mxlnd_create_match(tx, 0);
2294 mxlnd_conn_decref(conn); /* drop ref taken above */
2300 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2303 tx = mxlnd_get_idle_tx();
2304 if (unlikely(tx == NULL)) {
2305 CNETERR("Can't send %s to %s: tx descs exhausted\n",
2306 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2307 mxlnd_conn_decref(conn); /* drop ref taken above */
2311 tx->mxc_peer = peer;
2312 tx->mxc_conn = conn;
2313 /* conn ref taken above */
2314 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2315 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2316 tx->mxc_match = mxlnd_create_match(tx, 0);
2318 txmsg = tx->mxc_msg;
2319 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2321 if (payload_kiov != NULL)
2322 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2323 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2324 payload_niov, payload_kiov, payload_offset, payload_nob);
2326 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2327 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2328 payload_niov, payload_iov, payload_offset, payload_nob);
2330 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2336 * mxlnd_recv - the LND required recv function
2347 * This must not block.
2350 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2351 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2352 unsigned int offset, unsigned int mlen, unsigned int rlen)
2357 kmx_ctx_t *rx = private;
2358 kmx_msg_t *rxmsg = rx->mxc_msg;
2359 lnet_nid_t nid = rx->mxc_nid;
2360 kmx_ctx_t *tx = NULL;
2361 kmx_msg_t *txmsg = NULL;
2362 kmx_peer_t *peer = rx->mxc_peer;
2363 kmx_conn_t *conn = peer->mxp_conn;
2365 int msg_type = rxmsg->mxm_type;
2370 LASSERT (mlen <= rlen);
2371 /* Either all pages or all vaddrs */
2372 LASSERT (!(kiov != NULL && iov != NULL));
2373 LASSERT (peer && conn);
2375 /* conn_addref(conn) already taken for the primary rx */
2378 case MXLND_MSG_EAGER:
2379 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2380 len = rx->mxc_status.xfer_length;
2381 if (unlikely(nob > len)) {
2382 CNETERR("Eager message from %s too big: %d(%d)\n",
2383 libcfs_nid2str(nid), nob, len);
2389 lnet_copy_flat2kiov(niov, kiov, offset,
2390 MXLND_MSG_SIZE, rxmsg,
2391 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2394 lnet_copy_flat2iov(niov, iov, offset,
2395 MXLND_MSG_SIZE, rxmsg,
2396 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2402 case MXLND_MSG_PUT_REQ:
2403 /* we are going to reuse the rx, store the needed info */
2404 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2406 /* get tx, post rx, send PUT_ACK */
2408 tx = mxlnd_get_idle_tx();
2409 if (unlikely(tx == NULL)) {
2410 CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
2411 /* Not replying will break the connection */
2415 if (unlikely(mlen == 0)) {
2417 tx->mxc_peer = peer;
2418 tx->mxc_conn = conn;
2419 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2424 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2425 tx->mxc_peer = peer;
2426 tx->mxc_conn = conn;
2427 /* no need to lock peer first since we already have a ref */
2428 mxlnd_conn_addref(conn); /* for the tx */
2429 txmsg = tx->mxc_msg;
2430 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2431 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2432 tx->mxc_cookie = cookie;
2433 tx->mxc_match = mxlnd_create_match(tx, 0);
2435 /* we must post a receive _before_ sending the PUT_ACK */
2437 rx->mxc_state = MXLND_CTX_PREP;
2438 rx->mxc_peer = peer;
2439 rx->mxc_conn = conn;
2440 /* do not take another ref for this rx, it is already taken */
2441 rx->mxc_nid = peer->mxp_nid;
2442 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2443 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2445 if (unlikely(ret != 0)) {
2446 /* Notify peer that it's over */
2447 CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
2448 libcfs_nid2str(nid), ret);
2450 tx->mxc_state = MXLND_CTX_PREP;
2451 tx->mxc_peer = peer;
2452 tx->mxc_conn = conn;
2453 /* finalize = 0, let the PUT_ACK tx finalize this */
2454 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2455 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2456 /* conn ref already taken above */
2457 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2463 /* do not return a credit until after PUT_DATA returns */
2467 case MXLND_MSG_GET_REQ:
2468 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2470 if (likely(lntmsg != NULL)) {
2471 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2474 /* GET didn't match anything */
2475 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2476 * We have to embed the error code in the match bits.
2477 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2478 tx = mxlnd_get_idle_tx();
2479 if (unlikely(tx == NULL)) {
2480 CNETERR("Can't get tx for GET NAK for %s\n",
2481 libcfs_nid2str(nid));
2482 /* we can't get a tx, notify the peer that the GET failed */
2483 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2488 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2489 tx->mxc_state = MXLND_CTX_PENDING;
2491 tx->mxc_peer = peer;
2492 tx->mxc_conn = conn;
2493 /* no need to lock peer first since we already have a ref */
2494 mxlnd_conn_addref(conn); /* for this tx */
2495 tx->mxc_cookie = cookie;
2496 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2497 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2500 /* finalize lntmsg after tx completes */
2508 /* we received a message, increment peer's outstanding credits */
2510 cfs_spin_lock(&conn->mxk_lock);
2511 conn->mxk_outstanding++;
2512 cfs_spin_unlock(&conn->mxk_lock);
2514 /* we are done with the rx */
2515 mxlnd_put_idle_rx(rx);
2516 mxlnd_conn_decref(conn);
2519 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2521 /* we received a credit, see if we can use it to send a msg */
2522 if (credit) mxlnd_check_sends(peer);
2528 mxlnd_sleep(unsigned long timeout)
2530 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2531 cfs_schedule_timeout(timeout);
2536 * mxlnd_tx_queued - the generic send queue thread
2537 * @arg - thread id (as a void *)
2539 * This thread moves send messages from the global tx_queue to the owning
2540 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2541 * it to the global peer list.
2544 mxlnd_tx_queued(void *arg)
2546 long id = (long) arg;
2549 kmx_ctx_t *tx = NULL;
2550 kmx_peer_t *peer = NULL;
2551 cfs_list_t *queue = &kmxlnd_data.kmx_tx_queue;
2552 cfs_spinlock_t *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2553 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
2555 cfs_daemonize("mxlnd_tx_queued");
2557 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
2558 ret = cfs_down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2559 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
2561 if (ret != 0) // Should we check for -EINTR?
2563 cfs_spin_lock(tx_q_lock);
2564 if (cfs_list_empty (&kmxlnd_data.kmx_tx_queue)) {
2565 cfs_spin_unlock(tx_q_lock);
2568 tx = cfs_list_entry (queue->next, kmx_ctx_t, mxc_list);
2569 cfs_list_del_init(&tx->mxc_list);
2570 cfs_spin_unlock(tx_q_lock);
2573 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2575 tx->mxc_peer = peer;
2576 cfs_write_lock(g_lock);
2577 if (peer->mxp_conn == NULL) {
2578 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2580 /* out of memory, give up and fail tx */
2581 tx->mxc_errno = -ENOMEM;
2582 mxlnd_peer_decref(peer);
2583 cfs_write_unlock(g_lock);
2584 mxlnd_put_idle_tx(tx);
2588 tx->mxc_conn = peer->mxp_conn;
2589 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2590 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2591 cfs_write_unlock(g_lock);
2597 kmx_peer_t *peer = NULL;
2598 kmx_peer_t *old = NULL;
2600 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2602 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2603 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2605 /* adds conn ref for this function */
2606 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2607 *kmxlnd_tunables.kmx_board,
2608 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2610 /* finalize message */
2611 tx->mxc_errno = ret;
2612 mxlnd_put_idle_tx(tx);
2615 tx->mxc_peer = peer;
2616 tx->mxc_conn = peer->mxp_conn;
2617 /* this tx will keep the conn ref taken in peer_alloc() */
2619 /* add peer to global peer list, but look to see
2620 * if someone already created it after we released
2622 cfs_write_lock(g_lock);
2623 old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2625 /* we have a peer ref on old */
2626 if (old->mxp_conn) {
2630 /* drop our ref taken above... */
2631 mxlnd_peer_decref(old);
2633 mxlnd_del_peer_locked(old);
2638 cfs_list_add_tail(&peer->mxp_list,
2639 &kmxlnd_data.kmx_peers[hash]);
2640 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
2643 tx->mxc_conn = old->mxp_conn;
2644 LASSERT(old->mxp_conn != NULL);
2645 mxlnd_conn_addref(old->mxp_conn);
2646 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2647 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2648 mxlnd_peer_decref(peer);
2650 cfs_write_unlock(g_lock);
2655 mxlnd_thread_stop(id);
2659 /* When calling this, we must not have the peer lock. */
2661 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2663 mx_return_t mxret = MX_SUCCESS;
2664 mx_request_t request;
2665 kmx_conn_t *conn = peer->mxp_conn;
2666 u64 match = ((u64) msg_type) << MXLND_MSG_OFFSET;
2668 /* NOTE we are holding a conn ref every time we call this function,
2669 * we do not need to lock the peer before taking another ref */
2670 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2672 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2674 if (peer->mxp_reconnect_time == 0) {
2675 peer->mxp_reconnect_time = jiffies;
2678 if (peer->mxp_nic_id == 0ULL) {
2681 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2682 &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2684 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2686 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2687 /* not mapped yet, return */
2688 cfs_spin_lock(&conn->mxk_lock);
2689 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2690 cfs_spin_unlock(&conn->mxk_lock);
2694 if (cfs_time_after(jiffies,
2695 peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2696 conn->mxk_status != MXLND_CONN_DISCONNECT) {
2697 /* give up and notify LNET */
2698 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2699 libcfs_nid2str(peer->mxp_nid));
2700 mxlnd_conn_disconnect(conn, 0, 0);
2701 mxlnd_conn_decref(conn);
2705 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2706 peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2707 (void *) peer, &request);
2708 if (unlikely(mxret != MX_SUCCESS)) {
2709 cfs_spin_lock(&conn->mxk_lock);
2710 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2711 cfs_spin_unlock(&conn->mxk_lock);
2712 CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
2713 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2714 mxlnd_conn_decref(conn);
2716 mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2717 MXLND_CONNECT_TIMEOUT/CFS_HZ*1000);
2721 #define MXLND_STATS 0
2724 mxlnd_check_sends(kmx_peer_t *peer)
2728 mx_return_t mxret = MX_SUCCESS;
2729 kmx_ctx_t *tx = NULL;
2730 kmx_conn_t *conn = NULL;
2737 static unsigned long last = 0;
2740 if (unlikely(peer == NULL)) {
2741 LASSERT(peer != NULL);
2744 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
2745 conn = peer->mxp_conn;
2746 /* NOTE take a ref for the duration of this function since it is called
2747 * when there might not be any queued txs for this peer */
2749 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2750 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2753 mxlnd_conn_addref(conn); /* for duration of this function */
2755 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2757 /* do not add another ref for this tx */
2760 /* we do not have any conns */
2761 CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2766 if (cfs_time_after(jiffies, last)) {
2767 last = jiffies + CFS_HZ;
2768 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2769 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2770 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2771 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2772 conn->mxk_ntx_data, conn->mxk_data_posted);
2776 cfs_spin_lock(&conn->mxk_lock);
2777 ntx_posted = conn->mxk_ntx_posted;
2778 credits = conn->mxk_credits;
2780 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2781 LASSERT(ntx_posted >= 0);
2783 LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2784 LASSERT(credits >= 0);
2786 /* check number of queued msgs, ignore data */
2787 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2788 /* check if any txs queued that could return credits... */
2789 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2790 conn->mxk_ntx_msgs == 0) {
2791 /* if not, send a NOOP */
2792 tx = mxlnd_get_idle_tx();
2793 if (likely(tx != NULL)) {
2794 tx->mxc_peer = peer;
2795 tx->mxc_conn = peer->mxp_conn;
2796 mxlnd_conn_addref(conn); /* for this tx */
2797 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2798 tx->mxc_match = mxlnd_create_match(tx, 0);
2799 mxlnd_peer_queue_tx_locked(tx);
2806 /* if the peer is not ready, try to connect */
2807 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2808 conn->mxk_status == MXLND_CONN_FAIL)) {
2809 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2810 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2811 cfs_spin_unlock(&conn->mxk_lock);
2812 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2816 while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2817 !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2818 /* We have something to send. If we have a queued tx that does not
2819 * require a credit (free), choose it since its completion will
2820 * return a credit (here or at the peer), complete a DATA or
2821 * CONN_REQ or CONN_ACK. */
2822 cfs_list_t *tmp_tx = NULL;
2823 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2824 tmp_tx = &conn->mxk_tx_free_queue;
2826 tmp_tx = &conn->mxk_tx_credit_queue;
2828 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2830 msg_type = tx->mxc_msg_type;
2832 /* don't try to send a rx */
2833 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2835 /* ensure that it is a valid msg type */
2836 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2837 msg_type == MXLND_MSG_CONN_ACK ||
2838 msg_type == MXLND_MSG_NOOP ||
2839 msg_type == MXLND_MSG_EAGER ||
2840 msg_type == MXLND_MSG_PUT_REQ ||
2841 msg_type == MXLND_MSG_PUT_ACK ||
2842 msg_type == MXLND_MSG_PUT_DATA ||
2843 msg_type == MXLND_MSG_GET_REQ ||
2844 msg_type == MXLND_MSG_GET_DATA);
2845 LASSERT(tx->mxc_peer == peer);
2846 LASSERT(tx->mxc_nid == peer->mxp_nid);
2848 credit = mxlnd_tx_requires_credit(tx);
2851 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2852 CDEBUG(D_NET, "%s: posted enough\n",
2853 libcfs_nid2str(peer->mxp_nid));
2857 if (conn->mxk_credits == 0) {
2858 CDEBUG(D_NET, "%s: no credits\n",
2859 libcfs_nid2str(peer->mxp_nid));
2863 if (conn->mxk_credits == 1 && /* last credit reserved for */
2864 conn->mxk_outstanding == 0) { /* giving back credits */
2865 CDEBUG(D_NET, "%s: not using last credit\n",
2866 libcfs_nid2str(peer->mxp_nid));
2871 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2872 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2873 msg_type == MXLND_MSG_CONN_ACK)) {
2874 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2875 mxlnd_connstatus_to_str(conn->mxk_status),
2877 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2878 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2879 cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2880 cfs_list_del_init(&tx->mxc_list);
2881 tx->mxc_errno = -ECONNABORTED;
2882 cfs_spin_unlock(&conn->mxk_lock);
2883 mxlnd_put_idle_tx(tx);
2884 mxlnd_conn_decref(conn);
2891 cfs_list_del_init(&tx->mxc_list);
2893 /* handle credits, etc now while we have the lock to avoid races */
2895 conn->mxk_credits--;
2896 conn->mxk_ntx_posted++;
2898 if (msg_type != MXLND_MSG_PUT_DATA &&
2899 msg_type != MXLND_MSG_GET_DATA) {
2900 if (msg_type != MXLND_MSG_CONN_REQ &&
2901 msg_type != MXLND_MSG_CONN_ACK) {
2902 conn->mxk_ntx_msgs--;
2905 if (tx->mxc_incarnation == 0 &&
2906 conn->mxk_incarnation != 0) {
2907 tx->mxc_incarnation = conn->mxk_incarnation;
2910 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2911 * or (2) there is a non-DATA msg that can return credits in the
2912 * queue, then drop this duplicate NOOP */
2913 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2914 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2915 (conn->mxk_ntx_msgs >= 1)) {
2916 conn->mxk_credits++;
2917 conn->mxk_ntx_posted--;
2918 cfs_spin_unlock(&conn->mxk_lock);
2919 /* redundant NOOP */
2920 mxlnd_put_idle_tx(tx);
2921 mxlnd_conn_decref(conn);
2922 CDEBUG(D_NET, "%s: redundant noop\n",
2923 libcfs_nid2str(peer->mxp_nid));
2930 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2931 (msg_type != MXLND_MSG_GET_DATA))) {
2932 mxlnd_pack_msg_locked(tx);
2937 status = conn->mxk_status;
2938 cfs_spin_unlock(&conn->mxk_lock);
2940 if (likely((status == MXLND_CONN_READY) ||
2941 (msg_type == MXLND_MSG_CONN_REQ) ||
2942 (msg_type == MXLND_MSG_CONN_ACK))) {
2944 if (msg_type != MXLND_MSG_CONN_REQ &&
2945 msg_type != MXLND_MSG_CONN_ACK) {
2946 /* add to the pending list */
2947 ret = mxlnd_q_pending_ctx(tx);
2950 tx->mxc_state = MXLND_CTX_PENDING;
2954 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2955 msg_type != MXLND_MSG_GET_DATA)) {
2956 /* send a msg style tx */
2957 LASSERT(tx->mxc_nseg == 1);
2958 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2959 CDEBUG(D_NET, "sending %s 0x%llx\n",
2960 mxlnd_msgtype_to_str(msg_type),
2962 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2971 /* send a DATA tx */
2972 cfs_spin_lock(&conn->mxk_lock);
2973 conn->mxk_ntx_data--;
2974 conn->mxk_data_posted++;
2975 cfs_spin_unlock(&conn->mxk_lock);
2976 CDEBUG(D_NET, "sending %s 0x%llx\n",
2977 mxlnd_msgtype_to_str(msg_type),
2979 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2990 mxret = MX_CONNECTION_FAILED;
2992 if (likely(mxret == MX_SUCCESS)) {
2995 CNETERR("mx_kisend() failed with %s (%d) "
2996 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2997 libcfs_nid2str(peer->mxp_nid));
2998 /* NOTE mx_kisend() only fails if there are not enough
2999 * resources. Do not change the connection status. */
3000 if (mxret == MX_NO_RESOURCES) {
3001 tx->mxc_errno = -ENOMEM;
3003 tx->mxc_errno = -ECONNABORTED;
3006 cfs_spin_lock(&conn->mxk_lock);
3007 conn->mxk_ntx_posted--;
3008 conn->mxk_credits++;
3009 cfs_spin_unlock(&conn->mxk_lock);
3010 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3011 msg_type == MXLND_MSG_GET_DATA) {
3012 cfs_spin_lock(&conn->mxk_lock);
3013 conn->mxk_data_posted--;
3014 cfs_spin_unlock(&conn->mxk_lock);
3016 if (msg_type != MXLND_MSG_PUT_DATA &&
3017 msg_type != MXLND_MSG_GET_DATA &&
3018 msg_type != MXLND_MSG_CONN_REQ &&
3019 msg_type != MXLND_MSG_CONN_ACK) {
3020 cfs_spin_lock(&conn->mxk_lock);
3021 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3022 cfs_spin_unlock(&conn->mxk_lock);
3024 if (msg_type != MXLND_MSG_CONN_REQ &&
3025 msg_type != MXLND_MSG_CONN_ACK) {
3026 /* remove from the pending list */
3027 mxlnd_deq_pending_ctx(tx);
3029 mxlnd_put_idle_tx(tx);
3030 mxlnd_conn_decref(conn);
3033 cfs_spin_lock(&conn->mxk_lock);
3036 cfs_spin_unlock(&conn->mxk_lock);
3038 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3044 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3045 * @ctx - the tx descriptor
3047 * Determine which type of send request it was and start the next step, if needed,
3048 * or, if done, signal completion to LNET. After we are done, put back on the
3052 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3054 int code = tx->mxc_status.code;
3055 int failed = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3056 kmx_msg_t *msg = tx->mxc_msg;
3057 kmx_peer_t *peer = tx->mxc_peer;
3058 kmx_conn_t *conn = tx->mxc_conn;
3059 u8 type = tx->mxc_msg_type;
3060 int credit = mxlnd_tx_requires_credit(tx);
3061 u64 cookie = tx->mxc_cookie;
3063 CDEBUG(D_NET, "entering %s (0x%llx):\n",
3064 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3066 LASSERT (peer != NULL);
3067 LASSERT (conn != NULL);
3069 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3070 LASSERT (type == msg->mxm_type);
3074 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3076 cfs_spin_lock(&conn->mxk_lock);
3077 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3078 cfs_spin_unlock(&conn->mxk_lock);
3083 case MXLND_MSG_GET_DATA:
3084 cfs_spin_lock(&conn->mxk_lock);
3085 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3086 conn->mxk_outstanding++;
3087 conn->mxk_data_posted--;
3089 cfs_spin_unlock(&conn->mxk_lock);
3092 case MXLND_MSG_PUT_DATA:
3093 cfs_spin_lock(&conn->mxk_lock);
3094 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3095 conn->mxk_data_posted--;
3097 cfs_spin_unlock(&conn->mxk_lock);
3100 case MXLND_MSG_NOOP:
3101 case MXLND_MSG_PUT_REQ:
3102 case MXLND_MSG_PUT_ACK:
3103 case MXLND_MSG_GET_REQ:
3104 case MXLND_MSG_EAGER:
3107 case MXLND_MSG_CONN_ACK:
3108 if (peer->mxp_incompatible) {
3109 /* we sent our params, now close this conn */
3110 mxlnd_conn_disconnect(conn, 0, 1);
3112 case MXLND_MSG_CONN_REQ:
3114 CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
3115 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3116 mx_strstatus(code), code, tx->mxc_errno,
3117 libcfs_nid2str(tx->mxc_nid));
3118 if (!peer->mxp_incompatible) {
3119 cfs_spin_lock(&conn->mxk_lock);
3120 if (code == MX_STATUS_BAD_SESSION)
3121 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3123 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3124 cfs_spin_unlock(&conn->mxk_lock);
3130 CNETERR("Unknown msg type of %d\n", type);
3135 cfs_spin_lock(&conn->mxk_lock);
3136 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3137 conn->mxk_ntx_posted--;
3139 cfs_spin_unlock(&conn->mxk_lock);
3142 mxlnd_put_idle_tx(tx);
3143 mxlnd_conn_decref(conn);
3145 mxlnd_check_sends(peer);
3147 CDEBUG(D_NET, "leaving\n");
3151 /* Handle completion of MSG or DATA rx.
3152 * CONN_REQ and CONN_ACK are handled elsewhere. */
3154 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3159 u32 nob = rx->mxc_status.xfer_length;
3160 u64 bits = rx->mxc_status.match_info;
3161 kmx_msg_t *msg = rx->mxc_msg;
3162 kmx_peer_t *peer = rx->mxc_peer;
3163 kmx_conn_t *conn = rx->mxc_conn;
3164 u8 type = rx->mxc_msg_type;
3166 lnet_msg_t *lntmsg[2];
3171 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3172 * failed GET reply */
3174 /* NOTE peer may still be NULL if it is a new peer and
3175 * conn may be NULL if this is a re-connect */
3176 if (likely(peer != NULL && conn != NULL)) {
3177 /* we have a reference on the conn */
3179 } else if (peer != NULL && conn == NULL) {
3180 /* we have a reference on the peer */
3182 } else if (peer == NULL && conn != NULL) {
3184 CERROR("rx 0x%llx from %s has conn but no peer\n",
3185 bits, libcfs_nid2str(rx->mxc_nid));
3187 } /* else peer and conn == NULL */
3189 if (conn == NULL && peer != NULL) {
3190 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3191 conn = peer->mxp_conn;
3193 mxlnd_conn_addref(conn); /* conn takes ref... */
3194 mxlnd_peer_decref(peer); /* from peer */
3198 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3199 rx->mxc_conn = conn;
3203 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3209 if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3210 rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3211 CNETERR("rx from %s failed with %s (%d)\n",
3212 libcfs_nid2str(rx->mxc_nid),
3213 mx_strstatus(rx->mxc_status.code),
3214 rx->mxc_status.code);
3220 /* this may be a failed GET reply */
3221 if (type == MXLND_MSG_GET_DATA) {
3222 /* get the error (52-59) bits from the match bits */
3223 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3224 lntmsg[0] = rx->mxc_lntmsg[0];
3228 /* we had a rx complete with 0 bytes (no hdr, nothing) */
3229 CNETERR("rx from %s returned with 0 bytes\n",
3230 libcfs_nid2str(rx->mxc_nid));
3235 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3236 if (type == MXLND_MSG_PUT_DATA) {
3238 lntmsg[0] = rx->mxc_lntmsg[0];
3240 } else if (type == MXLND_MSG_GET_DATA) {
3242 lntmsg[0] = rx->mxc_lntmsg[0];
3243 lntmsg[1] = rx->mxc_lntmsg[1];
3247 ret = mxlnd_unpack_msg(msg, nob);
3249 CNETERR("Error %d unpacking rx from %s\n",
3250 ret, libcfs_nid2str(rx->mxc_nid));
3254 type = msg->mxm_type;
3256 if (rx->mxc_nid != msg->mxm_srcnid ||
3257 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3258 CNETERR("rx with mismatched NID (type %s) (my nid is "
3259 "0x%llx and rx msg dst is 0x%llx)\n",
3260 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3265 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3266 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3267 CNETERR("Stale rx from %s with type %s "
3268 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3269 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3270 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3271 msg->mxm_srcstamp, conn->mxk_incarnation,
3272 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3277 CDEBUG(D_NET, "Received %s with %d credits\n",
3278 mxlnd_msgtype_to_str(type), msg->mxm_credits);
3280 LASSERT(peer != NULL && conn != NULL);
3281 if (msg->mxm_credits != 0) {
3282 cfs_spin_lock(&conn->mxk_lock);
3283 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3284 if ((conn->mxk_credits + msg->mxm_credits) >
3285 *kmxlnd_tunables.kmx_peercredits) {
3286 CNETERR("mxk_credits %d mxm_credits %d\n",
3287 conn->mxk_credits, msg->mxm_credits);
3289 conn->mxk_credits += msg->mxm_credits;
3290 LASSERT(conn->mxk_credits >= 0);
3291 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3293 cfs_spin_unlock(&conn->mxk_lock);
3296 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3298 case MXLND_MSG_NOOP:
3301 case MXLND_MSG_EAGER:
3302 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3303 msg->mxm_srcnid, rx, 0);
3307 case MXLND_MSG_PUT_REQ:
3308 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3309 msg->mxm_srcnid, rx, 1);
3313 case MXLND_MSG_PUT_ACK: {
3314 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3315 if (cookie > MXLND_MAX_COOKIE) {
3316 CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3317 libcfs_nid2str(rx->mxc_nid));
3318 result = -((u32) MXLND_ERROR_VAL(cookie));
3319 lntmsg[0] = rx->mxc_lntmsg[0];
3321 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3322 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3323 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3328 case MXLND_MSG_GET_REQ:
3329 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3330 msg->mxm_srcnid, rx, 1);
3335 CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
3336 libcfs_nid2str(rx->mxc_nid));
3342 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3343 cfs_spin_lock(&conn->mxk_lock);
3344 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3345 cfs_spin_unlock(&conn->mxk_lock);
3350 cfs_spin_lock(&conn->mxk_lock);
3351 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3352 cfs_spin_unlock(&conn->mxk_lock);
3356 /* lnet_parse() failed, etc., repost now */
3357 mxlnd_put_idle_rx(rx);
3358 if (conn != NULL && credit == 1) {
3359 if (type == MXLND_MSG_PUT_DATA ||
3360 type == MXLND_MSG_EAGER ||
3361 type == MXLND_MSG_PUT_REQ ||
3362 type == MXLND_MSG_NOOP) {
3363 cfs_spin_lock(&conn->mxk_lock);
3364 conn->mxk_outstanding++;
3365 cfs_spin_unlock(&conn->mxk_lock);
3368 if (conn_ref) mxlnd_conn_decref(conn);
3369 LASSERT(peer_ref == 0);
3372 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3373 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3375 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3378 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3379 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3381 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3387 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3389 kmx_ctx_t *tx = NULL;
3390 kmx_msg_t *txmsg = NULL;
3391 kmx_conn_t *conn = peer->mxp_conn;
3395 u8 type = (msg_type == MXLND_MSG_ICON_REQ ?
3396 MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3398 /* a conn ref was taken when calling mx_iconnect(),
3399 * hold it until CONN_REQ or CONN_ACK completes */
3401 CDEBUG(D_NET, "entering\n");
3402 if (status.code != MX_STATUS_SUCCESS) {
3403 int send_bye = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3405 CNETERR("mx_iconnect() failed for %s with %s (%d) "
3406 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3407 mxlnd_msgtype_to_str(msg_type),
3408 mx_strstatus(status.code), status.code,
3409 libcfs_nid2str(peer->mxp_nid),
3413 cfs_spin_lock(&conn->mxk_lock);
3414 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3415 cfs_spin_unlock(&conn->mxk_lock);
3417 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3418 MXLND_CONNECT_TIMEOUT)) {
3419 CNETERR("timeout, calling conn_disconnect()\n");
3420 mxlnd_conn_disconnect(conn, 0, send_bye);
3423 mxlnd_conn_decref(conn);
3426 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3427 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3428 cfs_spin_lock(&conn->mxk_lock);
3429 conn->mxk_epa = status.source;
3430 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3431 if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3432 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3434 cfs_spin_unlock(&conn->mxk_lock);
3435 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3437 /* mx_iconnect() succeeded, reset delay to 0 */
3438 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3439 peer->mxp_reconnect_time = 0;
3440 peer->mxp_conn->mxk_sid = sid;
3441 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3443 /* marshal CONN_REQ or CONN_ACK msg */
3444 /* we are still using the conn ref from iconnect() - do not take another */
3445 tx = mxlnd_get_idle_tx();
3447 CNETERR("Can't obtain %s tx for %s\n",
3448 mxlnd_msgtype_to_str(type),
3449 libcfs_nid2str(peer->mxp_nid));
3450 cfs_spin_lock(&conn->mxk_lock);
3451 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3452 cfs_spin_unlock(&conn->mxk_lock);
3453 mxlnd_conn_decref(conn);
3457 tx->mxc_peer = peer;
3458 tx->mxc_conn = conn;
3459 tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3460 CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3461 mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3462 txmsg = tx->mxc_msg;
3463 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3464 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3465 tx->mxc_match = mxlnd_create_match(tx, 0);
3472 * mxlnd_request_waitd - the MX request completion thread(s)
3473 * @arg - thread id (as a void *)
3475 * This thread waits for a MX completion and then completes the request.
3476 * We will create one thread per CPU.
3479 mxlnd_request_waitd(void *arg)
3481 long id = (long) arg;
3484 mx_return_t mxret = MX_SUCCESS;
3486 kmx_ctx_t *ctx = NULL;
3487 enum kmx_req_state req_type = MXLND_REQ_TX;
3488 kmx_peer_t *peer = NULL;
3489 kmx_conn_t *conn = NULL;
3494 memset(name, 0, sizeof(name));
3495 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3496 cfs_daemonize(name);
3498 memset(&status, 0, sizeof(status));
3500 CDEBUG(D_NET, "%s starting\n", name);
3502 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3508 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3509 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3513 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3514 0ULL, 0ULL, &status, &result);
3517 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3518 0ULL, 0ULL, &status, &result);
3520 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
3524 /* nothing completed... */
3528 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3529 "match_info 0x%llx and length %d\n",
3530 mx_strstatus(status.code), status.code,
3531 (u64) status.match_info, status.msg_length);
3533 if (status.code != MX_STATUS_SUCCESS) {
3534 CNETERR("wait_any() failed with %s (%d) with "
3535 "match_info 0x%llx and length %d\n",
3536 mx_strstatus(status.code), status.code,
3537 (u64) status.match_info, status.msg_length);
3540 msg_type = MXLND_MSG_TYPE(status.match_info);
3542 /* This may be a mx_iconnect() request completing,
3543 * check the bit mask for CONN_REQ and CONN_ACK */
3544 if (msg_type == MXLND_MSG_ICON_REQ ||
3545 msg_type == MXLND_MSG_ICON_ACK) {
3546 peer = (kmx_peer_t*) status.context;
3547 mxlnd_handle_connect_msg(peer, msg_type, status);
3551 /* This must be a tx or rx */
3553 /* NOTE: if this is a RX from the unexpected callback, it may
3554 * have very little info. If we dropped it in unexpected_recv(),
3555 * it will not have a context. If so, ignore it. */
3556 ctx = (kmx_ctx_t *) status.context;
3559 req_type = ctx->mxc_type;
3560 conn = ctx->mxc_conn; /* this may be NULL */
3561 mxlnd_deq_pending_ctx(ctx);
3563 /* copy status to ctx->mxc_status */
3564 ctx->mxc_status = status;
3568 mxlnd_handle_tx_completion(ctx);
3571 mxlnd_handle_rx_completion(ctx);
3574 CNETERR("Unknown ctx type %d\n", req_type);
3579 /* conn is always set except for the first CONN_REQ rx
3580 * from a new peer */
3581 if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3582 mxlnd_conn_disconnect(conn, 1, 1);
3585 CDEBUG(D_NET, "waitd() completed task\n");
3587 CDEBUG(D_NET, "%s stopping\n", name);
3588 mxlnd_thread_stop(id);
3594 mxlnd_check_timeouts(unsigned long now)
3598 unsigned long next = 0; /* jiffies */
3599 kmx_peer_t *peer = NULL;
3600 kmx_conn_t *conn = NULL;
3601 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3603 cfs_read_lock(g_lock);
3604 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3605 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3608 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3609 cfs_read_unlock(g_lock);
3613 conn = peer->mxp_conn;
3615 mxlnd_conn_addref(conn);
3620 cfs_spin_lock(&conn->mxk_lock);
3622 /* if nothing pending (timeout == 0) or
3623 * if conn is already disconnected,
3625 if (conn->mxk_timeout == 0 ||
3626 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3627 cfs_spin_unlock(&conn->mxk_lock);
3628 mxlnd_conn_decref(conn);
3632 /* we want to find the timeout that will occur first.
3633 * if it is in the future, we will sleep until then.
3634 * if it is in the past, then we will sleep one
3635 * second and repeat the process. */
3637 (cfs_time_before(conn->mxk_timeout, next))) {
3638 next = conn->mxk_timeout;
3643 if (cfs_time_aftereq(now, conn->mxk_timeout)) {
3646 cfs_spin_unlock(&conn->mxk_lock);
3649 mxlnd_conn_disconnect(conn, 1, 1);
3651 mxlnd_conn_decref(conn);
3654 cfs_read_unlock(g_lock);
3655 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3661 mxlnd_passive_connect(kmx_connparams_t *cp)
3664 int incompatible = 0;
3669 kmx_msg_t *msg = &cp->mxr_msg;
3670 kmx_peer_t *peer = cp->mxr_peer;
3671 kmx_conn_t *conn = NULL;
3672 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3674 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3676 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3679 CNETERR("Error %d unpacking CONN_REQ from %s\n",
3680 ret, libcfs_nid2str(peer->mxp_nid));
3682 CNETERR("Error %d unpacking CONN_REQ from "
3683 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3687 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3688 CNETERR("Can't accept %s: bad dst nid %s\n",
3689 libcfs_nid2str(msg->mxm_srcnid),
3690 libcfs_nid2str(msg->mxm_dstnid));
3693 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3694 CNETERR("Can't accept %s: incompatible queue depth "
3696 libcfs_nid2str(msg->mxm_srcnid),
3697 msg->mxm_u.conn_req.mxcrm_queue_depth,
3698 *kmxlnd_tunables.kmx_peercredits);
3701 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3702 CNETERR("Can't accept %s: incompatible EAGER size "
3704 libcfs_nid2str(msg->mxm_srcnid),
3705 msg->mxm_u.conn_req.mxcrm_eager_size,
3706 (int) MXLND_MSG_SIZE);
3711 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3715 kmx_peer_t *existing_peer = NULL;
3717 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3719 mx_nic_id_to_board_number(nic_id, &board);
3721 /* adds conn ref for peer and one for this function */
3722 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3723 board, ep_id, 0ULL);
3727 peer->mxp_conn->mxk_sid = sid;
3728 LASSERT(peer->mxp_ep_id == ep_id);
3729 cfs_write_lock(g_lock);
3730 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3731 if (existing_peer) {
3732 mxlnd_conn_decref(peer->mxp_conn);
3733 mxlnd_peer_decref(peer);
3734 peer = existing_peer;
3735 mxlnd_conn_addref(peer->mxp_conn);
3736 conn = peer->mxp_conn;
3738 cfs_list_add_tail(&peer->mxp_list,
3739 &kmxlnd_data.kmx_peers[hash]);
3740 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
3742 cfs_write_unlock(g_lock);
3744 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3745 cfs_write_lock(g_lock);
3746 mxlnd_peer_decref(peer); /* drop ref taken above */
3747 cfs_write_unlock(g_lock);
3749 CNETERR("Cannot allocate mxp_conn\n");
3753 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3754 conn = peer->mxp_conn;
3755 } else { /* unexpected handler found peer */
3756 kmx_conn_t *old_conn = peer->mxp_conn;
3758 if (sid != peer->mxp_conn->mxk_sid) {
3759 /* do not call mx_disconnect() or send a BYE */
3760 mxlnd_conn_disconnect(old_conn, 0, 0);
3762 /* This allocs a conn, points peer->mxp_conn to this one.
3763 * The old conn is still on the peer->mxp_conns list.
3764 * As the pending requests complete, they will call
3765 * conn_decref() which will eventually free it. */
3766 ret = mxlnd_conn_alloc(&conn, peer);
3768 CNETERR("Cannot allocate peer->mxp_conn\n");
3771 /* conn_alloc() adds one ref for the peer and one
3772 * for this function */
3775 peer->mxp_conn->mxk_sid = sid;
3778 conn = peer->mxp_conn;
3781 cfs_write_lock(g_lock);
3782 peer->mxp_incompatible = incompatible;
3783 cfs_write_unlock(g_lock);
3784 cfs_spin_lock(&conn->mxk_lock);
3785 conn->mxk_incarnation = msg->mxm_srcstamp;
3786 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3787 cfs_spin_unlock(&conn->mxk_lock);
3789 /* handle_conn_ack() will create the CONN_ACK msg */
3790 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3793 if (conn_ref) mxlnd_conn_decref(conn);
3795 mxlnd_connparams_free(cp);
3800 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3803 int incompatible = 0;
3807 kmx_msg_t *msg = &cp->mxr_msg;
3808 kmx_peer_t *peer = cp->mxr_peer;
3809 kmx_conn_t *conn = cp->mxr_conn;
3811 mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3813 ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3816 CNETERR("Error %d unpacking CONN_ACK from %s\n",
3817 ret, libcfs_nid2str(peer->mxp_nid));
3819 CNETERR("Error %d unpacking CONN_ACK from "
3820 "unknown host with nic_id 0x%llx\n", ret, nic_id);
3826 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3827 CNETERR("Can't accept CONN_ACK from %s: "
3828 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3829 libcfs_nid2str(msg->mxm_dstnid));
3833 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3834 CNETERR("Can't accept CONN_ACK from %s: "
3835 "incompatible queue depth %d (%d wanted)\n",
3836 libcfs_nid2str(msg->mxm_srcnid),
3837 msg->mxm_u.conn_req.mxcrm_queue_depth,
3838 *kmxlnd_tunables.kmx_peercredits);
3843 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3844 CNETERR("Can't accept CONN_ACK from %s: "
3845 "incompatible EAGER size %d (%d wanted)\n",
3846 libcfs_nid2str(msg->mxm_srcnid),
3847 msg->mxm_u.conn_req.mxcrm_eager_size,
3848 (int) MXLND_MSG_SIZE);
3853 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3854 peer->mxp_incompatible = incompatible;
3855 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3856 cfs_spin_lock(&conn->mxk_lock);
3857 conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3858 conn->mxk_outstanding = 0;
3859 conn->mxk_incarnation = msg->mxm_srcstamp;
3860 conn->mxk_timeout = 0;
3861 if (!incompatible) {
3862 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3863 libcfs_nid2str(msg->mxm_srcnid));
3864 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3866 cfs_spin_unlock(&conn->mxk_lock);
3869 mxlnd_check_sends(peer);
3873 cfs_spin_lock(&conn->mxk_lock);
3874 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3875 cfs_spin_unlock(&conn->mxk_lock);
3878 if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3880 mxlnd_connparams_free(cp);
3885 mxlnd_abort_msgs(void)
3888 cfs_list_t *orphans = &kmxlnd_data.kmx_orphan_msgs;
3889 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3892 cfs_spin_lock(g_conn_lock);
3893 while (!cfs_list_empty(orphans)) {
3894 kmx_ctx_t *ctx = NULL;
3895 kmx_conn_t *conn = NULL;
3897 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3898 cfs_list_del_init(&ctx->mxc_list);
3899 cfs_spin_unlock(g_conn_lock);
3901 ctx->mxc_errno = -ECONNABORTED;
3902 conn = ctx->mxc_conn;
3903 CDEBUG(D_NET, "aborting %s %s %s\n",
3904 mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3905 ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3906 libcfs_nid2str(ctx->mxc_nid));
3907 if (ctx->mxc_type == MXLND_REQ_TX) {
3908 mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3909 if (conn) mxlnd_conn_decref(conn); /* for this tx */
3911 ctx->mxc_state = MXLND_CTX_CANCELED;
3912 mxlnd_handle_rx_completion(ctx);
3916 cfs_spin_lock(g_conn_lock);
3918 cfs_spin_unlock(g_conn_lock);
3924 mxlnd_free_conn_zombies(void)
3927 cfs_list_t *zombies = &kmxlnd_data.kmx_conn_zombies;
3928 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3929 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
3931 /* cleanup any zombies */
3932 cfs_spin_lock(g_conn_lock);
3933 while (!cfs_list_empty(zombies)) {
3934 kmx_conn_t *conn = NULL;
3936 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3937 cfs_list_del_init(&conn->mxk_zombie);
3938 cfs_spin_unlock(g_conn_lock);
3940 cfs_write_lock(g_lock);
3941 mxlnd_conn_free_locked(conn);
3942 cfs_write_unlock(g_lock);
3945 cfs_spin_lock(g_conn_lock);
3947 cfs_spin_unlock(g_conn_lock);
3948 CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3953 * mxlnd_connd - handles incoming connection requests
3954 * @arg - thread id (as a void *)
3956 * This thread handles incoming connection requests
3959 mxlnd_connd(void *arg)
3961 long id = (long) arg;
3963 cfs_daemonize("mxlnd_connd");
3965 CDEBUG(D_NET, "connd starting\n");
3967 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3969 kmx_connparams_t *cp = NULL;
3970 cfs_spinlock_t *g_conn_lock = &kmxlnd_data.kmx_conn_lock;
3971 cfs_list_t *conn_reqs = &kmxlnd_data.kmx_conn_reqs;
3973 ret = cfs_down_interruptible(&kmxlnd_data.kmx_conn_sem);
3975 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
3981 ret = mxlnd_abort_msgs();
3982 ret += mxlnd_free_conn_zombies();
3984 cfs_spin_lock(g_conn_lock);
3985 if (cfs_list_empty(conn_reqs)) {
3987 CNETERR("connd woke up but did not "
3988 "find a kmx_connparams_t or zombie conn\n");
3989 cfs_spin_unlock(g_conn_lock);
3992 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3994 cfs_list_del_init(&cp->mxr_list);
3995 cfs_spin_unlock(g_conn_lock);
3997 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
3998 case MXLND_MSG_CONN_REQ:
3999 /* We have a connection request. Handle it. */
4000 mxlnd_passive_connect(cp);
4002 case MXLND_MSG_CONN_ACK:
4003 /* The peer is ready for messages */
4004 mxlnd_check_conn_ack(cp);
4009 mxlnd_free_conn_zombies();
4011 CDEBUG(D_NET, "connd stopping\n");
4012 mxlnd_thread_stop(id);
4017 * mxlnd_timeoutd - enforces timeouts on messages
4018 * @arg - thread id (as a void *)
4020 * This thread queries each peer for its earliest timeout. If a peer has timed out,
4021 * it calls mxlnd_conn_disconnect().
4023 * After checking for timeouts, try progressing sends (call check_sends()).
4026 mxlnd_timeoutd(void *arg)
4029 long id = (long) arg;
4030 unsigned long now = 0;
4031 unsigned long next = 0;
4032 unsigned long delay = CFS_HZ;
4033 kmx_peer_t *peer = NULL;
4034 kmx_peer_t *temp = NULL;
4035 kmx_conn_t *conn = NULL;
4036 cfs_rwlock_t *g_lock = &kmxlnd_data.kmx_global_lock;
4038 cfs_daemonize("mxlnd_timeoutd");
4040 CDEBUG(D_NET, "timeoutd starting\n");
4042 while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
4045 /* if the next timeout has not arrived, go back to sleep */
4046 if (cfs_time_after(now, next)) {
4047 next = mxlnd_check_timeouts(now);
4050 /* try to progress peers' txs */
4051 cfs_write_lock(g_lock);
4052 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4053 cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4055 /* NOTE we are safe against the removal of peer, but
4056 * not against the removal of temp */
4057 cfs_list_for_each_entry_safe(peer, temp, peers,
4059 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
4061 mxlnd_peer_addref(peer); /* add ref... */
4062 conn = peer->mxp_conn;
4063 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4064 mxlnd_conn_addref(conn); /* take ref... */
4066 CDEBUG(D_NET, "ignoring %s\n",
4067 libcfs_nid2str(peer->mxp_nid));
4068 mxlnd_peer_decref(peer); /* ...to here */
4072 if ((conn->mxk_status == MXLND_CONN_READY ||
4073 conn->mxk_status == MXLND_CONN_FAIL) &&
4077 cfs_write_unlock(g_lock);
4078 mxlnd_check_sends(peer);
4079 cfs_write_lock(g_lock);
4081 mxlnd_conn_decref(conn); /* until here */
4082 mxlnd_peer_decref(peer); /* ...to here */
4085 cfs_write_unlock(g_lock);
4089 CDEBUG(D_NET, "timeoutd stopping\n");
4090 mxlnd_thread_stop(id);