1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 /* if memcmp() == 0, it is NULL */
52 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
55 inline void mxlnd_noop(char *s, ...)
61 mxlnd_ctxstate_to_str(int mxc_state)
65 return "MXLND_CTX_INIT";
67 return "MXLND_CTX_IDLE";
69 return "MXLND_CTX_PREP";
70 case MXLND_CTX_PENDING:
71 return "MXLND_CTX_PENDING";
72 case MXLND_CTX_COMPLETED:
73 return "MXLND_CTX_COMPLETED";
74 case MXLND_CTX_CANCELED:
75 return "MXLND_CTX_CANCELED";
82 mxlnd_connstatus_to_str(int mxk_status)
85 case MXLND_CONN_READY:
86 return "MXLND_CONN_READY";
88 return "MXLND_CONN_INIT";
90 return "MXLND_CONN_REQ";
92 return "MXLND_CONN_ACK";
94 return "MXLND_CONN_WAIT";
95 case MXLND_CONN_DISCONNECT:
96 return "MXLND_CONN_DISCONNECT";
98 return "MXLND_CONN_FAIL";
105 mxlnd_msgtype_to_str(int type) {
107 case MXLND_MSG_EAGER:
108 return "MXLND_MSG_EAGER";
109 case MXLND_MSG_CONN_REQ:
110 return "MXLND_MSG_CONN_REQ";
111 case MXLND_MSG_CONN_ACK:
112 return "MXLND_MSG_CONN_ACK";
114 return "MXLND_MSG_BYE";
116 return "MXLND_MSG_NOOP";
117 case MXLND_MSG_PUT_REQ:
118 return "MXLND_MSG_PUT_REQ";
119 case MXLND_MSG_PUT_ACK:
120 return "MXLND_MSG_PUT_ACK";
121 case MXLND_MSG_PUT_DATA:
122 return "MXLND_MSG_PUT_DATA";
123 case MXLND_MSG_GET_REQ:
124 return "MXLND_MSG_GET_REQ";
125 case MXLND_MSG_GET_DATA:
126 return "MXLND_MSG_GET_DATA";
133 mxlnd_lnetmsg_to_str(int type)
137 return "LNET_MSG_ACK";
139 return "LNET_MSG_PUT";
141 return "LNET_MSG_GET";
143 return "LNET_MSG_REPLY";
145 return "LNET_MSG_HELLO";
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
156 u64 type = (u64) ctx->mxc_msg_type;
157 u64 err = (u64) error;
160 LASSERT(ctx->mxc_msg_type != 0);
161 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
169 *msg_type = (u8) MXLND_MSG_TYPE(match);
170 *error = (u8) MXLND_ERROR_VAL(match);
171 *cookie = match & MXLND_MAX_COOKIE;
172 LASSERT(*msg_type == MXLND_MSG_EAGER ||
173 *msg_type == MXLND_MSG_ICON_REQ ||
174 *msg_type == MXLND_MSG_CONN_REQ ||
175 *msg_type == MXLND_MSG_ICON_ACK ||
176 *msg_type == MXLND_MSG_CONN_ACK ||
177 *msg_type == MXLND_MSG_BYE ||
178 *msg_type == MXLND_MSG_NOOP ||
179 *msg_type == MXLND_MSG_PUT_REQ ||
180 *msg_type == MXLND_MSG_PUT_ACK ||
181 *msg_type == MXLND_MSG_PUT_DATA ||
182 *msg_type == MXLND_MSG_GET_REQ ||
183 *msg_type == MXLND_MSG_GET_DATA);
188 mxlnd_get_idle_rx(void)
190 struct list_head *tmp = NULL;
191 struct kmx_ctx *rx = NULL;
193 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
195 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
200 tmp = &kmxlnd_data.kmx_rx_idle;
201 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202 list_del_init(&rx->mxc_list);
203 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
206 if (rx->mxc_get != rx->mxc_put) {
207 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
219 LASSERT (rx->mxc_get == rx->mxc_put);
223 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224 rx->mxc_state = MXLND_CTX_PREP;
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
233 CDEBUG(D_NETERROR, "called with NULL pointer\n");
235 } else if (rx->mxc_type != MXLND_REQ_RX) {
236 CDEBUG(D_NETERROR, "called with tx\n");
239 LASSERT(rx->mxc_get == rx->mxc_put + 1);
242 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
249 mxlnd_reduce_idle_rxs(__u32 count)
252 struct kmx_ctx *rx = NULL;
254 spin_lock(&kmxlnd_data.kmx_rxs_lock);
255 for (i = 0; i < count; i++) {
256 rx = mxlnd_get_idle_rx();
258 struct list_head *tmp = &rx->mxc_global_list;
262 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
266 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
271 mxlnd_get_idle_tx(void)
273 struct list_head *tmp = NULL;
274 struct kmx_ctx *tx = NULL;
276 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
278 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
284 tmp = &kmxlnd_data.kmx_tx_idle;
285 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286 list_del_init(&tx->mxc_list);
288 /* Allocate a new completion cookie. It might not be needed,
289 * but we've got a lock right now and we're unlikely to
291 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293 kmxlnd_data.kmx_tx_next_cookie = 1;
295 kmxlnd_data.kmx_tx_used++;
296 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
298 LASSERT (tx->mxc_get == tx->mxc_put);
302 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303 LASSERT (tx->mxc_lntmsg[0] == NULL);
304 LASSERT (tx->mxc_lntmsg[1] == NULL);
306 tx->mxc_state = MXLND_CTX_PREP;
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
317 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
319 lnet_msg_t *lntmsg[2];
322 CDEBUG(D_NETERROR, "called with NULL pointer\n");
324 } else if (tx->mxc_type != MXLND_REQ_TX) {
325 CDEBUG(D_NETERROR, "called with rx\n");
328 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329 tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330 struct kmx_conn *conn = tx->mxc_conn;
333 mxlnd_conn_disconnect(conn, 0, 1);
336 lntmsg[0] = tx->mxc_lntmsg[0];
337 lntmsg[1] = tx->mxc_lntmsg[1];
339 LASSERT(tx->mxc_get == tx->mxc_put + 1);
342 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344 kmxlnd_data.kmx_tx_used--;
345 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
353 * \param conn a kmx_conn pointer
355 * The calling function should remove the conn from the conns list first
359 mxlnd_conn_free(struct kmx_conn *conn)
361 struct kmx_peer *peer = conn->mxk_peer;
363 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365 list_empty (&conn->mxk_tx_free_queue) &&
366 list_empty (&conn->mxk_pending));
367 if (!list_empty(&conn->mxk_list)) {
368 list_del_init(&conn->mxk_list);
369 if (peer->mxp_conn == conn) {
370 peer->mxp_conn = NULL;
371 if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372 mx_set_endpoint_addr_context(conn->mxk_epa,
376 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377 MXLND_FREE (conn, sizeof (*conn));
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
386 struct kmx_ctx *ctx = NULL;
387 struct kmx_ctx *next = NULL;
388 mx_return_t mxret = MX_SUCCESS;
393 spin_lock(&conn->mxk_lock);
394 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395 /* we will delete all including txs */
396 list_del_init(&ctx->mxc_list);
397 if (ctx->mxc_type == MXLND_REQ_RX) {
399 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
402 if (mxret != MX_SUCCESS) {
403 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
406 ctx->mxc_status.code = -ECONNABORTED;
407 ctx->mxc_state = MXLND_CTX_CANCELED;
408 /* NOTE this calls lnet_finalize() and
409 * we cannot hold any locks when calling it.
410 * It also calls mxlnd_conn_decref(conn) */
411 spin_unlock(&conn->mxk_lock);
412 mxlnd_handle_rx_completion(ctx);
413 spin_lock(&conn->mxk_lock);
418 spin_unlock(&conn->mxk_lock);
426 * Shutdown a connection
427 * \param conn a kmx_conn pointer
428 * \param mx_dis call mx_disconnect()
429 * \param send_bye send peer a BYE msg
431 * This function sets the status to DISCONNECT, completes queued
432 * txs with failure, calls mx_disconnect, which will complete
433 * pending txs and matched rxs with failure.
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
438 mx_endpoint_addr_t epa = conn->mxk_epa;
439 struct list_head *tmp = NULL;
440 int valid = !mxlnd_endpoint_addr_null(epa);
442 spin_lock(&conn->mxk_lock);
443 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444 spin_unlock(&conn->mxk_lock);
447 conn->mxk_status = MXLND_CONN_DISCONNECT;
448 conn->mxk_timeout = 0;
450 while (!list_empty(&conn->mxk_tx_free_queue) ||
451 !list_empty(&conn->mxk_tx_credit_queue)) {
453 struct kmx_ctx *tx = NULL;
455 if (!list_empty(&conn->mxk_tx_free_queue)) {
456 tmp = &conn->mxk_tx_free_queue;
458 tmp = &conn->mxk_tx_credit_queue;
461 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462 list_del_init(&tx->mxc_list);
463 tx->mxc_status.code = -ECONNABORTED;
464 spin_unlock(&conn->mxk_lock);
465 mxlnd_put_idle_tx(tx);
466 mxlnd_conn_decref(conn); /* for this tx */
467 spin_lock(&conn->mxk_lock);
470 spin_unlock(&conn->mxk_lock);
472 /* cancel pending rxs */
473 mxlnd_conn_cancel_pending_rxs(conn);
475 if (send_bye && valid) {
476 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477 /* send a BYE to the peer */
478 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481 epa, match, NULL, NULL);
482 /* wait to allow the peer to ack our message */
483 mxlnd_sleep(msecs_to_jiffies(20));
486 if (kmxlnd_data.kmx_shutdown != 1) {
487 unsigned long last_msg = 0;
489 /* notify LNET that we are giving up on this peer */
490 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
491 last_msg = conn->mxk_last_rx;
493 last_msg = conn->mxk_last_tx;
495 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
498 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
500 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
506 * Allocate and initialize a new conn struct
507 * \param connp address of a kmx_conn pointer
508 * \param peer owning kmx_peer
510 * Returns 0 on success and -ENOMEM on failure
513 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
515 struct kmx_conn *conn = NULL;
517 LASSERT(peer != NULL);
519 MXLND_ALLOC(conn, sizeof (*conn));
521 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
524 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
526 memset(conn, 0, sizeof(*conn));
528 /* conn->mxk_incarnation = 0 - will be set by peer */
529 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
530 and one for the caller */
531 conn->mxk_peer = peer;
532 /* mxk_epa - to be set after mx_iconnect() */
533 INIT_LIST_HEAD(&conn->mxk_list);
534 spin_lock_init(&conn->mxk_lock);
535 /* conn->mxk_timeout = 0 */
536 conn->mxk_last_tx = jiffies;
537 conn->mxk_last_rx = conn->mxk_last_tx;
538 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
539 /* mxk_outstanding = 0 */
540 conn->mxk_status = MXLND_CONN_INIT;
541 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
542 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
543 /* conn->mxk_ntx_msgs = 0 */
544 /* conn->mxk_ntx_data = 0 */
545 /* conn->mxk_ntx_posted = 0 */
546 /* conn->mxk_data_posted = 0 */
547 INIT_LIST_HEAD(&conn->mxk_pending);
551 mxlnd_peer_addref(peer); /* add a ref for this conn */
553 /* add to front of peer's conns list */
554 list_add(&conn->mxk_list, &peer->mxp_conns);
555 peer->mxp_conn = conn;
560 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
563 write_lock(&kmxlnd_data.kmx_global_lock);
564 ret = mxlnd_conn_alloc_locked(connp, peer);
565 write_unlock(&kmxlnd_data.kmx_global_lock);
570 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
573 struct kmx_conn *conn = ctx->mxc_conn;
575 ctx->mxc_state = MXLND_CTX_PENDING;
577 spin_lock(&conn->mxk_lock);
578 if (conn->mxk_status >= MXLND_CONN_INIT) {
579 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
580 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
581 conn->mxk_timeout = ctx->mxc_deadline;
584 ctx->mxc_state = MXLND_CTX_COMPLETED;
587 spin_unlock(&conn->mxk_lock);
593 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
595 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
596 ctx->mxc_state == MXLND_CTX_COMPLETED);
597 if (ctx->mxc_state != MXLND_CTX_PENDING &&
598 ctx->mxc_state != MXLND_CTX_COMPLETED) {
599 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
600 mxlnd_ctxstate_to_str(ctx->mxc_state));
602 ctx->mxc_state = MXLND_CTX_COMPLETED;
603 if (!list_empty(&ctx->mxc_list)) {
604 struct kmx_conn *conn = ctx->mxc_conn;
605 struct kmx_ctx *next = NULL;
606 LASSERT(conn != NULL);
607 spin_lock(&conn->mxk_lock);
608 list_del_init(&ctx->mxc_list);
609 conn->mxk_timeout = 0;
610 if (!list_empty(&conn->mxk_pending)) {
611 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
612 conn->mxk_timeout = next->mxc_deadline;
614 spin_unlock(&conn->mxk_lock);
621 * \param peer a kmx_peer pointer
623 * The calling function should decrement the rxs, drain the tx queues and
624 * remove the peer from the peers list first then destroy it.
627 mxlnd_peer_free(struct kmx_peer *peer)
629 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
630 peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
632 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
634 if (peer == kmxlnd_data.kmx_localhost)
635 LASSERT(kmxlnd_data.kmx_shutdown);
637 if (!list_empty(&peer->mxp_peers)) {
638 /* assume we are locked */
639 list_del_init(&peer->mxp_peers);
642 MXLND_FREE (peer, sizeof (*peer));
643 atomic_dec(&kmxlnd_data.kmx_npeers);
647 #define MXLND_LOOKUP_COUNT 10
649 /* We only want the MAC address of the peer's Myricom NIC. We
650 * require that each node has the IPoMX interface (myriN) up.
651 * We will not pass any traffic over IPoMX, but it allows us
652 * to get the MAC address. */
654 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
660 unsigned char *haddr = NULL;
661 struct net_device *dev = NULL;
662 struct neighbour *n = NULL;
663 cfs_socket_t *sock = NULL;
664 __be32 dst_ip = htonl(ip);
666 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
671 haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
674 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
677 if (n->nud_state & NUD_VALID) {
678 memcpy(haddr, n->ha, dev->addr_len);
684 /* not found, try to connect (force an arp) */
685 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
687 libcfs_sock_release(sock);
688 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
689 } while (try++ < MXLND_LOOKUP_COUNT);
695 #ifdef __LITTLE_ENDIAN
696 *nic_id = ___arch__swab64(tmp_id);
704 * Allocate and initialize a new peer struct
705 * \param peerp address of a kmx_peer pointer
706 * \param nid LNET node id
707 * Returns 0 on success and -ENOMEM on failure
710 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
714 u32 ip = LNET_NIDADDR(nid);
715 struct kmx_peer *peer = NULL;
717 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
719 MXLND_ALLOC(peer, sizeof (*peer));
721 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
724 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
726 memset(peer, 0, sizeof(*peer));
729 /* peer->mxp_incarnation */
730 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
733 peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
734 peer->mxp_board = board;
735 peer->mxp_nic_id = nic_id;
737 if (nic_id == 0ULL) {
738 ret = mxlnd_ip2nic_id(ip, &nic_id);
740 CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
741 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
744 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
746 INIT_LIST_HEAD(&peer->mxp_peers);
747 INIT_LIST_HEAD(&peer->mxp_conns);
748 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
750 mxlnd_peer_decref(peer);
754 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
755 struct kmx_ctx *rx = NULL;
756 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
758 mxlnd_reduce_idle_rxs(i);
759 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
760 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
761 mxlnd_peer_decref(peer);
764 spin_lock(&kmxlnd_data.kmx_rxs_lock);
765 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
766 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
768 mxlnd_put_idle_rx(rx);
770 /* peer->mxp_reconnect_time = 0 */
771 /* peer->mxp_incompatible = 0 */
777 static inline struct kmx_peer *
778 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
782 struct kmx_peer *peer = NULL;
784 hash = mxlnd_nid_to_hash(nid);
786 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
787 if (peer->mxp_nid == nid) {
789 mxlnd_peer_addref(peer);
793 return (found ? peer : NULL);
796 static inline struct kmx_peer *
797 mxlnd_find_peer_by_nid(lnet_nid_t nid)
799 struct kmx_peer *peer = NULL;
801 read_lock(&kmxlnd_data.kmx_global_lock);
802 peer = mxlnd_find_peer_by_nid_locked(nid);
803 read_unlock(&kmxlnd_data.kmx_global_lock);
808 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
810 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
811 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
812 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
813 tx->mxc_msg_type == MXLND_MSG_NOOP);
817 * Set type and number of bytes
818 * \param msg msg pointer
819 * \param type of message
820 * \param body_nob bytes in msg body
823 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
825 msg->mxm_type = type;
826 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
830 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
832 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
833 struct kmx_msg *msg = NULL;
835 LASSERT (tx != NULL);
836 LASSERT (nob <= MXLND_EAGER_SIZE);
839 /* tx->mxc_peer should have already been set if we know it */
840 tx->mxc_msg_type = type;
842 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
843 tx->mxc_seg.segment_length = nob;
844 tx->mxc_pin_type = MX_PIN_PHYSICAL;
845 //tx->mxc_state = MXLND_CTX_PENDING;
848 msg->mxm_type = type;
855 mxlnd_cksum (void *ptr, int nob)
861 sum = ((sum << 1) | (sum >> 31)) + *c++;
863 /* ensure I don't return 0 (== no checksum) */
864 return (sum == 0) ? 1 : sum;
869 * \param tx msg to send
872 mxlnd_pack_msg(struct kmx_ctx *tx)
874 struct kmx_msg *msg = tx->mxc_msg;
876 /* type and nob should already be set in init_msg() */
877 msg->mxm_magic = MXLND_MSG_MAGIC;
878 msg->mxm_version = MXLND_MSG_VERSION;
880 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
881 * return credits as well */
882 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
883 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
884 spin_lock(&tx->mxc_conn->mxk_lock);
885 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
886 tx->mxc_conn->mxk_outstanding = 0;
887 spin_unlock(&tx->mxc_conn->mxk_lock);
889 msg->mxm_credits = 0;
893 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
894 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
895 msg->mxm_dstnid = tx->mxc_nid;
896 /* if it is a new peer, the dststamp will be 0 */
897 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
898 msg->mxm_seq = tx->mxc_cookie;
900 if (*kmxlnd_tunables.kmx_cksum) {
901 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
906 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
908 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
913 /* 6 bytes are enough to have received magic + version */
915 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
919 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
921 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
924 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
928 if (msg->mxm_version !=
929 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
930 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
934 if (nob < hdr_size) {
935 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
939 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
941 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
945 /* checksum must be computed with mxm_cksum zero and BEFORE anything
947 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
949 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
950 CDEBUG(D_NETERROR, "Bad checksum\n");
953 msg->mxm_cksum = msg_cksum;
956 /* leave magic unflipped as a clue to peer endianness */
957 __swab16s(&msg->mxm_version);
958 CLASSERT (sizeof(msg->mxm_type) == 1);
959 CLASSERT (sizeof(msg->mxm_credits) == 1);
960 msg->mxm_nob = msg_nob;
961 __swab64s(&msg->mxm_srcnid);
962 __swab64s(&msg->mxm_srcstamp);
963 __swab64s(&msg->mxm_dstnid);
964 __swab64s(&msg->mxm_dststamp);
965 __swab64s(&msg->mxm_seq);
968 if (msg->mxm_srcnid == LNET_NID_ANY) {
969 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
973 switch (msg->mxm_type) {
975 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
981 case MXLND_MSG_EAGER:
982 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
983 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
984 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
989 case MXLND_MSG_PUT_REQ:
990 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
991 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
992 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
996 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
999 case MXLND_MSG_PUT_ACK:
1000 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1001 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1002 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1006 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1007 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1011 case MXLND_MSG_GET_REQ:
1012 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1013 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1014 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1018 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1022 case MXLND_MSG_CONN_REQ:
1023 case MXLND_MSG_CONN_ACK:
1024 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1025 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1026 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1030 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1031 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1040 * \param lntmsg the LNET msg that this is continuing. If EAGER, then NULL.
1041 * \param length length of incoming message
1043 * The caller gets the rx and sets nid, peer and conn if known.
1045 * Returns 0 on success and -1 on failure
1048 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1051 mx_return_t mxret = MX_SUCCESS;
1052 uint64_t mask = ~(MXLND_ERROR_MASK);
1054 rx->mxc_msg_type = msg_type;
1055 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1056 rx->mxc_cookie = cookie;
1057 /* rx->mxc_match may already be set */
1058 /* rx->mxc_seg.segment_ptr is already set */
1059 rx->mxc_seg.segment_length = length;
1060 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1061 ret = mxlnd_q_pending_ctx(rx);
1063 /* the caller is responsible for calling conn_decref() if needed */
1066 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1067 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1068 if (mxret != MX_SUCCESS) {
1069 mxlnd_deq_pending_ctx(rx);
1070 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1071 mx_strerror(mxret), (int) mxret);
1079 * This is the callback function that will handle unexpected receives
1080 * \param context NULL, ignore
1081 * \param source the peer's mx_endpoint_addr_t
1082 * \param match_value the msg's bit, should be MXLND_MSG_EAGER
1083 * \param length length of incoming message
1084 * \param data_if_available ignore
1086 * If it is an eager-sized msg, we will call recv_msg() with the actual
1087 * length. If it is a large message, we will call recv_msg() with a
1088 * length of 0 bytes to drop it because we should never have a large,
1089 * unexpected message.
1091 * NOTE - The MX library blocks until this function completes. Make it as fast as
1092 * possible. DO NOT allocate memory which can block!
1094 * If we cannot get a rx or the conn is closed, drop the message on the floor
1095 * (i.e. recv 0 bytes and ignore).
1097 mx_unexp_handler_action_t
1098 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1099 uint64_t match_value, uint32_t length, void *data_if_available)
1102 struct kmx_ctx *rx = NULL;
1108 if (context != NULL) {
1109 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1113 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1116 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1117 if (msg_type == MXLND_MSG_BYE) {
1118 struct kmx_peer *peer = NULL;
1120 mx_get_endpoint_addr_context(source, (void **) &peer);
1121 if (peer && peer->mxp_conn) {
1122 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1123 libcfs_nid2str(peer->mxp_nid));
1124 mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1127 return MX_RECV_FINISHED;
1130 rx = mxlnd_get_idle_rx();
1132 if (length <= MXLND_EAGER_SIZE) {
1133 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1135 CDEBUG(D_NETERROR, "unexpected large receive with "
1136 "match_value=0x%llx length=%d\n",
1137 match_value, length);
1138 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1142 struct kmx_peer *peer = NULL;
1143 struct kmx_conn *conn = NULL;
1145 /* NOTE to avoid a peer disappearing out from under us,
1146 * read lock the peers lock first */
1147 read_lock(&kmxlnd_data.kmx_global_lock);
1148 mx_get_endpoint_addr_context(source, (void **) &peer);
1150 mxlnd_peer_addref(peer); /* add a ref... */
1151 conn = peer->mxp_conn;
1153 mxlnd_conn_addref(conn); /* add ref until rx completed */
1154 mxlnd_peer_decref(peer); /* and drop peer ref */
1155 rx->mxc_conn = conn;
1157 rx->mxc_peer = peer;
1158 rx->mxc_nid = peer->mxp_nid;
1160 read_unlock(&kmxlnd_data.kmx_global_lock);
1162 CDEBUG(D_NETERROR, "could not post receive\n");
1163 mxlnd_put_idle_rx(rx);
1167 if (rx == NULL || ret != 0) {
1169 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1172 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1174 seg.segment_ptr = 0ULL;
1175 seg.segment_length = 0;
1176 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1177 match_value, ~0ULL, NULL, NULL);
1180 return MX_RECV_CONTINUE;
1185 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1189 struct kmx_peer *peer = NULL;
1191 read_lock(&kmxlnd_data.kmx_global_lock);
1192 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1193 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1195 *nidp = peer->mxp_nid;
1196 *count = atomic_read(&peer->mxp_refcount);
1202 read_unlock(&kmxlnd_data.kmx_global_lock);
1208 mxlnd_del_peer_locked(struct kmx_peer *peer)
1210 list_del_init(&peer->mxp_peers); /* remove from the global list */
1211 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1212 mxlnd_peer_decref(peer); /* drop global list ref */
1217 mxlnd_del_peer(lnet_nid_t nid)
1221 struct kmx_peer *peer = NULL;
1222 struct kmx_peer *next = NULL;
1224 if (nid != LNET_NID_ANY) {
1225 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1227 write_lock(&kmxlnd_data.kmx_global_lock);
1228 if (nid != LNET_NID_ANY) {
1231 } if (peer == kmxlnd_data.kmx_localhost) {
1232 mxlnd_peer_decref(peer); /* and drops it */
1233 CERROR("cannot free this host's NID 0x%llx\n", nid);
1235 mxlnd_peer_decref(peer); /* and drops it */
1236 mxlnd_del_peer_locked(peer);
1238 } else { /* LNET_NID_ANY */
1239 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1240 list_for_each_entry_safe(peer, next,
1241 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1242 if (peer != kmxlnd_data.kmx_localhost)
1243 mxlnd_del_peer_locked(peer);
1247 write_unlock(&kmxlnd_data.kmx_global_lock);
1253 mxlnd_get_conn_by_idx(int index)
1256 struct kmx_peer *peer = NULL;
1257 struct kmx_conn *conn = NULL;
1259 read_lock(&kmxlnd_data.kmx_global_lock);
1260 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1261 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1262 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1267 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1268 read_unlock(&kmxlnd_data.kmx_global_lock);
1273 read_unlock(&kmxlnd_data.kmx_global_lock);
1279 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1281 struct kmx_conn *conn = NULL;
1282 struct kmx_conn *next = NULL;
1284 if (peer == kmxlnd_data.kmx_localhost) return;
1286 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1287 mxlnd_conn_disconnect(conn, 0, 1);
1293 mxlnd_close_matching_conns(lnet_nid_t nid)
1297 struct kmx_peer *peer = NULL;
1299 read_lock(&kmxlnd_data.kmx_global_lock);
1300 if (nid != LNET_NID_ANY) {
1301 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1305 mxlnd_close_matching_conns_locked(peer);
1306 mxlnd_peer_decref(peer); /* and drops it here */
1308 } else { /* LNET_NID_ANY */
1309 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1310 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1311 mxlnd_close_matching_conns_locked(peer);
1314 read_unlock(&kmxlnd_data.kmx_global_lock);
1320 * Modify MXLND parameters
1321 * \param ni LNET interface handle
1322 * \param cmd command to change
1323 * \param arg the ioctl data
1325 * Not implemented yet.
1328 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1330 struct libcfs_ioctl_data *data = arg;
1333 LASSERT (ni == kmxlnd_data.kmx_ni);
1336 case IOC_LIBCFS_GET_PEER: {
1340 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1341 data->ioc_nid = nid;
1342 data->ioc_count = count;
1345 case IOC_LIBCFS_DEL_PEER: {
1346 ret = mxlnd_del_peer(data->ioc_nid);
1349 case IOC_LIBCFS_GET_CONN: {
1350 struct kmx_conn *conn = NULL;
1352 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1357 data->ioc_nid = conn->mxk_peer->mxp_nid;
1358 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1362 case IOC_LIBCFS_CLOSE_CONNECTION: {
1363 ret = mxlnd_close_matching_conns(data->ioc_nid);
1367 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1375 * Add the tx to the global tx queue
1377 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1380 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1382 u8 msg_type = tx->mxc_msg_type;
1383 //struct kmx_peer *peer = tx->mxc_peer;
1384 struct kmx_conn *conn = tx->mxc_conn;
1386 LASSERT (msg_type != 0);
1387 LASSERT (tx->mxc_nid != 0);
1388 LASSERT (tx->mxc_peer != NULL);
1389 LASSERT (tx->mxc_conn != NULL);
1391 tx->mxc_incarnation = conn->mxk_incarnation;
1393 if (msg_type != MXLND_MSG_PUT_DATA &&
1394 msg_type != MXLND_MSG_GET_DATA) {
1396 if (mxlnd_tx_requires_credit(tx)) {
1397 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1398 conn->mxk_ntx_msgs++;
1399 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1400 msg_type == MXLND_MSG_CONN_ACK) {
1401 /* put conn msgs at the front of the queue */
1402 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1404 /* PUT_ACK, PUT_NAK */
1405 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1406 conn->mxk_ntx_msgs++;
1410 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1411 conn->mxk_ntx_data++;
1418 * Add the tx to the global tx queue
1420 * Add the tx to the peer's msg or data queue
1423 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1425 LASSERT(tx->mxc_peer != NULL);
1426 LASSERT(tx->mxc_conn != NULL);
1427 spin_lock(&tx->mxc_conn->mxk_lock);
1428 mxlnd_peer_queue_tx_locked(tx);
1429 spin_unlock(&tx->mxc_conn->mxk_lock);
1435 * Add the tx to the global tx queue
1437 * Add the tx to the global queue and up the tx_queue_sem
1440 mxlnd_queue_tx(struct kmx_ctx *tx)
1442 struct kmx_peer *peer = tx->mxc_peer;
1443 LASSERT (tx->mxc_nid != 0);
1446 if (peer->mxp_incompatible &&
1447 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1448 /* let this fail now */
1449 tx->mxc_status.code = -ECONNABORTED;
1450 mxlnd_conn_decref(peer->mxp_conn);
1451 mxlnd_put_idle_tx(tx);
1454 if (tx->mxc_conn == NULL) {
1456 struct kmx_conn *conn = NULL;
1458 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1460 tx->mxc_status.code = ret;
1461 mxlnd_put_idle_tx(tx);
1464 tx->mxc_conn = conn;
1465 mxlnd_peer_decref(peer); /* and takes it from peer */
1467 LASSERT(tx->mxc_conn != NULL);
1468 mxlnd_peer_queue_tx(tx);
1469 mxlnd_check_sends(peer);
1471 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1472 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1473 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1474 up(&kmxlnd_data.kmx_tx_queue_sem);
1481 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1488 int first_iov_offset = 0;
1489 int first_found = 0;
1491 int last_iov_length = 0;
1492 mx_ksegment_t *seg = NULL;
1494 if (niov == 0) return 0;
1495 LASSERT(iov != NULL);
1497 for (i = 0; i < niov; i++) {
1498 sum = old_sum + (u32) iov[i].iov_len;
1499 if (!first_found && (sum > offset)) {
1501 first_iov_offset = offset - old_sum;
1503 sum = (u32) iov[i].iov_len - first_iov_offset;
1508 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1509 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1514 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1515 nseg = last_iov - first_iov + 1;
1518 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1520 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1523 memset(seg, 0, nseg * sizeof(*seg));
1524 ctx->mxc_nseg = nseg;
1526 for (i = 0; i < nseg; i++) {
1527 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1528 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1530 seg[i].segment_ptr += (u64) first_iov_offset;
1531 seg[i].segment_length -= (u32) first_iov_offset;
1533 if (i == (nseg - 1)) {
1534 seg[i].segment_length = (u32) last_iov_length;
1536 sum += seg[i].segment_length;
1538 ctx->mxc_seg_list = seg;
1539 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1540 #ifdef MX_PIN_FULLPAGES
1541 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1543 LASSERT(nob == sum);
1548 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1554 int first_kiov = -1;
1555 int first_kiov_offset = 0;
1556 int first_found = 0;
1558 int last_kiov_length = 0;
1559 mx_ksegment_t *seg = NULL;
1561 if (niov == 0) return 0;
1562 LASSERT(kiov != NULL);
1564 for (i = 0; i < niov; i++) {
1565 sum = old_sum + kiov[i].kiov_len;
1566 if (i == 0) sum -= kiov[i].kiov_offset;
1567 if (!first_found && (sum > offset)) {
1569 first_kiov_offset = offset - old_sum;
1570 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1571 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1573 sum = kiov[i].kiov_len - first_kiov_offset;
1578 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1579 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1584 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1585 nseg = last_kiov - first_kiov + 1;
1588 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1590 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1593 memset(seg, 0, niov * sizeof(*seg));
1594 ctx->mxc_nseg = niov;
1596 for (i = 0; i < niov; i++) {
1597 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1598 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1600 seg[i].segment_ptr += (u64) first_kiov_offset;
1601 /* we have to add back the original kiov_offset */
1602 seg[i].segment_length -= first_kiov_offset +
1603 kiov[first_kiov].kiov_offset;
1605 if (i == (nseg - 1)) {
1606 seg[i].segment_length = last_kiov_length;
1608 sum += seg[i].segment_length;
1610 ctx->mxc_seg_list = seg;
1611 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1612 #ifdef MX_PIN_FULLPAGES
1613 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1615 LASSERT(nob == sum);
1620 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1622 LASSERT(type == MXLND_MSG_PUT_ACK);
1623 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1624 tx->mxc_cookie = cookie;
1625 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1626 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1627 tx->mxc_match = mxlnd_create_match(tx, status);
1634 * Get tx, map [k]iov, queue tx
1636 * This setups the DATA send for PUT or GET.
1638 * On success, it queues the tx, on failure it calls lnet_finalize()
1641 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1644 lnet_process_id_t target = lntmsg->msg_target;
1645 unsigned int niov = lntmsg->msg_niov;
1646 struct iovec *iov = lntmsg->msg_iov;
1647 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1648 unsigned int offset = lntmsg->msg_offset;
1649 unsigned int nob = lntmsg->msg_len;
1650 struct kmx_ctx *tx = NULL;
1652 LASSERT(lntmsg != NULL);
1653 LASSERT(peer != NULL);
1654 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1655 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1657 tx = mxlnd_get_idle_tx();
1659 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1660 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1661 libcfs_nid2str(target.nid));
1664 tx->mxc_nid = target.nid;
1665 /* NOTE called when we have a ref on the conn, get one for this tx */
1666 mxlnd_conn_addref(peer->mxp_conn);
1667 tx->mxc_peer = peer;
1668 tx->mxc_conn = peer->mxp_conn;
1669 tx->mxc_msg_type = msg_type;
1670 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1671 tx->mxc_state = MXLND_CTX_PENDING;
1672 tx->mxc_lntmsg[0] = lntmsg;
1673 tx->mxc_cookie = cookie;
1674 tx->mxc_match = mxlnd_create_match(tx, 0);
1676 /* This setups up the mx_ksegment_t to send the DATA payload */
1678 /* do not setup the segments */
1679 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1680 "to %s?\n", libcfs_nid2str(target.nid));
1682 } else if (kiov == NULL) {
1683 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1685 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1688 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1689 libcfs_nid2str(target.nid));
1690 tx->mxc_status.code = -EIO;
1697 mxlnd_conn_decref(peer->mxp_conn);
1698 mxlnd_put_idle_tx(tx);
1702 CDEBUG(D_NETERROR, "no tx avail\n");
1703 lnet_finalize(ni, lntmsg, -EIO);
1708 * Map [k]iov, post rx
1710 * This setups the DATA receive for PUT or GET.
1712 * On success, it returns 0, on failure it returns -1
1715 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1718 lnet_process_id_t target = lntmsg->msg_target;
1719 unsigned int niov = lntmsg->msg_niov;
1720 struct iovec *iov = lntmsg->msg_iov;
1721 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1722 unsigned int offset = lntmsg->msg_offset;
1723 unsigned int nob = lntmsg->msg_len;
1724 mx_return_t mxret = MX_SUCCESS;
1725 u64 mask = ~(MXLND_ERROR_MASK);
1727 /* above assumes MXLND_MSG_PUT_DATA */
1728 if (msg_type == MXLND_MSG_GET_DATA) {
1729 niov = lntmsg->msg_md->md_niov;
1730 iov = lntmsg->msg_md->md_iov.iov;
1731 kiov = lntmsg->msg_md->md_iov.kiov;
1733 nob = lntmsg->msg_md->md_length;
1736 LASSERT(lntmsg != NULL);
1737 LASSERT(rx != NULL);
1738 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1739 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1741 rx->mxc_msg_type = msg_type;
1742 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1743 rx->mxc_state = MXLND_CTX_PENDING;
1744 rx->mxc_nid = target.nid;
1745 /* if posting a GET_DATA, we may not yet know the peer */
1746 if (rx->mxc_peer != NULL) {
1747 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1749 rx->mxc_lntmsg[0] = lntmsg;
1750 rx->mxc_cookie = cookie;
1751 rx->mxc_match = mxlnd_create_match(rx, 0);
1752 /* This setups up the mx_ksegment_t to receive the DATA payload */
1754 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1756 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1758 if (msg_type == MXLND_MSG_GET_DATA) {
1759 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1760 if (rx->mxc_lntmsg[1] == NULL) {
1761 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1762 libcfs_nid2str(target.nid));
1767 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1768 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1769 libcfs_nid2str(target.nid));
1772 ret = mxlnd_q_pending_ctx(rx);
1776 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1777 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1778 rx->mxc_seg_list, rx->mxc_nseg,
1779 rx->mxc_pin_type, rx->mxc_match,
1782 if (mxret != MX_SUCCESS) {
1783 if (rx->mxc_conn != NULL) {
1784 mxlnd_deq_pending_ctx(rx);
1786 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1787 (int) mxret, libcfs_nid2str(target.nid));
1795 * The LND required send function
1797 * This must not block. Since we may not have a peer struct for the receiver,
1798 * it will append send messages on a global tx list. We will then up the
1799 * tx_queued's semaphore to notify it of the new send.
1802 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1805 int type = lntmsg->msg_type;
1806 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1807 lnet_process_id_t target = lntmsg->msg_target;
1808 lnet_nid_t nid = target.nid;
1809 int target_is_router = lntmsg->msg_target_is_router;
1810 int routing = lntmsg->msg_routing;
1811 unsigned int payload_niov = lntmsg->msg_niov;
1812 struct iovec *payload_iov = lntmsg->msg_iov;
1813 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1814 unsigned int payload_offset = lntmsg->msg_offset;
1815 unsigned int payload_nob = lntmsg->msg_len;
1816 struct kmx_ctx *tx = NULL;
1817 struct kmx_msg *txmsg = NULL;
1818 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1819 struct kmx_ctx *rx_data = NULL;
1820 struct kmx_conn *conn = NULL;
1822 uint32_t length = 0;
1823 struct kmx_peer *peer = NULL;
1825 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1826 payload_nob, payload_niov, libcfs_id2str(target));
1828 LASSERT (payload_nob == 0 || payload_niov > 0);
1829 LASSERT (payload_niov <= LNET_MAX_IOV);
1830 /* payload is either all vaddrs or all pages */
1831 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1833 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1835 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1836 * to a new peer, use the nid */
1837 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1839 if (unlikely(peer->mxp_incompatible)) {
1840 mxlnd_peer_decref(peer); /* drop ref taken above */
1842 read_lock(&kmxlnd_data.kmx_global_lock);
1843 conn = peer->mxp_conn;
1845 mxlnd_conn_addref(conn);
1846 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1848 read_unlock(&kmxlnd_data.kmx_global_lock);
1851 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1852 if (conn == NULL && peer != NULL) {
1853 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1854 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1859 LASSERT (payload_nob == 0);
1862 case LNET_MSG_REPLY:
1864 /* Is the payload small enough not to need DATA? */
1865 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1866 if (nob <= MXLND_EAGER_SIZE)
1867 break; /* send EAGER */
1869 tx = mxlnd_get_idle_tx();
1870 if (unlikely(tx == NULL)) {
1871 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1872 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1873 libcfs_nid2str(nid));
1874 if (conn) mxlnd_conn_decref(conn);
1878 /* the peer may be NULL */
1879 tx->mxc_peer = peer;
1880 tx->mxc_conn = conn; /* may be NULL */
1881 /* we added a conn ref above */
1882 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1883 txmsg = tx->mxc_msg;
1884 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1885 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1886 tx->mxc_match = mxlnd_create_match(tx, 0);
1888 /* we must post a receive _before_ sending the request.
1889 * we need to determine how much to receive, it will be either
1890 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1892 rx = mxlnd_get_idle_rx();
1893 if (unlikely(rx == NULL)) {
1894 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1895 libcfs_nid2str(nid));
1896 mxlnd_put_idle_tx(tx);
1897 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1901 rx->mxc_peer = peer;
1902 /* conn may be NULL but unlikely since the first msg is always small */
1903 /* NOTE no need to lock peer before adding conn ref since we took
1904 * a conn ref for the tx (it cannot be freed between there and here ) */
1905 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1906 rx->mxc_conn = conn;
1907 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1908 rx->mxc_cookie = tx->mxc_cookie;
1909 rx->mxc_match = mxlnd_create_match(rx, 0);
1911 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1912 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1913 if (unlikely(ret != 0)) {
1914 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1915 libcfs_nid2str(nid));
1916 rx->mxc_lntmsg[0] = NULL;
1917 mxlnd_put_idle_rx(rx);
1918 mxlnd_put_idle_tx(tx);
1920 mxlnd_conn_decref(conn); /* for the rx... */
1921 mxlnd_conn_decref(conn); /* and for the tx */
1923 return -EHOSTUNREACH;
1930 if (routing || target_is_router)
1931 break; /* send EAGER */
1933 /* is the REPLY message too small for DATA? */
1934 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1935 if (nob <= MXLND_EAGER_SIZE)
1936 break; /* send EAGER */
1938 /* get tx (we need the cookie) , post rx for incoming DATA,
1939 * then post GET_REQ tx */
1940 tx = mxlnd_get_idle_tx();
1941 if (unlikely(tx == NULL)) {
1942 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1943 libcfs_nid2str(nid));
1944 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1947 rx_data = mxlnd_get_idle_rx();
1948 if (unlikely(rx_data == NULL)) {
1949 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1950 libcfs_nid2str(nid));
1951 mxlnd_put_idle_tx(tx);
1952 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1955 rx_data->mxc_peer = peer;
1956 /* NOTE no need to lock peer before adding conn ref since we took
1957 * a conn ref for the tx (it cannot be freed between there and here ) */
1958 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1959 rx_data->mxc_conn = conn; /* may be NULL */
1961 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1962 if (unlikely(ret != 0)) {
1963 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1964 libcfs_nid2str(nid));
1965 mxlnd_put_idle_rx(rx_data);
1966 mxlnd_put_idle_tx(tx);
1968 mxlnd_conn_decref(conn); /* for the rx_data... */
1969 mxlnd_conn_decref(conn); /* and for the tx */
1974 tx->mxc_peer = peer;
1975 tx->mxc_conn = conn; /* may be NULL */
1976 /* conn ref taken above */
1977 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1978 txmsg = tx->mxc_msg;
1979 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1980 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1981 tx->mxc_match = mxlnd_create_match(tx, 0);
1988 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1994 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1995 <= MXLND_EAGER_SIZE);
1997 tx = mxlnd_get_idle_tx();
1998 if (unlikely(tx == NULL)) {
1999 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2000 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2001 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2005 tx->mxc_peer = peer;
2006 tx->mxc_conn = conn; /* may be NULL */
2007 /* conn ref taken above */
2008 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2009 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2010 tx->mxc_match = mxlnd_create_match(tx, 0);
2012 txmsg = tx->mxc_msg;
2013 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2015 if (payload_kiov != NULL)
2016 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2017 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2018 payload_niov, payload_kiov, payload_offset, payload_nob);
2020 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2021 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2022 payload_niov, payload_iov, payload_offset, payload_nob);
2024 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2030 * The LND required recv function
2032 * This must not block.
2035 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2036 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2037 unsigned int offset, unsigned int mlen, unsigned int rlen)
2042 struct kmx_ctx *rx = private;
2043 struct kmx_msg *rxmsg = rx->mxc_msg;
2044 lnet_nid_t nid = rx->mxc_nid;
2045 struct kmx_ctx *tx = NULL;
2046 struct kmx_msg *txmsg = NULL;
2047 struct kmx_peer *peer = rx->mxc_peer;
2048 struct kmx_conn *conn = peer->mxp_conn;
2050 int msg_type = rxmsg->mxm_type;
2055 LASSERT (mlen <= rlen);
2056 /* Either all pages or all vaddrs */
2057 LASSERT (!(kiov != NULL && iov != NULL));
2058 LASSERT (peer != NULL);
2060 /* conn_addref(conn) already taken for the primary rx */
2063 case MXLND_MSG_EAGER:
2064 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2065 len = rx->mxc_status.xfer_length;
2066 if (unlikely(nob > len)) {
2067 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2068 libcfs_nid2str(nid), nob, len);
2074 lnet_copy_flat2kiov(niov, kiov, offset,
2075 MXLND_EAGER_SIZE, rxmsg,
2076 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2079 lnet_copy_flat2iov(niov, iov, offset,
2080 MXLND_EAGER_SIZE, rxmsg,
2081 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2087 case MXLND_MSG_PUT_REQ:
2088 /* we are going to reuse the rx, store the needed info */
2089 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2091 /* get tx, post rx, send PUT_ACK */
2093 tx = mxlnd_get_idle_tx();
2094 if (unlikely(tx == NULL)) {
2095 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2096 /* Not replying will break the connection */
2100 if (unlikely(mlen == 0)) {
2102 tx->mxc_peer = peer;
2103 tx->mxc_conn = conn;
2104 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2109 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2110 tx->mxc_peer = peer;
2111 tx->mxc_conn = conn;
2112 /* no need to lock peer first since we already have a ref */
2113 mxlnd_conn_addref(conn); /* for the tx */
2114 txmsg = tx->mxc_msg;
2115 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2116 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2117 tx->mxc_cookie = cookie;
2118 tx->mxc_match = mxlnd_create_match(tx, 0);
2120 /* we must post a receive _before_ sending the PUT_ACK */
2122 rx->mxc_state = MXLND_CTX_PREP;
2123 rx->mxc_peer = peer;
2124 rx->mxc_conn = conn;
2125 /* do not take another ref for this rx, it is already taken */
2126 rx->mxc_nid = peer->mxp_nid;
2127 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2128 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2130 if (unlikely(ret != 0)) {
2131 /* Notify peer that it's over */
2132 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2133 libcfs_nid2str(nid), ret);
2135 tx->mxc_state = MXLND_CTX_PREP;
2136 tx->mxc_peer = peer;
2137 tx->mxc_conn = conn;
2138 /* finalize = 0, let the PUT_ACK tx finalize this */
2139 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2140 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2141 /* conn ref already taken above */
2142 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2148 /* do not return a credit until after PUT_DATA returns */
2152 case MXLND_MSG_GET_REQ:
2153 if (likely(lntmsg != NULL)) {
2154 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2155 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2157 /* GET didn't match anything */
2158 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2159 * We have to embed the error code in the match bits.
2160 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2161 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2163 tx = mxlnd_get_idle_tx();
2164 if (unlikely(tx == NULL)) {
2165 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2166 libcfs_nid2str(nid));
2170 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2171 tx->mxc_state = MXLND_CTX_PENDING;
2173 tx->mxc_peer = peer;
2174 tx->mxc_conn = conn;
2175 /* no need to lock peer first since we already have a ref */
2176 mxlnd_conn_addref(conn); /* for this tx */
2177 tx->mxc_cookie = cookie;
2178 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2179 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2182 /* finalize lntmsg after tx completes */
2190 /* we received a message, increment peer's outstanding credits */
2192 spin_lock(&conn->mxk_lock);
2193 conn->mxk_outstanding++;
2194 spin_unlock(&conn->mxk_lock);
2196 /* we are done with the rx */
2197 mxlnd_put_idle_rx(rx);
2198 mxlnd_conn_decref(conn);
2201 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2203 /* we received a credit, see if we can use it to send a msg */
2204 if (credit) mxlnd_check_sends(peer);
2210 mxlnd_sleep(unsigned long timeout)
2212 set_current_state(TASK_INTERRUPTIBLE);
2213 schedule_timeout(timeout);
2218 * The generic send queue thread
2219 * \param arg thread id (as a void *)
2221 * This thread moves send messages from the global tx_queue to the owning
2222 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2223 * it to the global peer list.
2226 mxlnd_tx_queued(void *arg)
2228 long id = (long) arg;
2231 struct kmx_ctx *tx = NULL;
2232 struct kmx_peer *peer = NULL;
2233 struct list_head *tmp_tx = NULL;
2235 cfs_daemonize("mxlnd_tx_queued");
2236 //cfs_block_allsigs();
2238 while (!kmxlnd_data.kmx_shutdown) {
2239 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2240 if (kmxlnd_data.kmx_shutdown)
2242 if (ret != 0) // Should we check for -EINTR?
2244 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2245 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2246 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2249 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2250 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2251 list_del_init(&tx->mxc_list);
2252 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2255 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2257 tx->mxc_peer = peer;
2258 write_lock(&kmxlnd_data.kmx_global_lock);
2259 if (peer->mxp_conn == NULL) {
2260 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2262 /* out of memory, give up and fail tx */
2263 tx->mxc_status.code = -ENOMEM;
2264 write_unlock(&kmxlnd_data.kmx_global_lock);
2265 mxlnd_peer_decref(peer);
2266 mxlnd_put_idle_tx(tx);
2270 tx->mxc_conn = peer->mxp_conn;
2271 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2272 write_unlock(&kmxlnd_data.kmx_global_lock);
2273 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2279 struct kmx_peer *peer = NULL;
2280 struct kmx_peer *old = NULL;
2282 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2284 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2285 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2287 /* adds conn ref for this function */
2288 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2289 *kmxlnd_tunables.kmx_board,
2290 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2292 /* finalize message */
2293 tx->mxc_status.code = ret;
2294 mxlnd_put_idle_tx(tx);
2297 tx->mxc_peer = peer;
2298 tx->mxc_conn = peer->mxp_conn;
2299 /* this tx will keep the conn ref taken in peer_alloc() */
2301 /* add peer to global peer list, but look to see
2302 * if someone already created it after we released
2304 write_lock(&kmxlnd_data.kmx_global_lock);
2305 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2306 if (old->mxp_nid == peer->mxp_nid) {
2307 /* somebody beat us here, we created a duplicate */
2314 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2315 atomic_inc(&kmxlnd_data.kmx_npeers);
2318 tx->mxc_conn = old->mxp_conn;
2319 /* FIXME can conn be NULL? */
2320 LASSERT(old->mxp_conn != NULL);
2321 mxlnd_conn_addref(old->mxp_conn);
2322 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2323 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2324 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2325 mxlnd_peer_decref(peer);
2327 write_unlock(&kmxlnd_data.kmx_global_lock);
2332 mxlnd_thread_stop(id);
2336 /* When calling this, we must not have the peer lock. */
2338 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2340 mx_return_t mxret = MX_SUCCESS;
2341 mx_request_t request;
2342 struct kmx_conn *conn = peer->mxp_conn;
2343 u8 msg_type = (u8) MXLND_MSG_TYPE(mask);
2345 /* NOTE we are holding a conn ref every time we call this function,
2346 * we do not need to lock the peer before taking another ref */
2347 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2349 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2351 if (peer->mxp_reconnect_time == 0) {
2352 peer->mxp_reconnect_time = jiffies;
2355 if (peer->mxp_nic_id == 0ULL) {
2358 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2360 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2362 if (peer->mxp_nic_id == 0ULL) {
2363 /* not mapped yet, return */
2364 spin_lock(&conn->mxk_lock);
2365 conn->mxk_status = MXLND_CONN_INIT;
2366 spin_unlock(&conn->mxk_lock);
2367 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2368 /* give up and notify LNET */
2369 mxlnd_conn_disconnect(conn, 0, 0);
2370 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2372 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2375 mxlnd_conn_decref(conn);
2380 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2381 peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2382 (void *) peer, &request);
2383 if (unlikely(mxret != MX_SUCCESS)) {
2384 spin_lock(&conn->mxk_lock);
2385 conn->mxk_status = MXLND_CONN_FAIL;
2386 spin_unlock(&conn->mxk_lock);
2387 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2388 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2389 mxlnd_conn_decref(conn);
2394 #define MXLND_STATS 0
2397 mxlnd_check_sends(struct kmx_peer *peer)
2401 mx_return_t mxret = MX_SUCCESS;
2402 struct kmx_ctx *tx = NULL;
2403 struct kmx_conn *conn = NULL;
2410 static unsigned long last = 0;
2413 if (unlikely(peer == NULL)) {
2414 LASSERT(peer != NULL);
2417 write_lock(&kmxlnd_data.kmx_global_lock);
2418 conn = peer->mxp_conn;
2419 /* NOTE take a ref for the duration of this function since it is called
2420 * when there might not be any queued txs for this peer */
2421 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2422 write_unlock(&kmxlnd_data.kmx_global_lock);
2424 /* do not add another ref for this tx */
2427 /* we do not have any conns */
2432 if (time_after(jiffies, last)) {
2433 last = jiffies + HZ;
2434 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2435 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2436 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2437 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2438 conn->mxk_ntx_data, conn->mxk_data_posted);
2442 /* cache peer state for asserts */
2443 spin_lock(&conn->mxk_lock);
2444 ntx_posted = conn->mxk_ntx_posted;
2445 credits = conn->mxk_credits;
2446 spin_unlock(&conn->mxk_lock);
2448 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2449 LASSERT(ntx_posted >= 0);
2451 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2452 LASSERT(credits >= 0);
2454 /* check number of queued msgs, ignore data */
2455 spin_lock(&conn->mxk_lock);
2456 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2457 /* check if any txs queued that could return credits... */
2458 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2459 /* if not, send a NOOP */
2460 tx = mxlnd_get_idle_tx();
2461 if (likely(tx != NULL)) {
2462 tx->mxc_peer = peer;
2463 tx->mxc_conn = peer->mxp_conn;
2464 mxlnd_conn_addref(conn); /* for this tx */
2465 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2466 tx->mxc_match = mxlnd_create_match(tx, 0);
2467 mxlnd_peer_queue_tx_locked(tx);
2473 spin_unlock(&conn->mxk_lock);
2475 /* if the peer is not ready, try to connect */
2476 spin_lock(&conn->mxk_lock);
2477 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2478 conn->mxk_status == MXLND_CONN_FAIL ||
2479 conn->mxk_status == MXLND_CONN_REQ)) {
2480 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2481 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2482 conn->mxk_status = MXLND_CONN_WAIT;
2483 spin_unlock(&conn->mxk_lock);
2484 mxlnd_iconnect(peer, match);
2487 spin_unlock(&conn->mxk_lock);
2489 spin_lock(&conn->mxk_lock);
2490 while (!list_empty(&conn->mxk_tx_free_queue) ||
2491 !list_empty(&conn->mxk_tx_credit_queue)) {
2492 /* We have something to send. If we have a queued tx that does not
2493 * require a credit (free), choose it since its completion will
2494 * return a credit (here or at the peer), complete a DATA or
2495 * CONN_REQ or CONN_ACK. */
2496 struct list_head *tmp_tx = NULL;
2497 if (!list_empty(&conn->mxk_tx_free_queue)) {
2498 tmp_tx = &conn->mxk_tx_free_queue;
2500 tmp_tx = &conn->mxk_tx_credit_queue;
2502 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2504 msg_type = tx->mxc_msg_type;
2506 /* don't try to send a rx */
2507 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2509 /* ensure that it is a valid msg type */
2510 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2511 msg_type == MXLND_MSG_CONN_ACK ||
2512 msg_type == MXLND_MSG_NOOP ||
2513 msg_type == MXLND_MSG_EAGER ||
2514 msg_type == MXLND_MSG_PUT_REQ ||
2515 msg_type == MXLND_MSG_PUT_ACK ||
2516 msg_type == MXLND_MSG_PUT_DATA ||
2517 msg_type == MXLND_MSG_GET_REQ ||
2518 msg_type == MXLND_MSG_GET_DATA);
2519 LASSERT(tx->mxc_peer == peer);
2520 LASSERT(tx->mxc_nid == peer->mxp_nid);
2522 credit = mxlnd_tx_requires_credit(tx);
2525 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2526 CDEBUG(D_NET, "%s: posted enough\n",
2527 libcfs_nid2str(peer->mxp_nid));
2531 if (conn->mxk_credits == 0) {
2532 CDEBUG(D_NET, "%s: no credits\n",
2533 libcfs_nid2str(peer->mxp_nid));
2537 if (conn->mxk_credits == 1 && /* last credit reserved for */
2538 conn->mxk_outstanding == 0) { /* giving back credits */
2539 CDEBUG(D_NET, "%s: not using last credit\n",
2540 libcfs_nid2str(peer->mxp_nid));
2545 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2546 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2547 msg_type == MXLND_MSG_CONN_ACK)) {
2548 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2549 mxlnd_connstatus_to_str(conn->mxk_status),
2551 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2552 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2553 list_del_init(&tx->mxc_list);
2554 tx->mxc_status.code = -ECONNABORTED;
2555 mxlnd_put_idle_tx(tx);
2556 mxlnd_conn_decref(conn);
2562 list_del_init(&tx->mxc_list);
2564 /* handle credits, etc now while we have the lock to avoid races */
2566 conn->mxk_credits--;
2567 conn->mxk_ntx_posted++;
2569 if (msg_type != MXLND_MSG_PUT_DATA &&
2570 msg_type != MXLND_MSG_GET_DATA) {
2571 if (msg_type != MXLND_MSG_CONN_REQ &&
2572 msg_type != MXLND_MSG_CONN_ACK) {
2573 conn->mxk_ntx_msgs--;
2576 if (tx->mxc_incarnation == 0 &&
2577 conn->mxk_incarnation != 0) {
2578 tx->mxc_incarnation = conn->mxk_incarnation;
2580 spin_unlock(&conn->mxk_lock);
2582 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2583 * or (2) there is a non-DATA msg that can return credits in the
2584 * queue, then drop this duplicate NOOP */
2585 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2586 spin_lock(&conn->mxk_lock);
2587 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2588 (conn->mxk_ntx_msgs >= 1)) {
2589 conn->mxk_credits++;
2590 conn->mxk_ntx_posted--;
2591 spin_unlock(&conn->mxk_lock);
2592 /* redundant NOOP */
2593 mxlnd_put_idle_tx(tx);
2594 mxlnd_conn_decref(conn);
2595 CDEBUG(D_NET, "%s: redundant noop\n",
2596 libcfs_nid2str(peer->mxp_nid));
2600 spin_unlock(&conn->mxk_lock);
2604 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2605 (msg_type != MXLND_MSG_GET_DATA))) {
2609 //ret = -ECONNABORTED;
2612 spin_lock(&conn->mxk_lock);
2613 status = conn->mxk_status;
2614 spin_unlock(&conn->mxk_lock);
2616 if (likely((status == MXLND_CONN_READY) ||
2617 (msg_type == MXLND_MSG_CONN_REQ) ||
2618 (msg_type == MXLND_MSG_CONN_ACK))) {
2620 if (msg_type != MXLND_MSG_CONN_REQ &&
2621 msg_type != MXLND_MSG_CONN_ACK) {
2622 /* add to the pending list */
2623 ret = mxlnd_q_pending_ctx(tx);
2625 /* FIXME the conn is disconnected, now what? */
2629 tx->mxc_state = MXLND_CTX_PENDING;
2633 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2634 msg_type != MXLND_MSG_GET_DATA)) {
2635 /* send a msg style tx */
2636 LASSERT(tx->mxc_nseg == 1);
2637 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2638 CDEBUG(D_NET, "sending %s 0x%llx\n",
2639 mxlnd_msgtype_to_str(msg_type),
2641 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2650 /* send a DATA tx */
2651 spin_lock(&conn->mxk_lock);
2652 conn->mxk_ntx_data--;
2653 conn->mxk_data_posted++;
2654 spin_unlock(&conn->mxk_lock);
2655 CDEBUG(D_NET, "sending %s 0x%llx\n",
2656 mxlnd_msgtype_to_str(msg_type),
2658 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2668 mxret = MX_CONNECTION_FAILED;
2670 if (likely(mxret == MX_SUCCESS)) {
2673 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2674 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2675 libcfs_nid2str(peer->mxp_nid));
2676 /* NOTE mx_kisend() only fails if there are not enough
2677 * resources. Do not change the connection status. */
2678 if (mxret == MX_NO_RESOURCES) {
2679 tx->mxc_status.code = -ENOMEM;
2681 tx->mxc_status.code = -ECONNABORTED;
2684 spin_lock(&conn->mxk_lock);
2685 conn->mxk_ntx_posted--;
2686 conn->mxk_credits++;
2687 spin_unlock(&conn->mxk_lock);
2688 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2689 msg_type == MXLND_MSG_GET_DATA) {
2690 spin_lock(&conn->mxk_lock);
2691 conn->mxk_data_posted--;
2692 spin_unlock(&conn->mxk_lock);
2694 if (msg_type != MXLND_MSG_PUT_DATA &&
2695 msg_type != MXLND_MSG_GET_DATA &&
2696 msg_type != MXLND_MSG_CONN_REQ &&
2697 msg_type != MXLND_MSG_CONN_ACK) {
2698 spin_lock(&conn->mxk_lock);
2699 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2700 spin_unlock(&conn->mxk_lock);
2702 if (msg_type != MXLND_MSG_CONN_REQ &&
2703 msg_type != MXLND_MSG_CONN_ACK) {
2704 /* remove from the pending list */
2705 mxlnd_deq_pending_ctx(tx);
2707 mxlnd_put_idle_tx(tx);
2708 mxlnd_conn_decref(conn);
2711 spin_lock(&conn->mxk_lock);
2714 spin_unlock(&conn->mxk_lock);
2716 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2722 * A tx completed, progress or complete the msg
2723 * \param tx the tx descriptor
2725 * Determine which type of send request it was and start the next step, if needed,
2726 * or, if done, signal completion to LNET. After we are done, put back on the
2730 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2732 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2733 struct kmx_msg *msg = tx->mxc_msg;
2734 struct kmx_peer *peer = tx->mxc_peer;
2735 struct kmx_conn *conn = tx->mxc_conn;
2736 u8 type = tx->mxc_msg_type;
2737 int credit = mxlnd_tx_requires_credit(tx);
2738 u64 cookie = tx->mxc_cookie;
2740 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2741 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2743 if (unlikely(conn == NULL)) {
2744 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2745 conn = peer->mxp_conn;
2747 /* do not add a ref for the tx, it was set before sending */
2748 tx->mxc_conn = conn;
2749 tx->mxc_peer = conn->mxk_peer;
2752 LASSERT (peer != NULL);
2753 LASSERT (conn != NULL);
2755 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2756 LASSERT (type == msg->mxm_type);
2760 tx->mxc_status.code = -EIO;
2762 spin_lock(&conn->mxk_lock);
2763 conn->mxk_last_tx = jiffies;
2764 spin_unlock(&conn->mxk_lock);
2769 case MXLND_MSG_GET_DATA:
2770 spin_lock(&conn->mxk_lock);
2771 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2772 conn->mxk_outstanding++;
2773 conn->mxk_data_posted--;
2775 spin_unlock(&conn->mxk_lock);
2778 case MXLND_MSG_PUT_DATA:
2779 spin_lock(&conn->mxk_lock);
2780 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2781 conn->mxk_data_posted--;
2783 spin_unlock(&conn->mxk_lock);
2786 case MXLND_MSG_NOOP:
2787 case MXLND_MSG_PUT_REQ:
2788 case MXLND_MSG_PUT_ACK:
2789 case MXLND_MSG_GET_REQ:
2790 case MXLND_MSG_EAGER:
2791 //case MXLND_MSG_NAK:
2794 case MXLND_MSG_CONN_ACK:
2795 if (peer->mxp_incompatible) {
2796 /* we sent our params, now close this conn */
2797 mxlnd_conn_disconnect(conn, 0, 1);
2799 case MXLND_MSG_CONN_REQ:
2801 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2802 "failed with %s (%d) to %s\n",
2803 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2804 mx_strstatus(tx->mxc_status.code),
2805 tx->mxc_status.code,
2806 libcfs_nid2str(tx->mxc_nid));
2807 if (!peer->mxp_incompatible) {
2808 spin_lock(&conn->mxk_lock);
2809 conn->mxk_status = MXLND_CONN_FAIL;
2810 spin_unlock(&conn->mxk_lock);
2816 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2821 spin_lock(&conn->mxk_lock);
2822 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2823 conn->mxk_ntx_posted--;
2825 spin_unlock(&conn->mxk_lock);
2828 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2829 mxlnd_put_idle_tx(tx);
2830 mxlnd_conn_decref(conn);
2832 mxlnd_check_sends(peer);
2838 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2843 u32 nob = rx->mxc_status.xfer_length;
2844 u64 bits = rx->mxc_status.match_info;
2845 struct kmx_msg *msg = rx->mxc_msg;
2846 struct kmx_peer *peer = rx->mxc_peer;
2847 struct kmx_conn *conn = rx->mxc_conn;
2848 u8 type = rx->mxc_msg_type;
2850 lnet_msg_t *lntmsg[2];
2857 int incompatible = 0;
2860 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2861 * failed GET reply, CONN_REQ, or a CONN_ACK */
2863 /* NOTE peer may still be NULL if it is a new peer and
2864 * conn may be NULL if this is a re-connect */
2865 if (likely(peer != NULL && conn != NULL)) {
2866 /* we have a reference on the conn */
2868 } else if (peer != NULL && conn == NULL) {
2869 /* we have a reference on the peer */
2871 } else if (peer == NULL && conn != NULL) {
2873 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2875 } /* else peer and conn == NULL */
2878 if (peer == NULL || conn == NULL) {
2879 /* if the peer was disconnected, the peer may exist but
2880 * not have any valid conns */
2881 decref = 0; /* no peer means no ref was taken for this rx */
2885 if (conn == NULL && peer != NULL) {
2886 write_lock(&kmxlnd_data.kmx_global_lock);
2887 conn = peer->mxp_conn;
2889 mxlnd_conn_addref(conn); /* conn takes ref... */
2890 mxlnd_peer_decref(peer); /* from peer */
2894 write_unlock(&kmxlnd_data.kmx_global_lock);
2895 rx->mxc_conn = conn;
2899 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2905 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2906 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2907 libcfs_nid2str(rx->mxc_nid),
2908 mx_strstatus(rx->mxc_status.code),
2909 (int) rx->mxc_status.code);
2915 /* this may be a failed GET reply */
2916 if (type == MXLND_MSG_GET_DATA) {
2917 /* get the error (52-59) bits from the match bits */
2918 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2919 lntmsg[0] = rx->mxc_lntmsg[0];
2923 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2924 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2925 libcfs_nid2str(rx->mxc_nid));
2930 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2931 if (type == MXLND_MSG_PUT_DATA) {
2932 result = rx->mxc_status.code;
2933 lntmsg[0] = rx->mxc_lntmsg[0];
2935 } else if (type == MXLND_MSG_GET_DATA) {
2936 result = rx->mxc_status.code;
2937 lntmsg[0] = rx->mxc_lntmsg[0];
2938 lntmsg[1] = rx->mxc_lntmsg[1];
2942 ret = mxlnd_unpack_msg(msg, nob);
2944 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2945 ret, libcfs_nid2str(rx->mxc_nid));
2949 type = msg->mxm_type;
2952 if (type != MXLND_MSG_CONN_REQ &&
2953 (rx->mxc_nid != msg->mxm_srcnid ||
2954 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2955 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2956 "0x%llx and rx msg dst is 0x%llx)\n",
2957 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2962 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2963 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2964 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2966 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2967 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2968 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2969 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2970 msg->mxm_srcstamp, conn->mxk_incarnation,
2971 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2973 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2974 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2975 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2976 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2983 CDEBUG(D_NET, "Received %s with %d credits\n",
2984 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2986 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2987 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2988 LASSERT(peer != NULL);
2989 LASSERT(conn != NULL);
2990 if (msg->mxm_credits != 0) {
2991 spin_lock(&conn->mxk_lock);
2992 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2993 if ((conn->mxk_credits + msg->mxm_credits) >
2994 *kmxlnd_tunables.kmx_credits) {
2995 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2996 conn->mxk_credits, msg->mxm_credits);
2998 conn->mxk_credits += msg->mxm_credits;
2999 LASSERT(conn->mxk_credits >= 0);
3000 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3002 spin_unlock(&conn->mxk_lock);
3006 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3008 case MXLND_MSG_NOOP:
3011 case MXLND_MSG_EAGER:
3012 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3013 msg->mxm_srcnid, rx, 0);
3017 case MXLND_MSG_PUT_REQ:
3018 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3019 msg->mxm_srcnid, rx, 1);
3023 case MXLND_MSG_PUT_ACK: {
3024 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3025 if (cookie > MXLND_MAX_COOKIE) {
3026 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3027 libcfs_nid2str(rx->mxc_nid));
3028 result = -((u32) MXLND_ERROR_VAL(cookie));
3029 lntmsg[0] = rx->mxc_lntmsg[0];
3031 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3032 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3033 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3038 case MXLND_MSG_GET_REQ:
3039 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3040 msg->mxm_srcnid, rx, 1);
3044 case MXLND_MSG_CONN_REQ:
3045 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3046 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3047 libcfs_nid2str(msg->mxm_srcnid),
3048 libcfs_nid2str(msg->mxm_dstnid));
3051 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3052 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3054 libcfs_nid2str(msg->mxm_srcnid),
3055 msg->mxm_u.conn_req.mxcrm_queue_depth,
3056 *kmxlnd_tunables.kmx_credits);
3059 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3060 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3062 libcfs_nid2str(msg->mxm_srcnid),
3063 msg->mxm_u.conn_req.mxcrm_eager_size,
3064 (int) MXLND_EAGER_SIZE);
3067 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3069 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3072 struct kmx_peer *existing_peer = NULL;
3073 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3075 rx->mxc_nid = msg->mxm_srcnid;
3077 /* adds conn ref for peer and one for this function */
3078 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3079 *kmxlnd_tunables.kmx_board,
3080 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3084 peer->mxp_sid = sid;
3085 LASSERT(peer->mxp_ep_id == ep_id);
3086 write_lock(&kmxlnd_data.kmx_global_lock);
3087 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3088 if (existing_peer) {
3089 mxlnd_conn_decref(peer->mxp_conn);
3090 mxlnd_peer_decref(peer);
3091 peer = existing_peer;
3092 mxlnd_conn_addref(peer->mxp_conn);
3094 list_add_tail(&peer->mxp_peers,
3095 &kmxlnd_data.kmx_peers[hash]);
3096 atomic_inc(&kmxlnd_data.kmx_npeers);
3098 write_unlock(&kmxlnd_data.kmx_global_lock);
3100 /* FIXME should write lock here */
3101 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3102 mxlnd_peer_decref(peer); /* drop ref taken above */
3104 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3108 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3109 conn = peer->mxp_conn;
3110 } else { /* found peer */
3111 struct kmx_conn *old_conn = conn;
3113 if (sid != peer->mxp_sid) {
3114 /* do not call mx_disconnect() or send a BYE */
3115 mxlnd_conn_disconnect(old_conn, 0, 0);
3117 /* the ref for this rx was taken on the old_conn */
3118 mxlnd_conn_decref(old_conn);
3120 /* This allocs a conn, points peer->mxp_conn to this one.
3121 * The old conn is still on the peer->mxp_conns list.
3122 * As the pending requests complete, they will call
3123 * conn_decref() which will eventually free it. */
3124 ret = mxlnd_conn_alloc(&conn, peer);
3126 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3129 /* conn_alloc() adds one ref for the peer and one
3130 * for this function */
3133 peer->mxp_sid = sid;
3136 write_lock(&kmxlnd_data.kmx_global_lock);
3137 peer->mxp_incarnation = msg->mxm_srcstamp;
3138 peer->mxp_incompatible = incompatible;
3139 write_unlock(&kmxlnd_data.kmx_global_lock);
3140 spin_lock(&conn->mxk_lock);
3141 conn->mxk_incarnation = msg->mxm_srcstamp;
3142 conn->mxk_status = MXLND_CONN_WAIT;
3143 spin_unlock(&conn->mxk_lock);
3145 /* handle_conn_ack() will create the CONN_ACK msg */
3146 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3147 mxlnd_iconnect(peer, match);
3151 case MXLND_MSG_CONN_ACK:
3152 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3153 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3154 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3155 libcfs_nid2str(msg->mxm_dstnid));
3159 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3160 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3161 "incompatible queue depth %d (%d wanted)\n",
3162 libcfs_nid2str(msg->mxm_srcnid),
3163 msg->mxm_u.conn_req.mxcrm_queue_depth,
3164 *kmxlnd_tunables.kmx_credits);
3165 spin_lock(&conn->mxk_lock);
3166 conn->mxk_status = MXLND_CONN_FAIL;
3167 spin_unlock(&conn->mxk_lock);
3171 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3172 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3173 "incompatible EAGER size %d (%d wanted)\n",
3174 libcfs_nid2str(msg->mxm_srcnid),
3175 msg->mxm_u.conn_req.mxcrm_eager_size,
3176 (int) MXLND_EAGER_SIZE);
3177 spin_lock(&conn->mxk_lock);
3178 conn->mxk_status = MXLND_CONN_FAIL;
3179 spin_unlock(&conn->mxk_lock);
3183 write_lock(&kmxlnd_data.kmx_global_lock);
3184 peer->mxp_incarnation = msg->mxm_srcstamp;
3185 peer->mxp_incompatible = incompatible;
3186 write_unlock(&kmxlnd_data.kmx_global_lock);
3187 spin_lock(&conn->mxk_lock);
3188 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3189 conn->mxk_outstanding = 0;
3190 conn->mxk_incarnation = msg->mxm_srcstamp;
3191 conn->mxk_timeout = 0;
3192 if (!incompatible) {
3193 conn->mxk_status = MXLND_CONN_READY;
3195 spin_unlock(&conn->mxk_lock);
3196 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3200 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3201 libcfs_nid2str(rx->mxc_nid));
3208 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3209 spin_lock(&conn->mxk_lock);
3210 conn->mxk_status = MXLND_CONN_FAIL;
3211 spin_unlock(&conn->mxk_lock);
3216 spin_lock(&conn->mxk_lock);
3217 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3218 spin_unlock(&conn->mxk_lock);
3222 /* lnet_parse() failed, etc., repost now */
3223 mxlnd_put_idle_rx(rx);
3224 if (conn != NULL && credit == 1) {
3225 if (type == MXLND_MSG_PUT_DATA) {
3226 spin_lock(&conn->mxk_lock);
3227 conn->mxk_outstanding++;
3228 spin_unlock(&conn->mxk_lock);
3229 } else if (type != MXLND_MSG_GET_DATA &&
3230 (type == MXLND_MSG_EAGER ||
3231 type == MXLND_MSG_PUT_REQ ||
3232 type == MXLND_MSG_NOOP)) {
3233 spin_lock(&conn->mxk_lock);
3234 conn->mxk_outstanding++;
3235 spin_unlock(&conn->mxk_lock);
3238 if (conn_ref) mxlnd_conn_decref(conn);
3239 LASSERT(peer_ref == 0);
3242 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3243 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3245 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3248 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3249 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3251 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3259 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3261 struct kmx_ctx *tx = NULL;
3262 struct kmx_msg *txmsg = NULL;
3263 struct kmx_conn *conn = peer->mxp_conn;
3265 /* a conn ref was taken when calling mx_iconnect(),
3266 * hold it until CONN_REQ or CONN_ACK completes */
3268 CDEBUG(D_NET, "entering\n");
3269 if (status.code != MX_STATUS_SUCCESS) {
3270 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3271 mx_strstatus(status.code), status.code,
3272 libcfs_nid2str(peer->mxp_nid));
3273 spin_lock(&conn->mxk_lock);
3274 conn->mxk_status = MXLND_CONN_FAIL;
3275 spin_unlock(&conn->mxk_lock);
3277 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3278 struct kmx_conn *new_conn = NULL;
3279 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3280 /* FIXME write lock here ? */
3281 mxlnd_conn_disconnect(conn, 0, 0);
3282 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3283 mxlnd_conn_decref(new_conn); /* which we no longer need */
3284 peer->mxp_reconnect_time = 0;
3287 mxlnd_conn_decref(conn);
3291 spin_lock(&conn->mxk_lock);
3292 conn->mxk_epa = status.source;
3293 spin_unlock(&conn->mxk_lock);
3294 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3295 * we should not need to lock the peer */
3296 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3298 /* mx_iconnect() succeeded, reset delay to 0 */
3299 write_lock(&kmxlnd_data.kmx_global_lock);
3300 peer->mxp_reconnect_time = 0;
3301 write_unlock(&kmxlnd_data.kmx_global_lock);
3303 /* marshal CONN_REQ msg */
3304 /* we are still using the conn ref from iconnect() - do not take another */
3305 tx = mxlnd_get_idle_tx();
3307 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3308 libcfs_nid2str(peer->mxp_nid));
3309 spin_lock(&conn->mxk_lock);
3310 conn->mxk_status = MXLND_CONN_FAIL;
3311 spin_unlock(&conn->mxk_lock);
3312 mxlnd_conn_decref(conn);
3316 tx->mxc_peer = peer;
3317 tx->mxc_conn = conn;
3318 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3319 txmsg = tx->mxc_msg;
3320 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3321 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3322 tx->mxc_match = mxlnd_create_match(tx, 0);
3324 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3330 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3332 struct kmx_ctx *tx = NULL;
3333 struct kmx_msg *txmsg = NULL;
3334 struct kmx_conn *conn = peer->mxp_conn;
3339 /* a conn ref was taken when calling mx_iconnect(),
3340 * hold it until CONN_REQ or CONN_ACK completes */
3342 CDEBUG(D_NET, "entering\n");
3343 if (status.code != MX_STATUS_SUCCESS) {
3344 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3345 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3346 mx_strstatus(status.code), status.code,
3347 libcfs_nid2str(peer->mxp_nid),
3351 spin_lock(&conn->mxk_lock);
3352 conn->mxk_status = MXLND_CONN_FAIL;
3353 spin_unlock(&conn->mxk_lock);
3355 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3356 struct kmx_conn *new_conn = NULL;
3357 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3358 /* FIXME write lock here? */
3359 mxlnd_conn_disconnect(conn, 0, 1);
3360 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3362 mxlnd_conn_decref(new_conn); /* which we no longer need */
3363 peer->mxp_reconnect_time = 0;
3366 mxlnd_conn_decref(conn);
3369 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3370 spin_lock(&conn->mxk_lock);
3371 conn->mxk_epa = status.source;
3372 if (likely(!peer->mxp_incompatible)) {
3373 conn->mxk_status = MXLND_CONN_READY;
3375 spin_unlock(&conn->mxk_lock);
3376 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3377 * we should not have to lock the peer */
3378 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3380 /* mx_iconnect() succeeded, reset delay to 0 */
3381 write_lock(&kmxlnd_data.kmx_global_lock);
3382 peer->mxp_reconnect_time = 0;
3383 peer->mxp_sid = sid;
3384 write_unlock(&kmxlnd_data.kmx_global_lock);
3386 /* marshal CONN_ACK msg */
3387 tx = mxlnd_get_idle_tx();
3389 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3390 libcfs_nid2str(peer->mxp_nid));
3391 spin_lock(&conn->mxk_lock);
3392 conn->mxk_status = MXLND_CONN_FAIL;
3393 spin_unlock(&conn->mxk_lock);
3394 mxlnd_conn_decref(conn);
3398 tx->mxc_peer = peer;
3399 tx->mxc_conn = conn;
3400 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3401 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3402 txmsg = tx->mxc_msg;
3403 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3404 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3405 tx->mxc_match = mxlnd_create_match(tx, 0);
3412 * The MX request completion thread(s)
3413 * \param arg thread id (as a void *)
3415 * This thread waits for a MX completion and then completes the request.
3416 * We will create one thread per CPU.
3419 mxlnd_request_waitd(void *arg)
3421 long id = (long) arg;
3424 mx_return_t mxret = MX_SUCCESS;
3426 struct kmx_ctx *ctx = NULL;
3427 enum kmx_req_state req_type = MXLND_REQ_TX;
3428 struct kmx_peer *peer = NULL;
3429 struct kmx_conn *conn = NULL;
3434 memset(name, 0, sizeof(name));
3435 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3436 cfs_daemonize(name);
3437 //cfs_block_allsigs();
3439 memset(&status, 0, sizeof(status));
3441 CDEBUG(D_NET, "%s starting\n", name);
3443 while (!kmxlnd_data.kmx_shutdown) {
3449 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3450 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3454 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3455 0ULL, 0ULL, &status, &result);
3458 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3459 0ULL, 0ULL, &status, &result);
3461 if (unlikely(kmxlnd_data.kmx_shutdown))
3465 /* nothing completed... */
3469 if (status.code != MX_STATUS_SUCCESS) {
3470 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3471 "match_info 0x%llx and length %d\n",
3472 mx_strstatus(status.code), status.code,
3473 (u64) status.match_info, status.msg_length);
3476 msg_type = MXLND_MSG_TYPE(status.match_info);
3478 /* This may be a mx_iconnect() request completing,
3479 * check the bit mask for CONN_REQ and CONN_ACK */
3480 if (msg_type == MXLND_MSG_ICON_REQ ||
3481 msg_type == MXLND_MSG_ICON_ACK) {
3482 peer = (struct kmx_peer*) status.context;
3483 if (msg_type == MXLND_MSG_ICON_REQ) {
3484 mxlnd_handle_conn_req(peer, status);
3486 mxlnd_handle_conn_ack(peer, status);
3491 /* This must be a tx or rx */
3493 /* NOTE: if this is a RX from the unexpected callback, it may
3494 * have very little info. If we dropped it in unexpected_recv(),
3495 * it will not have a context. If so, ignore it. */
3496 ctx = (struct kmx_ctx *) status.context;
3499 req_type = ctx->mxc_type;
3500 conn = ctx->mxc_conn; /* this may be NULL */
3501 mxlnd_deq_pending_ctx(ctx);
3503 /* copy status to ctx->mxc_status */
3504 memcpy(&ctx->mxc_status, &status, sizeof(status));
3508 mxlnd_handle_tx_completion(ctx);
3511 mxlnd_handle_rx_completion(ctx);
3514 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3519 /* FIXME may need to reconsider this */
3520 /* conn is always set except for the first CONN_REQ rx
3521 * from a new peer */
3522 if (!(status.code == MX_STATUS_SUCCESS ||
3523 status.code == MX_STATUS_TRUNCATED) &&
3525 mxlnd_conn_disconnect(conn, 1, 1);
3528 CDEBUG(D_NET, "waitd() completed task\n");
3530 CDEBUG(D_NET, "%s stopping\n", name);
3531 mxlnd_thread_stop(id);
3537 mxlnd_check_timeouts(unsigned long now)
3541 unsigned long next = 0;
3542 struct kmx_peer *peer = NULL;
3543 struct kmx_conn *conn = NULL;
3545 read_lock(&kmxlnd_data.kmx_global_lock);
3546 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3547 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3549 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3550 read_unlock(&kmxlnd_data.kmx_global_lock);
3554 conn = peer->mxp_conn;
3556 mxlnd_conn_addref(conn);
3561 /* FIXMEis this needed? */
3562 spin_lock(&conn->mxk_lock);
3564 /* if nothing pending (timeout == 0) or
3565 * if conn is already disconnected,
3567 if (conn->mxk_timeout == 0 ||
3568 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3569 /* FIXME is this needed? */
3570 spin_unlock(&conn->mxk_lock);
3571 mxlnd_conn_decref(conn);
3575 /* we want to find the timeout that will occur first.
3576 * if it is in the future, we will sleep until then.
3577 * if it is in the past, then we will sleep one
3578 * second and repeat the process. */
3579 if ((next == 0) || (conn->mxk_timeout < next)) {
3580 next = conn->mxk_timeout;
3585 if (time_after_eq(now, conn->mxk_timeout)) {
3588 spin_unlock(&conn->mxk_lock);
3591 mxlnd_conn_disconnect(conn, 1, 1);
3593 mxlnd_conn_decref(conn);
3596 read_unlock(&kmxlnd_data.kmx_global_lock);
3597 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3603 * Enforces timeouts on messages
3604 * \param arg thread id (as a void *)
3606 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3607 * it calls mxlnd_conn_disconnect().
3609 * After checking for timeouts, try progressing sends (call check_sends()).
3612 mxlnd_timeoutd(void *arg)
3615 long id = (long) arg;
3616 unsigned long now = 0;
3617 unsigned long next = 0;
3618 unsigned long delay = HZ;
3619 struct kmx_peer *peer = NULL;
3620 struct kmx_conn *conn = NULL;
3622 cfs_daemonize("mxlnd_timeoutd");
3623 //cfs_block_allsigs();
3625 CDEBUG(D_NET, "timeoutd starting\n");
3627 while (!kmxlnd_data.kmx_shutdown) {
3630 /* if the next timeout has not arrived, go back to sleep */
3631 if (time_after(now, next)) {
3632 next = mxlnd_check_timeouts(now);
3635 read_lock(&kmxlnd_data.kmx_global_lock);
3636 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3637 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3638 /* FIXME upgrade to write lock?
3639 * is any lock needed? */
3640 conn = peer->mxp_conn;
3641 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3646 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3647 time_after(now, conn->mxk_last_tx + HZ)) {
3648 /* FIXME drop lock or call check_sends_locked */
3649 read_unlock(&kmxlnd_data.kmx_global_lock);
3650 mxlnd_check_sends(peer);
3651 read_lock(&kmxlnd_data.kmx_global_lock);
3653 mxlnd_conn_decref(conn); /* until here */
3656 read_unlock(&kmxlnd_data.kmx_global_lock);
3660 CDEBUG(D_NET, "timeoutd stopping\n");
3661 mxlnd_thread_stop(id);