1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 /* if memcmp() == 0, it is NULL */
52 return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
55 inline void mxlnd_noop(char *s, ...)
61 mxlnd_ctxstate_to_str(int mxc_state)
65 return "MXLND_CTX_INIT";
67 return "MXLND_CTX_IDLE";
69 return "MXLND_CTX_PREP";
70 case MXLND_CTX_PENDING:
71 return "MXLND_CTX_PENDING";
72 case MXLND_CTX_COMPLETED:
73 return "MXLND_CTX_COMPLETED";
74 case MXLND_CTX_CANCELED:
75 return "MXLND_CTX_CANCELED";
82 mxlnd_connstatus_to_str(int mxk_status)
85 case MXLND_CONN_READY:
86 return "MXLND_CONN_READY";
88 return "MXLND_CONN_INIT";
90 return "MXLND_CONN_REQ";
92 return "MXLND_CONN_ACK";
94 return "MXLND_CONN_WAIT";
95 case MXLND_CONN_DISCONNECT:
96 return "MXLND_CONN_DISCONNECT";
98 return "MXLND_CONN_FAIL";
105 mxlnd_msgtype_to_str(int type) {
107 case MXLND_MSG_EAGER:
108 return "MXLND_MSG_EAGER";
109 case MXLND_MSG_CONN_REQ:
110 return "MXLND_MSG_CONN_REQ";
111 case MXLND_MSG_CONN_ACK:
112 return "MXLND_MSG_CONN_ACK";
114 return "MXLND_MSG_BYE";
116 return "MXLND_MSG_NOOP";
117 case MXLND_MSG_PUT_REQ:
118 return "MXLND_MSG_PUT_REQ";
119 case MXLND_MSG_PUT_ACK:
120 return "MXLND_MSG_PUT_ACK";
121 case MXLND_MSG_PUT_DATA:
122 return "MXLND_MSG_PUT_DATA";
123 case MXLND_MSG_GET_REQ:
124 return "MXLND_MSG_GET_REQ";
125 case MXLND_MSG_GET_DATA:
126 return "MXLND_MSG_GET_DATA";
133 mxlnd_lnetmsg_to_str(int type)
137 return "LNET_MSG_ACK";
139 return "LNET_MSG_PUT";
141 return "LNET_MSG_GET";
143 return "LNET_MSG_REPLY";
145 return "LNET_MSG_HELLO";
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
156 u64 type = (u64) ctx->mxc_msg_type;
157 u64 err = (u64) error;
160 LASSERT(ctx->mxc_msg_type != 0);
161 LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162 match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
169 *msg_type = (u8) MXLND_MSG_TYPE(match);
170 *error = (u8) MXLND_ERROR_VAL(match);
171 *cookie = match & MXLND_MAX_COOKIE;
172 LASSERT(*msg_type == MXLND_MSG_EAGER ||
173 *msg_type == MXLND_MSG_ICON_REQ ||
174 *msg_type == MXLND_MSG_CONN_REQ ||
175 *msg_type == MXLND_MSG_ICON_ACK ||
176 *msg_type == MXLND_MSG_CONN_ACK ||
177 *msg_type == MXLND_MSG_BYE ||
178 *msg_type == MXLND_MSG_NOOP ||
179 *msg_type == MXLND_MSG_PUT_REQ ||
180 *msg_type == MXLND_MSG_PUT_ACK ||
181 *msg_type == MXLND_MSG_PUT_DATA ||
182 *msg_type == MXLND_MSG_GET_REQ ||
183 *msg_type == MXLND_MSG_GET_DATA);
188 mxlnd_get_idle_rx(void)
190 struct list_head *tmp = NULL;
191 struct kmx_ctx *rx = NULL;
193 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
195 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
200 tmp = &kmxlnd_data.kmx_rx_idle;
201 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202 list_del_init(&rx->mxc_list);
203 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
206 if (rx->mxc_get != rx->mxc_put) {
207 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
219 LASSERT (rx->mxc_get == rx->mxc_put);
223 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224 rx->mxc_state = MXLND_CTX_PREP;
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
233 CDEBUG(D_NETERROR, "called with NULL pointer\n");
235 } else if (rx->mxc_type != MXLND_REQ_RX) {
236 CDEBUG(D_NETERROR, "called with tx\n");
239 LASSERT(rx->mxc_get == rx->mxc_put + 1);
242 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
249 mxlnd_reduce_idle_rxs(__u32 count)
252 struct kmx_ctx *rx = NULL;
254 spin_lock(&kmxlnd_data.kmx_rxs_lock);
255 for (i = 0; i < count; i++) {
256 rx = mxlnd_get_idle_rx();
258 struct list_head *tmp = &rx->mxc_global_list;
262 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
266 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
271 mxlnd_get_idle_tx(void)
273 struct list_head *tmp = NULL;
274 struct kmx_ctx *tx = NULL;
276 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
278 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
284 tmp = &kmxlnd_data.kmx_tx_idle;
285 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286 list_del_init(&tx->mxc_list);
288 /* Allocate a new completion cookie. It might not be needed,
289 * but we've got a lock right now and we're unlikely to
291 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293 kmxlnd_data.kmx_tx_next_cookie = 1;
295 kmxlnd_data.kmx_tx_used++;
296 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
298 LASSERT (tx->mxc_get == tx->mxc_put);
302 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303 LASSERT (tx->mxc_lntmsg[0] == NULL);
304 LASSERT (tx->mxc_lntmsg[1] == NULL);
306 tx->mxc_state = MXLND_CTX_PREP;
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
317 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
319 lnet_msg_t *lntmsg[2];
322 CDEBUG(D_NETERROR, "called with NULL pointer\n");
324 } else if (tx->mxc_type != MXLND_REQ_TX) {
325 CDEBUG(D_NETERROR, "called with rx\n");
328 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329 tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330 struct kmx_conn *conn = tx->mxc_conn;
333 mxlnd_conn_disconnect(conn, 0, 1);
336 lntmsg[0] = tx->mxc_lntmsg[0];
337 lntmsg[1] = tx->mxc_lntmsg[1];
339 LASSERT(tx->mxc_get == tx->mxc_put + 1);
342 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344 kmxlnd_data.kmx_tx_used--;
345 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
352 * mxlnd_conn_free - free the conn
353 * @conn - a kmx_conn pointer
355 * The calling function should remove the conn from the conns list first
359 mxlnd_conn_free(struct kmx_conn *conn)
361 struct kmx_peer *peer = conn->mxk_peer;
363 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365 list_empty (&conn->mxk_tx_free_queue) &&
366 list_empty (&conn->mxk_pending));
367 if (!list_empty(&conn->mxk_list)) {
368 list_del_init(&conn->mxk_list);
369 if (peer->mxp_conn == conn) {
370 peer->mxp_conn = NULL;
371 if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372 mx_set_endpoint_addr_context(conn->mxk_epa,
376 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377 MXLND_FREE (conn, sizeof (*conn));
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
386 struct kmx_ctx *ctx = NULL;
387 struct kmx_ctx *next = NULL;
388 mx_return_t mxret = MX_SUCCESS;
393 spin_lock(&conn->mxk_lock);
394 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395 /* we will delete all including txs */
396 list_del_init(&ctx->mxc_list);
397 if (ctx->mxc_type == MXLND_REQ_RX) {
399 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
402 if (mxret != MX_SUCCESS) {
403 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
406 ctx->mxc_status.code = -ECONNABORTED;
407 ctx->mxc_state = MXLND_CTX_CANCELED;
408 /* NOTE this calls lnet_finalize() and
409 * we cannot hold any locks when calling it.
410 * It also calls mxlnd_conn_decref(conn) */
411 spin_unlock(&conn->mxk_lock);
412 mxlnd_handle_rx_completion(ctx);
413 spin_lock(&conn->mxk_lock);
418 spin_unlock(&conn->mxk_lock);
426 * mxlnd_conn_disconnect - shutdown a connection
427 * @conn - a kmx_conn pointer
428 * @mx_dis - call mx_disconnect()
429 * @send_bye - send peer a BYE msg
431 * This function sets the status to DISCONNECT, completes queued
432 * txs with failure, calls mx_disconnect, which will complete
433 * pending txs and matched rxs with failure.
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
438 mx_endpoint_addr_t epa = conn->mxk_epa;
439 struct list_head *tmp = NULL;
440 int valid = !mxlnd_endpoint_addr_null(epa);
442 spin_lock(&conn->mxk_lock);
443 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444 spin_unlock(&conn->mxk_lock);
447 conn->mxk_status = MXLND_CONN_DISCONNECT;
448 conn->mxk_timeout = 0;
450 while (!list_empty(&conn->mxk_tx_free_queue) ||
451 !list_empty(&conn->mxk_tx_credit_queue)) {
453 struct kmx_ctx *tx = NULL;
455 if (!list_empty(&conn->mxk_tx_free_queue)) {
456 tmp = &conn->mxk_tx_free_queue;
458 tmp = &conn->mxk_tx_credit_queue;
461 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462 list_del_init(&tx->mxc_list);
463 tx->mxc_status.code = -ECONNABORTED;
464 spin_unlock(&conn->mxk_lock);
465 mxlnd_put_idle_tx(tx);
466 mxlnd_conn_decref(conn); /* for this tx */
467 spin_lock(&conn->mxk_lock);
470 spin_unlock(&conn->mxk_lock);
472 /* cancel pending rxs */
473 mxlnd_conn_cancel_pending_rxs(conn);
475 if (send_bye && valid) {
476 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477 /* send a BYE to the peer */
478 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481 epa, match, NULL, NULL);
482 /* wait to allow the peer to ack our message */
483 mxlnd_sleep(msecs_to_jiffies(20));
486 if (kmxlnd_data.kmx_shutdown != 1) {
487 time_t last_alive = 0;
488 unsigned long last_msg = 0;
490 /* notify LNET that we are giving up on this peer */
491 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
492 last_msg = conn->mxk_last_rx;
494 last_msg = conn->mxk_last_tx;
496 last_alive = cfs_time_current_sec() -
497 cfs_duration_sec(cfs_time_current() - last_msg);
498 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
501 mx_disconnect(kmxlnd_data.kmx_endpt, epa);
503 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
509 * mxlnd_conn_alloc - allocate and initialize a new conn struct
510 * @connp - address of a kmx_conn pointer
511 * @peer - owning kmx_peer
513 * Returns 0 on success and -ENOMEM on failure
516 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
518 struct kmx_conn *conn = NULL;
520 LASSERT(peer != NULL);
522 MXLND_ALLOC(conn, sizeof (*conn));
524 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
527 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
529 memset(conn, 0, sizeof(*conn));
531 /* conn->mxk_incarnation = 0 - will be set by peer */
532 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
533 and one for the caller */
534 conn->mxk_peer = peer;
535 /* mxk_epa - to be set after mx_iconnect() */
536 INIT_LIST_HEAD(&conn->mxk_list);
537 spin_lock_init(&conn->mxk_lock);
538 /* conn->mxk_timeout = 0 */
539 conn->mxk_last_tx = jiffies;
540 conn->mxk_last_rx = conn->mxk_last_tx;
541 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
542 /* mxk_outstanding = 0 */
543 conn->mxk_status = MXLND_CONN_INIT;
544 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
545 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
546 /* conn->mxk_ntx_msgs = 0 */
547 /* conn->mxk_ntx_data = 0 */
548 /* conn->mxk_ntx_posted = 0 */
549 /* conn->mxk_data_posted = 0 */
550 INIT_LIST_HEAD(&conn->mxk_pending);
554 mxlnd_peer_addref(peer); /* add a ref for this conn */
556 /* add to front of peer's conns list */
557 list_add(&conn->mxk_list, &peer->mxp_conns);
558 peer->mxp_conn = conn;
563 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
566 write_lock(&kmxlnd_data.kmx_global_lock);
567 ret = mxlnd_conn_alloc_locked(connp, peer);
568 write_unlock(&kmxlnd_data.kmx_global_lock);
573 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
576 struct kmx_conn *conn = ctx->mxc_conn;
578 ctx->mxc_state = MXLND_CTX_PENDING;
580 spin_lock(&conn->mxk_lock);
581 if (conn->mxk_status >= MXLND_CONN_INIT) {
582 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
583 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
584 conn->mxk_timeout = ctx->mxc_deadline;
587 ctx->mxc_state = MXLND_CTX_COMPLETED;
590 spin_unlock(&conn->mxk_lock);
596 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
598 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
599 ctx->mxc_state == MXLND_CTX_COMPLETED);
600 if (ctx->mxc_state != MXLND_CTX_PENDING &&
601 ctx->mxc_state != MXLND_CTX_COMPLETED) {
602 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
603 mxlnd_ctxstate_to_str(ctx->mxc_state));
605 ctx->mxc_state = MXLND_CTX_COMPLETED;
606 if (!list_empty(&ctx->mxc_list)) {
607 struct kmx_conn *conn = ctx->mxc_conn;
608 struct kmx_ctx *next = NULL;
609 LASSERT(conn != NULL);
610 spin_lock(&conn->mxk_lock);
611 list_del_init(&ctx->mxc_list);
612 conn->mxk_timeout = 0;
613 if (!list_empty(&conn->mxk_pending)) {
614 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
615 conn->mxk_timeout = next->mxc_deadline;
617 spin_unlock(&conn->mxk_lock);
623 * mxlnd_peer_free - free the peer
624 * @peer - a kmx_peer pointer
626 * The calling function should decrement the rxs, drain the tx queues and
627 * remove the peer from the peers list first then destroy it.
630 mxlnd_peer_free(struct kmx_peer *peer)
632 CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
633 peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
635 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
637 if (peer == kmxlnd_data.kmx_localhost)
638 LASSERT(kmxlnd_data.kmx_shutdown);
640 if (!list_empty(&peer->mxp_peers)) {
641 /* assume we are locked */
642 list_del_init(&peer->mxp_peers);
645 MXLND_FREE (peer, sizeof (*peer));
646 atomic_dec(&kmxlnd_data.kmx_npeers);
650 #define MXLND_LOOKUP_COUNT 10
652 /* We only want the MAC address of the peer's Myricom NIC. We
653 * require that each node has the IPoMX interface (myriN) up.
654 * We will not pass any traffic over IPoMX, but it allows us
655 * to get the MAC address. */
657 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
663 unsigned char *haddr = NULL;
664 struct net_device *dev = NULL;
665 struct neighbour *n = NULL;
666 cfs_socket_t *sock = NULL;
667 __be32 dst_ip = htonl(ip);
669 dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
674 haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
677 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
680 if (n->nud_state & NUD_VALID) {
681 memcpy(haddr, n->ha, dev->addr_len);
687 /* not found, try to connect (force an arp) */
688 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
690 libcfs_sock_release(sock);
691 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
692 } while (try++ < MXLND_LOOKUP_COUNT);
698 #ifdef __LITTLE_ENDIAN
699 *nic_id = ___arch__swab64(tmp_id);
707 * mxlnd_peer_alloc - allocate and initialize a new peer struct
708 * @peerp - address of a kmx_peer pointer
709 * @nid - LNET node id
711 * Returns 0 on success and -ENOMEM on failure
714 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
718 u32 ip = LNET_NIDADDR(nid);
719 struct kmx_peer *peer = NULL;
721 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
723 MXLND_ALLOC(peer, sizeof (*peer));
725 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
728 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
730 memset(peer, 0, sizeof(*peer));
733 /* peer->mxp_incarnation */
734 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
737 peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
738 peer->mxp_board = board;
739 peer->mxp_nic_id = nic_id;
741 if (nic_id == 0ULL) {
742 ret = mxlnd_ip2nic_id(ip, &nic_id);
744 CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
745 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
748 peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
750 INIT_LIST_HEAD(&peer->mxp_peers);
751 INIT_LIST_HEAD(&peer->mxp_conns);
752 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
754 mxlnd_peer_decref(peer);
758 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
759 struct kmx_ctx *rx = NULL;
760 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
762 mxlnd_reduce_idle_rxs(i);
763 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
764 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
765 mxlnd_peer_decref(peer);
768 spin_lock(&kmxlnd_data.kmx_rxs_lock);
769 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
770 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
772 mxlnd_put_idle_rx(rx);
774 /* peer->mxp_reconnect_time = 0 */
775 /* peer->mxp_incompatible = 0 */
781 static inline struct kmx_peer *
782 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
786 struct kmx_peer *peer = NULL;
788 hash = mxlnd_nid_to_hash(nid);
790 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
791 if (peer->mxp_nid == nid) {
793 mxlnd_peer_addref(peer);
797 return (found ? peer : NULL);
800 static inline struct kmx_peer *
801 mxlnd_find_peer_by_nid(lnet_nid_t nid)
803 struct kmx_peer *peer = NULL;
805 read_lock(&kmxlnd_data.kmx_global_lock);
806 peer = mxlnd_find_peer_by_nid_locked(nid);
807 read_unlock(&kmxlnd_data.kmx_global_lock);
812 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
814 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
815 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
816 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
817 tx->mxc_msg_type == MXLND_MSG_NOOP);
821 * mxlnd_init_msg - set type and number of bytes
824 * @body_nob - bytes in msg body
827 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
829 msg->mxm_type = type;
830 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
834 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
836 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
837 struct kmx_msg *msg = NULL;
839 LASSERT (tx != NULL);
840 LASSERT (nob <= MXLND_EAGER_SIZE);
843 /* tx->mxc_peer should have already been set if we know it */
844 tx->mxc_msg_type = type;
846 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
847 tx->mxc_seg.segment_length = nob;
848 tx->mxc_pin_type = MX_PIN_PHYSICAL;
849 //tx->mxc_state = MXLND_CTX_PENDING;
852 msg->mxm_type = type;
859 mxlnd_cksum (void *ptr, int nob)
865 sum = ((sum << 1) | (sum >> 31)) + *c++;
867 /* ensure I don't return 0 (== no checksum) */
868 return (sum == 0) ? 1 : sum;
872 * mxlnd_pack_msg - complete msg info
876 mxlnd_pack_msg(struct kmx_ctx *tx)
878 struct kmx_msg *msg = tx->mxc_msg;
880 /* type and nob should already be set in init_msg() */
881 msg->mxm_magic = MXLND_MSG_MAGIC;
882 msg->mxm_version = MXLND_MSG_VERSION;
884 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
885 * return credits as well */
886 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
887 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
888 spin_lock(&tx->mxc_conn->mxk_lock);
889 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
890 tx->mxc_conn->mxk_outstanding = 0;
891 spin_unlock(&tx->mxc_conn->mxk_lock);
893 msg->mxm_credits = 0;
897 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
898 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
899 msg->mxm_dstnid = tx->mxc_nid;
900 /* if it is a new peer, the dststamp will be 0 */
901 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
902 msg->mxm_seq = tx->mxc_cookie;
904 if (*kmxlnd_tunables.kmx_cksum) {
905 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
910 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
912 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
917 /* 6 bytes are enough to have received magic + version */
919 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
923 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
925 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
928 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
932 if (msg->mxm_version !=
933 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
934 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
938 if (nob < hdr_size) {
939 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
943 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
945 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
949 /* checksum must be computed with mxm_cksum zero and BEFORE anything
951 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
953 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
954 CDEBUG(D_NETERROR, "Bad checksum\n");
957 msg->mxm_cksum = msg_cksum;
960 /* leave magic unflipped as a clue to peer endianness */
961 __swab16s(&msg->mxm_version);
962 CLASSERT (sizeof(msg->mxm_type) == 1);
963 CLASSERT (sizeof(msg->mxm_credits) == 1);
964 msg->mxm_nob = msg_nob;
965 __swab64s(&msg->mxm_srcnid);
966 __swab64s(&msg->mxm_srcstamp);
967 __swab64s(&msg->mxm_dstnid);
968 __swab64s(&msg->mxm_dststamp);
969 __swab64s(&msg->mxm_seq);
972 if (msg->mxm_srcnid == LNET_NID_ANY) {
973 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
977 switch (msg->mxm_type) {
979 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
985 case MXLND_MSG_EAGER:
986 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
987 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
988 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
993 case MXLND_MSG_PUT_REQ:
994 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
995 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
996 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1000 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1003 case MXLND_MSG_PUT_ACK:
1004 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1005 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1006 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1010 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1011 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1015 case MXLND_MSG_GET_REQ:
1016 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1017 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1018 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1022 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1026 case MXLND_MSG_CONN_REQ:
1027 case MXLND_MSG_CONN_ACK:
1028 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1029 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1030 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1034 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1035 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1044 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1048 * @length - length of incoming message
1049 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1051 * The caller gets the rx and sets nid, peer and conn if known.
1053 * Returns 0 on success and -1 on failure
1056 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1059 mx_return_t mxret = MX_SUCCESS;
1060 uint64_t mask = ~(MXLND_ERROR_MASK);
1062 rx->mxc_msg_type = msg_type;
1063 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1064 rx->mxc_cookie = cookie;
1065 /* rx->mxc_match may already be set */
1066 /* rx->mxc_seg.segment_ptr is already set */
1067 rx->mxc_seg.segment_length = length;
1068 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1069 ret = mxlnd_q_pending_ctx(rx);
1071 /* the caller is responsible for calling conn_decref() if needed */
1074 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1075 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1076 if (mxret != MX_SUCCESS) {
1077 mxlnd_deq_pending_ctx(rx);
1078 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1079 mx_strerror(mxret), (int) mxret);
1087 * mxlnd_unexpected_recv - this is the callback function that will handle
1088 * unexpected receives
1089 * @context - NULL, ignore
1090 * @source - the peer's mx_endpoint_addr_t
1091 * @match_value - the msg's bit, should be MXLND_MSG_EAGER
1092 * @length - length of incoming message
1093 * @data_if_available - ignore
1095 * If it is an eager-sized msg, we will call recv_msg() with the actual
1096 * length. If it is a large message, we will call recv_msg() with a
1097 * length of 0 bytes to drop it because we should never have a large,
1098 * unexpected message.
1100 * NOTE - The MX library blocks until this function completes. Make it as fast as
1101 * possible. DO NOT allocate memory which can block!
1103 * If we cannot get a rx or the conn is closed, drop the message on the floor
1104 * (i.e. recv 0 bytes and ignore).
1106 mx_unexp_handler_action_t
1107 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1108 uint64_t match_value, uint32_t length, void *data_if_available)
1111 struct kmx_ctx *rx = NULL;
1117 if (context != NULL) {
1118 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1122 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1125 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1126 if (msg_type == MXLND_MSG_BYE) {
1127 struct kmx_peer *peer = NULL;
1129 mx_get_endpoint_addr_context(source, (void **) &peer);
1130 if (peer && peer->mxp_conn) {
1131 CDEBUG(D_NET, "peer %s sent BYE msg\n",
1132 libcfs_nid2str(peer->mxp_nid));
1133 mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1136 return MX_RECV_FINISHED;
1139 rx = mxlnd_get_idle_rx();
1141 if (length <= MXLND_EAGER_SIZE) {
1142 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1144 CDEBUG(D_NETERROR, "unexpected large receive with "
1145 "match_value=0x%llx length=%d\n",
1146 match_value, length);
1147 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1151 struct kmx_peer *peer = NULL;
1152 struct kmx_conn *conn = NULL;
1154 /* NOTE to avoid a peer disappearing out from under us,
1155 * read lock the peers lock first */
1156 read_lock(&kmxlnd_data.kmx_global_lock);
1157 mx_get_endpoint_addr_context(source, (void **) &peer);
1159 mxlnd_peer_addref(peer); /* add a ref... */
1160 conn = peer->mxp_conn;
1162 mxlnd_conn_addref(conn); /* add ref until rx completed */
1163 mxlnd_peer_decref(peer); /* and drop peer ref */
1164 rx->mxc_conn = conn;
1166 rx->mxc_peer = peer;
1167 rx->mxc_nid = peer->mxp_nid;
1169 read_unlock(&kmxlnd_data.kmx_global_lock);
1171 CDEBUG(D_NETERROR, "could not post receive\n");
1172 mxlnd_put_idle_rx(rx);
1176 if (rx == NULL || ret != 0) {
1178 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1181 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1183 seg.segment_ptr = 0ULL;
1184 seg.segment_length = 0;
1185 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1186 match_value, ~0ULL, NULL, NULL);
1189 return MX_RECV_CONTINUE;
1194 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1198 struct kmx_peer *peer = NULL;
1200 read_lock(&kmxlnd_data.kmx_global_lock);
1201 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1202 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1204 *nidp = peer->mxp_nid;
1205 *count = atomic_read(&peer->mxp_refcount);
1211 read_unlock(&kmxlnd_data.kmx_global_lock);
1217 mxlnd_del_peer_locked(struct kmx_peer *peer)
1219 list_del_init(&peer->mxp_peers); /* remove from the global list */
1220 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1221 mxlnd_peer_decref(peer); /* drop global list ref */
1226 mxlnd_del_peer(lnet_nid_t nid)
1230 struct kmx_peer *peer = NULL;
1231 struct kmx_peer *next = NULL;
1233 if (nid != LNET_NID_ANY) {
1234 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1236 write_lock(&kmxlnd_data.kmx_global_lock);
1237 if (nid != LNET_NID_ANY) {
1240 } if (peer == kmxlnd_data.kmx_localhost) {
1241 mxlnd_peer_decref(peer); /* and drops it */
1242 CERROR("cannot free this host's NID 0x%llx\n", nid);
1244 mxlnd_peer_decref(peer); /* and drops it */
1245 mxlnd_del_peer_locked(peer);
1247 } else { /* LNET_NID_ANY */
1248 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1249 list_for_each_entry_safe(peer, next,
1250 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1251 if (peer != kmxlnd_data.kmx_localhost)
1252 mxlnd_del_peer_locked(peer);
1256 write_unlock(&kmxlnd_data.kmx_global_lock);
1262 mxlnd_get_conn_by_idx(int index)
1265 struct kmx_peer *peer = NULL;
1266 struct kmx_conn *conn = NULL;
1268 read_lock(&kmxlnd_data.kmx_global_lock);
1269 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1270 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1271 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1276 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1277 read_unlock(&kmxlnd_data.kmx_global_lock);
1282 read_unlock(&kmxlnd_data.kmx_global_lock);
1288 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1290 struct kmx_conn *conn = NULL;
1291 struct kmx_conn *next = NULL;
1293 if (peer == kmxlnd_data.kmx_localhost) return;
1295 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1296 mxlnd_conn_disconnect(conn, 0, 1);
1302 mxlnd_close_matching_conns(lnet_nid_t nid)
1306 struct kmx_peer *peer = NULL;
1308 read_lock(&kmxlnd_data.kmx_global_lock);
1309 if (nid != LNET_NID_ANY) {
1310 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1314 mxlnd_close_matching_conns_locked(peer);
1315 mxlnd_peer_decref(peer); /* and drops it here */
1317 } else { /* LNET_NID_ANY */
1318 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1319 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1320 mxlnd_close_matching_conns_locked(peer);
1323 read_unlock(&kmxlnd_data.kmx_global_lock);
1329 * mxlnd_ctl - modify MXLND parameters
1330 * @ni - LNET interface handle
1331 * @cmd - command to change
1332 * @arg - the ioctl data
1334 * Not implemented yet.
1337 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1339 struct libcfs_ioctl_data *data = arg;
1342 LASSERT (ni == kmxlnd_data.kmx_ni);
1345 case IOC_LIBCFS_GET_PEER: {
1349 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1350 data->ioc_nid = nid;
1351 data->ioc_count = count;
1354 case IOC_LIBCFS_DEL_PEER: {
1355 ret = mxlnd_del_peer(data->ioc_nid);
1358 case IOC_LIBCFS_GET_CONN: {
1359 struct kmx_conn *conn = NULL;
1361 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1366 data->ioc_nid = conn->mxk_peer->mxp_nid;
1367 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1371 case IOC_LIBCFS_CLOSE_CONNECTION: {
1372 ret = mxlnd_close_matching_conns(data->ioc_nid);
1376 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1384 * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1387 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1390 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1392 u8 msg_type = tx->mxc_msg_type;
1393 //struct kmx_peer *peer = tx->mxc_peer;
1394 struct kmx_conn *conn = tx->mxc_conn;
1396 LASSERT (msg_type != 0);
1397 LASSERT (tx->mxc_nid != 0);
1398 LASSERT (tx->mxc_peer != NULL);
1399 LASSERT (tx->mxc_conn != NULL);
1401 tx->mxc_incarnation = conn->mxk_incarnation;
1403 if (msg_type != MXLND_MSG_PUT_DATA &&
1404 msg_type != MXLND_MSG_GET_DATA) {
1406 if (mxlnd_tx_requires_credit(tx)) {
1407 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1408 conn->mxk_ntx_msgs++;
1409 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1410 msg_type == MXLND_MSG_CONN_ACK) {
1411 /* put conn msgs at the front of the queue */
1412 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1414 /* PUT_ACK, PUT_NAK */
1415 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1416 conn->mxk_ntx_msgs++;
1420 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1421 conn->mxk_ntx_data++;
1428 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1431 * Add the tx to the peer's msg or data queue
1434 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1436 LASSERT(tx->mxc_peer != NULL);
1437 LASSERT(tx->mxc_conn != NULL);
1438 spin_lock(&tx->mxc_conn->mxk_lock);
1439 mxlnd_peer_queue_tx_locked(tx);
1440 spin_unlock(&tx->mxc_conn->mxk_lock);
1446 * mxlnd_queue_tx - add the tx to the global tx queue
1449 * Add the tx to the global queue and up the tx_queue_sem
1452 mxlnd_queue_tx(struct kmx_ctx *tx)
1454 struct kmx_peer *peer = tx->mxc_peer;
1455 LASSERT (tx->mxc_nid != 0);
1458 if (peer->mxp_incompatible &&
1459 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1460 /* let this fail now */
1461 tx->mxc_status.code = -ECONNABORTED;
1462 mxlnd_conn_decref(peer->mxp_conn);
1463 mxlnd_put_idle_tx(tx);
1466 if (tx->mxc_conn == NULL) {
1468 struct kmx_conn *conn = NULL;
1470 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1472 tx->mxc_status.code = ret;
1473 mxlnd_put_idle_tx(tx);
1476 tx->mxc_conn = conn;
1477 mxlnd_peer_decref(peer); /* and takes it from peer */
1479 LASSERT(tx->mxc_conn != NULL);
1480 mxlnd_peer_queue_tx(tx);
1481 mxlnd_check_sends(peer);
1483 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1484 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1485 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1486 up(&kmxlnd_data.kmx_tx_queue_sem);
1493 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1500 int first_iov_offset = 0;
1501 int first_found = 0;
1503 int last_iov_length = 0;
1504 mx_ksegment_t *seg = NULL;
1506 if (niov == 0) return 0;
1507 LASSERT(iov != NULL);
1509 for (i = 0; i < niov; i++) {
1510 sum = old_sum + (u32) iov[i].iov_len;
1511 if (!first_found && (sum > offset)) {
1513 first_iov_offset = offset - old_sum;
1515 sum = (u32) iov[i].iov_len - first_iov_offset;
1520 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1521 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1526 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1527 nseg = last_iov - first_iov + 1;
1530 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1532 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1535 memset(seg, 0, nseg * sizeof(*seg));
1536 ctx->mxc_nseg = nseg;
1538 for (i = 0; i < nseg; i++) {
1539 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1540 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1542 seg[i].segment_ptr += (u64) first_iov_offset;
1543 seg[i].segment_length -= (u32) first_iov_offset;
1545 if (i == (nseg - 1)) {
1546 seg[i].segment_length = (u32) last_iov_length;
1548 sum += seg[i].segment_length;
1550 ctx->mxc_seg_list = seg;
1551 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1552 #ifdef MX_PIN_FULLPAGES
1553 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1555 LASSERT(nob == sum);
1560 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1566 int first_kiov = -1;
1567 int first_kiov_offset = 0;
1568 int first_found = 0;
1570 int last_kiov_length = 0;
1571 mx_ksegment_t *seg = NULL;
1573 if (niov == 0) return 0;
1574 LASSERT(kiov != NULL);
1576 for (i = 0; i < niov; i++) {
1577 sum = old_sum + kiov[i].kiov_len;
1578 if (i == 0) sum -= kiov[i].kiov_offset;
1579 if (!first_found && (sum > offset)) {
1581 first_kiov_offset = offset - old_sum;
1582 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1583 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1585 sum = kiov[i].kiov_len - first_kiov_offset;
1590 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1591 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1596 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1597 nseg = last_kiov - first_kiov + 1;
1600 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1602 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1605 memset(seg, 0, niov * sizeof(*seg));
1606 ctx->mxc_nseg = niov;
1608 for (i = 0; i < niov; i++) {
1609 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1610 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1612 seg[i].segment_ptr += (u64) first_kiov_offset;
1613 /* we have to add back the original kiov_offset */
1614 seg[i].segment_length -= first_kiov_offset +
1615 kiov[first_kiov].kiov_offset;
1617 if (i == (nseg - 1)) {
1618 seg[i].segment_length = last_kiov_length;
1620 sum += seg[i].segment_length;
1622 ctx->mxc_seg_list = seg;
1623 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1624 #ifdef MX_PIN_FULLPAGES
1625 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1627 LASSERT(nob == sum);
1632 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1634 LASSERT(type == MXLND_MSG_PUT_ACK);
1635 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1636 tx->mxc_cookie = cookie;
1637 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1638 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1639 tx->mxc_match = mxlnd_create_match(tx, status);
1646 * mxlnd_send_data - get tx, map [k]iov, queue tx
1653 * This setups the DATA send for PUT or GET.
1655 * On success, it queues the tx, on failure it calls lnet_finalize()
1658 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1661 lnet_process_id_t target = lntmsg->msg_target;
1662 unsigned int niov = lntmsg->msg_niov;
1663 struct iovec *iov = lntmsg->msg_iov;
1664 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1665 unsigned int offset = lntmsg->msg_offset;
1666 unsigned int nob = lntmsg->msg_len;
1667 struct kmx_ctx *tx = NULL;
1669 LASSERT(lntmsg != NULL);
1670 LASSERT(peer != NULL);
1671 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1672 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1674 tx = mxlnd_get_idle_tx();
1676 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1677 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1678 libcfs_nid2str(target.nid));
1681 tx->mxc_nid = target.nid;
1682 /* NOTE called when we have a ref on the conn, get one for this tx */
1683 mxlnd_conn_addref(peer->mxp_conn);
1684 tx->mxc_peer = peer;
1685 tx->mxc_conn = peer->mxp_conn;
1686 tx->mxc_msg_type = msg_type;
1687 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1688 tx->mxc_state = MXLND_CTX_PENDING;
1689 tx->mxc_lntmsg[0] = lntmsg;
1690 tx->mxc_cookie = cookie;
1691 tx->mxc_match = mxlnd_create_match(tx, 0);
1693 /* This setups up the mx_ksegment_t to send the DATA payload */
1695 /* do not setup the segments */
1696 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1697 "to %s?\n", libcfs_nid2str(target.nid));
1699 } else if (kiov == NULL) {
1700 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1702 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1705 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1706 libcfs_nid2str(target.nid));
1707 tx->mxc_status.code = -EIO;
1714 mxlnd_conn_decref(peer->mxp_conn);
1715 mxlnd_put_idle_tx(tx);
1719 CDEBUG(D_NETERROR, "no tx avail\n");
1720 lnet_finalize(ni, lntmsg, -EIO);
1725 * mxlnd_recv_data - map [k]iov, post rx
1732 * This setups the DATA receive for PUT or GET.
1734 * On success, it returns 0, on failure it returns -1
1737 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1740 lnet_process_id_t target = lntmsg->msg_target;
1741 unsigned int niov = lntmsg->msg_niov;
1742 struct iovec *iov = lntmsg->msg_iov;
1743 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1744 unsigned int offset = lntmsg->msg_offset;
1745 unsigned int nob = lntmsg->msg_len;
1746 mx_return_t mxret = MX_SUCCESS;
1747 u64 mask = ~(MXLND_ERROR_MASK);
1749 /* above assumes MXLND_MSG_PUT_DATA */
1750 if (msg_type == MXLND_MSG_GET_DATA) {
1751 niov = lntmsg->msg_md->md_niov;
1752 iov = lntmsg->msg_md->md_iov.iov;
1753 kiov = lntmsg->msg_md->md_iov.kiov;
1755 nob = lntmsg->msg_md->md_length;
1758 LASSERT(lntmsg != NULL);
1759 LASSERT(rx != NULL);
1760 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1761 LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1763 rx->mxc_msg_type = msg_type;
1764 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1765 rx->mxc_state = MXLND_CTX_PENDING;
1766 rx->mxc_nid = target.nid;
1767 /* if posting a GET_DATA, we may not yet know the peer */
1768 if (rx->mxc_peer != NULL) {
1769 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1771 rx->mxc_lntmsg[0] = lntmsg;
1772 rx->mxc_cookie = cookie;
1773 rx->mxc_match = mxlnd_create_match(rx, 0);
1774 /* This setups up the mx_ksegment_t to receive the DATA payload */
1776 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1778 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1780 if (msg_type == MXLND_MSG_GET_DATA) {
1781 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1782 if (rx->mxc_lntmsg[1] == NULL) {
1783 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1784 libcfs_nid2str(target.nid));
1789 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1790 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1791 libcfs_nid2str(target.nid));
1794 ret = mxlnd_q_pending_ctx(rx);
1798 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1799 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1800 rx->mxc_seg_list, rx->mxc_nseg,
1801 rx->mxc_pin_type, rx->mxc_match,
1804 if (mxret != MX_SUCCESS) {
1805 if (rx->mxc_conn != NULL) {
1806 mxlnd_deq_pending_ctx(rx);
1808 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1809 (int) mxret, libcfs_nid2str(target.nid));
1817 * mxlnd_send - the LND required send function
1822 * This must not block. Since we may not have a peer struct for the receiver,
1823 * it will append send messages on a global tx list. We will then up the
1824 * tx_queued's semaphore to notify it of the new send.
1827 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1830 int type = lntmsg->msg_type;
1831 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1832 lnet_process_id_t target = lntmsg->msg_target;
1833 lnet_nid_t nid = target.nid;
1834 int target_is_router = lntmsg->msg_target_is_router;
1835 int routing = lntmsg->msg_routing;
1836 unsigned int payload_niov = lntmsg->msg_niov;
1837 struct iovec *payload_iov = lntmsg->msg_iov;
1838 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1839 unsigned int payload_offset = lntmsg->msg_offset;
1840 unsigned int payload_nob = lntmsg->msg_len;
1841 struct kmx_ctx *tx = NULL;
1842 struct kmx_msg *txmsg = NULL;
1843 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1844 struct kmx_ctx *rx_data = NULL;
1845 struct kmx_conn *conn = NULL;
1847 uint32_t length = 0;
1848 struct kmx_peer *peer = NULL;
1850 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1851 payload_nob, payload_niov, libcfs_id2str(target));
1853 LASSERT (payload_nob == 0 || payload_niov > 0);
1854 LASSERT (payload_niov <= LNET_MAX_IOV);
1855 /* payload is either all vaddrs or all pages */
1856 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1858 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1860 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1861 * to a new peer, use the nid */
1862 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1864 if (unlikely(peer->mxp_incompatible)) {
1865 mxlnd_peer_decref(peer); /* drop ref taken above */
1867 read_lock(&kmxlnd_data.kmx_global_lock);
1868 conn = peer->mxp_conn;
1870 mxlnd_conn_addref(conn);
1871 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1873 read_unlock(&kmxlnd_data.kmx_global_lock);
1876 CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1877 if (conn == NULL && peer != NULL) {
1878 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1879 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1884 LASSERT (payload_nob == 0);
1887 case LNET_MSG_REPLY:
1889 /* Is the payload small enough not to need DATA? */
1890 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1891 if (nob <= MXLND_EAGER_SIZE)
1892 break; /* send EAGER */
1894 tx = mxlnd_get_idle_tx();
1895 if (unlikely(tx == NULL)) {
1896 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1897 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1898 libcfs_nid2str(nid));
1899 if (conn) mxlnd_conn_decref(conn);
1903 /* the peer may be NULL */
1904 tx->mxc_peer = peer;
1905 tx->mxc_conn = conn; /* may be NULL */
1906 /* we added a conn ref above */
1907 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1908 txmsg = tx->mxc_msg;
1909 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1910 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1911 tx->mxc_match = mxlnd_create_match(tx, 0);
1913 /* we must post a receive _before_ sending the request.
1914 * we need to determine how much to receive, it will be either
1915 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1917 rx = mxlnd_get_idle_rx();
1918 if (unlikely(rx == NULL)) {
1919 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1920 libcfs_nid2str(nid));
1921 mxlnd_put_idle_tx(tx);
1922 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1926 rx->mxc_peer = peer;
1927 /* conn may be NULL but unlikely since the first msg is always small */
1928 /* NOTE no need to lock peer before adding conn ref since we took
1929 * a conn ref for the tx (it cannot be freed between there and here ) */
1930 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1931 rx->mxc_conn = conn;
1932 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1933 rx->mxc_cookie = tx->mxc_cookie;
1934 rx->mxc_match = mxlnd_create_match(rx, 0);
1936 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1937 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1938 if (unlikely(ret != 0)) {
1939 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1940 libcfs_nid2str(nid));
1941 rx->mxc_lntmsg[0] = NULL;
1942 mxlnd_put_idle_rx(rx);
1943 mxlnd_put_idle_tx(tx);
1945 mxlnd_conn_decref(conn); /* for the rx... */
1946 mxlnd_conn_decref(conn); /* and for the tx */
1948 return -EHOSTUNREACH;
1955 if (routing || target_is_router)
1956 break; /* send EAGER */
1958 /* is the REPLY message too small for DATA? */
1959 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1960 if (nob <= MXLND_EAGER_SIZE)
1961 break; /* send EAGER */
1963 /* get tx (we need the cookie) , post rx for incoming DATA,
1964 * then post GET_REQ tx */
1965 tx = mxlnd_get_idle_tx();
1966 if (unlikely(tx == NULL)) {
1967 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1968 libcfs_nid2str(nid));
1969 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1972 rx_data = mxlnd_get_idle_rx();
1973 if (unlikely(rx_data == NULL)) {
1974 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1975 libcfs_nid2str(nid));
1976 mxlnd_put_idle_tx(tx);
1977 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1980 rx_data->mxc_peer = peer;
1981 /* NOTE no need to lock peer before adding conn ref since we took
1982 * a conn ref for the tx (it cannot be freed between there and here ) */
1983 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1984 rx_data->mxc_conn = conn; /* may be NULL */
1986 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1987 if (unlikely(ret != 0)) {
1988 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1989 libcfs_nid2str(nid));
1990 mxlnd_put_idle_rx(rx_data);
1991 mxlnd_put_idle_tx(tx);
1993 mxlnd_conn_decref(conn); /* for the rx_data... */
1994 mxlnd_conn_decref(conn); /* and for the tx */
1999 tx->mxc_peer = peer;
2000 tx->mxc_conn = conn; /* may be NULL */
2001 /* conn ref taken above */
2002 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2003 txmsg = tx->mxc_msg;
2004 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2005 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2006 tx->mxc_match = mxlnd_create_match(tx, 0);
2013 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2019 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2020 <= MXLND_EAGER_SIZE);
2022 tx = mxlnd_get_idle_tx();
2023 if (unlikely(tx == NULL)) {
2024 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2025 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2026 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2030 tx->mxc_peer = peer;
2031 tx->mxc_conn = conn; /* may be NULL */
2032 /* conn ref taken above */
2033 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2034 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2035 tx->mxc_match = mxlnd_create_match(tx, 0);
2037 txmsg = tx->mxc_msg;
2038 txmsg->mxm_u.eager.mxem_hdr = *hdr;
2040 if (payload_kiov != NULL)
2041 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2042 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2043 payload_niov, payload_kiov, payload_offset, payload_nob);
2045 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2046 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2047 payload_niov, payload_iov, payload_offset, payload_nob);
2049 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2055 * mxlnd_recv - the LND required recv function
2066 * This must not block.
2069 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2070 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2071 unsigned int offset, unsigned int mlen, unsigned int rlen)
2076 struct kmx_ctx *rx = private;
2077 struct kmx_msg *rxmsg = rx->mxc_msg;
2078 lnet_nid_t nid = rx->mxc_nid;
2079 struct kmx_ctx *tx = NULL;
2080 struct kmx_msg *txmsg = NULL;
2081 struct kmx_peer *peer = rx->mxc_peer;
2082 struct kmx_conn *conn = peer->mxp_conn;
2084 int msg_type = rxmsg->mxm_type;
2089 LASSERT (mlen <= rlen);
2090 /* Either all pages or all vaddrs */
2091 LASSERT (!(kiov != NULL && iov != NULL));
2092 LASSERT (peer != NULL);
2094 /* conn_addref(conn) already taken for the primary rx */
2097 case MXLND_MSG_EAGER:
2098 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2099 len = rx->mxc_status.xfer_length;
2100 if (unlikely(nob > len)) {
2101 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2102 libcfs_nid2str(nid), nob, len);
2108 lnet_copy_flat2kiov(niov, kiov, offset,
2109 MXLND_EAGER_SIZE, rxmsg,
2110 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2113 lnet_copy_flat2iov(niov, iov, offset,
2114 MXLND_EAGER_SIZE, rxmsg,
2115 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2121 case MXLND_MSG_PUT_REQ:
2122 /* we are going to reuse the rx, store the needed info */
2123 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2125 /* get tx, post rx, send PUT_ACK */
2127 tx = mxlnd_get_idle_tx();
2128 if (unlikely(tx == NULL)) {
2129 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2130 /* Not replying will break the connection */
2134 if (unlikely(mlen == 0)) {
2136 tx->mxc_peer = peer;
2137 tx->mxc_conn = conn;
2138 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2143 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2144 tx->mxc_peer = peer;
2145 tx->mxc_conn = conn;
2146 /* no need to lock peer first since we already have a ref */
2147 mxlnd_conn_addref(conn); /* for the tx */
2148 txmsg = tx->mxc_msg;
2149 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2150 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2151 tx->mxc_cookie = cookie;
2152 tx->mxc_match = mxlnd_create_match(tx, 0);
2154 /* we must post a receive _before_ sending the PUT_ACK */
2156 rx->mxc_state = MXLND_CTX_PREP;
2157 rx->mxc_peer = peer;
2158 rx->mxc_conn = conn;
2159 /* do not take another ref for this rx, it is already taken */
2160 rx->mxc_nid = peer->mxp_nid;
2161 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2162 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2164 if (unlikely(ret != 0)) {
2165 /* Notify peer that it's over */
2166 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2167 libcfs_nid2str(nid), ret);
2169 tx->mxc_state = MXLND_CTX_PREP;
2170 tx->mxc_peer = peer;
2171 tx->mxc_conn = conn;
2172 /* finalize = 0, let the PUT_ACK tx finalize this */
2173 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2174 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2175 /* conn ref already taken above */
2176 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2182 /* do not return a credit until after PUT_DATA returns */
2186 case MXLND_MSG_GET_REQ:
2187 if (likely(lntmsg != NULL)) {
2188 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2189 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2191 /* GET didn't match anything */
2192 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2193 * We have to embed the error code in the match bits.
2194 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2195 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2197 tx = mxlnd_get_idle_tx();
2198 if (unlikely(tx == NULL)) {
2199 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2200 libcfs_nid2str(nid));
2204 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2205 tx->mxc_state = MXLND_CTX_PENDING;
2207 tx->mxc_peer = peer;
2208 tx->mxc_conn = conn;
2209 /* no need to lock peer first since we already have a ref */
2210 mxlnd_conn_addref(conn); /* for this tx */
2211 tx->mxc_cookie = cookie;
2212 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2213 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2216 /* finalize lntmsg after tx completes */
2224 /* we received a message, increment peer's outstanding credits */
2226 spin_lock(&conn->mxk_lock);
2227 conn->mxk_outstanding++;
2228 spin_unlock(&conn->mxk_lock);
2230 /* we are done with the rx */
2231 mxlnd_put_idle_rx(rx);
2232 mxlnd_conn_decref(conn);
2235 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2237 /* we received a credit, see if we can use it to send a msg */
2238 if (credit) mxlnd_check_sends(peer);
2244 mxlnd_sleep(unsigned long timeout)
2246 set_current_state(TASK_INTERRUPTIBLE);
2247 schedule_timeout(timeout);
2252 * mxlnd_tx_queued - the generic send queue thread
2253 * @arg - thread id (as a void *)
2255 * This thread moves send messages from the global tx_queue to the owning
2256 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2257 * it to the global peer list.
2260 mxlnd_tx_queued(void *arg)
2262 long id = (long) arg;
2265 struct kmx_ctx *tx = NULL;
2266 struct kmx_peer *peer = NULL;
2267 struct list_head *tmp_tx = NULL;
2269 cfs_daemonize("mxlnd_tx_queued");
2270 //cfs_block_allsigs();
2272 while (!kmxlnd_data.kmx_shutdown) {
2273 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2274 if (kmxlnd_data.kmx_shutdown)
2276 if (ret != 0) // Should we check for -EINTR?
2278 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2279 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2280 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2283 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2284 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2285 list_del_init(&tx->mxc_list);
2286 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2289 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2291 tx->mxc_peer = peer;
2292 write_lock(&kmxlnd_data.kmx_global_lock);
2293 if (peer->mxp_conn == NULL) {
2294 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2296 /* out of memory, give up and fail tx */
2297 tx->mxc_status.code = -ENOMEM;
2298 write_unlock(&kmxlnd_data.kmx_global_lock);
2299 mxlnd_peer_decref(peer);
2300 mxlnd_put_idle_tx(tx);
2304 tx->mxc_conn = peer->mxp_conn;
2305 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2306 write_unlock(&kmxlnd_data.kmx_global_lock);
2307 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2313 struct kmx_peer *peer = NULL;
2314 struct kmx_peer *old = NULL;
2316 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2318 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2319 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2321 /* adds conn ref for this function */
2322 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2323 *kmxlnd_tunables.kmx_board,
2324 *kmxlnd_tunables.kmx_ep_id, 0ULL);
2326 /* finalize message */
2327 tx->mxc_status.code = ret;
2328 mxlnd_put_idle_tx(tx);
2331 tx->mxc_peer = peer;
2332 tx->mxc_conn = peer->mxp_conn;
2333 /* this tx will keep the conn ref taken in peer_alloc() */
2335 /* add peer to global peer list, but look to see
2336 * if someone already created it after we released
2338 write_lock(&kmxlnd_data.kmx_global_lock);
2339 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2340 if (old->mxp_nid == peer->mxp_nid) {
2341 /* somebody beat us here, we created a duplicate */
2348 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2349 atomic_inc(&kmxlnd_data.kmx_npeers);
2352 tx->mxc_conn = old->mxp_conn;
2353 /* FIXME can conn be NULL? */
2354 LASSERT(old->mxp_conn != NULL);
2355 mxlnd_conn_addref(old->mxp_conn);
2356 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2357 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2358 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2359 mxlnd_peer_decref(peer);
2361 write_unlock(&kmxlnd_data.kmx_global_lock);
2366 mxlnd_thread_stop(id);
2370 /* When calling this, we must not have the peer lock. */
2372 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2374 mx_return_t mxret = MX_SUCCESS;
2375 mx_request_t request;
2376 struct kmx_conn *conn = peer->mxp_conn;
2377 u8 msg_type = (u8) MXLND_MSG_TYPE(mask);
2379 /* NOTE we are holding a conn ref every time we call this function,
2380 * we do not need to lock the peer before taking another ref */
2381 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2383 LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2385 if (peer->mxp_reconnect_time == 0) {
2386 peer->mxp_reconnect_time = jiffies;
2389 if (peer->mxp_nic_id == 0ULL) {
2392 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2394 mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2396 if (peer->mxp_nic_id == 0ULL) {
2397 /* not mapped yet, return */
2398 spin_lock(&conn->mxk_lock);
2399 conn->mxk_status = MXLND_CONN_INIT;
2400 spin_unlock(&conn->mxk_lock);
2401 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2402 /* give up and notify LNET */
2403 mxlnd_conn_disconnect(conn, 0, 0);
2404 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2406 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2409 mxlnd_conn_decref(conn);
2414 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2415 peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2416 (void *) peer, &request);
2417 if (unlikely(mxret != MX_SUCCESS)) {
2418 spin_lock(&conn->mxk_lock);
2419 conn->mxk_status = MXLND_CONN_FAIL;
2420 spin_unlock(&conn->mxk_lock);
2421 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2422 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2423 mxlnd_conn_decref(conn);
2428 #define MXLND_STATS 0
2431 mxlnd_check_sends(struct kmx_peer *peer)
2435 mx_return_t mxret = MX_SUCCESS;
2436 struct kmx_ctx *tx = NULL;
2437 struct kmx_conn *conn = NULL;
2444 static unsigned long last = 0;
2447 if (unlikely(peer == NULL)) {
2448 LASSERT(peer != NULL);
2451 write_lock(&kmxlnd_data.kmx_global_lock);
2452 conn = peer->mxp_conn;
2453 /* NOTE take a ref for the duration of this function since it is called
2454 * when there might not be any queued txs for this peer */
2455 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2456 write_unlock(&kmxlnd_data.kmx_global_lock);
2458 /* do not add another ref for this tx */
2461 /* we do not have any conns */
2466 if (time_after(jiffies, last)) {
2467 last = jiffies + HZ;
2468 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2469 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2470 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2471 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2472 conn->mxk_ntx_data, conn->mxk_data_posted);
2476 /* cache peer state for asserts */
2477 spin_lock(&conn->mxk_lock);
2478 ntx_posted = conn->mxk_ntx_posted;
2479 credits = conn->mxk_credits;
2480 spin_unlock(&conn->mxk_lock);
2482 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2483 LASSERT(ntx_posted >= 0);
2485 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2486 LASSERT(credits >= 0);
2488 /* check number of queued msgs, ignore data */
2489 spin_lock(&conn->mxk_lock);
2490 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2491 /* check if any txs queued that could return credits... */
2492 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2493 /* if not, send a NOOP */
2494 tx = mxlnd_get_idle_tx();
2495 if (likely(tx != NULL)) {
2496 tx->mxc_peer = peer;
2497 tx->mxc_conn = peer->mxp_conn;
2498 mxlnd_conn_addref(conn); /* for this tx */
2499 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2500 tx->mxc_match = mxlnd_create_match(tx, 0);
2501 mxlnd_peer_queue_tx_locked(tx);
2507 spin_unlock(&conn->mxk_lock);
2509 /* if the peer is not ready, try to connect */
2510 spin_lock(&conn->mxk_lock);
2511 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2512 conn->mxk_status == MXLND_CONN_FAIL ||
2513 conn->mxk_status == MXLND_CONN_REQ)) {
2514 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2515 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2516 conn->mxk_status = MXLND_CONN_WAIT;
2517 spin_unlock(&conn->mxk_lock);
2518 mxlnd_iconnect(peer, match);
2521 spin_unlock(&conn->mxk_lock);
2523 spin_lock(&conn->mxk_lock);
2524 while (!list_empty(&conn->mxk_tx_free_queue) ||
2525 !list_empty(&conn->mxk_tx_credit_queue)) {
2526 /* We have something to send. If we have a queued tx that does not
2527 * require a credit (free), choose it since its completion will
2528 * return a credit (here or at the peer), complete a DATA or
2529 * CONN_REQ or CONN_ACK. */
2530 struct list_head *tmp_tx = NULL;
2531 if (!list_empty(&conn->mxk_tx_free_queue)) {
2532 tmp_tx = &conn->mxk_tx_free_queue;
2534 tmp_tx = &conn->mxk_tx_credit_queue;
2536 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2538 msg_type = tx->mxc_msg_type;
2540 /* don't try to send a rx */
2541 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2543 /* ensure that it is a valid msg type */
2544 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2545 msg_type == MXLND_MSG_CONN_ACK ||
2546 msg_type == MXLND_MSG_NOOP ||
2547 msg_type == MXLND_MSG_EAGER ||
2548 msg_type == MXLND_MSG_PUT_REQ ||
2549 msg_type == MXLND_MSG_PUT_ACK ||
2550 msg_type == MXLND_MSG_PUT_DATA ||
2551 msg_type == MXLND_MSG_GET_REQ ||
2552 msg_type == MXLND_MSG_GET_DATA);
2553 LASSERT(tx->mxc_peer == peer);
2554 LASSERT(tx->mxc_nid == peer->mxp_nid);
2556 credit = mxlnd_tx_requires_credit(tx);
2559 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2560 CDEBUG(D_NET, "%s: posted enough\n",
2561 libcfs_nid2str(peer->mxp_nid));
2565 if (conn->mxk_credits == 0) {
2566 CDEBUG(D_NET, "%s: no credits\n",
2567 libcfs_nid2str(peer->mxp_nid));
2571 if (conn->mxk_credits == 1 && /* last credit reserved for */
2572 conn->mxk_outstanding == 0) { /* giving back credits */
2573 CDEBUG(D_NET, "%s: not using last credit\n",
2574 libcfs_nid2str(peer->mxp_nid));
2579 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2580 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2581 msg_type == MXLND_MSG_CONN_ACK)) {
2582 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2583 mxlnd_connstatus_to_str(conn->mxk_status),
2585 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2586 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2587 list_del_init(&tx->mxc_list);
2588 tx->mxc_status.code = -ECONNABORTED;
2589 mxlnd_put_idle_tx(tx);
2590 mxlnd_conn_decref(conn);
2596 list_del_init(&tx->mxc_list);
2598 /* handle credits, etc now while we have the lock to avoid races */
2600 conn->mxk_credits--;
2601 conn->mxk_ntx_posted++;
2603 if (msg_type != MXLND_MSG_PUT_DATA &&
2604 msg_type != MXLND_MSG_GET_DATA) {
2605 if (msg_type != MXLND_MSG_CONN_REQ &&
2606 msg_type != MXLND_MSG_CONN_ACK) {
2607 conn->mxk_ntx_msgs--;
2610 if (tx->mxc_incarnation == 0 &&
2611 conn->mxk_incarnation != 0) {
2612 tx->mxc_incarnation = conn->mxk_incarnation;
2614 spin_unlock(&conn->mxk_lock);
2616 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2617 * or (2) there is a non-DATA msg that can return credits in the
2618 * queue, then drop this duplicate NOOP */
2619 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2620 spin_lock(&conn->mxk_lock);
2621 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2622 (conn->mxk_ntx_msgs >= 1)) {
2623 conn->mxk_credits++;
2624 conn->mxk_ntx_posted--;
2625 spin_unlock(&conn->mxk_lock);
2626 /* redundant NOOP */
2627 mxlnd_put_idle_tx(tx);
2628 mxlnd_conn_decref(conn);
2629 CDEBUG(D_NET, "%s: redundant noop\n",
2630 libcfs_nid2str(peer->mxp_nid));
2634 spin_unlock(&conn->mxk_lock);
2638 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2639 (msg_type != MXLND_MSG_GET_DATA))) {
2643 //ret = -ECONNABORTED;
2646 spin_lock(&conn->mxk_lock);
2647 status = conn->mxk_status;
2648 spin_unlock(&conn->mxk_lock);
2650 if (likely((status == MXLND_CONN_READY) ||
2651 (msg_type == MXLND_MSG_CONN_REQ) ||
2652 (msg_type == MXLND_MSG_CONN_ACK))) {
2654 if (msg_type != MXLND_MSG_CONN_REQ &&
2655 msg_type != MXLND_MSG_CONN_ACK) {
2656 /* add to the pending list */
2657 ret = mxlnd_q_pending_ctx(tx);
2659 /* FIXME the conn is disconnected, now what? */
2663 tx->mxc_state = MXLND_CTX_PENDING;
2667 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2668 msg_type != MXLND_MSG_GET_DATA)) {
2669 /* send a msg style tx */
2670 LASSERT(tx->mxc_nseg == 1);
2671 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2672 CDEBUG(D_NET, "sending %s 0x%llx\n",
2673 mxlnd_msgtype_to_str(msg_type),
2675 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2684 /* send a DATA tx */
2685 spin_lock(&conn->mxk_lock);
2686 conn->mxk_ntx_data--;
2687 conn->mxk_data_posted++;
2688 spin_unlock(&conn->mxk_lock);
2689 CDEBUG(D_NET, "sending %s 0x%llx\n",
2690 mxlnd_msgtype_to_str(msg_type),
2692 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2702 mxret = MX_CONNECTION_FAILED;
2704 if (likely(mxret == MX_SUCCESS)) {
2707 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2708 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2709 libcfs_nid2str(peer->mxp_nid));
2710 /* NOTE mx_kisend() only fails if there are not enough
2711 * resources. Do not change the connection status. */
2712 if (mxret == MX_NO_RESOURCES) {
2713 tx->mxc_status.code = -ENOMEM;
2715 tx->mxc_status.code = -ECONNABORTED;
2718 spin_lock(&conn->mxk_lock);
2719 conn->mxk_ntx_posted--;
2720 conn->mxk_credits++;
2721 spin_unlock(&conn->mxk_lock);
2722 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2723 msg_type == MXLND_MSG_GET_DATA) {
2724 spin_lock(&conn->mxk_lock);
2725 conn->mxk_data_posted--;
2726 spin_unlock(&conn->mxk_lock);
2728 if (msg_type != MXLND_MSG_PUT_DATA &&
2729 msg_type != MXLND_MSG_GET_DATA &&
2730 msg_type != MXLND_MSG_CONN_REQ &&
2731 msg_type != MXLND_MSG_CONN_ACK) {
2732 spin_lock(&conn->mxk_lock);
2733 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2734 spin_unlock(&conn->mxk_lock);
2736 if (msg_type != MXLND_MSG_CONN_REQ &&
2737 msg_type != MXLND_MSG_CONN_ACK) {
2738 /* remove from the pending list */
2739 mxlnd_deq_pending_ctx(tx);
2741 mxlnd_put_idle_tx(tx);
2742 mxlnd_conn_decref(conn);
2745 spin_lock(&conn->mxk_lock);
2748 spin_unlock(&conn->mxk_lock);
2750 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2756 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2757 * @ctx - the tx descriptor
2759 * Determine which type of send request it was and start the next step, if needed,
2760 * or, if done, signal completion to LNET. After we are done, put back on the
2764 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2766 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2767 struct kmx_msg *msg = tx->mxc_msg;
2768 struct kmx_peer *peer = tx->mxc_peer;
2769 struct kmx_conn *conn = tx->mxc_conn;
2770 u8 type = tx->mxc_msg_type;
2771 int credit = mxlnd_tx_requires_credit(tx);
2772 u64 cookie = tx->mxc_cookie;
2774 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2775 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2777 if (unlikely(conn == NULL)) {
2778 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2779 conn = peer->mxp_conn;
2781 /* do not add a ref for the tx, it was set before sending */
2782 tx->mxc_conn = conn;
2783 tx->mxc_peer = conn->mxk_peer;
2786 LASSERT (peer != NULL);
2787 LASSERT (conn != NULL);
2789 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2790 LASSERT (type == msg->mxm_type);
2794 tx->mxc_status.code = -EIO;
2796 spin_lock(&conn->mxk_lock);
2797 conn->mxk_last_tx = jiffies;
2798 spin_unlock(&conn->mxk_lock);
2803 case MXLND_MSG_GET_DATA:
2804 spin_lock(&conn->mxk_lock);
2805 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2806 conn->mxk_outstanding++;
2807 conn->mxk_data_posted--;
2809 spin_unlock(&conn->mxk_lock);
2812 case MXLND_MSG_PUT_DATA:
2813 spin_lock(&conn->mxk_lock);
2814 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2815 conn->mxk_data_posted--;
2817 spin_unlock(&conn->mxk_lock);
2820 case MXLND_MSG_NOOP:
2821 case MXLND_MSG_PUT_REQ:
2822 case MXLND_MSG_PUT_ACK:
2823 case MXLND_MSG_GET_REQ:
2824 case MXLND_MSG_EAGER:
2825 //case MXLND_MSG_NAK:
2828 case MXLND_MSG_CONN_ACK:
2829 if (peer->mxp_incompatible) {
2830 /* we sent our params, now close this conn */
2831 mxlnd_conn_disconnect(conn, 0, 1);
2833 case MXLND_MSG_CONN_REQ:
2835 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2836 "failed with %s (%d) to %s\n",
2837 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2838 mx_strstatus(tx->mxc_status.code),
2839 tx->mxc_status.code,
2840 libcfs_nid2str(tx->mxc_nid));
2841 if (!peer->mxp_incompatible) {
2842 spin_lock(&conn->mxk_lock);
2843 conn->mxk_status = MXLND_CONN_FAIL;
2844 spin_unlock(&conn->mxk_lock);
2850 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2855 spin_lock(&conn->mxk_lock);
2856 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2857 conn->mxk_ntx_posted--;
2859 spin_unlock(&conn->mxk_lock);
2862 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2863 mxlnd_put_idle_tx(tx);
2864 mxlnd_conn_decref(conn);
2866 mxlnd_check_sends(peer);
2872 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2877 u32 nob = rx->mxc_status.xfer_length;
2878 u64 bits = rx->mxc_status.match_info;
2879 struct kmx_msg *msg = rx->mxc_msg;
2880 struct kmx_peer *peer = rx->mxc_peer;
2881 struct kmx_conn *conn = rx->mxc_conn;
2882 u8 type = rx->mxc_msg_type;
2884 lnet_msg_t *lntmsg[2];
2891 int incompatible = 0;
2894 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2895 * failed GET reply, CONN_REQ, or a CONN_ACK */
2897 /* NOTE peer may still be NULL if it is a new peer and
2898 * conn may be NULL if this is a re-connect */
2899 if (likely(peer != NULL && conn != NULL)) {
2900 /* we have a reference on the conn */
2902 } else if (peer != NULL && conn == NULL) {
2903 /* we have a reference on the peer */
2905 } else if (peer == NULL && conn != NULL) {
2907 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2909 } /* else peer and conn == NULL */
2912 if (peer == NULL || conn == NULL) {
2913 /* if the peer was disconnected, the peer may exist but
2914 * not have any valid conns */
2915 decref = 0; /* no peer means no ref was taken for this rx */
2919 if (conn == NULL && peer != NULL) {
2920 write_lock(&kmxlnd_data.kmx_global_lock);
2921 conn = peer->mxp_conn;
2923 mxlnd_conn_addref(conn); /* conn takes ref... */
2924 mxlnd_peer_decref(peer); /* from peer */
2928 write_unlock(&kmxlnd_data.kmx_global_lock);
2929 rx->mxc_conn = conn;
2933 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2939 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2940 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2941 libcfs_nid2str(rx->mxc_nid),
2942 mx_strstatus(rx->mxc_status.code),
2943 (int) rx->mxc_status.code);
2949 /* this may be a failed GET reply */
2950 if (type == MXLND_MSG_GET_DATA) {
2951 /* get the error (52-59) bits from the match bits */
2952 ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2953 lntmsg[0] = rx->mxc_lntmsg[0];
2957 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2958 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2959 libcfs_nid2str(rx->mxc_nid));
2964 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2965 if (type == MXLND_MSG_PUT_DATA) {
2966 result = rx->mxc_status.code;
2967 lntmsg[0] = rx->mxc_lntmsg[0];
2969 } else if (type == MXLND_MSG_GET_DATA) {
2970 result = rx->mxc_status.code;
2971 lntmsg[0] = rx->mxc_lntmsg[0];
2972 lntmsg[1] = rx->mxc_lntmsg[1];
2976 ret = mxlnd_unpack_msg(msg, nob);
2978 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2979 ret, libcfs_nid2str(rx->mxc_nid));
2983 type = msg->mxm_type;
2986 if (type != MXLND_MSG_CONN_REQ &&
2987 (rx->mxc_nid != msg->mxm_srcnid ||
2988 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2989 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2990 "0x%llx and rx msg dst is 0x%llx)\n",
2991 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2996 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2997 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2998 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3000 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3001 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3002 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3003 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3004 msg->mxm_srcstamp, conn->mxk_incarnation,
3005 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3007 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3008 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3009 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3010 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3017 CDEBUG(D_NET, "Received %s with %d credits\n",
3018 mxlnd_msgtype_to_str(type), msg->mxm_credits);
3020 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
3021 msg->mxm_type != MXLND_MSG_CONN_ACK) {
3022 LASSERT(peer != NULL);
3023 LASSERT(conn != NULL);
3024 if (msg->mxm_credits != 0) {
3025 spin_lock(&conn->mxk_lock);
3026 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3027 if ((conn->mxk_credits + msg->mxm_credits) >
3028 *kmxlnd_tunables.kmx_credits) {
3029 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
3030 conn->mxk_credits, msg->mxm_credits);
3032 conn->mxk_credits += msg->mxm_credits;
3033 LASSERT(conn->mxk_credits >= 0);
3034 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3036 spin_unlock(&conn->mxk_lock);
3040 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3042 case MXLND_MSG_NOOP:
3045 case MXLND_MSG_EAGER:
3046 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3047 msg->mxm_srcnid, rx, 0);
3051 case MXLND_MSG_PUT_REQ:
3052 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3053 msg->mxm_srcnid, rx, 1);
3057 case MXLND_MSG_PUT_ACK: {
3058 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3059 if (cookie > MXLND_MAX_COOKIE) {
3060 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3061 libcfs_nid2str(rx->mxc_nid));
3062 result = -((u32) MXLND_ERROR_VAL(cookie));
3063 lntmsg[0] = rx->mxc_lntmsg[0];
3065 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3066 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3067 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3072 case MXLND_MSG_GET_REQ:
3073 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3074 msg->mxm_srcnid, rx, 1);
3078 case MXLND_MSG_CONN_REQ:
3079 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3080 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3081 libcfs_nid2str(msg->mxm_srcnid),
3082 libcfs_nid2str(msg->mxm_dstnid));
3085 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3086 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3088 libcfs_nid2str(msg->mxm_srcnid),
3089 msg->mxm_u.conn_req.mxcrm_queue_depth,
3090 *kmxlnd_tunables.kmx_credits);
3093 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3094 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3096 libcfs_nid2str(msg->mxm_srcnid),
3097 msg->mxm_u.conn_req.mxcrm_eager_size,
3098 (int) MXLND_EAGER_SIZE);
3101 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3103 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3106 struct kmx_peer *existing_peer = NULL;
3107 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3109 rx->mxc_nid = msg->mxm_srcnid;
3111 /* adds conn ref for peer and one for this function */
3112 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3113 *kmxlnd_tunables.kmx_board,
3114 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3118 peer->mxp_sid = sid;
3119 LASSERT(peer->mxp_ep_id == ep_id);
3120 write_lock(&kmxlnd_data.kmx_global_lock);
3121 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3122 if (existing_peer) {
3123 mxlnd_conn_decref(peer->mxp_conn);
3124 mxlnd_peer_decref(peer);
3125 peer = existing_peer;
3126 mxlnd_conn_addref(peer->mxp_conn);
3128 list_add_tail(&peer->mxp_peers,
3129 &kmxlnd_data.kmx_peers[hash]);
3130 atomic_inc(&kmxlnd_data.kmx_npeers);
3132 write_unlock(&kmxlnd_data.kmx_global_lock);
3134 /* FIXME should write lock here */
3135 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3136 mxlnd_peer_decref(peer); /* drop ref taken above */
3138 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3142 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3143 conn = peer->mxp_conn;
3144 } else { /* found peer */
3145 struct kmx_conn *old_conn = conn;
3147 if (sid != peer->mxp_sid) {
3148 /* do not call mx_disconnect() or send a BYE */
3149 mxlnd_conn_disconnect(old_conn, 0, 0);
3151 /* the ref for this rx was taken on the old_conn */
3152 mxlnd_conn_decref(old_conn);
3154 /* This allocs a conn, points peer->mxp_conn to this one.
3155 * The old conn is still on the peer->mxp_conns list.
3156 * As the pending requests complete, they will call
3157 * conn_decref() which will eventually free it. */
3158 ret = mxlnd_conn_alloc(&conn, peer);
3160 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3163 /* conn_alloc() adds one ref for the peer and one
3164 * for this function */
3167 peer->mxp_sid = sid;
3170 write_lock(&kmxlnd_data.kmx_global_lock);
3171 peer->mxp_incarnation = msg->mxm_srcstamp;
3172 peer->mxp_incompatible = incompatible;
3173 write_unlock(&kmxlnd_data.kmx_global_lock);
3174 spin_lock(&conn->mxk_lock);
3175 conn->mxk_incarnation = msg->mxm_srcstamp;
3176 conn->mxk_status = MXLND_CONN_WAIT;
3177 spin_unlock(&conn->mxk_lock);
3179 /* handle_conn_ack() will create the CONN_ACK msg */
3180 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3181 mxlnd_iconnect(peer, match);
3185 case MXLND_MSG_CONN_ACK:
3186 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3187 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3188 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3189 libcfs_nid2str(msg->mxm_dstnid));
3193 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3194 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3195 "incompatible queue depth %d (%d wanted)\n",
3196 libcfs_nid2str(msg->mxm_srcnid),
3197 msg->mxm_u.conn_req.mxcrm_queue_depth,
3198 *kmxlnd_tunables.kmx_credits);
3199 spin_lock(&conn->mxk_lock);
3200 conn->mxk_status = MXLND_CONN_FAIL;
3201 spin_unlock(&conn->mxk_lock);
3205 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3206 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3207 "incompatible EAGER size %d (%d wanted)\n",
3208 libcfs_nid2str(msg->mxm_srcnid),
3209 msg->mxm_u.conn_req.mxcrm_eager_size,
3210 (int) MXLND_EAGER_SIZE);
3211 spin_lock(&conn->mxk_lock);
3212 conn->mxk_status = MXLND_CONN_FAIL;
3213 spin_unlock(&conn->mxk_lock);
3217 write_lock(&kmxlnd_data.kmx_global_lock);
3218 peer->mxp_incarnation = msg->mxm_srcstamp;
3219 peer->mxp_incompatible = incompatible;
3220 write_unlock(&kmxlnd_data.kmx_global_lock);
3221 spin_lock(&conn->mxk_lock);
3222 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3223 conn->mxk_outstanding = 0;
3224 conn->mxk_incarnation = msg->mxm_srcstamp;
3225 conn->mxk_timeout = 0;
3226 if (!incompatible) {
3227 conn->mxk_status = MXLND_CONN_READY;
3229 spin_unlock(&conn->mxk_lock);
3230 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3234 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3235 libcfs_nid2str(rx->mxc_nid));
3242 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3243 spin_lock(&conn->mxk_lock);
3244 conn->mxk_status = MXLND_CONN_FAIL;
3245 spin_unlock(&conn->mxk_lock);
3250 spin_lock(&conn->mxk_lock);
3251 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3252 spin_unlock(&conn->mxk_lock);
3256 /* lnet_parse() failed, etc., repost now */
3257 mxlnd_put_idle_rx(rx);
3258 if (conn != NULL && credit == 1) {
3259 if (type == MXLND_MSG_PUT_DATA) {
3260 spin_lock(&conn->mxk_lock);
3261 conn->mxk_outstanding++;
3262 spin_unlock(&conn->mxk_lock);
3263 } else if (type != MXLND_MSG_GET_DATA &&
3264 (type == MXLND_MSG_EAGER ||
3265 type == MXLND_MSG_PUT_REQ ||
3266 type == MXLND_MSG_NOOP)) {
3267 spin_lock(&conn->mxk_lock);
3268 conn->mxk_outstanding++;
3269 spin_unlock(&conn->mxk_lock);
3272 if (conn_ref) mxlnd_conn_decref(conn);
3273 LASSERT(peer_ref == 0);
3276 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3277 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3279 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3282 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3283 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3285 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3293 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3295 struct kmx_ctx *tx = NULL;
3296 struct kmx_msg *txmsg = NULL;
3297 struct kmx_conn *conn = peer->mxp_conn;
3299 /* a conn ref was taken when calling mx_iconnect(),
3300 * hold it until CONN_REQ or CONN_ACK completes */
3302 CDEBUG(D_NET, "entering\n");
3303 if (status.code != MX_STATUS_SUCCESS) {
3304 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3305 mx_strstatus(status.code), status.code,
3306 libcfs_nid2str(peer->mxp_nid));
3307 spin_lock(&conn->mxk_lock);
3308 conn->mxk_status = MXLND_CONN_FAIL;
3309 spin_unlock(&conn->mxk_lock);
3311 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3312 struct kmx_conn *new_conn = NULL;
3313 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3314 /* FIXME write lock here ? */
3315 mxlnd_conn_disconnect(conn, 0, 0);
3316 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3317 mxlnd_conn_decref(new_conn); /* which we no longer need */
3318 peer->mxp_reconnect_time = 0;
3321 mxlnd_conn_decref(conn);
3325 spin_lock(&conn->mxk_lock);
3326 conn->mxk_epa = status.source;
3327 spin_unlock(&conn->mxk_lock);
3328 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3329 * we should not need to lock the peer */
3330 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3332 /* mx_iconnect() succeeded, reset delay to 0 */
3333 write_lock(&kmxlnd_data.kmx_global_lock);
3334 peer->mxp_reconnect_time = 0;
3335 write_unlock(&kmxlnd_data.kmx_global_lock);
3337 /* marshal CONN_REQ msg */
3338 /* we are still using the conn ref from iconnect() - do not take another */
3339 tx = mxlnd_get_idle_tx();
3341 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3342 libcfs_nid2str(peer->mxp_nid));
3343 spin_lock(&conn->mxk_lock);
3344 conn->mxk_status = MXLND_CONN_FAIL;
3345 spin_unlock(&conn->mxk_lock);
3346 mxlnd_conn_decref(conn);
3350 tx->mxc_peer = peer;
3351 tx->mxc_conn = conn;
3352 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3353 txmsg = tx->mxc_msg;
3354 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3355 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3356 tx->mxc_match = mxlnd_create_match(tx, 0);
3358 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3364 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3366 struct kmx_ctx *tx = NULL;
3367 struct kmx_msg *txmsg = NULL;
3368 struct kmx_conn *conn = peer->mxp_conn;
3373 /* a conn ref was taken when calling mx_iconnect(),
3374 * hold it until CONN_REQ or CONN_ACK completes */
3376 CDEBUG(D_NET, "entering\n");
3377 if (status.code != MX_STATUS_SUCCESS) {
3378 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3379 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3380 mx_strstatus(status.code), status.code,
3381 libcfs_nid2str(peer->mxp_nid),
3385 spin_lock(&conn->mxk_lock);
3386 conn->mxk_status = MXLND_CONN_FAIL;
3387 spin_unlock(&conn->mxk_lock);
3389 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3390 struct kmx_conn *new_conn = NULL;
3391 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3392 /* FIXME write lock here? */
3393 mxlnd_conn_disconnect(conn, 0, 1);
3394 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3396 mxlnd_conn_decref(new_conn); /* which we no longer need */
3397 peer->mxp_reconnect_time = 0;
3400 mxlnd_conn_decref(conn);
3403 mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3404 spin_lock(&conn->mxk_lock);
3405 conn->mxk_epa = status.source;
3406 if (likely(!peer->mxp_incompatible)) {
3407 conn->mxk_status = MXLND_CONN_READY;
3409 spin_unlock(&conn->mxk_lock);
3410 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3411 * we should not have to lock the peer */
3412 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3414 /* mx_iconnect() succeeded, reset delay to 0 */
3415 write_lock(&kmxlnd_data.kmx_global_lock);
3416 peer->mxp_reconnect_time = 0;
3417 peer->mxp_sid = sid;
3418 write_unlock(&kmxlnd_data.kmx_global_lock);
3420 /* marshal CONN_ACK msg */
3421 tx = mxlnd_get_idle_tx();
3423 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3424 libcfs_nid2str(peer->mxp_nid));
3425 spin_lock(&conn->mxk_lock);
3426 conn->mxk_status = MXLND_CONN_FAIL;
3427 spin_unlock(&conn->mxk_lock);
3428 mxlnd_conn_decref(conn);
3432 tx->mxc_peer = peer;
3433 tx->mxc_conn = conn;
3434 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3435 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3436 txmsg = tx->mxc_msg;
3437 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3438 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3439 tx->mxc_match = mxlnd_create_match(tx, 0);
3446 * mxlnd_request_waitd - the MX request completion thread(s)
3447 * @arg - thread id (as a void *)
3449 * This thread waits for a MX completion and then completes the request.
3450 * We will create one thread per CPU.
3453 mxlnd_request_waitd(void *arg)
3455 long id = (long) arg;
3458 mx_return_t mxret = MX_SUCCESS;
3460 struct kmx_ctx *ctx = NULL;
3461 enum kmx_req_state req_type = MXLND_REQ_TX;
3462 struct kmx_peer *peer = NULL;
3463 struct kmx_conn *conn = NULL;
3468 memset(name, 0, sizeof(name));
3469 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3470 cfs_daemonize(name);
3471 //cfs_block_allsigs();
3473 memset(&status, 0, sizeof(status));
3475 CDEBUG(D_NET, "%s starting\n", name);
3477 while (!kmxlnd_data.kmx_shutdown) {
3483 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3484 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3488 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3489 0ULL, 0ULL, &status, &result);
3492 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3493 0ULL, 0ULL, &status, &result);
3495 if (unlikely(kmxlnd_data.kmx_shutdown))
3499 /* nothing completed... */
3503 if (status.code != MX_STATUS_SUCCESS) {
3504 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3505 "match_info 0x%llx and length %d\n",
3506 mx_strstatus(status.code), status.code,
3507 (u64) status.match_info, status.msg_length);
3510 msg_type = MXLND_MSG_TYPE(status.match_info);
3512 /* This may be a mx_iconnect() request completing,
3513 * check the bit mask for CONN_REQ and CONN_ACK */
3514 if (msg_type == MXLND_MSG_ICON_REQ ||
3515 msg_type == MXLND_MSG_ICON_ACK) {
3516 peer = (struct kmx_peer*) status.context;
3517 if (msg_type == MXLND_MSG_ICON_REQ) {
3518 mxlnd_handle_conn_req(peer, status);
3520 mxlnd_handle_conn_ack(peer, status);
3525 /* This must be a tx or rx */
3527 /* NOTE: if this is a RX from the unexpected callback, it may
3528 * have very little info. If we dropped it in unexpected_recv(),
3529 * it will not have a context. If so, ignore it. */
3530 ctx = (struct kmx_ctx *) status.context;
3533 req_type = ctx->mxc_type;
3534 conn = ctx->mxc_conn; /* this may be NULL */
3535 mxlnd_deq_pending_ctx(ctx);
3537 /* copy status to ctx->mxc_status */
3538 memcpy(&ctx->mxc_status, &status, sizeof(status));
3542 mxlnd_handle_tx_completion(ctx);
3545 mxlnd_handle_rx_completion(ctx);
3548 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3553 /* FIXME may need to reconsider this */
3554 /* conn is always set except for the first CONN_REQ rx
3555 * from a new peer */
3556 if (!(status.code == MX_STATUS_SUCCESS ||
3557 status.code == MX_STATUS_TRUNCATED) &&
3559 mxlnd_conn_disconnect(conn, 1, 1);
3562 CDEBUG(D_NET, "waitd() completed task\n");
3564 CDEBUG(D_NET, "%s stopping\n", name);
3565 mxlnd_thread_stop(id);
3571 mxlnd_check_timeouts(unsigned long now)
3575 unsigned long next = 0;
3576 struct kmx_peer *peer = NULL;
3577 struct kmx_conn *conn = NULL;
3579 read_lock(&kmxlnd_data.kmx_global_lock);
3580 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3581 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3583 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3584 read_unlock(&kmxlnd_data.kmx_global_lock);
3588 conn = peer->mxp_conn;
3590 mxlnd_conn_addref(conn);
3595 /* FIXMEis this needed? */
3596 spin_lock(&conn->mxk_lock);
3598 /* if nothing pending (timeout == 0) or
3599 * if conn is already disconnected,
3601 if (conn->mxk_timeout == 0 ||
3602 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3603 /* FIXME is this needed? */
3604 spin_unlock(&conn->mxk_lock);
3605 mxlnd_conn_decref(conn);
3609 /* we want to find the timeout that will occur first.
3610 * if it is in the future, we will sleep until then.
3611 * if it is in the past, then we will sleep one
3612 * second and repeat the process. */
3613 if ((next == 0) || (conn->mxk_timeout < next)) {
3614 next = conn->mxk_timeout;
3619 if (time_after_eq(now, conn->mxk_timeout)) {
3622 spin_unlock(&conn->mxk_lock);
3625 mxlnd_conn_disconnect(conn, 1, 1);
3627 mxlnd_conn_decref(conn);
3630 read_unlock(&kmxlnd_data.kmx_global_lock);
3631 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3637 * mxlnd_timeoutd - enforces timeouts on messages
3638 * @arg - thread id (as a void *)
3640 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3641 * it calls mxlnd_conn_disconnect().
3643 * After checking for timeouts, try progressing sends (call check_sends()).
3646 mxlnd_timeoutd(void *arg)
3649 long id = (long) arg;
3650 unsigned long now = 0;
3651 unsigned long next = 0;
3652 unsigned long delay = HZ;
3653 struct kmx_peer *peer = NULL;
3654 struct kmx_conn *conn = NULL;
3656 cfs_daemonize("mxlnd_timeoutd");
3657 //cfs_block_allsigs();
3659 CDEBUG(D_NET, "timeoutd starting\n");
3661 while (!kmxlnd_data.kmx_shutdown) {
3664 /* if the next timeout has not arrived, go back to sleep */
3665 if (time_after(now, next)) {
3666 next = mxlnd_check_timeouts(now);
3669 read_lock(&kmxlnd_data.kmx_global_lock);
3670 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3671 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3672 /* FIXME upgrade to write lock?
3673 * is any lock needed? */
3674 conn = peer->mxp_conn;
3675 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3680 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3681 time_after(now, conn->mxk_last_tx + HZ)) {
3682 /* FIXME drop lock or call check_sends_locked */
3683 read_unlock(&kmxlnd_data.kmx_global_lock);
3684 mxlnd_check_sends(peer);
3685 read_lock(&kmxlnd_data.kmx_global_lock);
3687 mxlnd_conn_decref(conn); /* until here */
3690 read_unlock(&kmxlnd_data.kmx_global_lock);
3694 CDEBUG(D_NET, "timeoutd stopping\n");
3695 mxlnd_thread_stop(id);