1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 /* if memcmp() == 0, it is NULL */
52 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
55 inline void mxlnd_noop(char *s, ...)
61 mxlnd_ctxstate_to_str(int mxc_state)
65 return "MXLND_CTX_INIT";
67 return "MXLND_CTX_IDLE";
69 return "MXLND_CTX_PREP";
70 case MXLND_CTX_PENDING:
71 return "MXLND_CTX_PENDING";
72 case MXLND_CTX_COMPLETED:
73 return "MXLND_CTX_COMPLETED";
74 case MXLND_CTX_CANCELED:
75 return "MXLND_CTX_CANCELED";
82 mxlnd_connstatus_to_str(int mxk_status)
85 case MXLND_CONN_READY:
86 return "MXLND_CONN_READY";
88 return "MXLND_CONN_INIT";
90 return "MXLND_CONN_REQ";
92 return "MXLND_CONN_ACK";
94 return "MXLND_CONN_WAIT";
95 case MXLND_CONN_DISCONNECT:
96 return "MXLND_CONN_DISCONNECT";
98 return "MXLND_CONN_FAIL";
105 mxlnd_msgtype_to_str(int type) {
107 case MXLND_MSG_EAGER:
108 return "MXLND_MSG_EAGER";
109 case MXLND_MSG_CONN_REQ:
110 return "MXLND_MSG_CONN_REQ";
111 case MXLND_MSG_CONN_ACK:
112 return "MXLND_MSG_CONN_ACK";
114 return "MXLND_MSG_BYE";
116 return "MXLND_MSG_NOOP";
117 case MXLND_MSG_PUT_REQ:
118 return "MXLND_MSG_PUT_REQ";
119 case MXLND_MSG_PUT_ACK:
120 return "MXLND_MSG_PUT_ACK";
121 case MXLND_MSG_PUT_DATA:
122 return "MXLND_MSG_PUT_DATA";
123 case MXLND_MSG_GET_REQ:
124 return "MXLND_MSG_GET_REQ";
125 case MXLND_MSG_GET_DATA:
126 return "MXLND_MSG_GET_DATA";
133 mxlnd_lnetmsg_to_str(int type)
137 return "LNET_MSG_ACK";
139 return "LNET_MSG_PUT";
141 return "LNET_MSG_GET";
143 return "LNET_MSG_REPLY";
145 return "LNET_MSG_HELLO";
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
156 u64 type = (u64) ctx->mxc_msg_type;
157 u64 err = (u64) error;
160 LASSERT(ctx->mxc_msg_type != 0);
161 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
169 *msg_type = (u8) MXLND_MSG_TYPE(match);
170 *error = (u8) MXLND_ERROR_VAL(match);
171 *cookie = match & MXLND_MAX_COOKIE;
172 LASSERT(*msg_type == MXLND_MSG_EAGER ||
173 *msg_type == MXLND_MSG_ICON_REQ ||
174 *msg_type == MXLND_MSG_CONN_REQ ||
175 *msg_type == MXLND_MSG_ICON_ACK ||
176 *msg_type == MXLND_MSG_CONN_ACK ||
177 *msg_type == MXLND_MSG_BYE ||
178 *msg_type == MXLND_MSG_NOOP ||
179 *msg_type == MXLND_MSG_PUT_REQ ||
180 *msg_type == MXLND_MSG_PUT_ACK ||
181 *msg_type == MXLND_MSG_PUT_DATA ||
182 *msg_type == MXLND_MSG_GET_REQ ||
183 *msg_type == MXLND_MSG_GET_DATA);
188 mxlnd_get_idle_rx(void)
190 struct list_head *tmp = NULL;
191 struct kmx_ctx *rx = NULL;
193 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
195 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
200 tmp = &kmxlnd_data.kmx_rx_idle;
201 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202 list_del_init(&rx->mxc_list);
203 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
206 if (rx->mxc_get != rx->mxc_put) {
207 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
219 LASSERT (rx->mxc_get == rx->mxc_put);
223 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224 rx->mxc_state = MXLND_CTX_PREP;
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
233 CDEBUG(D_NETERROR, "called with NULL pointer\n");
235 } else if (rx->mxc_type != MXLND_REQ_RX) {
236 CDEBUG(D_NETERROR, "called with tx\n");
239 LASSERT(rx->mxc_get == rx->mxc_put + 1);
242 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
249 mxlnd_reduce_idle_rxs(__u32 count)
252 struct kmx_ctx *rx = NULL;
254 spin_lock(&kmxlnd_data.kmx_rxs_lock);
255 for (i = 0; i < count; i++) {
256 rx = mxlnd_get_idle_rx();
258 struct list_head *tmp = &rx->mxc_global_list;
262 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
266 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
271 mxlnd_get_idle_tx(void)
273 struct list_head *tmp = NULL;
274 struct kmx_ctx *tx = NULL;
276 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
278 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
284 tmp = &kmxlnd_data.kmx_tx_idle;
285 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286 list_del_init(&tx->mxc_list);
288 /* Allocate a new completion cookie. It might not be needed,
289 * but we've got a lock right now and we're unlikely to
291 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293 kmxlnd_data.kmx_tx_next_cookie = 1;
295 kmxlnd_data.kmx_tx_used++;
296 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
298 LASSERT (tx->mxc_get == tx->mxc_put);
302 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303 LASSERT (tx->mxc_lntmsg[0] == NULL);
304 LASSERT (tx->mxc_lntmsg[1] == NULL);
306 tx->mxc_state = MXLND_CTX_PREP;
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
317 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
319 lnet_msg_t *lntmsg[2];
322 CDEBUG(D_NETERROR, "called with NULL pointer\n");
324 } else if (tx->mxc_type != MXLND_REQ_TX) {
325 CDEBUG(D_NETERROR, "called with rx\n");
328 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329 tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330 struct kmx_conn *conn = tx->mxc_conn;
333 mxlnd_conn_disconnect(conn, 0, 1);
336 lntmsg[0] = tx->mxc_lntmsg[0];
337 lntmsg[1] = tx->mxc_lntmsg[1];
339 LASSERT(tx->mxc_get == tx->mxc_put + 1);
342 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344 kmxlnd_data.kmx_tx_used--;
345 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
353 * \param conn a kmx_conn pointer
355 * The calling function should remove the conn from the conns list first
359 mxlnd_conn_free(struct kmx_conn *conn)
361 struct kmx_peer *peer = conn->mxk_peer;
363 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365 list_empty (&conn->mxk_tx_free_queue) &&
366 list_empty (&conn->mxk_pending));
367 if (!list_empty(&conn->mxk_list)) {
368 list_del_init(&conn->mxk_list);
369 if (peer->mxp_conn == conn) {
370 peer->mxp_conn = NULL;
371 if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372 mx_set_endpoint_addr_context(conn->mxk_epa,
376 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377 MXLND_FREE (conn, sizeof (*conn));
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
386 struct kmx_ctx *ctx = NULL;
387 struct kmx_ctx *next = NULL;
388 mx_return_t mxret = MX_SUCCESS;
393 spin_lock(&conn->mxk_lock);
394 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395 /* we will delete all including txs */
396 list_del_init(&ctx->mxc_list);
397 if (ctx->mxc_type == MXLND_REQ_RX) {
399 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
402 if (mxret != MX_SUCCESS) {
403 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
406 ctx->mxc_status.code = -ECONNABORTED;
407 ctx->mxc_state = MXLND_CTX_CANCELED;
408 /* NOTE this calls lnet_finalize() and
409 * we cannot hold any locks when calling it.
410 * It also calls mxlnd_conn_decref(conn) */
411 spin_unlock(&conn->mxk_lock);
412 mxlnd_handle_rx_completion(ctx);
413 spin_lock(&conn->mxk_lock);
418 spin_unlock(&conn->mxk_lock);
426 * Shutdown a connection
427 * \param conn a kmx_conn pointer
428 * \param mx_dis call mx_disconnect()
429 * \param send_bye send peer a BYE msg
431 * This function sets the status to DISCONNECT, completes queued
432 * txs with failure, calls mx_disconnect, which will complete
433 * pending txs and matched rxs with failure.
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
438 mx_endpoint_addr_t epa = conn->mxk_epa;
439 struct list_head *tmp = NULL;
440 int valid = !mxlnd_endpoint_addr_null(epa);
442 spin_lock(&conn->mxk_lock);
443 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444 spin_unlock(&conn->mxk_lock);
447 conn->mxk_status = MXLND_CONN_DISCONNECT;
448 conn->mxk_timeout = 0;
450 while (!list_empty(&conn->mxk_tx_free_queue) ||
451 !list_empty(&conn->mxk_tx_credit_queue)) {
453 struct kmx_ctx *tx = NULL;
455 if (!list_empty(&conn->mxk_tx_free_queue)) {
456 tmp = &conn->mxk_tx_free_queue;
458 tmp = &conn->mxk_tx_credit_queue;
461 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462 list_del_init(&tx->mxc_list);
463 tx->mxc_status.code = -ECONNABORTED;
464 spin_unlock(&conn->mxk_lock);
465 mxlnd_put_idle_tx(tx);
466 mxlnd_conn_decref(conn); /* for this tx */
467 spin_lock(&conn->mxk_lock);
470 spin_unlock(&conn->mxk_lock);
472 /* cancel pending rxs */
473 mxlnd_conn_cancel_pending_rxs(conn);
475 if (send_bye && valid) {
476 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477 /* send a BYE to the peer */
478 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481 epa, match, NULL, NULL);
482 /* wait to allow the peer to ack our message */
483 mxlnd_sleep(msecs_to_jiffies(20));
486 if (kmxlnd_data.kmx_shutdown != 1) {
487 time_t last_alive = 0;
488 unsigned long last_msg = 0;
490 /* notify LNET that we are giving up on this peer */
491 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
492 last_msg = conn->mxk_last_rx;
494 last_msg = conn->mxk_last_tx;
496 last_alive = cfs_time_current_sec() -
497 cfs_duration_sec(cfs_time_current() - last_msg);
498 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
501 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
503 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
509 * Allocate and initialize a new conn struct
510 * \param connp address of a kmx_conn pointer
511 * \param peer owning kmx_peer
513 * Returns 0 on success and -ENOMEM on failure
516 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
518 struct kmx_conn *conn = NULL;
520 LASSERT(peer != NULL);
522 MXLND_ALLOC(conn, sizeof (*conn));
524 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
527 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
529 memset(conn, 0, sizeof(*conn));
531 /* conn->mxk_incarnation = 0 - will be set by peer */
532 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
533 and one for the caller */
534 conn->mxk_peer = peer;
535 /* mxk_epa - to be set after mx_iconnect() */
536 INIT_LIST_HEAD(&conn->mxk_list);
537 spin_lock_init(&conn->mxk_lock);
538 /* conn->mxk_timeout = 0 */
539 conn->mxk_last_tx = jiffies;
540 conn->mxk_last_rx = conn->mxk_last_tx;
541 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
542 /* mxk_outstanding = 0 */
543 conn->mxk_status = MXLND_CONN_INIT;
544 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
545 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
546 /* conn->mxk_ntx_msgs = 0 */
547 /* conn->mxk_ntx_data = 0 */
548 /* conn->mxk_ntx_posted = 0 */
549 /* conn->mxk_data_posted = 0 */
550 INIT_LIST_HEAD(&conn->mxk_pending);
554 mxlnd_peer_addref(peer); /* add a ref for this conn */
556 /* add to front of peer's conns list */
557 list_add(&conn->mxk_list, &peer->mxp_conns);
558 peer->mxp_conn = conn;
563 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
566 write_lock(&kmxlnd_data.kmx_global_lock);
567 ret = mxlnd_conn_alloc_locked(connp, peer);
568 write_unlock(&kmxlnd_data.kmx_global_lock);
573 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
576 struct kmx_conn *conn = ctx->mxc_conn;
578 ctx->mxc_state = MXLND_CTX_PENDING;
580 spin_lock(&conn->mxk_lock);
581 if (conn->mxk_status >= MXLND_CONN_INIT) {
582 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
583 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
584 conn->mxk_timeout = ctx->mxc_deadline;
587 ctx->mxc_state = MXLND_CTX_COMPLETED;
590 spin_unlock(&conn->mxk_lock);
596 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
598 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
599 ctx->mxc_state == MXLND_CTX_COMPLETED);
600 if (ctx->mxc_state != MXLND_CTX_PENDING &&
601 ctx->mxc_state != MXLND_CTX_COMPLETED) {
602 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
603 mxlnd_ctxstate_to_str(ctx->mxc_state));
605 ctx->mxc_state = MXLND_CTX_COMPLETED;
606 if (!list_empty(&ctx->mxc_list)) {
607 struct kmx_conn *conn = ctx->mxc_conn;
608 struct kmx_ctx *next = NULL;
609 LASSERT(conn != NULL);
610 spin_lock(&conn->mxk_lock);
611 list_del_init(&ctx->mxc_list);
612 conn->mxk_timeout = 0;
613 if (!list_empty(&conn->mxk_pending)) {
614 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
615 conn->mxk_timeout = next->mxc_deadline;
617 spin_unlock(&conn->mxk_lock);
624 * \param peer a kmx_peer pointer
626 * The calling function should decrement the rxs, drain the tx queues and
627 * remove the peer from the peers list first then destroy it.
630 mxlnd_peer_free(struct kmx_peer *peer)
632 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
633 peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
635 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
637 if (peer == kmxlnd_data.kmx_localhost)
638 LASSERT(kmxlnd_data.kmx_shutdown);
640 if (!list_empty(&peer->mxp_peers)) {
641 /* assume we are locked */
642 list_del_init(&peer->mxp_peers);
645 MXLND_FREE (peer, sizeof (*peer));
646 atomic_dec(&kmxlnd_data.kmx_npeers);
650 #define MXLND_LOOKUP_COUNT 10
652 /* We only want the MAC address of the peer's Myricom NIC. We
653 * require that each node has the IPoMX interface (myriN) up.
654 * We will not pass any traffic over IPoMX, but it allows us
655 * to get the MAC address. */
657 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
663 unsigned char *haddr = NULL;
664 struct net_device *dev = NULL;
665 struct neighbour *n = NULL;
666 cfs_socket_t *sock = NULL;
667 __be32 dst_ip = htonl(ip);
669 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
674 haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
677 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
680 if (n->nud_state & NUD_VALID) {
681 memcpy(haddr, n->ha, dev->addr_len);
687 /* not found, try to connect (force an arp) */
688 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
690 libcfs_sock_release(sock);
691 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
692 } while (try++ < MXLND_LOOKUP_COUNT);
698 #ifdef __LITTLE_ENDIAN
699 *nic_id = ___arch__swab64(tmp_id);
707 * Allocate and initialize a new peer struct
708 * \param peerp address of a kmx_peer pointer
709 * \param nid LNET node id
710 * Returns 0 on success and -ENOMEM on failure
713 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
717 u32 ip = LNET_NIDADDR(nid);
718 struct kmx_peer *peer = NULL;
720 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
722 MXLND_ALLOC(peer, sizeof (*peer));
724 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
727 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
729 memset(peer, 0, sizeof(*peer));
732 /* peer->mxp_incarnation */
733 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
736 peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
737 peer->mxp_board = board;
738 peer->mxp_nic_id = nic_id;
740 if (nic_id == 0ULL) {
741 ret = mxlnd_ip2nic_id(ip, &nic_id);
743 CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
744 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
747 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
749 INIT_LIST_HEAD(&peer->mxp_peers);
750 INIT_LIST_HEAD(&peer->mxp_conns);
751 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
753 mxlnd_peer_decref(peer);
757 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
758 struct kmx_ctx *rx = NULL;
759 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
761 mxlnd_reduce_idle_rxs(i);
762 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
763 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
764 mxlnd_peer_decref(peer);
767 spin_lock(&kmxlnd_data.kmx_rxs_lock);
768 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
769 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
771 mxlnd_put_idle_rx(rx);
773 /* peer->mxp_reconnect_time = 0 */
774 /* peer->mxp_incompatible = 0 */
780 static inline struct kmx_peer *
781 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
785 struct kmx_peer *peer = NULL;
787 hash = mxlnd_nid_to_hash(nid);
789 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
790 if (peer->mxp_nid == nid) {
792 mxlnd_peer_addref(peer);
796 return (found ? peer : NULL);
799 static inline struct kmx_peer *
800 mxlnd_find_peer_by_nid(lnet_nid_t nid)
802 struct kmx_peer *peer = NULL;
804 read_lock(&kmxlnd_data.kmx_global_lock);
805 peer = mxlnd_find_peer_by_nid_locked(nid);
806 read_unlock(&kmxlnd_data.kmx_global_lock);
811 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
813 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
814 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
815 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
816 tx->mxc_msg_type == MXLND_MSG_NOOP);
820 * Set type and number of bytes
821 * \param msg msg pointer
822 * \param type of message
823 * \param body_nob bytes in msg body
826 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
828 msg->mxm_type = type;
829 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
833 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
835 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
836 struct kmx_msg *msg = NULL;
838 LASSERT (tx != NULL);
839 LASSERT (nob <= MXLND_EAGER_SIZE);
842 /* tx->mxc_peer should have already been set if we know it */
843 tx->mxc_msg_type = type;
845 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
846 tx->mxc_seg.segment_length = nob;
847 tx->mxc_pin_type = MX_PIN_PHYSICAL;
848 //tx->mxc_state = MXLND_CTX_PENDING;
851 msg->mxm_type = type;
858 mxlnd_cksum (void *ptr, int nob)
864 sum = ((sum << 1) | (sum >> 31)) + *c++;
866 /* ensure I don't return 0 (== no checksum) */
867 return (sum == 0) ? 1 : sum;
872 * \param tx msg to send
875 mxlnd_pack_msg(struct kmx_ctx *tx)
877 struct kmx_msg *msg = tx->mxc_msg;
879 /* type and nob should already be set in init_msg() */
880 msg->mxm_magic = MXLND_MSG_MAGIC;
881 msg->mxm_version = MXLND_MSG_VERSION;
883 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
884 * return credits as well */
885 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
886 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
887 spin_lock(&tx->mxc_conn->mxk_lock);
888 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
889 tx->mxc_conn->mxk_outstanding = 0;
890 spin_unlock(&tx->mxc_conn->mxk_lock);
892 msg->mxm_credits = 0;
896 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
897 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
898 msg->mxm_dstnid = tx->mxc_nid;
899 /* if it is a new peer, the dststamp will be 0 */
900 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
901 msg->mxm_seq = tx->mxc_cookie;
903 if (*kmxlnd_tunables.kmx_cksum) {
904 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
909 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
911 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
916 /* 6 bytes are enough to have received magic + version */
918 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
922 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
924 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
927 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
931 if (msg->mxm_version !=
932 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
933 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
937 if (nob < hdr_size) {
938 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
942 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
944 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
948 /* checksum must be computed with mxm_cksum zero and BEFORE anything
950 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
952 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
953 CDEBUG(D_NETERROR, "Bad checksum\n");
956 msg->mxm_cksum = msg_cksum;
959 /* leave magic unflipped as a clue to peer endianness */
960 __swab16s(&msg->mxm_version);
961 CLASSERT (sizeof(msg->mxm_type) == 1);
962 CLASSERT (sizeof(msg->mxm_credits) == 1);
963 msg->mxm_nob = msg_nob;
964 __swab64s(&msg->mxm_srcnid);
965 __swab64s(&msg->mxm_srcstamp);
966 __swab64s(&msg->mxm_dstnid);
967 __swab64s(&msg->mxm_dststamp);
968 __swab64s(&msg->mxm_seq);
971 if (msg->mxm_srcnid == LNET_NID_ANY) {
972 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
976 switch (msg->mxm_type) {
978 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
984 case MXLND_MSG_EAGER:
985 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
986 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
987 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
992 case MXLND_MSG_PUT_REQ:
993 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
994 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
995 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
999 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1002 case MXLND_MSG_PUT_ACK:
1003 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1004 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1005 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1009 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1010 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1014 case MXLND_MSG_GET_REQ:
1015 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1016 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1017 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1021 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1025 case MXLND_MSG_CONN_REQ:
1026 case MXLND_MSG_CONN_ACK:
1027 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1028 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1029 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1033 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1034 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1043 * \param lntmsg the LNET msg that this is continuing. If EAGER, then NULL.
1044 * \param length length of incoming message
1046 * The caller gets the rx and sets nid, peer and conn if known.
1048 * Returns 0 on success and -1 on failure
1051 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1054 mx_return_t mxret = MX_SUCCESS;
1055 uint64_t mask = ~(MXLND_ERROR_MASK);
1057 rx->mxc_msg_type = msg_type;
1058 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1059 rx->mxc_cookie = cookie;
1060 /* rx->mxc_match may already be set */
1061 /* rx->mxc_seg.segment_ptr is already set */
1062 rx->mxc_seg.segment_length = length;
1063 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1064 ret = mxlnd_q_pending_ctx(rx);
1066 /* the caller is responsible for calling conn_decref() if needed */
1069 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1070 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1071 if (mxret != MX_SUCCESS) {
1072 mxlnd_deq_pending_ctx(rx);
1073 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1074 mx_strerror(mxret), (int) mxret);
1082 * This is the callback function that will handle unexpected receives
1083 * \param context NULL, ignore
1084 * \param source the peer's mx_endpoint_addr_t
1085 * \param match_value the msg's bit, should be MXLND_MSG_EAGER
1086 * \param length length of incoming message
1087 * \param data_if_available ignore
1089 * If it is an eager-sized msg, we will call recv_msg() with the actual
1090 * length. If it is a large message, we will call recv_msg() with a
1091 * length of 0 bytes to drop it because we should never have a large,
1092 * unexpected message.
1094 * NOTE - The MX library blocks until this function completes. Make it as fast as
1095 * possible. DO NOT allocate memory which can block!
1097 * If we cannot get a rx or the conn is closed, drop the message on the floor
1098 * (i.e. recv 0 bytes and ignore).
1100 mx_unexp_handler_action_t
1101 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1102 uint64_t match_value, uint32_t length, void *data_if_available)
1105 struct kmx_ctx *rx = NULL;
1111 if (context != NULL) {
1112 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1116 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1119 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1120 if (msg_type == MXLND_MSG_BYE) {
1121 struct kmx_peer *peer = NULL;
1123 mx_get_endpoint_addr_context(source, (void **) &peer);
1124 if (peer && peer->mxp_conn) {
1125 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1126 libcfs_nid2str(peer->mxp_nid));
1127 mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1130 return MX_RECV_FINISHED;
1133 rx = mxlnd_get_idle_rx();
1135 if (length <= MXLND_EAGER_SIZE) {
1136 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1138 CDEBUG(D_NETERROR, "unexpected large receive with "
1139 "match_value=0x%llx length=%d\n",
1140 match_value, length);
1141 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1145 struct kmx_peer *peer = NULL;
1146 struct kmx_conn *conn = NULL;
1148 /* NOTE to avoid a peer disappearing out from under us,
1149 * read lock the peers lock first */
1150 read_lock(&kmxlnd_data.kmx_global_lock);
1151 mx_get_endpoint_addr_context(source, (void **) &peer);
1153 mxlnd_peer_addref(peer); /* add a ref... */
1154 conn = peer->mxp_conn;
1156 mxlnd_conn_addref(conn); /* add ref until rx completed */
1157 mxlnd_peer_decref(peer); /* and drop peer ref */
1158 rx->mxc_conn = conn;
1160 rx->mxc_peer = peer;
1161 rx->mxc_nid = peer->mxp_nid;
1163 read_unlock(&kmxlnd_data.kmx_global_lock);
1165 CDEBUG(D_NETERROR, "could not post receive\n");
1166 mxlnd_put_idle_rx(rx);
1170 if (rx == NULL || ret != 0) {
1172 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1175 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1177 seg.segment_ptr = 0ULL;
1178 seg.segment_length = 0;
1179 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1180 match_value, ~0ULL, NULL, NULL);
1183 return MX_RECV_CONTINUE;
1188 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1192 struct kmx_peer *peer = NULL;
1194 read_lock(&kmxlnd_data.kmx_global_lock);
1195 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1196 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1198 *nidp = peer->mxp_nid;
1199 *count = atomic_read(&peer->mxp_refcount);
1205 read_unlock(&kmxlnd_data.kmx_global_lock);
1211 mxlnd_del_peer_locked(struct kmx_peer *peer)
1213 list_del_init(&peer->mxp_peers); /* remove from the global list */
1214 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1215 mxlnd_peer_decref(peer); /* drop global list ref */
1220 mxlnd_del_peer(lnet_nid_t nid)
1224 struct kmx_peer *peer = NULL;
1225 struct kmx_peer *next = NULL;
1227 if (nid != LNET_NID_ANY) {
1228 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1230 write_lock(&kmxlnd_data.kmx_global_lock);
1231 if (nid != LNET_NID_ANY) {
1234 } if (peer == kmxlnd_data.kmx_localhost) {
1235 mxlnd_peer_decref(peer); /* and drops it */
1236 CERROR("cannot free this host's NID 0x%llx\n", nid);
1238 mxlnd_peer_decref(peer); /* and drops it */
1239 mxlnd_del_peer_locked(peer);
1241 } else { /* LNET_NID_ANY */
1242 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1243 list_for_each_entry_safe(peer, next,
1244 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1245 if (peer != kmxlnd_data.kmx_localhost)
1246 mxlnd_del_peer_locked(peer);
1250 write_unlock(&kmxlnd_data.kmx_global_lock);
1256 mxlnd_get_conn_by_idx(int index)
1259 struct kmx_peer *peer = NULL;
1260 struct kmx_conn *conn = NULL;
1262 read_lock(&kmxlnd_data.kmx_global_lock);
1263 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1264 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1265 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1270 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1271 read_unlock(&kmxlnd_data.kmx_global_lock);
1276 read_unlock(&kmxlnd_data.kmx_global_lock);
1282 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1284 struct kmx_conn *conn = NULL;
1285 struct kmx_conn *next = NULL;
1287 if (peer == kmxlnd_data.kmx_localhost) return;
1289 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1290 mxlnd_conn_disconnect(conn, 0, 1);
1296 mxlnd_close_matching_conns(lnet_nid_t nid)
1300 struct kmx_peer *peer = NULL;
1302 read_lock(&kmxlnd_data.kmx_global_lock);
1303 if (nid != LNET_NID_ANY) {
1304 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1308 mxlnd_close_matching_conns_locked(peer);
1309 mxlnd_peer_decref(peer); /* and drops it here */
1311 } else { /* LNET_NID_ANY */
1312 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1313 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1314 mxlnd_close_matching_conns_locked(peer);
1317 read_unlock(&kmxlnd_data.kmx_global_lock);
1323 * Modify MXLND parameters
1324 * \param ni LNET interface handle
1325 * \param cmd command to change
1326 * \param arg the ioctl data
1328 * Not implemented yet.
1331 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1333 struct libcfs_ioctl_data *data = arg;
1336 LASSERT (ni == kmxlnd_data.kmx_ni);
1339 case IOC_LIBCFS_GET_PEER: {
1343 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1344 data->ioc_nid = nid;
1345 data->ioc_count = count;
1348 case IOC_LIBCFS_DEL_PEER: {
1349 ret = mxlnd_del_peer(data->ioc_nid);
1352 case IOC_LIBCFS_GET_CONN: {
1353 struct kmx_conn *conn = NULL;
1355 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1360 data->ioc_nid = conn->mxk_peer->mxp_nid;
1361 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1365 case IOC_LIBCFS_CLOSE_CONNECTION: {
1366 ret = mxlnd_close_matching_conns(data->ioc_nid);
1370 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1378 * Add the tx to the global tx queue
1380 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1383 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1385 u8 msg_type = tx->mxc_msg_type;
1386 //struct kmx_peer *peer = tx->mxc_peer;
1387 struct kmx_conn *conn = tx->mxc_conn;
1389 LASSERT (msg_type != 0);
1390 LASSERT (tx->mxc_nid != 0);
1391 LASSERT (tx->mxc_peer != NULL);
1392 LASSERT (tx->mxc_conn != NULL);
1394 tx->mxc_incarnation = conn->mxk_incarnation;
1396 if (msg_type != MXLND_MSG_PUT_DATA &&
1397 msg_type != MXLND_MSG_GET_DATA) {
1399 if (mxlnd_tx_requires_credit(tx)) {
1400 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1401 conn->mxk_ntx_msgs++;
1402 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1403 msg_type == MXLND_MSG_CONN_ACK) {
1404 /* put conn msgs at the front of the queue */
1405 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1407 /* PUT_ACK, PUT_NAK */
1408 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1409 conn->mxk_ntx_msgs++;
1413 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1414 conn->mxk_ntx_data++;
1421 * Add the tx to the global tx queue
1423 * Add the tx to the peer's msg or data queue
1426 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1428 LASSERT(tx->mxc_peer != NULL);
1429 LASSERT(tx->mxc_conn != NULL);
1430 spin_lock(&tx->mxc_conn->mxk_lock);
1431 mxlnd_peer_queue_tx_locked(tx);
1432 spin_unlock(&tx->mxc_conn->mxk_lock);
1438 * Add the tx to the global tx queue
1440 * Add the tx to the global queue and up the tx_queue_sem
1443 mxlnd_queue_tx(struct kmx_ctx *tx)
1445 struct kmx_peer *peer = tx->mxc_peer;
1446 LASSERT (tx->mxc_nid != 0);
1449 if (peer->mxp_incompatible &&
1450 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1451 /* let this fail now */
1452 tx->mxc_status.code = -ECONNABORTED;
1453 mxlnd_conn_decref(peer->mxp_conn);
1454 mxlnd_put_idle_tx(tx);
1457 if (tx->mxc_conn == NULL) {
1459 struct kmx_conn *conn = NULL;
1461 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1463 tx->mxc_status.code = ret;
1464 mxlnd_put_idle_tx(tx);
1467 tx->mxc_conn = conn;
1468 mxlnd_peer_decref(peer); /* and takes it from peer */
1470 LASSERT(tx->mxc_conn != NULL);
1471 mxlnd_peer_queue_tx(tx);
1472 mxlnd_check_sends(peer);
1474 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1475 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1476 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1477 up(&kmxlnd_data.kmx_tx_queue_sem);
1484 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1491 int first_iov_offset = 0;
1492 int first_found = 0;
1494 int last_iov_length = 0;
1495 mx_ksegment_t *seg = NULL;
1497 if (niov == 0) return 0;
1498 LASSERT(iov != NULL);
1500 for (i = 0; i < niov; i++) {
1501 sum = old_sum + (u32) iov[i].iov_len;
1502 if (!first_found && (sum > offset)) {
1504 first_iov_offset = offset - old_sum;
1506 sum = (u32) iov[i].iov_len - first_iov_offset;
1511 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1512 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1517 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1518 nseg = last_iov - first_iov + 1;
1521 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1523 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1526 memset(seg, 0, nseg * sizeof(*seg));
1527 ctx->mxc_nseg = nseg;
1529 for (i = 0; i < nseg; i++) {
1530 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1531 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1533 seg[i].segment_ptr += (u64) first_iov_offset;
1534 seg[i].segment_length -= (u32) first_iov_offset;
1536 if (i == (nseg - 1)) {
1537 seg[i].segment_length = (u32) last_iov_length;
1539 sum += seg[i].segment_length;
1541 ctx->mxc_seg_list = seg;
1542 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1543 #ifdef MX_PIN_FULLPAGES
1544 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1546 LASSERT(nob == sum);
1551 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1557 int first_kiov = -1;
1558 int first_kiov_offset = 0;
1559 int first_found = 0;
1561 int last_kiov_length = 0;
1562 mx_ksegment_t *seg = NULL;
1564 if (niov == 0) return 0;
1565 LASSERT(kiov != NULL);
1567 for (i = 0; i < niov; i++) {
1568 sum = old_sum + kiov[i].kiov_len;
1569 if (i == 0) sum -= kiov[i].kiov_offset;
1570 if (!first_found && (sum > offset)) {
1572 first_kiov_offset = offset - old_sum;
1573 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1574 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1576 sum = kiov[i].kiov_len - first_kiov_offset;
1581 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1582 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1587 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1588 nseg = last_kiov - first_kiov + 1;
1591 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1593 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1596 memset(seg, 0, niov * sizeof(*seg));
1597 ctx->mxc_nseg = niov;
1599 for (i = 0; i < niov; i++) {
1600 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1601 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1603 seg[i].segment_ptr += (u64) first_kiov_offset;
1604 /* we have to add back the original kiov_offset */
1605 seg[i].segment_length -= first_kiov_offset +
1606 kiov[first_kiov].kiov_offset;
1608 if (i == (nseg - 1)) {
1609 seg[i].segment_length = last_kiov_length;
1611 sum += seg[i].segment_length;
1613 ctx->mxc_seg_list = seg;
1614 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1615 #ifdef MX_PIN_FULLPAGES
1616 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1618 LASSERT(nob == sum);
1623 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1625 LASSERT(type == MXLND_MSG_PUT_ACK);
1626 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1627 tx->mxc_cookie = cookie;
1628 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1629 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1630 tx->mxc_match = mxlnd_create_match(tx, status);
1637 * Get tx, map [k]iov, queue tx
1639 * This setups the DATA send for PUT or GET.
1641 * On success, it queues the tx, on failure it calls lnet_finalize()
1644 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1647 lnet_process_id_t target = lntmsg->msg_target;
1648 unsigned int niov = lntmsg->msg_niov;
1649 struct iovec *iov = lntmsg->msg_iov;
1650 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1651 unsigned int offset = lntmsg->msg_offset;
1652 unsigned int nob = lntmsg->msg_len;
1653 struct kmx_ctx *tx = NULL;
1655 LASSERT(lntmsg != NULL);
1656 LASSERT(peer != NULL);
1657 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1658 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1660 tx = mxlnd_get_idle_tx();
1662 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1663 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1664 libcfs_nid2str(target.nid));
1667 tx->mxc_nid = target.nid;
1668 /* NOTE called when we have a ref on the conn, get one for this tx */
1669 mxlnd_conn_addref(peer->mxp_conn);
1670 tx->mxc_peer = peer;
1671 tx->mxc_conn = peer->mxp_conn;
1672 tx->mxc_msg_type = msg_type;
1673 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1674 tx->mxc_state = MXLND_CTX_PENDING;
1675 tx->mxc_lntmsg[0] = lntmsg;
1676 tx->mxc_cookie = cookie;
1677 tx->mxc_match = mxlnd_create_match(tx, 0);
1679 /* This setups up the mx_ksegment_t to send the DATA payload */
1681 /* do not setup the segments */
1682 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1683 "to %s?\n", libcfs_nid2str(target.nid));
1685 } else if (kiov == NULL) {
1686 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1688 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1691 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1692 libcfs_nid2str(target.nid));
1693 tx->mxc_status.code = -EIO;
1700 mxlnd_conn_decref(peer->mxp_conn);
1701 mxlnd_put_idle_tx(tx);
1705 CDEBUG(D_NETERROR, "no tx avail\n");
1706 lnet_finalize(ni, lntmsg, -EIO);
1711 * Map [k]iov, post rx
1713 * This setups the DATA receive for PUT or GET.
1715 * On success, it returns 0, on failure it returns -1
1718 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1721 lnet_process_id_t target = lntmsg->msg_target;
1722 unsigned int niov = lntmsg->msg_niov;
1723 struct iovec *iov = lntmsg->msg_iov;
1724 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1725 unsigned int offset = lntmsg->msg_offset;
1726 unsigned int nob = lntmsg->msg_len;
1727 mx_return_t mxret = MX_SUCCESS;
1728 u64 mask = ~(MXLND_ERROR_MASK);
1730 /* above assumes MXLND_MSG_PUT_DATA */
1731 if (msg_type == MXLND_MSG_GET_DATA) {
1732 niov = lntmsg->msg_md->md_niov;
1733 iov = lntmsg->msg_md->md_iov.iov;
1734 kiov = lntmsg->msg_md->md_iov.kiov;
1736 nob = lntmsg->msg_md->md_length;
1739 LASSERT(lntmsg != NULL);
1740 LASSERT(rx != NULL);
1741 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1742 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1744 rx->mxc_msg_type = msg_type;
1745 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1746 rx->mxc_state = MXLND_CTX_PENDING;
1747 rx->mxc_nid = target.nid;
1748 /* if posting a GET_DATA, we may not yet know the peer */
1749 if (rx->mxc_peer != NULL) {
1750 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1752 rx->mxc_lntmsg[0] = lntmsg;
1753 rx->mxc_cookie = cookie;
1754 rx->mxc_match = mxlnd_create_match(rx, 0);
1755 /* This setups up the mx_ksegment_t to receive the DATA payload */
1757 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1759 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1761 if (msg_type == MXLND_MSG_GET_DATA) {
1762 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1763 if (rx->mxc_lntmsg[1] == NULL) {
1764 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1765 libcfs_nid2str(target.nid));
1770 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1771 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1772 libcfs_nid2str(target.nid));
1775 ret = mxlnd_q_pending_ctx(rx);
1779 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1780 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1781 rx->mxc_seg_list, rx->mxc_nseg,
1782 rx->mxc_pin_type, rx->mxc_match,
1785 if (mxret != MX_SUCCESS) {
1786 if (rx->mxc_conn != NULL) {
1787 mxlnd_deq_pending_ctx(rx);
1789 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1790 (int) mxret, libcfs_nid2str(target.nid));
1798 * The LND required send function
1800 * This must not block. Since we may not have a peer struct for the receiver,
1801 * it will append send messages on a global tx list. We will then up the
1802 * tx_queued's semaphore to notify it of the new send.
1805 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1808 int type = lntmsg->msg_type;
1809 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1810 lnet_process_id_t target = lntmsg->msg_target;
1811 lnet_nid_t nid = target.nid;
1812 int target_is_router = lntmsg->msg_target_is_router;
1813 int routing = lntmsg->msg_routing;
1814 unsigned int payload_niov = lntmsg->msg_niov;
1815 struct iovec *payload_iov = lntmsg->msg_iov;
1816 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1817 unsigned int payload_offset = lntmsg->msg_offset;
1818 unsigned int payload_nob = lntmsg->msg_len;
1819 struct kmx_ctx *tx = NULL;
1820 struct kmx_msg *txmsg = NULL;
1821 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1822 struct kmx_ctx *rx_data = NULL;
1823 struct kmx_conn *conn = NULL;
1825 uint32_t length = 0;
1826 struct kmx_peer *peer = NULL;
1828 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1829 payload_nob, payload_niov, libcfs_id2str(target));
1831 LASSERT (payload_nob == 0 || payload_niov > 0);
1832 LASSERT (payload_niov <= LNET_MAX_IOV);
1833 /* payload is either all vaddrs or all pages */
1834 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1836 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1838 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1839 * to a new peer, use the nid */
1840 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1842 if (unlikely(peer->mxp_incompatible)) {
1843 mxlnd_peer_decref(peer); /* drop ref taken above */
1845 read_lock(&kmxlnd_data.kmx_global_lock);
1846 conn = peer->mxp_conn;
1848 mxlnd_conn_addref(conn);
1849 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1851 read_unlock(&kmxlnd_data.kmx_global_lock);
1854 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1855 if (conn == NULL && peer != NULL) {
1856 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1857 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1862 LASSERT (payload_nob == 0);
1865 case LNET_MSG_REPLY:
1867 /* Is the payload small enough not to need DATA? */
1868 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1869 if (nob <= MXLND_EAGER_SIZE)
1870 break; /* send EAGER */
1872 tx = mxlnd_get_idle_tx();
1873 if (unlikely(tx == NULL)) {
1874 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1875 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1876 libcfs_nid2str(nid));
1877 if (conn) mxlnd_conn_decref(conn);
1881 /* the peer may be NULL */
1882 tx->mxc_peer = peer;
1883 tx->mxc_conn = conn; /* may be NULL */
1884 /* we added a conn ref above */
1885 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1886 txmsg = tx->mxc_msg;
1887 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1888 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1889 tx->mxc_match = mxlnd_create_match(tx, 0);
1891 /* we must post a receive _before_ sending the request.
1892 * we need to determine how much to receive, it will be either
1893 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1895 rx = mxlnd_get_idle_rx();
1896 if (unlikely(rx == NULL)) {
1897 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1898 libcfs_nid2str(nid));
1899 mxlnd_put_idle_tx(tx);
1900 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1904 rx->mxc_peer = peer;
1905 /* conn may be NULL but unlikely since the first msg is always small */
1906 /* NOTE no need to lock peer before adding conn ref since we took
1907 * a conn ref for the tx (it cannot be freed between there and here ) */
1908 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1909 rx->mxc_conn = conn;
1910 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1911 rx->mxc_cookie = tx->mxc_cookie;
1912 rx->mxc_match = mxlnd_create_match(rx, 0);
1914 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1915 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1916 if (unlikely(ret != 0)) {
1917 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1918 libcfs_nid2str(nid));
1919 rx->mxc_lntmsg[0] = NULL;
1920 mxlnd_put_idle_rx(rx);
1921 mxlnd_put_idle_tx(tx);
1923 mxlnd_conn_decref(conn); /* for the rx... */
1924 mxlnd_conn_decref(conn); /* and for the tx */
1926 return -EHOSTUNREACH;
1933 if (routing || target_is_router)
1934 break; /* send EAGER */
1936 /* is the REPLY message too small for DATA? */
1937 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1938 if (nob <= MXLND_EAGER_SIZE)
1939 break; /* send EAGER */
1941 /* get tx (we need the cookie) , post rx for incoming DATA,
1942 * then post GET_REQ tx */
1943 tx = mxlnd_get_idle_tx();
1944 if (unlikely(tx == NULL)) {
1945 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1946 libcfs_nid2str(nid));
1947 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1950 rx_data = mxlnd_get_idle_rx();
1951 if (unlikely(rx_data == NULL)) {
1952 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1953 libcfs_nid2str(nid));
1954 mxlnd_put_idle_tx(tx);
1955 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1958 rx_data->mxc_peer = peer;
1959 /* NOTE no need to lock peer before adding conn ref since we took
1960 * a conn ref for the tx (it cannot be freed between there and here ) */
1961 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1962 rx_data->mxc_conn = conn; /* may be NULL */
1964 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1965 if (unlikely(ret != 0)) {
1966 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1967 libcfs_nid2str(nid));
1968 mxlnd_put_idle_rx(rx_data);
1969 mxlnd_put_idle_tx(tx);
1971 mxlnd_conn_decref(conn); /* for the rx_data... */
1972 mxlnd_conn_decref(conn); /* and for the tx */
1977 tx->mxc_peer = peer;
1978 tx->mxc_conn = conn; /* may be NULL */
1979 /* conn ref taken above */
1980 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1981 txmsg = tx->mxc_msg;
1982 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1983 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1984 tx->mxc_match = mxlnd_create_match(tx, 0);
1991 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1997 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1998 <= MXLND_EAGER_SIZE);
2000 tx = mxlnd_get_idle_tx();
2001 if (unlikely(tx == NULL)) {
2002 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2003 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2004 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2008 tx->mxc_peer = peer;
2009 tx->mxc_conn = conn; /* may be NULL */
2010 /* conn ref taken above */
2011 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2012 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2013 tx->mxc_match = mxlnd_create_match(tx, 0);
2015 txmsg = tx->mxc_msg;
2016 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2018 if (payload_kiov != NULL)
2019 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2020 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2021 payload_niov, payload_kiov, payload_offset, payload_nob);
2023 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2024 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2025 payload_niov, payload_iov, payload_offset, payload_nob);
2027 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2033 * The LND required recv function
2035 * This must not block.
2038 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2039 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2040 unsigned int offset, unsigned int mlen, unsigned int rlen)
2045 struct kmx_ctx *rx = private;
2046 struct kmx_msg *rxmsg = rx->mxc_msg;
2047 lnet_nid_t nid = rx->mxc_nid;
2048 struct kmx_ctx *tx = NULL;
2049 struct kmx_msg *txmsg = NULL;
2050 struct kmx_peer *peer = rx->mxc_peer;
2051 struct kmx_conn *conn = peer->mxp_conn;
2053 int msg_type = rxmsg->mxm_type;
2058 LASSERT (mlen <= rlen);
2059 /* Either all pages or all vaddrs */
2060 LASSERT (!(kiov != NULL && iov != NULL));
2061 LASSERT (peer != NULL);
2063 /* conn_addref(conn) already taken for the primary rx */
2066 case MXLND_MSG_EAGER:
2067 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2068 len = rx->mxc_status.xfer_length;
2069 if (unlikely(nob > len)) {
2070 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2071 libcfs_nid2str(nid), nob, len);
2077 lnet_copy_flat2kiov(niov, kiov, offset,
2078 MXLND_EAGER_SIZE, rxmsg,
2079 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2082 lnet_copy_flat2iov(niov, iov, offset,
2083 MXLND_EAGER_SIZE, rxmsg,
2084 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2090 case MXLND_MSG_PUT_REQ:
2091 /* we are going to reuse the rx, store the needed info */
2092 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2094 /* get tx, post rx, send PUT_ACK */
2096 tx = mxlnd_get_idle_tx();
2097 if (unlikely(tx == NULL)) {
2098 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2099 /* Not replying will break the connection */
2103 if (unlikely(mlen == 0)) {
2105 tx->mxc_peer = peer;
2106 tx->mxc_conn = conn;
2107 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2112 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2113 tx->mxc_peer = peer;
2114 tx->mxc_conn = conn;
2115 /* no need to lock peer first since we already have a ref */
2116 mxlnd_conn_addref(conn); /* for the tx */
2117 txmsg = tx->mxc_msg;
2118 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2119 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2120 tx->mxc_cookie = cookie;
2121 tx->mxc_match = mxlnd_create_match(tx, 0);
2123 /* we must post a receive _before_ sending the PUT_ACK */
2125 rx->mxc_state = MXLND_CTX_PREP;
2126 rx->mxc_peer = peer;
2127 rx->mxc_conn = conn;
2128 /* do not take another ref for this rx, it is already taken */
2129 rx->mxc_nid = peer->mxp_nid;
2130 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2131 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2133 if (unlikely(ret != 0)) {
2134 /* Notify peer that it's over */
2135 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2136 libcfs_nid2str(nid), ret);
2138 tx->mxc_state = MXLND_CTX_PREP;
2139 tx->mxc_peer = peer;
2140 tx->mxc_conn = conn;
2141 /* finalize = 0, let the PUT_ACK tx finalize this */
2142 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2143 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2144 /* conn ref already taken above */
2145 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2151 /* do not return a credit until after PUT_DATA returns */
2155 case MXLND_MSG_GET_REQ:
2156 if (likely(lntmsg != NULL)) {
2157 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2158 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2160 /* GET didn't match anything */
2161 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2162 * We have to embed the error code in the match bits.
2163 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2164 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2166 tx = mxlnd_get_idle_tx();
2167 if (unlikely(tx == NULL)) {
2168 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2169 libcfs_nid2str(nid));
2173 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2174 tx->mxc_state = MXLND_CTX_PENDING;
2176 tx->mxc_peer = peer;
2177 tx->mxc_conn = conn;
2178 /* no need to lock peer first since we already have a ref */
2179 mxlnd_conn_addref(conn); /* for this tx */
2180 tx->mxc_cookie = cookie;
2181 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2182 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2185 /* finalize lntmsg after tx completes */
2193 /* we received a message, increment peer's outstanding credits */
2195 spin_lock(&conn->mxk_lock);
2196 conn->mxk_outstanding++;
2197 spin_unlock(&conn->mxk_lock);
2199 /* we are done with the rx */
2200 mxlnd_put_idle_rx(rx);
2201 mxlnd_conn_decref(conn);
2204 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2206 /* we received a credit, see if we can use it to send a msg */
2207 if (credit) mxlnd_check_sends(peer);
2213 mxlnd_sleep(unsigned long timeout)
2215 set_current_state(TASK_INTERRUPTIBLE);
2216 schedule_timeout(timeout);
2221 * The generic send queue thread
2222 * \param arg thread id (as a void *)
2224 * This thread moves send messages from the global tx_queue to the owning
2225 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2226 * it to the global peer list.
2229 mxlnd_tx_queued(void *arg)
2231 long id = (long) arg;
2234 struct kmx_ctx *tx = NULL;
2235 struct kmx_peer *peer = NULL;
2236 struct list_head *tmp_tx = NULL;
2238 cfs_daemonize("mxlnd_tx_queued");
2239 //cfs_block_allsigs();
2241 while (!kmxlnd_data.kmx_shutdown) {
2242 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2243 if (kmxlnd_data.kmx_shutdown)
2245 if (ret != 0) // Should we check for -EINTR?
2247 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2248 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2249 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2252 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2253 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2254 list_del_init(&tx->mxc_list);
2255 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2258 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2260 tx->mxc_peer = peer;
2261 write_lock(&kmxlnd_data.kmx_global_lock);
2262 if (peer->mxp_conn == NULL) {
2263 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2265 /* out of memory, give up and fail tx */
2266 tx->mxc_status.code = -ENOMEM;
2267 write_unlock(&kmxlnd_data.kmx_global_lock);
2268 mxlnd_peer_decref(peer);
2269 mxlnd_put_idle_tx(tx);
2273 tx->mxc_conn = peer->mxp_conn;
2274 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2275 write_unlock(&kmxlnd_data.kmx_global_lock);
2276 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2282 struct kmx_peer *peer = NULL;
2283 struct kmx_peer *old = NULL;
2285 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2287 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2288 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2290 /* adds conn ref for this function */
2291 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2292 *kmxlnd_tunables.kmx_board,
2293 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2295 /* finalize message */
2296 tx->mxc_status.code = ret;
2297 mxlnd_put_idle_tx(tx);
2300 tx->mxc_peer = peer;
2301 tx->mxc_conn = peer->mxp_conn;
2302 /* this tx will keep the conn ref taken in peer_alloc() */
2304 /* add peer to global peer list, but look to see
2305 * if someone already created it after we released
2307 write_lock(&kmxlnd_data.kmx_global_lock);
2308 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2309 if (old->mxp_nid == peer->mxp_nid) {
2310 /* somebody beat us here, we created a duplicate */
2317 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2318 atomic_inc(&kmxlnd_data.kmx_npeers);
2321 tx->mxc_conn = old->mxp_conn;
2322 /* FIXME can conn be NULL? */
2323 LASSERT(old->mxp_conn != NULL);
2324 mxlnd_conn_addref(old->mxp_conn);
2325 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2326 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2327 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2328 mxlnd_peer_decref(peer);
2330 write_unlock(&kmxlnd_data.kmx_global_lock);
2335 mxlnd_thread_stop(id);
2339 /* When calling this, we must not have the peer lock. */
2341 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2343 mx_return_t mxret = MX_SUCCESS;
2344 mx_request_t request;
2345 struct kmx_conn *conn = peer->mxp_conn;
2346 u8 msg_type = (u8) MXLND_MSG_TYPE(mask);
2348 /* NOTE we are holding a conn ref every time we call this function,
2349 * we do not need to lock the peer before taking another ref */
2350 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2352 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2354 if (peer->mxp_reconnect_time == 0) {
2355 peer->mxp_reconnect_time = jiffies;
2358 if (peer->mxp_nic_id == 0ULL) {
2361 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2363 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2365 if (peer->mxp_nic_id == 0ULL) {
2366 /* not mapped yet, return */
2367 spin_lock(&conn->mxk_lock);
2368 conn->mxk_status = MXLND_CONN_INIT;
2369 spin_unlock(&conn->mxk_lock);
2370 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2371 /* give up and notify LNET */
2372 mxlnd_conn_disconnect(conn, 0, 0);
2373 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2375 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2378 mxlnd_conn_decref(conn);
2383 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2384 peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2385 (void *) peer, &request);
2386 if (unlikely(mxret != MX_SUCCESS)) {
2387 spin_lock(&conn->mxk_lock);
2388 conn->mxk_status = MXLND_CONN_FAIL;
2389 spin_unlock(&conn->mxk_lock);
2390 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2391 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2392 mxlnd_conn_decref(conn);
2397 #define MXLND_STATS 0
2400 mxlnd_check_sends(struct kmx_peer *peer)
2404 mx_return_t mxret = MX_SUCCESS;
2405 struct kmx_ctx *tx = NULL;
2406 struct kmx_conn *conn = NULL;
2413 static unsigned long last = 0;
2416 if (unlikely(peer == NULL)) {
2417 LASSERT(peer != NULL);
2420 write_lock(&kmxlnd_data.kmx_global_lock);
2421 conn = peer->mxp_conn;
2422 /* NOTE take a ref for the duration of this function since it is called
2423 * when there might not be any queued txs for this peer */
2424 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2425 write_unlock(&kmxlnd_data.kmx_global_lock);
2427 /* do not add another ref for this tx */
2430 /* we do not have any conns */
2435 if (time_after(jiffies, last)) {
2436 last = jiffies + HZ;
2437 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2438 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2439 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2440 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2441 conn->mxk_ntx_data, conn->mxk_data_posted);
2445 /* cache peer state for asserts */
2446 spin_lock(&conn->mxk_lock);
2447 ntx_posted = conn->mxk_ntx_posted;
2448 credits = conn->mxk_credits;
2449 spin_unlock(&conn->mxk_lock);
2451 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2452 LASSERT(ntx_posted >= 0);
2454 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2455 LASSERT(credits >= 0);
2457 /* check number of queued msgs, ignore data */
2458 spin_lock(&conn->mxk_lock);
2459 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2460 /* check if any txs queued that could return credits... */
2461 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2462 /* if not, send a NOOP */
2463 tx = mxlnd_get_idle_tx();
2464 if (likely(tx != NULL)) {
2465 tx->mxc_peer = peer;
2466 tx->mxc_conn = peer->mxp_conn;
2467 mxlnd_conn_addref(conn); /* for this tx */
2468 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2469 tx->mxc_match = mxlnd_create_match(tx, 0);
2470 mxlnd_peer_queue_tx_locked(tx);
2476 spin_unlock(&conn->mxk_lock);
2478 /* if the peer is not ready, try to connect */
2479 spin_lock(&conn->mxk_lock);
2480 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2481 conn->mxk_status == MXLND_CONN_FAIL ||
2482 conn->mxk_status == MXLND_CONN_REQ)) {
2483 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2484 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2485 conn->mxk_status = MXLND_CONN_WAIT;
2486 spin_unlock(&conn->mxk_lock);
2487 mxlnd_iconnect(peer, match);
2490 spin_unlock(&conn->mxk_lock);
2492 spin_lock(&conn->mxk_lock);
2493 while (!list_empty(&conn->mxk_tx_free_queue) ||
2494 !list_empty(&conn->mxk_tx_credit_queue)) {
2495 /* We have something to send. If we have a queued tx that does not
2496 * require a credit (free), choose it since its completion will
2497 * return a credit (here or at the peer), complete a DATA or
2498 * CONN_REQ or CONN_ACK. */
2499 struct list_head *tmp_tx = NULL;
2500 if (!list_empty(&conn->mxk_tx_free_queue)) {
2501 tmp_tx = &conn->mxk_tx_free_queue;
2503 tmp_tx = &conn->mxk_tx_credit_queue;
2505 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2507 msg_type = tx->mxc_msg_type;
2509 /* don't try to send a rx */
2510 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2512 /* ensure that it is a valid msg type */
2513 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2514 msg_type == MXLND_MSG_CONN_ACK ||
2515 msg_type == MXLND_MSG_NOOP ||
2516 msg_type == MXLND_MSG_EAGER ||
2517 msg_type == MXLND_MSG_PUT_REQ ||
2518 msg_type == MXLND_MSG_PUT_ACK ||
2519 msg_type == MXLND_MSG_PUT_DATA ||
2520 msg_type == MXLND_MSG_GET_REQ ||
2521 msg_type == MXLND_MSG_GET_DATA);
2522 LASSERT(tx->mxc_peer == peer);
2523 LASSERT(tx->mxc_nid == peer->mxp_nid);
2525 credit = mxlnd_tx_requires_credit(tx);
2528 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2529 CDEBUG(D_NET, "%s: posted enough\n",
2530 libcfs_nid2str(peer->mxp_nid));
2534 if (conn->mxk_credits == 0) {
2535 CDEBUG(D_NET, "%s: no credits\n",
2536 libcfs_nid2str(peer->mxp_nid));
2540 if (conn->mxk_credits == 1 && /* last credit reserved for */
2541 conn->mxk_outstanding == 0) { /* giving back credits */
2542 CDEBUG(D_NET, "%s: not using last credit\n",
2543 libcfs_nid2str(peer->mxp_nid));
2548 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2549 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2550 msg_type == MXLND_MSG_CONN_ACK)) {
2551 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2552 mxlnd_connstatus_to_str(conn->mxk_status),
2554 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2555 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2556 list_del_init(&tx->mxc_list);
2557 tx->mxc_status.code = -ECONNABORTED;
2558 mxlnd_put_idle_tx(tx);
2559 mxlnd_conn_decref(conn);
2565 list_del_init(&tx->mxc_list);
2567 /* handle credits, etc now while we have the lock to avoid races */
2569 conn->mxk_credits--;
2570 conn->mxk_ntx_posted++;
2572 if (msg_type != MXLND_MSG_PUT_DATA &&
2573 msg_type != MXLND_MSG_GET_DATA) {
2574 if (msg_type != MXLND_MSG_CONN_REQ &&
2575 msg_type != MXLND_MSG_CONN_ACK) {
2576 conn->mxk_ntx_msgs--;
2579 if (tx->mxc_incarnation == 0 &&
2580 conn->mxk_incarnation != 0) {
2581 tx->mxc_incarnation = conn->mxk_incarnation;
2583 spin_unlock(&conn->mxk_lock);
2585 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2586 * or (2) there is a non-DATA msg that can return credits in the
2587 * queue, then drop this duplicate NOOP */
2588 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2589 spin_lock(&conn->mxk_lock);
2590 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2591 (conn->mxk_ntx_msgs >= 1)) {
2592 conn->mxk_credits++;
2593 conn->mxk_ntx_posted--;
2594 spin_unlock(&conn->mxk_lock);
2595 /* redundant NOOP */
2596 mxlnd_put_idle_tx(tx);
2597 mxlnd_conn_decref(conn);
2598 CDEBUG(D_NET, "%s: redundant noop\n",
2599 libcfs_nid2str(peer->mxp_nid));
2603 spin_unlock(&conn->mxk_lock);
2607 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2608 (msg_type != MXLND_MSG_GET_DATA))) {
2612 //ret = -ECONNABORTED;
2615 spin_lock(&conn->mxk_lock);
2616 status = conn->mxk_status;
2617 spin_unlock(&conn->mxk_lock);
2619 if (likely((status == MXLND_CONN_READY) ||
2620 (msg_type == MXLND_MSG_CONN_REQ) ||
2621 (msg_type == MXLND_MSG_CONN_ACK))) {
2623 if (msg_type != MXLND_MSG_CONN_REQ &&
2624 msg_type != MXLND_MSG_CONN_ACK) {
2625 /* add to the pending list */
2626 ret = mxlnd_q_pending_ctx(tx);
2628 /* FIXME the conn is disconnected, now what? */
2632 tx->mxc_state = MXLND_CTX_PENDING;
2636 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2637 msg_type != MXLND_MSG_GET_DATA)) {
2638 /* send a msg style tx */
2639 LASSERT(tx->mxc_nseg == 1);
2640 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2641 CDEBUG(D_NET, "sending %s 0x%llx\n",
2642 mxlnd_msgtype_to_str(msg_type),
2644 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2653 /* send a DATA tx */
2654 spin_lock(&conn->mxk_lock);
2655 conn->mxk_ntx_data--;
2656 conn->mxk_data_posted++;
2657 spin_unlock(&conn->mxk_lock);
2658 CDEBUG(D_NET, "sending %s 0x%llx\n",
2659 mxlnd_msgtype_to_str(msg_type),
2661 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2671 mxret = MX_CONNECTION_FAILED;
2673 if (likely(mxret == MX_SUCCESS)) {
2676 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2677 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2678 libcfs_nid2str(peer->mxp_nid));
2679 /* NOTE mx_kisend() only fails if there are not enough
2680 * resources. Do not change the connection status. */
2681 if (mxret == MX_NO_RESOURCES) {
2682 tx->mxc_status.code = -ENOMEM;
2684 tx->mxc_status.code = -ECONNABORTED;
2687 spin_lock(&conn->mxk_lock);
2688 conn->mxk_ntx_posted--;
2689 conn->mxk_credits++;
2690 spin_unlock(&conn->mxk_lock);
2691 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2692 msg_type == MXLND_MSG_GET_DATA) {
2693 spin_lock(&conn->mxk_lock);
2694 conn->mxk_data_posted--;
2695 spin_unlock(&conn->mxk_lock);
2697 if (msg_type != MXLND_MSG_PUT_DATA &&
2698 msg_type != MXLND_MSG_GET_DATA &&
2699 msg_type != MXLND_MSG_CONN_REQ &&
2700 msg_type != MXLND_MSG_CONN_ACK) {
2701 spin_lock(&conn->mxk_lock);
2702 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2703 spin_unlock(&conn->mxk_lock);
2705 if (msg_type != MXLND_MSG_CONN_REQ &&
2706 msg_type != MXLND_MSG_CONN_ACK) {
2707 /* remove from the pending list */
2708 mxlnd_deq_pending_ctx(tx);
2710 mxlnd_put_idle_tx(tx);
2711 mxlnd_conn_decref(conn);
2714 spin_lock(&conn->mxk_lock);
2717 spin_unlock(&conn->mxk_lock);
2719 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2725 * A tx completed, progress or complete the msg
2726 * \param tx the tx descriptor
2728 * Determine which type of send request it was and start the next step, if needed,
2729 * or, if done, signal completion to LNET. After we are done, put back on the
2733 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2735 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2736 struct kmx_msg *msg = tx->mxc_msg;
2737 struct kmx_peer *peer = tx->mxc_peer;
2738 struct kmx_conn *conn = tx->mxc_conn;
2739 u8 type = tx->mxc_msg_type;
2740 int credit = mxlnd_tx_requires_credit(tx);
2741 u64 cookie = tx->mxc_cookie;
2743 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2744 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2746 if (unlikely(conn == NULL)) {
2747 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2748 conn = peer->mxp_conn;
2750 /* do not add a ref for the tx, it was set before sending */
2751 tx->mxc_conn = conn;
2752 tx->mxc_peer = conn->mxk_peer;
2755 LASSERT (peer != NULL);
2756 LASSERT (conn != NULL);
2758 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2759 LASSERT (type == msg->mxm_type);
2763 tx->mxc_status.code = -EIO;
2765 spin_lock(&conn->mxk_lock);
2766 conn->mxk_last_tx = jiffies;
2767 spin_unlock(&conn->mxk_lock);
2772 case MXLND_MSG_GET_DATA:
2773 spin_lock(&conn->mxk_lock);
2774 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2775 conn->mxk_outstanding++;
2776 conn->mxk_data_posted--;
2778 spin_unlock(&conn->mxk_lock);
2781 case MXLND_MSG_PUT_DATA:
2782 spin_lock(&conn->mxk_lock);
2783 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2784 conn->mxk_data_posted--;
2786 spin_unlock(&conn->mxk_lock);
2789 case MXLND_MSG_NOOP:
2790 case MXLND_MSG_PUT_REQ:
2791 case MXLND_MSG_PUT_ACK:
2792 case MXLND_MSG_GET_REQ:
2793 case MXLND_MSG_EAGER:
2794 //case MXLND_MSG_NAK:
2797 case MXLND_MSG_CONN_ACK:
2798 if (peer->mxp_incompatible) {
2799 /* we sent our params, now close this conn */
2800 mxlnd_conn_disconnect(conn, 0, 1);
2802 case MXLND_MSG_CONN_REQ:
2804 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2805 "failed with %s (%d) to %s\n",
2806 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2807 mx_strstatus(tx->mxc_status.code),
2808 tx->mxc_status.code,
2809 libcfs_nid2str(tx->mxc_nid));
2810 if (!peer->mxp_incompatible) {
2811 spin_lock(&conn->mxk_lock);
2812 conn->mxk_status = MXLND_CONN_FAIL;
2813 spin_unlock(&conn->mxk_lock);
2819 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2824 spin_lock(&conn->mxk_lock);
2825 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2826 conn->mxk_ntx_posted--;
2828 spin_unlock(&conn->mxk_lock);
2831 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2832 mxlnd_put_idle_tx(tx);
2833 mxlnd_conn_decref(conn);
2835 mxlnd_check_sends(peer);
2841 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2846 u32 nob = rx->mxc_status.xfer_length;
2847 u64 bits = rx->mxc_status.match_info;
2848 struct kmx_msg *msg = rx->mxc_msg;
2849 struct kmx_peer *peer = rx->mxc_peer;
2850 struct kmx_conn *conn = rx->mxc_conn;
2851 u8 type = rx->mxc_msg_type;
2853 lnet_msg_t *lntmsg[2];
2860 int incompatible = 0;
2863 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2864 * failed GET reply, CONN_REQ, or a CONN_ACK */
2866 /* NOTE peer may still be NULL if it is a new peer and
2867 * conn may be NULL if this is a re-connect */
2868 if (likely(peer != NULL && conn != NULL)) {
2869 /* we have a reference on the conn */
2871 } else if (peer != NULL && conn == NULL) {
2872 /* we have a reference on the peer */
2874 } else if (peer == NULL && conn != NULL) {
2876 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2878 } /* else peer and conn == NULL */
2881 if (peer == NULL || conn == NULL) {
2882 /* if the peer was disconnected, the peer may exist but
2883 * not have any valid conns */
2884 decref = 0; /* no peer means no ref was taken for this rx */
2888 if (conn == NULL && peer != NULL) {
2889 write_lock(&kmxlnd_data.kmx_global_lock);
2890 conn = peer->mxp_conn;
2892 mxlnd_conn_addref(conn); /* conn takes ref... */
2893 mxlnd_peer_decref(peer); /* from peer */
2897 write_unlock(&kmxlnd_data.kmx_global_lock);
2898 rx->mxc_conn = conn;
2902 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2908 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2909 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2910 libcfs_nid2str(rx->mxc_nid),
2911 mx_strstatus(rx->mxc_status.code),
2912 (int) rx->mxc_status.code);
2918 /* this may be a failed GET reply */
2919 if (type == MXLND_MSG_GET_DATA) {
2920 /* get the error (52-59) bits from the match bits */
2921 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2922 lntmsg[0] = rx->mxc_lntmsg[0];
2926 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2927 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2928 libcfs_nid2str(rx->mxc_nid));
2933 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2934 if (type == MXLND_MSG_PUT_DATA) {
2935 result = rx->mxc_status.code;
2936 lntmsg[0] = rx->mxc_lntmsg[0];
2938 } else if (type == MXLND_MSG_GET_DATA) {
2939 result = rx->mxc_status.code;
2940 lntmsg[0] = rx->mxc_lntmsg[0];
2941 lntmsg[1] = rx->mxc_lntmsg[1];
2945 ret = mxlnd_unpack_msg(msg, nob);
2947 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2948 ret, libcfs_nid2str(rx->mxc_nid));
2952 type = msg->mxm_type;
2955 if (type != MXLND_MSG_CONN_REQ &&
2956 (rx->mxc_nid != msg->mxm_srcnid ||
2957 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2958 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2959 "0x%llx and rx msg dst is 0x%llx)\n",
2960 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2965 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2966 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2967 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2969 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2970 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2971 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2972 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2973 msg->mxm_srcstamp, conn->mxk_incarnation,
2974 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2976 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2977 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2978 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2979 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2986 CDEBUG(D_NET, "Received %s with %d credits\n",
2987 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2989 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2990 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2991 LASSERT(peer != NULL);
2992 LASSERT(conn != NULL);
2993 if (msg->mxm_credits != 0) {
2994 spin_lock(&conn->mxk_lock);
2995 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2996 if ((conn->mxk_credits + msg->mxm_credits) >
2997 *kmxlnd_tunables.kmx_credits) {
2998 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2999 conn->mxk_credits, msg->mxm_credits);
3001 conn->mxk_credits += msg->mxm_credits;
3002 LASSERT(conn->mxk_credits >= 0);
3003 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3005 spin_unlock(&conn->mxk_lock);
3009 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3011 case MXLND_MSG_NOOP:
3014 case MXLND_MSG_EAGER:
3015 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3016 msg->mxm_srcnid, rx, 0);
3020 case MXLND_MSG_PUT_REQ:
3021 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3022 msg->mxm_srcnid, rx, 1);
3026 case MXLND_MSG_PUT_ACK: {
3027 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3028 if (cookie > MXLND_MAX_COOKIE) {
3029 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3030 libcfs_nid2str(rx->mxc_nid));
3031 result = -((u32) MXLND_ERROR_VAL(cookie));
3032 lntmsg[0] = rx->mxc_lntmsg[0];
3034 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3035 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3036 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3041 case MXLND_MSG_GET_REQ:
3042 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3043 msg->mxm_srcnid, rx, 1);
3047 case MXLND_MSG_CONN_REQ:
3048 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3049 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3050 libcfs_nid2str(msg->mxm_srcnid),
3051 libcfs_nid2str(msg->mxm_dstnid));
3054 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3055 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3057 libcfs_nid2str(msg->mxm_srcnid),
3058 msg->mxm_u.conn_req.mxcrm_queue_depth,
3059 *kmxlnd_tunables.kmx_credits);
3062 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3063 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3065 libcfs_nid2str(msg->mxm_srcnid),
3066 msg->mxm_u.conn_req.mxcrm_eager_size,
3067 (int) MXLND_EAGER_SIZE);
3070 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3072 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3075 struct kmx_peer *existing_peer = NULL;
3076 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3078 rx->mxc_nid = msg->mxm_srcnid;
3080 /* adds conn ref for peer and one for this function */
3081 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3082 *kmxlnd_tunables.kmx_board,
3083 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3087 peer->mxp_sid = sid;
3088 LASSERT(peer->mxp_ep_id == ep_id);
3089 write_lock(&kmxlnd_data.kmx_global_lock);
3090 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3091 if (existing_peer) {
3092 mxlnd_conn_decref(peer->mxp_conn);
3093 mxlnd_peer_decref(peer);
3094 peer = existing_peer;
3095 mxlnd_conn_addref(peer->mxp_conn);
3097 list_add_tail(&peer->mxp_peers,
3098 &kmxlnd_data.kmx_peers[hash]);
3099 atomic_inc(&kmxlnd_data.kmx_npeers);
3101 write_unlock(&kmxlnd_data.kmx_global_lock);
3103 /* FIXME should write lock here */
3104 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3105 mxlnd_peer_decref(peer); /* drop ref taken above */
3107 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3111 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3112 conn = peer->mxp_conn;
3113 } else { /* found peer */
3114 struct kmx_conn *old_conn = conn;
3116 if (sid != peer->mxp_sid) {
3117 /* do not call mx_disconnect() or send a BYE */
3118 mxlnd_conn_disconnect(old_conn, 0, 0);
3120 /* the ref for this rx was taken on the old_conn */
3121 mxlnd_conn_decref(old_conn);
3123 /* This allocs a conn, points peer->mxp_conn to this one.
3124 * The old conn is still on the peer->mxp_conns list.
3125 * As the pending requests complete, they will call
3126 * conn_decref() which will eventually free it. */
3127 ret = mxlnd_conn_alloc(&conn, peer);
3129 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3132 /* conn_alloc() adds one ref for the peer and one
3133 * for this function */
3136 peer->mxp_sid = sid;
3139 write_lock(&kmxlnd_data.kmx_global_lock);
3140 peer->mxp_incarnation = msg->mxm_srcstamp;
3141 peer->mxp_incompatible = incompatible;
3142 write_unlock(&kmxlnd_data.kmx_global_lock);
3143 spin_lock(&conn->mxk_lock);
3144 conn->mxk_incarnation = msg->mxm_srcstamp;
3145 conn->mxk_status = MXLND_CONN_WAIT;
3146 spin_unlock(&conn->mxk_lock);
3148 /* handle_conn_ack() will create the CONN_ACK msg */
3149 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3150 mxlnd_iconnect(peer, match);
3154 case MXLND_MSG_CONN_ACK:
3155 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3156 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3157 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3158 libcfs_nid2str(msg->mxm_dstnid));
3162 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3163 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3164 "incompatible queue depth %d (%d wanted)\n",
3165 libcfs_nid2str(msg->mxm_srcnid),
3166 msg->mxm_u.conn_req.mxcrm_queue_depth,
3167 *kmxlnd_tunables.kmx_credits);
3168 spin_lock(&conn->mxk_lock);
3169 conn->mxk_status = MXLND_CONN_FAIL;
3170 spin_unlock(&conn->mxk_lock);
3174 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3175 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3176 "incompatible EAGER size %d (%d wanted)\n",
3177 libcfs_nid2str(msg->mxm_srcnid),
3178 msg->mxm_u.conn_req.mxcrm_eager_size,
3179 (int) MXLND_EAGER_SIZE);
3180 spin_lock(&conn->mxk_lock);
3181 conn->mxk_status = MXLND_CONN_FAIL;
3182 spin_unlock(&conn->mxk_lock);
3186 write_lock(&kmxlnd_data.kmx_global_lock);
3187 peer->mxp_incarnation = msg->mxm_srcstamp;
3188 peer->mxp_incompatible = incompatible;
3189 write_unlock(&kmxlnd_data.kmx_global_lock);
3190 spin_lock(&conn->mxk_lock);
3191 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3192 conn->mxk_outstanding = 0;
3193 conn->mxk_incarnation = msg->mxm_srcstamp;
3194 conn->mxk_timeout = 0;
3195 if (!incompatible) {
3196 conn->mxk_status = MXLND_CONN_READY;
3198 spin_unlock(&conn->mxk_lock);
3199 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3203 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3204 libcfs_nid2str(rx->mxc_nid));
3211 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3212 spin_lock(&conn->mxk_lock);
3213 conn->mxk_status = MXLND_CONN_FAIL;
3214 spin_unlock(&conn->mxk_lock);
3219 spin_lock(&conn->mxk_lock);
3220 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3221 spin_unlock(&conn->mxk_lock);
3225 /* lnet_parse() failed, etc., repost now */
3226 mxlnd_put_idle_rx(rx);
3227 if (conn != NULL && credit == 1) {
3228 if (type == MXLND_MSG_PUT_DATA) {
3229 spin_lock(&conn->mxk_lock);
3230 conn->mxk_outstanding++;
3231 spin_unlock(&conn->mxk_lock);
3232 } else if (type != MXLND_MSG_GET_DATA &&
3233 (type == MXLND_MSG_EAGER ||
3234 type == MXLND_MSG_PUT_REQ ||
3235 type == MXLND_MSG_NOOP)) {
3236 spin_lock(&conn->mxk_lock);
3237 conn->mxk_outstanding++;
3238 spin_unlock(&conn->mxk_lock);
3241 if (conn_ref) mxlnd_conn_decref(conn);
3242 LASSERT(peer_ref == 0);
3245 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3246 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3248 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3251 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3252 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3254 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3262 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3264 struct kmx_ctx *tx = NULL;
3265 struct kmx_msg *txmsg = NULL;
3266 struct kmx_conn *conn = peer->mxp_conn;
3268 /* a conn ref was taken when calling mx_iconnect(),
3269 * hold it until CONN_REQ or CONN_ACK completes */
3271 CDEBUG(D_NET, "entering\n");
3272 if (status.code != MX_STATUS_SUCCESS) {
3273 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3274 mx_strstatus(status.code), status.code,
3275 libcfs_nid2str(peer->mxp_nid));
3276 spin_lock(&conn->mxk_lock);
3277 conn->mxk_status = MXLND_CONN_FAIL;
3278 spin_unlock(&conn->mxk_lock);
3280 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3281 struct kmx_conn *new_conn = NULL;
3282 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3283 /* FIXME write lock here ? */
3284 mxlnd_conn_disconnect(conn, 0, 0);
3285 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3286 mxlnd_conn_decref(new_conn); /* which we no longer need */
3287 peer->mxp_reconnect_time = 0;
3290 mxlnd_conn_decref(conn);
3294 spin_lock(&conn->mxk_lock);
3295 conn->mxk_epa = status.source;
3296 spin_unlock(&conn->mxk_lock);
3297 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3298 * we should not need to lock the peer */
3299 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3301 /* mx_iconnect() succeeded, reset delay to 0 */
3302 write_lock(&kmxlnd_data.kmx_global_lock);
3303 peer->mxp_reconnect_time = 0;
3304 write_unlock(&kmxlnd_data.kmx_global_lock);
3306 /* marshal CONN_REQ msg */
3307 /* we are still using the conn ref from iconnect() - do not take another */
3308 tx = mxlnd_get_idle_tx();
3310 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3311 libcfs_nid2str(peer->mxp_nid));
3312 spin_lock(&conn->mxk_lock);
3313 conn->mxk_status = MXLND_CONN_FAIL;
3314 spin_unlock(&conn->mxk_lock);
3315 mxlnd_conn_decref(conn);
3319 tx->mxc_peer = peer;
3320 tx->mxc_conn = conn;
3321 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3322 txmsg = tx->mxc_msg;
3323 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3324 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3325 tx->mxc_match = mxlnd_create_match(tx, 0);
3327 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3333 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3335 struct kmx_ctx *tx = NULL;
3336 struct kmx_msg *txmsg = NULL;
3337 struct kmx_conn *conn = peer->mxp_conn;
3342 /* a conn ref was taken when calling mx_iconnect(),
3343 * hold it until CONN_REQ or CONN_ACK completes */
3345 CDEBUG(D_NET, "entering\n");
3346 if (status.code != MX_STATUS_SUCCESS) {
3347 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3348 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3349 mx_strstatus(status.code), status.code,
3350 libcfs_nid2str(peer->mxp_nid),
3354 spin_lock(&conn->mxk_lock);
3355 conn->mxk_status = MXLND_CONN_FAIL;
3356 spin_unlock(&conn->mxk_lock);
3358 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3359 struct kmx_conn *new_conn = NULL;
3360 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3361 /* FIXME write lock here? */
3362 mxlnd_conn_disconnect(conn, 0, 1);
3363 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3365 mxlnd_conn_decref(new_conn); /* which we no longer need */
3366 peer->mxp_reconnect_time = 0;
3369 mxlnd_conn_decref(conn);
3372 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3373 spin_lock(&conn->mxk_lock);
3374 conn->mxk_epa = status.source;
3375 if (likely(!peer->mxp_incompatible)) {
3376 conn->mxk_status = MXLND_CONN_READY;
3378 spin_unlock(&conn->mxk_lock);
3379 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3380 * we should not have to lock the peer */
3381 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3383 /* mx_iconnect() succeeded, reset delay to 0 */
3384 write_lock(&kmxlnd_data.kmx_global_lock);
3385 peer->mxp_reconnect_time = 0;
3386 peer->mxp_sid = sid;
3387 write_unlock(&kmxlnd_data.kmx_global_lock);
3389 /* marshal CONN_ACK msg */
3390 tx = mxlnd_get_idle_tx();
3392 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3393 libcfs_nid2str(peer->mxp_nid));
3394 spin_lock(&conn->mxk_lock);
3395 conn->mxk_status = MXLND_CONN_FAIL;
3396 spin_unlock(&conn->mxk_lock);
3397 mxlnd_conn_decref(conn);
3401 tx->mxc_peer = peer;
3402 tx->mxc_conn = conn;
3403 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3404 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3405 txmsg = tx->mxc_msg;
3406 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3407 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3408 tx->mxc_match = mxlnd_create_match(tx, 0);
3415 * The MX request completion thread(s)
3416 * \param arg thread id (as a void *)
3418 * This thread waits for a MX completion and then completes the request.
3419 * We will create one thread per CPU.
3422 mxlnd_request_waitd(void *arg)
3424 long id = (long) arg;
3427 mx_return_t mxret = MX_SUCCESS;
3429 struct kmx_ctx *ctx = NULL;
3430 enum kmx_req_state req_type = MXLND_REQ_TX;
3431 struct kmx_peer *peer = NULL;
3432 struct kmx_conn *conn = NULL;
3437 memset(name, 0, sizeof(name));
3438 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3439 cfs_daemonize(name);
3440 //cfs_block_allsigs();
3442 memset(&status, 0, sizeof(status));
3444 CDEBUG(D_NET, "%s starting\n", name);
3446 while (!kmxlnd_data.kmx_shutdown) {
3452 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3453 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3457 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3458 0ULL, 0ULL, &status, &result);
3461 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3462 0ULL, 0ULL, &status, &result);
3464 if (unlikely(kmxlnd_data.kmx_shutdown))
3468 /* nothing completed... */
3472 if (status.code != MX_STATUS_SUCCESS) {
3473 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3474 "match_info 0x%llx and length %d\n",
3475 mx_strstatus(status.code), status.code,
3476 (u64) status.match_info, status.msg_length);
3479 msg_type = MXLND_MSG_TYPE(status.match_info);
3481 /* This may be a mx_iconnect() request completing,
3482 * check the bit mask for CONN_REQ and CONN_ACK */
3483 if (msg_type == MXLND_MSG_ICON_REQ ||
3484 msg_type == MXLND_MSG_ICON_ACK) {
3485 peer = (struct kmx_peer*) status.context;
3486 if (msg_type == MXLND_MSG_ICON_REQ) {
3487 mxlnd_handle_conn_req(peer, status);
3489 mxlnd_handle_conn_ack(peer, status);
3494 /* This must be a tx or rx */
3496 /* NOTE: if this is a RX from the unexpected callback, it may
3497 * have very little info. If we dropped it in unexpected_recv(),
3498 * it will not have a context. If so, ignore it. */
3499 ctx = (struct kmx_ctx *) status.context;
3502 req_type = ctx->mxc_type;
3503 conn = ctx->mxc_conn; /* this may be NULL */
3504 mxlnd_deq_pending_ctx(ctx);
3506 /* copy status to ctx->mxc_status */
3507 memcpy(&ctx->mxc_status, &status, sizeof(status));
3511 mxlnd_handle_tx_completion(ctx);
3514 mxlnd_handle_rx_completion(ctx);
3517 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3522 /* FIXME may need to reconsider this */
3523 /* conn is always set except for the first CONN_REQ rx
3524 * from a new peer */
3525 if (!(status.code == MX_STATUS_SUCCESS ||
3526 status.code == MX_STATUS_TRUNCATED) &&
3528 mxlnd_conn_disconnect(conn, 1, 1);
3531 CDEBUG(D_NET, "waitd() completed task\n");
3533 CDEBUG(D_NET, "%s stopping\n", name);
3534 mxlnd_thread_stop(id);
3540 mxlnd_check_timeouts(unsigned long now)
3544 unsigned long next = 0;
3545 struct kmx_peer *peer = NULL;
3546 struct kmx_conn *conn = NULL;
3548 read_lock(&kmxlnd_data.kmx_global_lock);
3549 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3550 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3552 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3553 read_unlock(&kmxlnd_data.kmx_global_lock);
3557 conn = peer->mxp_conn;
3559 mxlnd_conn_addref(conn);
3564 /* FIXMEis this needed? */
3565 spin_lock(&conn->mxk_lock);
3567 /* if nothing pending (timeout == 0) or
3568 * if conn is already disconnected,
3570 if (conn->mxk_timeout == 0 ||
3571 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3572 /* FIXME is this needed? */
3573 spin_unlock(&conn->mxk_lock);
3574 mxlnd_conn_decref(conn);
3578 /* we want to find the timeout that will occur first.
3579 * if it is in the future, we will sleep until then.
3580 * if it is in the past, then we will sleep one
3581 * second and repeat the process. */
3582 if ((next == 0) || (conn->mxk_timeout < next)) {
3583 next = conn->mxk_timeout;
3588 if (time_after_eq(now, conn->mxk_timeout)) {
3591 spin_unlock(&conn->mxk_lock);
3594 mxlnd_conn_disconnect(conn, 1, 1);
3596 mxlnd_conn_decref(conn);
3599 read_unlock(&kmxlnd_data.kmx_global_lock);
3600 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3606 * Enforces timeouts on messages
3607 * \param arg thread id (as a void *)
3609 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3610 * it calls mxlnd_conn_disconnect().
3612 * After checking for timeouts, try progressing sends (call check_sends()).
3615 mxlnd_timeoutd(void *arg)
3618 long id = (long) arg;
3619 unsigned long now = 0;
3620 unsigned long next = 0;
3621 unsigned long delay = HZ;
3622 struct kmx_peer *peer = NULL;
3623 struct kmx_conn *conn = NULL;
3625 cfs_daemonize("mxlnd_timeoutd");
3626 //cfs_block_allsigs();
3628 CDEBUG(D_NET, "timeoutd starting\n");
3630 while (!kmxlnd_data.kmx_shutdown) {
3633 /* if the next timeout has not arrived, go back to sleep */
3634 if (time_after(now, next)) {
3635 next = mxlnd_check_timeouts(now);
3638 read_lock(&kmxlnd_data.kmx_global_lock);
3639 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3640 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3641 /* FIXME upgrade to write lock?
3642 * is any lock needed? */
3643 conn = peer->mxp_conn;
3644 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3649 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3650 time_after(now, conn->mxk_last_tx + HZ)) {
3651 /* FIXME drop lock or call check_sends_locked */
3652 read_unlock(&kmxlnd_data.kmx_global_lock);
3653 mxlnd_check_sends(peer);
3654 read_lock(&kmxlnd_data.kmx_global_lock);
3656 mxlnd_conn_decref(conn); /* until here */
3659 read_unlock(&kmxlnd_data.kmx_global_lock);
3663 CDEBUG(D_NET, "timeoutd stopping\n");
3664 mxlnd_thread_stop(id);