2 * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3 * vim:expandtab:shiftwidth=8:tabstop=8:
7 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License version 2 only,
11 * as published by the Free Software Foundation.
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * General Public License version 2 for more details (a copy is included
17 * in the LICENSE file that accompanied this code).
19 * You should have received a copy of the GNU General Public License
20 * version 2 along with this program; If not, see [sun.com URL with a
23 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24 * CA 95054 USA or visit www.sun.com if you need additional information or
30 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
31 * Use is subject to license terms.
33 * Copyright (C) 2006 Myricom, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lnet/klnds/mxlnd/mxlnd.c
41 * Author: Eric Barton <eric@bartonsoftware.com>
42 * Author: Scott Atchley <atchley at myri.com>
47 inline void mxlnd_noop(char *s, ...)
53 mxlnd_ctxstate_to_str(int mxc_state)
57 return "MXLND_CTX_INIT";
59 return "MXLND_CTX_IDLE";
61 return "MXLND_CTX_PREP";
62 case MXLND_CTX_PENDING:
63 return "MXLND_CTX_PENDING";
64 case MXLND_CTX_COMPLETED:
65 return "MXLND_CTX_COMPLETED";
66 case MXLND_CTX_CANCELED:
67 return "MXLND_CTX_CANCELED";
74 mxlnd_connstatus_to_str(int mxk_status)
77 case MXLND_CONN_READY:
78 return "MXLND_CONN_READY";
80 return "MXLND_CONN_INIT";
82 return "MXLND_CONN_REQ";
84 return "MXLND_CONN_ACK";
86 return "MXLND_CONN_WAIT";
87 case MXLND_CONN_DISCONNECT:
88 return "MXLND_CONN_DISCONNECT";
90 return "MXLND_CONN_FAIL";
97 mxlnd_msgtype_to_str(int type) {
100 return "MXLND_MSG_EAGER";
101 case MXLND_MSG_CONN_REQ:
102 return "MXLND_MSG_CONN_REQ";
103 case MXLND_MSG_CONN_ACK:
104 return "MXLND_MSG_CONN_ACK";
106 return "MXLND_MSG_NOOP";
107 case MXLND_MSG_PUT_REQ:
108 return "MXLND_MSG_PUT_REQ";
109 case MXLND_MSG_PUT_ACK:
110 return "MXLND_MSG_PUT_ACK";
111 case MXLND_MSG_PUT_DATA:
112 return "MXLND_MSG_PUT_DATA";
113 case MXLND_MSG_GET_REQ:
114 return "MXLND_MSG_GET_REQ";
115 case MXLND_MSG_GET_DATA:
116 return "MXLND_MSG_GET_DATA";
123 mxlnd_lnetmsg_to_str(int type)
127 return "LNET_MSG_ACK";
129 return "LNET_MSG_PUT";
131 return "LNET_MSG_GET";
133 return "LNET_MSG_REPLY";
135 return "LNET_MSG_HELLO";
143 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
144 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
146 u64 type = (u64) ctx->mxc_msg_type;
147 u64 err = (u64) error;
150 LASSERT(ctx->mxc_msg_type != 0);
151 LASSERT(ctx->mxc_cookie >> 52 == 0);
152 match = (type << 60) | (err << 52) | ctx->mxc_cookie;
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
159 *msg_type = (u8) (match >> 60);
160 *error = (u8) ((match >> 52) & 0xFF);
161 *cookie = match & 0xFFFFFFFFFFFFFLL;
162 LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
163 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
164 *msg_type == MXLND_MSG_EAGER ||
165 *msg_type == MXLND_MSG_CONN_REQ ||
166 *msg_type == MXLND_MSG_CONN_ACK ||
167 *msg_type == MXLND_MSG_NOOP ||
168 *msg_type == MXLND_MSG_PUT_REQ ||
169 *msg_type == MXLND_MSG_PUT_ACK ||
170 *msg_type == MXLND_MSG_PUT_DATA ||
171 *msg_type == MXLND_MSG_GET_REQ ||
172 *msg_type == MXLND_MSG_GET_DATA);
177 mxlnd_get_idle_rx(void)
179 struct list_head *tmp = NULL;
180 struct kmx_ctx *rx = NULL;
182 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
184 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
185 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
189 tmp = &kmxlnd_data.kmx_rx_idle;
190 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
191 list_del_init(&rx->mxc_list);
192 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
195 if (rx->mxc_get != rx->mxc_put) {
196 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
197 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
198 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
199 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
200 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
201 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
202 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
203 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
204 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
205 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
208 LASSERT (rx->mxc_get == rx->mxc_put);
212 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
213 rx->mxc_state = MXLND_CTX_PREP;
219 mxlnd_put_idle_rx(struct kmx_ctx *rx)
222 CDEBUG(D_NETERROR, "called with NULL pointer\n");
224 } else if (rx->mxc_type != MXLND_REQ_RX) {
225 CDEBUG(D_NETERROR, "called with tx\n");
228 LASSERT(rx->mxc_get == rx->mxc_put + 1);
231 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
232 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
233 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
238 mxlnd_reduce_idle_rxs(__u32 count)
241 struct kmx_ctx *rx = NULL;
243 spin_lock(&kmxlnd_data.kmx_rxs_lock);
244 for (i = 0; i < count; i++) {
245 rx = mxlnd_get_idle_rx();
247 struct list_head *tmp = &rx->mxc_global_list;
251 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
255 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
260 mxlnd_get_idle_tx(void)
262 struct list_head *tmp = NULL;
263 struct kmx_ctx *tx = NULL;
265 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
267 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
268 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
269 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
273 tmp = &kmxlnd_data.kmx_tx_idle;
274 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
275 list_del_init(&tx->mxc_list);
277 /* Allocate a new completion cookie. It might not be needed,
278 * but we've got a lock right now and we're unlikely to
280 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
281 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
282 kmxlnd_data.kmx_tx_next_cookie = 1;
284 kmxlnd_data.kmx_tx_used++;
285 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
287 LASSERT (tx->mxc_get == tx->mxc_put);
291 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
292 LASSERT (tx->mxc_lntmsg[0] == NULL);
293 LASSERT (tx->mxc_lntmsg[1] == NULL);
295 tx->mxc_state = MXLND_CTX_PREP;
301 mxlnd_put_idle_tx(struct kmx_ctx *tx)
303 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
305 lnet_msg_t *lntmsg[2];
308 CDEBUG(D_NETERROR, "called with NULL pointer\n");
310 } else if (tx->mxc_type != MXLND_REQ_TX) {
311 CDEBUG(D_NETERROR, "called with rx\n");
314 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
315 tx->mxc_status.code == MX_STATUS_TRUNCATED))
318 lntmsg[0] = tx->mxc_lntmsg[0];
319 lntmsg[1] = tx->mxc_lntmsg[1];
321 LASSERT(tx->mxc_get == tx->mxc_put + 1);
324 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
325 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
326 kmxlnd_data.kmx_tx_used--;
327 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
328 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
329 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
334 * mxlnd_conn_free - free the conn
335 * @conn - a kmx_conn pointer
337 * The calling function should remove the conn from the conns list first
341 mxlnd_conn_free(struct kmx_conn *conn)
343 struct kmx_peer *peer = conn->mxk_peer;
345 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
346 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
347 list_empty (&conn->mxk_tx_free_queue) &&
348 list_empty (&conn->mxk_pending));
349 if (!list_empty(&conn->mxk_list)) {
350 spin_lock(&peer->mxp_lock);
351 list_del_init(&conn->mxk_list);
352 if (peer->mxp_conn == conn) {
353 peer->mxp_conn = NULL;
354 if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
355 mx_set_endpoint_addr_context(conn->mxk_epa,
359 spin_unlock(&peer->mxp_lock);
361 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
362 MXLND_FREE (conn, sizeof (*conn));
368 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
371 struct kmx_ctx *ctx = NULL;
372 struct kmx_ctx *next = NULL;
373 mx_return_t mxret = MX_SUCCESS;
378 spin_lock(&conn->mxk_lock);
379 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
380 /* we will delete all including txs */
381 list_del_init(&ctx->mxc_list);
382 if (ctx->mxc_type == MXLND_REQ_RX) {
384 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
387 if (mxret != MX_SUCCESS) {
388 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
391 ctx->mxc_status.code = -ECONNABORTED;
392 ctx->mxc_state = MXLND_CTX_CANCELED;
393 /* NOTE this calls lnet_finalize() and
394 * we cannot hold any locks when calling it.
395 * It also calls mxlnd_conn_decref(conn) */
396 spin_unlock(&conn->mxk_lock);
397 mxlnd_handle_rx_completion(ctx);
398 spin_lock(&conn->mxk_lock);
403 spin_unlock(&conn->mxk_lock);
411 * mxlnd_conn_disconnect - shutdown a connection
412 * @conn - a kmx_conn pointer
414 * This function sets the status to DISCONNECT, completes queued
415 * txs with failure, calls mx_disconnect, which will complete
416 * pending txs and matched rxs with failure.
419 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
421 struct list_head *tmp = NULL;
423 spin_lock(&conn->mxk_lock);
424 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
425 spin_unlock(&conn->mxk_lock);
428 conn->mxk_status = MXLND_CONN_DISCONNECT;
429 conn->mxk_timeout = 0;
431 while (!list_empty(&conn->mxk_tx_free_queue) ||
432 !list_empty(&conn->mxk_tx_credit_queue)) {
434 struct kmx_ctx *tx = NULL;
436 if (!list_empty(&conn->mxk_tx_free_queue)) {
437 tmp = &conn->mxk_tx_free_queue;
439 tmp = &conn->mxk_tx_credit_queue;
442 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
443 list_del_init(&tx->mxc_list);
444 tx->mxc_status.code = -ECONNABORTED;
445 spin_unlock(&conn->mxk_lock);
446 mxlnd_put_idle_tx(tx);
447 mxlnd_conn_decref(conn); /* for this tx */
448 spin_lock(&conn->mxk_lock);
451 spin_unlock(&conn->mxk_lock);
453 /* cancel pending rxs */
454 mxlnd_conn_cancel_pending_rxs(conn);
456 if (kmxlnd_data.kmx_shutdown != 1) {
458 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
461 time_t last_alive = 0;
462 unsigned long last_msg = 0;
464 /* notify LNET that we are giving up on this peer */
465 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
466 last_msg = conn->mxk_last_rx;
468 last_msg = conn->mxk_last_tx;
470 last_alive = cfs_time_current_sec() -
471 cfs_duration_sec(cfs_time_current() - last_msg);
472 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
475 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
481 * mxlnd_conn_alloc - allocate and initialize a new conn struct
482 * @connp - address of a kmx_conn pointer
483 * @peer - owning kmx_peer
485 * Returns 0 on success and -ENOMEM on failure
488 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
490 struct kmx_conn *conn = NULL;
492 LASSERT(peer != NULL);
494 MXLND_ALLOC(conn, sizeof (*conn));
496 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
499 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
501 memset(conn, 0, sizeof(*conn));
503 /* conn->mxk_incarnation = 0 - will be set by peer */
504 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
505 and one for the caller */
506 conn->mxk_peer = peer;
507 /* mxk_epa - to be set after mx_iconnect() */
508 INIT_LIST_HEAD(&conn->mxk_list);
509 spin_lock_init(&conn->mxk_lock);
510 /* conn->mxk_timeout = 0 */
511 conn->mxk_last_tx = jiffies;
512 conn->mxk_last_rx = conn->mxk_last_tx;
513 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
514 /* mxk_outstanding = 0 */
515 conn->mxk_status = MXLND_CONN_INIT;
516 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
517 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
518 /* conn->mxk_ntx_msgs = 0 */
519 /* conn->mxk_ntx_data = 0 */
520 /* conn->mxk_ntx_posted = 0 */
521 /* conn->mxk_data_posted = 0 */
522 INIT_LIST_HEAD(&conn->mxk_pending);
526 mxlnd_peer_addref(peer); /* add a ref for this conn */
528 /* add to front of peer's conns list */
529 list_add(&conn->mxk_list, &peer->mxp_conns);
530 peer->mxp_conn = conn;
535 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
538 spin_lock(&peer->mxp_lock);
539 ret = mxlnd_conn_alloc_locked(connp, peer);
540 spin_unlock(&peer->mxp_lock);
545 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
548 struct kmx_conn *conn = ctx->mxc_conn;
550 ctx->mxc_state = MXLND_CTX_PENDING;
552 spin_lock(&conn->mxk_lock);
553 if (conn->mxk_status >= MXLND_CONN_INIT) {
554 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
555 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
556 conn->mxk_timeout = ctx->mxc_deadline;
559 ctx->mxc_state = MXLND_CTX_COMPLETED;
562 spin_unlock(&conn->mxk_lock);
568 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
570 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
571 ctx->mxc_state == MXLND_CTX_COMPLETED);
572 if (ctx->mxc_state != MXLND_CTX_PENDING &&
573 ctx->mxc_state != MXLND_CTX_COMPLETED) {
574 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
575 mxlnd_ctxstate_to_str(ctx->mxc_state));
577 ctx->mxc_state = MXLND_CTX_COMPLETED;
578 if (!list_empty(&ctx->mxc_list)) {
579 struct kmx_conn *conn = ctx->mxc_conn;
580 struct kmx_ctx *next = NULL;
581 LASSERT(conn != NULL);
582 spin_lock(&conn->mxk_lock);
583 list_del_init(&ctx->mxc_list);
584 conn->mxk_timeout = 0;
585 if (!list_empty(&conn->mxk_pending)) {
586 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
587 conn->mxk_timeout = next->mxc_deadline;
589 spin_unlock(&conn->mxk_lock);
595 * mxlnd_peer_free - free the peer
596 * @peer - a kmx_peer pointer
598 * The calling function should decrement the rxs, drain the tx queues and
599 * remove the peer from the peers list first then destroy it.
602 mxlnd_peer_free(struct kmx_peer *peer)
604 CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
606 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
608 if (peer->mxp_host != NULL) {
609 spin_lock(&peer->mxp_host->mxh_lock);
610 peer->mxp_host->mxh_peer = NULL;
611 spin_unlock(&peer->mxp_host->mxh_lock);
613 if (!list_empty(&peer->mxp_peers)) {
614 /* assume we are locked */
615 list_del_init(&peer->mxp_peers);
618 MXLND_FREE (peer, sizeof (*peer));
619 atomic_dec(&kmxlnd_data.kmx_npeers);
624 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
627 char name[MX_MAX_HOSTNAME_LEN + 1];
628 mx_return_t mxret = MX_SUCCESS;
630 memset(name, 0, sizeof(name));
631 snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
632 mxret = mx_hostname_to_nic_id(name, &nic_id);
633 if (mxret == MX_SUCCESS) {
634 peer->mxp_nic_id = nic_id;
636 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
637 "with %s\n", name, mx_strerror(mxret));
638 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
639 if (mxret == MX_SUCCESS) {
640 peer->mxp_nic_id = nic_id;
642 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
643 "with %s\n", peer->mxp_host->mxh_hostname,
651 * mxlnd_peer_alloc - allocate and initialize a new peer struct
652 * @peerp - address of a kmx_peer pointer
653 * @nid - LNET node id
655 * Returns 0 on success and -ENOMEM on failure
658 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
662 u32 addr = LNET_NIDADDR(nid);
663 struct kmx_peer *peer = NULL;
664 struct kmx_host *host = NULL;
666 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
668 MXLND_ALLOC(peer, sizeof (*peer));
670 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
673 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
675 memset(peer, 0, sizeof(*peer));
677 list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
678 if (addr == host->mxh_addr) {
679 peer->mxp_host = host;
680 spin_lock(&host->mxh_lock);
681 host->mxh_peer = peer;
682 spin_unlock(&host->mxh_lock);
686 if (peer->mxp_host == NULL) {
687 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
688 MXLND_FREE(peer, sizeof(*peer));
693 /* peer->mxp_incarnation */
694 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
695 mxlnd_peer_hostname_to_nic_id(peer);
697 INIT_LIST_HEAD(&peer->mxp_peers);
698 spin_lock_init(&peer->mxp_lock);
699 INIT_LIST_HEAD(&peer->mxp_conns);
700 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
702 mxlnd_peer_decref(peer);
706 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
707 struct kmx_ctx *rx = NULL;
708 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
710 mxlnd_reduce_idle_rxs(i);
711 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
712 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
713 mxlnd_peer_decref(peer);
716 spin_lock(&kmxlnd_data.kmx_rxs_lock);
717 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
718 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
720 mxlnd_put_idle_rx(rx);
722 /* peer->mxp_reconnect_time = 0 */
723 /* peer->mxp_incompatible = 0 */
730 * mxlnd_nid_to_hash - hash the nid
733 * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
736 mxlnd_nid_to_hash(lnet_nid_t nid)
738 return (nid & MXLND_HASH_MASK) ^
739 ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
742 static inline struct kmx_peer *
743 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
747 struct kmx_peer *peer = NULL;
749 hash = mxlnd_nid_to_hash(nid);
751 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
752 if (peer->mxp_nid == nid) {
754 mxlnd_peer_addref(peer);
758 return (found ? peer : NULL);
761 static inline struct kmx_peer *
762 mxlnd_find_peer_by_nid(lnet_nid_t nid)
764 struct kmx_peer *peer = NULL;
766 read_lock(&kmxlnd_data.kmx_peers_lock);
767 peer = mxlnd_find_peer_by_nid_locked(nid);
768 read_unlock(&kmxlnd_data.kmx_peers_lock);
773 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
775 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
776 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
777 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
778 tx->mxc_msg_type == MXLND_MSG_NOOP);
782 * mxlnd_init_msg - set type and number of bytes
785 * @body_nob - bytes in msg body
788 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
790 msg->mxm_type = type;
791 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
795 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
797 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
798 struct kmx_msg *msg = NULL;
800 LASSERT (tx != NULL);
801 LASSERT (nob <= MXLND_EAGER_SIZE);
804 /* tx->mxc_peer should have already been set if we know it */
805 tx->mxc_msg_type = type;
807 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
808 tx->mxc_seg.segment_length = nob;
809 tx->mxc_pin_type = MX_PIN_PHYSICAL;
810 //tx->mxc_state = MXLND_CTX_PENDING;
813 msg->mxm_type = type;
820 mxlnd_cksum (void *ptr, int nob)
826 sum = ((sum << 1) | (sum >> 31)) + *c++;
828 /* ensure I don't return 0 (== no checksum) */
829 return (sum == 0) ? 1 : sum;
833 * mxlnd_pack_msg - complete msg info
837 mxlnd_pack_msg(struct kmx_ctx *tx)
839 struct kmx_msg *msg = tx->mxc_msg;
841 /* type and nob should already be set in init_msg() */
842 msg->mxm_magic = MXLND_MSG_MAGIC;
843 msg->mxm_version = MXLND_MSG_VERSION;
845 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
846 * return credits as well */
847 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
848 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
849 spin_lock(&tx->mxc_conn->mxk_lock);
850 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
851 tx->mxc_conn->mxk_outstanding = 0;
852 spin_unlock(&tx->mxc_conn->mxk_lock);
854 msg->mxm_credits = 0;
858 msg->mxm_srcnid = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
859 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
860 msg->mxm_dstnid = tx->mxc_nid;
861 /* if it is a new peer, the dststamp will be 0 */
862 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
863 msg->mxm_seq = tx->mxc_cookie;
865 if (*kmxlnd_tunables.kmx_cksum) {
866 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
871 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
873 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
878 /* 6 bytes are enough to have received magic + version */
880 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
884 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
886 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
889 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
893 if (msg->mxm_version !=
894 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
895 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
899 if (nob < hdr_size) {
900 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
904 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
906 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
910 /* checksum must be computed with mxm_cksum zero and BEFORE anything
912 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
914 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
915 CDEBUG(D_NETERROR, "Bad checksum\n");
918 msg->mxm_cksum = msg_cksum;
921 /* leave magic unflipped as a clue to peer endianness */
922 __swab16s(&msg->mxm_version);
923 CLASSERT (sizeof(msg->mxm_type) == 1);
924 CLASSERT (sizeof(msg->mxm_credits) == 1);
925 msg->mxm_nob = msg_nob;
926 __swab64s(&msg->mxm_srcnid);
927 __swab64s(&msg->mxm_srcstamp);
928 __swab64s(&msg->mxm_dstnid);
929 __swab64s(&msg->mxm_dststamp);
930 __swab64s(&msg->mxm_seq);
933 if (msg->mxm_srcnid == LNET_NID_ANY) {
934 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
938 switch (msg->mxm_type) {
940 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
946 case MXLND_MSG_EAGER:
947 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
948 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
949 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
954 case MXLND_MSG_PUT_REQ:
955 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
956 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
957 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
961 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
964 case MXLND_MSG_PUT_ACK:
965 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
966 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
967 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
971 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
972 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
976 case MXLND_MSG_GET_REQ:
977 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
978 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
979 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
983 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
987 case MXLND_MSG_CONN_REQ:
988 case MXLND_MSG_CONN_ACK:
989 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
990 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
991 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
995 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
996 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1005 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1009 * @length - length of incoming message
1010 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1012 * The caller gets the rx and sets nid, peer and conn if known.
1014 * Returns 0 on success and -1 on failure
1017 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1020 mx_return_t mxret = MX_SUCCESS;
1021 uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
1023 rx->mxc_msg_type = msg_type;
1024 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1025 rx->mxc_cookie = cookie;
1026 /* rx->mxc_match may already be set */
1027 /* rx->mxc_seg.segment_ptr is already set */
1028 rx->mxc_seg.segment_length = length;
1029 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1030 ret = mxlnd_q_pending_ctx(rx);
1032 /* the caller is responsible for calling conn_decref() if needed */
1035 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1036 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1037 if (mxret != MX_SUCCESS) {
1038 mxlnd_deq_pending_ctx(rx);
1039 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1040 mx_strerror(mxret), (int) mxret);
1048 * mxlnd_unexpected_recv - this is the callback function that will handle
1049 * unexpected receives
1050 * @context - NULL, ignore
1051 * @source - the peer's mx_endpoint_addr_t
1052 * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1053 * @length - length of incoming message
1054 * @data_if_available - ignore
1056 * If it is an eager-sized msg, we will call recv_msg() with the actual
1057 * length. If it is a large message, we will call recv_msg() with a
1058 * length of 0 bytes to drop it because we should never have a large,
1059 * unexpected message.
1061 * NOTE - The MX library blocks until this function completes. Make it as fast as
1062 * possible. DO NOT allocate memory which can block!
1064 * If we cannot get a rx or the conn is closed, drop the message on the floor
1065 * (i.e. recv 0 bytes and ignore).
1067 mx_unexp_handler_action_t
1068 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1069 uint64_t match_value, uint32_t length, void *data_if_available)
1072 struct kmx_ctx *rx = NULL;
1078 if (context != NULL) {
1079 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1083 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1086 rx = mxlnd_get_idle_rx();
1088 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1089 if (length <= MXLND_EAGER_SIZE) {
1090 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1092 CDEBUG(D_NETERROR, "unexpected large receive with "
1093 "match_value=0x%llx length=%d\n",
1094 match_value, length);
1095 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1099 struct kmx_peer *peer = NULL;
1100 struct kmx_conn *conn = NULL;
1102 /* NOTE to avoid a peer disappearing out from under us,
1103 * read lock the peers lock first */
1104 read_lock(&kmxlnd_data.kmx_peers_lock);
1105 mx_get_endpoint_addr_context(source, (void **) &peer);
1107 mxlnd_peer_addref(peer); /* add a ref... */
1108 spin_lock(&peer->mxp_lock);
1109 conn = peer->mxp_conn;
1111 mxlnd_conn_addref(conn); /* add ref until rx completed */
1112 mxlnd_peer_decref(peer); /* and drop peer ref */
1113 rx->mxc_conn = conn;
1115 spin_unlock(&peer->mxp_lock);
1116 rx->mxc_peer = peer;
1117 rx->mxc_nid = peer->mxp_nid;
1119 read_unlock(&kmxlnd_data.kmx_peers_lock);
1121 CDEBUG(D_NETERROR, "could not post receive\n");
1122 mxlnd_put_idle_rx(rx);
1126 if (rx == NULL || ret != 0) {
1128 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1131 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1133 seg.segment_ptr = 0LL;
1134 seg.segment_length = 0;
1135 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1136 match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1139 return MX_RECV_CONTINUE;
1144 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1148 struct kmx_peer *peer = NULL;
1150 read_lock(&kmxlnd_data.kmx_peers_lock);
1151 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1152 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1156 *nidp = peer->mxp_nid;
1157 *count = atomic_read(&peer->mxp_refcount);
1162 read_unlock(&kmxlnd_data.kmx_peers_lock);
1168 mxlnd_del_peer_locked(struct kmx_peer *peer)
1170 list_del_init(&peer->mxp_peers); /* remove from the global list */
1171 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1172 mxlnd_peer_decref(peer); /* drop global list ref */
1177 mxlnd_del_peer(lnet_nid_t nid)
1181 struct kmx_peer *peer = NULL;
1182 struct kmx_peer *next = NULL;
1184 if (nid != LNET_NID_ANY) {
1185 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1187 write_lock(&kmxlnd_data.kmx_peers_lock);
1188 if (nid != LNET_NID_ANY) {
1192 mxlnd_peer_decref(peer); /* and drops it */
1193 mxlnd_del_peer_locked(peer);
1195 } else { /* LNET_NID_ANY */
1196 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1197 list_for_each_entry_safe(peer, next,
1198 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1199 mxlnd_del_peer_locked(peer);
1203 write_unlock(&kmxlnd_data.kmx_peers_lock);
1209 mxlnd_get_conn_by_idx(int index)
1212 struct kmx_peer *peer = NULL;
1213 struct kmx_conn *conn = NULL;
1215 read_lock(&kmxlnd_data.kmx_peers_lock);
1216 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1217 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1218 spin_lock(&peer->mxp_lock);
1219 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1224 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1225 spin_unlock(&peer->mxp_lock);
1226 read_unlock(&kmxlnd_data.kmx_peers_lock);
1229 spin_unlock(&peer->mxp_lock);
1232 read_unlock(&kmxlnd_data.kmx_peers_lock);
1238 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1240 struct kmx_conn *conn = NULL;
1241 struct kmx_conn *next = NULL;
1243 spin_lock(&peer->mxp_lock);
1244 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1245 mxlnd_conn_disconnect(conn, 0 , 0);
1247 spin_unlock(&peer->mxp_lock);
1252 mxlnd_close_matching_conns(lnet_nid_t nid)
1256 struct kmx_peer *peer = NULL;
1258 read_lock(&kmxlnd_data.kmx_peers_lock);
1259 if (nid != LNET_NID_ANY) {
1260 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1264 mxlnd_close_matching_conns_locked(peer);
1265 mxlnd_peer_decref(peer); /* and drops it here */
1267 } else { /* LNET_NID_ANY */
1268 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1269 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1270 mxlnd_close_matching_conns_locked(peer);
1273 read_unlock(&kmxlnd_data.kmx_peers_lock);
1279 * mxlnd_ctl - modify MXLND parameters
1280 * @ni - LNET interface handle
1281 * @cmd - command to change
1282 * @arg - the ioctl data
1284 * Not implemented yet.
1287 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1289 struct libcfs_ioctl_data *data = arg;
1292 LASSERT (ni == kmxlnd_data.kmx_ni);
1295 case IOC_LIBCFS_GET_PEER: {
1299 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1300 data->ioc_nid = nid;
1301 data->ioc_count = count;
1304 case IOC_LIBCFS_DEL_PEER: {
1305 ret = mxlnd_del_peer(data->ioc_nid);
1308 case IOC_LIBCFS_GET_CONN: {
1309 struct kmx_conn *conn = NULL;
1311 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1316 data->ioc_nid = conn->mxk_peer->mxp_nid;
1317 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1321 case IOC_LIBCFS_CLOSE_CONNECTION: {
1322 ret = mxlnd_close_matching_conns(data->ioc_nid);
1326 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1334 * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1337 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1340 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1342 u8 msg_type = tx->mxc_msg_type;
1343 //struct kmx_peer *peer = tx->mxc_peer;
1344 struct kmx_conn *conn = tx->mxc_conn;
1346 LASSERT (msg_type != 0);
1347 LASSERT (tx->mxc_nid != 0);
1348 LASSERT (tx->mxc_peer != NULL);
1349 LASSERT (tx->mxc_conn != NULL);
1351 tx->mxc_incarnation = conn->mxk_incarnation;
1353 if (msg_type != MXLND_MSG_PUT_DATA &&
1354 msg_type != MXLND_MSG_GET_DATA) {
1356 if (mxlnd_tx_requires_credit(tx)) {
1357 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1358 conn->mxk_ntx_msgs++;
1359 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1360 msg_type == MXLND_MSG_CONN_ACK) {
1361 /* put conn msgs at the front of the queue */
1362 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1364 /* PUT_ACK, PUT_NAK */
1365 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1366 conn->mxk_ntx_msgs++;
1370 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1371 conn->mxk_ntx_data++;
1378 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1381 * Add the tx to the peer's msg or data queue
1384 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1386 LASSERT(tx->mxc_peer != NULL);
1387 LASSERT(tx->mxc_conn != NULL);
1388 spin_lock(&tx->mxc_conn->mxk_lock);
1389 mxlnd_peer_queue_tx_locked(tx);
1390 spin_unlock(&tx->mxc_conn->mxk_lock);
1396 * mxlnd_queue_tx - add the tx to the global tx queue
1399 * Add the tx to the global queue and up the tx_queue_sem
1402 mxlnd_queue_tx(struct kmx_ctx *tx)
1404 struct kmx_peer *peer = tx->mxc_peer;
1405 LASSERT (tx->mxc_nid != 0);
1408 if (peer->mxp_incompatible &&
1409 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1410 /* let this fail now */
1411 tx->mxc_status.code = -ECONNABORTED;
1412 mxlnd_conn_decref(peer->mxp_conn);
1413 mxlnd_put_idle_tx(tx);
1416 if (tx->mxc_conn == NULL) {
1418 struct kmx_conn *conn = NULL;
1420 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1422 tx->mxc_status.code = ret;
1423 mxlnd_put_idle_tx(tx);
1426 tx->mxc_conn = conn;
1427 mxlnd_peer_decref(peer); /* and takes it from peer */
1429 LASSERT(tx->mxc_conn != NULL);
1430 mxlnd_peer_queue_tx(tx);
1431 mxlnd_check_sends(peer);
1433 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1434 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1435 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1436 up(&kmxlnd_data.kmx_tx_queue_sem);
1443 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1450 int first_iov_offset = 0;
1451 int first_found = 0;
1453 int last_iov_length = 0;
1454 mx_ksegment_t *seg = NULL;
1456 if (niov == 0) return 0;
1457 LASSERT(iov != NULL);
1459 for (i = 0; i < niov; i++) {
1460 sum = old_sum + (u32) iov[i].iov_len;
1461 if (!first_found && (sum > offset)) {
1463 first_iov_offset = offset - old_sum;
1465 sum = (u32) iov[i].iov_len - first_iov_offset;
1470 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1471 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1476 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1477 nseg = last_iov - first_iov + 1;
1480 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1482 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1485 memset(seg, 0, nseg * sizeof(*seg));
1486 ctx->mxc_nseg = nseg;
1488 for (i = 0; i < nseg; i++) {
1489 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1490 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1492 seg[i].segment_ptr += (u64) first_iov_offset;
1493 seg[i].segment_length -= (u32) first_iov_offset;
1495 if (i == (nseg - 1)) {
1496 seg[i].segment_length = (u32) last_iov_length;
1498 sum += seg[i].segment_length;
1500 ctx->mxc_seg_list = seg;
1501 ctx->mxc_pin_type = MX_PIN_KERNEL;
1502 #ifdef MX_PIN_FULLPAGES
1503 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1505 LASSERT(nob == sum);
1510 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1516 int first_kiov = -1;
1517 int first_kiov_offset = 0;
1518 int first_found = 0;
1520 int last_kiov_length = 0;
1521 mx_ksegment_t *seg = NULL;
1523 if (niov == 0) return 0;
1524 LASSERT(kiov != NULL);
1526 for (i = 0; i < niov; i++) {
1527 sum = old_sum + kiov[i].kiov_len;
1528 if (i == 0) sum -= kiov[i].kiov_offset;
1529 if (!first_found && (sum > offset)) {
1531 first_kiov_offset = offset - old_sum;
1532 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1533 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1535 sum = kiov[i].kiov_len - first_kiov_offset;
1540 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1541 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1546 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1547 nseg = last_kiov - first_kiov + 1;
1550 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1552 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1555 memset(seg, 0, niov * sizeof(*seg));
1556 ctx->mxc_nseg = niov;
1558 for (i = 0; i < niov; i++) {
1559 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1560 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1562 seg[i].segment_ptr += (u64) first_kiov_offset;
1563 /* we have to add back the original kiov_offset */
1564 seg[i].segment_length -= first_kiov_offset +
1565 kiov[first_kiov].kiov_offset;
1567 if (i == (nseg - 1)) {
1568 seg[i].segment_length = last_kiov_length;
1570 sum += seg[i].segment_length;
1572 ctx->mxc_seg_list = seg;
1573 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1574 #ifdef MX_PIN_FULLPAGES
1575 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1577 LASSERT(nob == sum);
1582 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1584 LASSERT(type == MXLND_MSG_PUT_ACK);
1585 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1586 tx->mxc_cookie = cookie;
1587 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1588 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1589 tx->mxc_match = mxlnd_create_match(tx, status);
1596 * mxlnd_send_data - get tx, map [k]iov, queue tx
1603 * This setups the DATA send for PUT or GET.
1605 * On success, it queues the tx, on failure it calls lnet_finalize()
1608 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1611 lnet_process_id_t target = lntmsg->msg_target;
1612 unsigned int niov = lntmsg->msg_niov;
1613 struct iovec *iov = lntmsg->msg_iov;
1614 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1615 unsigned int offset = lntmsg->msg_offset;
1616 unsigned int nob = lntmsg->msg_len;
1617 struct kmx_ctx *tx = NULL;
1619 LASSERT(lntmsg != NULL);
1620 LASSERT(peer != NULL);
1621 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1622 LASSERT((cookie>>52) == 0);
1624 tx = mxlnd_get_idle_tx();
1626 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1627 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1628 libcfs_nid2str(target.nid));
1631 tx->mxc_nid = target.nid;
1632 /* NOTE called when we have a ref on the conn, get one for this tx */
1633 mxlnd_conn_addref(peer->mxp_conn);
1634 tx->mxc_peer = peer;
1635 tx->mxc_conn = peer->mxp_conn;
1636 tx->mxc_msg_type = msg_type;
1637 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1638 tx->mxc_state = MXLND_CTX_PENDING;
1639 tx->mxc_lntmsg[0] = lntmsg;
1640 tx->mxc_cookie = cookie;
1641 tx->mxc_match = mxlnd_create_match(tx, 0);
1643 /* This setups up the mx_ksegment_t to send the DATA payload */
1645 /* do not setup the segments */
1646 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1647 "to %s?\n", libcfs_nid2str(target.nid));
1649 } else if (kiov == NULL) {
1650 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1652 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1655 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1656 libcfs_nid2str(target.nid));
1657 tx->mxc_status.code = -EIO;
1664 mxlnd_conn_decref(peer->mxp_conn);
1665 mxlnd_put_idle_tx(tx);
1669 CDEBUG(D_NETERROR, "no tx avail\n");
1670 lnet_finalize(ni, lntmsg, -EIO);
1675 * mxlnd_recv_data - map [k]iov, post rx
1682 * This setups the DATA receive for PUT or GET.
1684 * On success, it returns 0, on failure it returns -1
1687 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1690 lnet_process_id_t target = lntmsg->msg_target;
1691 unsigned int niov = lntmsg->msg_niov;
1692 struct iovec *iov = lntmsg->msg_iov;
1693 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1694 unsigned int offset = lntmsg->msg_offset;
1695 unsigned int nob = lntmsg->msg_len;
1696 mx_return_t mxret = MX_SUCCESS;
1698 /* above assumes MXLND_MSG_PUT_DATA */
1699 if (msg_type == MXLND_MSG_GET_DATA) {
1700 niov = lntmsg->msg_md->md_niov;
1701 iov = lntmsg->msg_md->md_iov.iov;
1702 kiov = lntmsg->msg_md->md_iov.kiov;
1704 nob = lntmsg->msg_md->md_length;
1707 LASSERT(lntmsg != NULL);
1708 LASSERT(rx != NULL);
1709 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1710 LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1712 rx->mxc_msg_type = msg_type;
1713 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1714 rx->mxc_state = MXLND_CTX_PENDING;
1715 rx->mxc_nid = target.nid;
1716 /* if posting a GET_DATA, we may not yet know the peer */
1717 if (rx->mxc_peer != NULL) {
1718 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1720 rx->mxc_lntmsg[0] = lntmsg;
1721 rx->mxc_cookie = cookie;
1722 rx->mxc_match = mxlnd_create_match(rx, 0);
1723 /* This setups up the mx_ksegment_t to receive the DATA payload */
1725 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1727 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1729 if (msg_type == MXLND_MSG_GET_DATA) {
1730 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1731 if (rx->mxc_lntmsg[1] == NULL) {
1732 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1733 libcfs_nid2str(target.nid));
1738 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1739 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1740 libcfs_nid2str(target.nid));
1743 ret = mxlnd_q_pending_ctx(rx);
1747 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1748 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1749 rx->mxc_seg_list, rx->mxc_nseg,
1750 rx->mxc_pin_type, rx->mxc_match,
1751 0xF00FFFFFFFFFFFFFLL, (void *) rx,
1753 if (mxret != MX_SUCCESS) {
1754 if (rx->mxc_conn != NULL) {
1755 mxlnd_deq_pending_ctx(rx);
1757 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1758 (int) mxret, libcfs_nid2str(target.nid));
1766 * mxlnd_send - the LND required send function
1771 * This must not block. Since we may not have a peer struct for the receiver,
1772 * it will append send messages on a global tx list. We will then up the
1773 * tx_queued's semaphore to notify it of the new send.
1776 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1779 int type = lntmsg->msg_type;
1780 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1781 lnet_process_id_t target = lntmsg->msg_target;
1782 lnet_nid_t nid = target.nid;
1783 int target_is_router = lntmsg->msg_target_is_router;
1784 int routing = lntmsg->msg_routing;
1785 unsigned int payload_niov = lntmsg->msg_niov;
1786 struct iovec *payload_iov = lntmsg->msg_iov;
1787 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1788 unsigned int payload_offset = lntmsg->msg_offset;
1789 unsigned int payload_nob = lntmsg->msg_len;
1790 struct kmx_ctx *tx = NULL;
1791 struct kmx_msg *txmsg = NULL;
1792 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1793 struct kmx_ctx *rx_data = NULL;
1794 struct kmx_conn *conn = NULL;
1796 uint32_t length = 0;
1797 struct kmx_peer *peer = NULL;
1799 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1800 payload_nob, payload_niov, libcfs_id2str(target));
1802 LASSERT (payload_nob == 0 || payload_niov > 0);
1803 LASSERT (payload_niov <= LNET_MAX_IOV);
1804 /* payload is either all vaddrs or all pages */
1805 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1807 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1809 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1810 * to a new peer, use the nid */
1811 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1813 if (unlikely(peer->mxp_incompatible)) {
1814 mxlnd_peer_decref(peer); /* drop ref taken above */
1816 spin_lock(&peer->mxp_lock);
1817 conn = peer->mxp_conn;
1819 mxlnd_conn_addref(conn);
1820 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1822 spin_unlock(&peer->mxp_lock);
1825 if (conn == NULL && peer != NULL) {
1826 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1827 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1832 LASSERT (payload_nob == 0);
1835 case LNET_MSG_REPLY:
1837 /* Is the payload small enough not to need DATA? */
1838 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1839 if (nob <= MXLND_EAGER_SIZE)
1840 break; /* send EAGER */
1842 tx = mxlnd_get_idle_tx();
1843 if (unlikely(tx == NULL)) {
1844 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1845 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1846 libcfs_nid2str(nid));
1847 if (conn) mxlnd_conn_decref(conn);
1851 /* the peer may be NULL */
1852 tx->mxc_peer = peer;
1853 tx->mxc_conn = conn; /* may be NULL */
1854 /* we added a conn ref above */
1855 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1856 txmsg = tx->mxc_msg;
1857 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1858 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1859 tx->mxc_match = mxlnd_create_match(tx, 0);
1861 /* we must post a receive _before_ sending the request.
1862 * we need to determine how much to receive, it will be either
1863 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1865 rx = mxlnd_get_idle_rx();
1866 if (unlikely(rx == NULL)) {
1867 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1868 libcfs_nid2str(nid));
1869 mxlnd_put_idle_tx(tx);
1870 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1874 rx->mxc_peer = peer;
1875 /* conn may be NULL but unlikely since the first msg is always small */
1876 /* NOTE no need to lock peer before adding conn ref since we took
1877 * a conn ref for the tx (it cannot be freed between there and here ) */
1878 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1879 rx->mxc_conn = conn;
1880 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1881 rx->mxc_cookie = tx->mxc_cookie;
1882 rx->mxc_match = mxlnd_create_match(rx, 0);
1884 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1885 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1886 if (unlikely(ret != 0)) {
1887 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1888 libcfs_nid2str(nid));
1889 rx->mxc_lntmsg[0] = NULL;
1890 mxlnd_put_idle_rx(rx);
1891 mxlnd_put_idle_tx(tx);
1893 mxlnd_conn_decref(conn); /* for the rx... */
1894 mxlnd_conn_decref(conn); /* and for the tx */
1896 return -EHOSTUNREACH;
1903 if (routing || target_is_router)
1904 break; /* send EAGER */
1906 /* is the REPLY message too small for DATA? */
1907 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1908 if (nob <= MXLND_EAGER_SIZE)
1909 break; /* send EAGER */
1911 /* get tx (we need the cookie) , post rx for incoming DATA,
1912 * then post GET_REQ tx */
1913 tx = mxlnd_get_idle_tx();
1914 if (unlikely(tx == NULL)) {
1915 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1916 libcfs_nid2str(nid));
1917 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1920 rx_data = mxlnd_get_idle_rx();
1921 if (unlikely(rx_data == NULL)) {
1922 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1923 libcfs_nid2str(nid));
1924 mxlnd_put_idle_tx(tx);
1925 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1928 rx_data->mxc_peer = peer;
1929 /* NOTE no need to lock peer before adding conn ref since we took
1930 * a conn ref for the tx (it cannot be freed between there and here ) */
1931 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1932 rx_data->mxc_conn = conn; /* may be NULL */
1934 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1935 if (unlikely(ret != 0)) {
1936 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1937 libcfs_nid2str(nid));
1938 mxlnd_put_idle_rx(rx_data);
1939 mxlnd_put_idle_tx(tx);
1941 mxlnd_conn_decref(conn); /* for the rx_data... */
1942 mxlnd_conn_decref(conn); /* and for the tx */
1947 tx->mxc_peer = peer;
1948 tx->mxc_conn = conn; /* may be NULL */
1949 /* conn ref taken above */
1950 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1951 txmsg = tx->mxc_msg;
1952 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1953 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1954 tx->mxc_match = mxlnd_create_match(tx, 0);
1961 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1967 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1968 <= MXLND_EAGER_SIZE);
1970 tx = mxlnd_get_idle_tx();
1971 if (unlikely(tx == NULL)) {
1972 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1973 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1974 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1978 tx->mxc_peer = peer;
1979 tx->mxc_conn = conn; /* may be NULL */
1980 /* conn ref taken above */
1981 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1982 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1983 tx->mxc_match = mxlnd_create_match(tx, 0);
1985 txmsg = tx->mxc_msg;
1986 txmsg->mxm_u.eager.mxem_hdr = *hdr;
1988 if (payload_kiov != NULL)
1989 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1990 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1991 payload_niov, payload_kiov, payload_offset, payload_nob);
1993 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1994 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1995 payload_niov, payload_iov, payload_offset, payload_nob);
1997 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2003 * mxlnd_recv - the LND required recv function
2014 * This must not block.
2017 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2018 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2019 unsigned int offset, unsigned int mlen, unsigned int rlen)
2024 struct kmx_ctx *rx = private;
2025 struct kmx_msg *rxmsg = rx->mxc_msg;
2026 lnet_nid_t nid = rx->mxc_nid;
2027 struct kmx_ctx *tx = NULL;
2028 struct kmx_msg *txmsg = NULL;
2029 struct kmx_peer *peer = rx->mxc_peer;
2030 struct kmx_conn *conn = peer->mxp_conn;
2032 int msg_type = rxmsg->mxm_type;
2037 LASSERT (mlen <= rlen);
2038 /* Either all pages or all vaddrs */
2039 LASSERT (!(kiov != NULL && iov != NULL));
2040 LASSERT (peer != NULL);
2042 /* conn_addref(conn) already taken for the primary rx */
2045 case MXLND_MSG_EAGER:
2046 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2047 len = rx->mxc_status.xfer_length;
2048 if (unlikely(nob > len)) {
2049 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2050 libcfs_nid2str(nid), nob, len);
2056 lnet_copy_flat2kiov(niov, kiov, offset,
2057 MXLND_EAGER_SIZE, rxmsg,
2058 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2061 lnet_copy_flat2iov(niov, iov, offset,
2062 MXLND_EAGER_SIZE, rxmsg,
2063 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2069 case MXLND_MSG_PUT_REQ:
2070 /* we are going to reuse the rx, store the needed info */
2071 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2073 /* get tx, post rx, send PUT_ACK */
2075 tx = mxlnd_get_idle_tx();
2076 if (unlikely(tx == NULL)) {
2077 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2078 /* Not replying will break the connection */
2082 if (unlikely(mlen == 0)) {
2084 tx->mxc_peer = peer;
2085 tx->mxc_conn = conn;
2086 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2091 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2092 tx->mxc_peer = peer;
2093 tx->mxc_conn = conn;
2094 /* no need to lock peer first since we already have a ref */
2095 mxlnd_conn_addref(conn); /* for the tx */
2096 txmsg = tx->mxc_msg;
2097 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2098 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2099 tx->mxc_cookie = cookie;
2100 tx->mxc_match = mxlnd_create_match(tx, 0);
2102 /* we must post a receive _before_ sending the PUT_ACK */
2104 rx->mxc_state = MXLND_CTX_PREP;
2105 rx->mxc_peer = peer;
2106 rx->mxc_conn = conn;
2107 /* do not take another ref for this rx, it is already taken */
2108 rx->mxc_nid = peer->mxp_nid;
2109 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2110 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2112 if (unlikely(ret != 0)) {
2113 /* Notify peer that it's over */
2114 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2115 libcfs_nid2str(nid), ret);
2117 tx->mxc_state = MXLND_CTX_PREP;
2118 tx->mxc_peer = peer;
2119 tx->mxc_conn = conn;
2120 /* finalize = 0, let the PUT_ACK tx finalize this */
2121 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2122 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2123 /* conn ref already taken above */
2124 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2130 /* do not return a credit until after PUT_DATA returns */
2134 case MXLND_MSG_GET_REQ:
2135 if (likely(lntmsg != NULL)) {
2136 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2137 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2139 /* GET didn't match anything */
2140 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2141 * We have to embed the error code in the match bits.
2142 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2143 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2145 tx = mxlnd_get_idle_tx();
2146 if (unlikely(tx == NULL)) {
2147 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2148 libcfs_nid2str(nid));
2152 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2153 tx->mxc_state = MXLND_CTX_PENDING;
2155 tx->mxc_peer = peer;
2156 tx->mxc_conn = conn;
2157 /* no need to lock peer first since we already have a ref */
2158 mxlnd_conn_addref(conn); /* for this tx */
2159 tx->mxc_cookie = cookie;
2160 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2161 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2164 /* finalize lntmsg after tx completes */
2172 /* we received a message, increment peer's outstanding credits */
2174 spin_lock(&conn->mxk_lock);
2175 conn->mxk_outstanding++;
2176 spin_unlock(&conn->mxk_lock);
2178 /* we are done with the rx */
2179 mxlnd_put_idle_rx(rx);
2180 mxlnd_conn_decref(conn);
2183 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2185 /* we received a credit, see if we can use it to send a msg */
2186 if (credit) mxlnd_check_sends(peer);
2192 mxlnd_sleep(unsigned long timeout)
2194 set_current_state(TASK_INTERRUPTIBLE);
2195 schedule_timeout(timeout);
2200 * mxlnd_tx_queued - the generic send queue thread
2201 * @arg - thread id (as a void *)
2203 * This thread moves send messages from the global tx_queue to the owning
2204 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2205 * it to the global peer list.
2208 mxlnd_tx_queued(void *arg)
2210 long id = (long) arg;
2213 struct kmx_ctx *tx = NULL;
2214 struct kmx_peer *peer = NULL;
2215 struct list_head *tmp_tx = NULL;
2217 cfs_daemonize("mxlnd_tx_queued");
2218 //cfs_block_allsigs();
2220 while (!kmxlnd_data.kmx_shutdown) {
2221 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2222 if (kmxlnd_data.kmx_shutdown)
2224 if (ret != 0) // Should we check for -EINTR?
2226 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2227 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2228 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2231 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2232 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2233 list_del_init(&tx->mxc_list);
2234 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2237 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2239 tx->mxc_peer = peer;
2240 spin_lock(&peer->mxp_lock);
2241 if (peer->mxp_conn == NULL) {
2242 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2244 /* out of memory, give up and fail tx */
2245 tx->mxc_status.code = -ENOMEM;
2246 spin_unlock(&peer->mxp_lock);
2247 mxlnd_peer_decref(peer);
2248 mxlnd_put_idle_tx(tx);
2252 tx->mxc_conn = peer->mxp_conn;
2253 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2254 spin_unlock(&peer->mxp_lock);
2255 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2261 struct kmx_peer *peer = NULL;
2262 struct kmx_peer *old = NULL;
2264 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2266 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2267 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2269 /* adds conn ref for this function */
2270 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2272 /* finalize message */
2273 tx->mxc_status.code = ret;
2274 mxlnd_put_idle_tx(tx);
2277 tx->mxc_peer = peer;
2278 tx->mxc_conn = peer->mxp_conn;
2279 /* this tx will keep the conn ref taken in peer_alloc() */
2281 /* add peer to global peer list, but look to see
2282 * if someone already created it after we released
2284 write_lock(&kmxlnd_data.kmx_peers_lock);
2285 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2286 if (old->mxp_nid == peer->mxp_nid) {
2287 /* somebody beat us here, we created a duplicate */
2294 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2295 atomic_inc(&kmxlnd_data.kmx_npeers);
2298 spin_lock(&old->mxp_lock);
2299 tx->mxc_conn = old->mxp_conn;
2300 /* FIXME can conn be NULL? */
2301 LASSERT(old->mxp_conn != NULL);
2302 mxlnd_conn_addref(old->mxp_conn);
2303 spin_unlock(&old->mxp_lock);
2304 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2305 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2306 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2307 mxlnd_peer_decref(peer);
2309 write_unlock(&kmxlnd_data.kmx_peers_lock);
2314 mxlnd_thread_stop(id);
2318 /* When calling this, we must not have the peer lock. */
2320 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2322 mx_return_t mxret = MX_SUCCESS;
2323 mx_request_t request;
2324 struct kmx_conn *conn = peer->mxp_conn;
2326 /* NOTE we are holding a conn ref every time we call this function,
2327 * we do not need to lock the peer before taking another ref */
2328 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2330 LASSERT(mask == MXLND_MASK_ICON_REQ ||
2331 mask == MXLND_MASK_ICON_ACK);
2333 if (peer->mxp_reconnect_time == 0) {
2334 peer->mxp_reconnect_time = jiffies;
2337 if (peer->mxp_nic_id == 0LL) {
2338 mxlnd_peer_hostname_to_nic_id(peer);
2339 if (peer->mxp_nic_id == 0LL) {
2340 /* not mapped yet, return */
2341 spin_lock(&conn->mxk_lock);
2342 conn->mxk_status = MXLND_CONN_INIT;
2343 spin_unlock(&conn->mxk_lock);
2344 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2345 /* give up and notify LNET */
2346 mxlnd_conn_disconnect(conn, 0, 1);
2347 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2349 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2352 mxlnd_conn_decref(conn);
2357 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2358 peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2359 (void *) peer, &request);
2360 if (unlikely(mxret != MX_SUCCESS)) {
2361 spin_lock(&conn->mxk_lock);
2362 conn->mxk_status = MXLND_CONN_FAIL;
2363 spin_unlock(&conn->mxk_lock);
2364 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2365 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2366 mxlnd_conn_decref(conn);
2371 #define MXLND_STATS 0
2374 mxlnd_check_sends(struct kmx_peer *peer)
2378 mx_return_t mxret = MX_SUCCESS;
2379 struct kmx_ctx *tx = NULL;
2380 struct kmx_conn *conn = NULL;
2387 static unsigned long last = 0;
2390 if (unlikely(peer == NULL)) {
2391 LASSERT(peer != NULL);
2394 spin_lock(&peer->mxp_lock);
2395 conn = peer->mxp_conn;
2396 /* NOTE take a ref for the duration of this function since it is called
2397 * when there might not be any queued txs for this peer */
2398 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2399 spin_unlock(&peer->mxp_lock);
2401 /* do not add another ref for this tx */
2404 /* we do not have any conns */
2409 if (time_after(jiffies, last)) {
2410 last = jiffies + HZ;
2411 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2412 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2413 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2414 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2415 conn->mxk_ntx_data, conn->mxk_data_posted);
2419 /* cache peer state for asserts */
2420 spin_lock(&conn->mxk_lock);
2421 ntx_posted = conn->mxk_ntx_posted;
2422 credits = conn->mxk_credits;
2423 spin_unlock(&conn->mxk_lock);
2425 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2426 LASSERT(ntx_posted >= 0);
2428 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2429 LASSERT(credits >= 0);
2431 /* check number of queued msgs, ignore data */
2432 spin_lock(&conn->mxk_lock);
2433 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2434 /* check if any txs queued that could return credits... */
2435 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2436 /* if not, send a NOOP */
2437 tx = mxlnd_get_idle_tx();
2438 if (likely(tx != NULL)) {
2439 tx->mxc_peer = peer;
2440 tx->mxc_conn = peer->mxp_conn;
2441 mxlnd_conn_addref(conn); /* for this tx */
2442 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2443 tx->mxc_match = mxlnd_create_match(tx, 0);
2444 mxlnd_peer_queue_tx_locked(tx);
2450 spin_unlock(&conn->mxk_lock);
2452 /* if the peer is not ready, try to connect */
2453 spin_lock(&conn->mxk_lock);
2454 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2455 conn->mxk_status == MXLND_CONN_FAIL ||
2456 conn->mxk_status == MXLND_CONN_REQ)) {
2457 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2458 conn->mxk_status = MXLND_CONN_WAIT;
2459 spin_unlock(&conn->mxk_lock);
2460 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2463 spin_unlock(&conn->mxk_lock);
2465 spin_lock(&conn->mxk_lock);
2466 while (!list_empty(&conn->mxk_tx_free_queue) ||
2467 !list_empty(&conn->mxk_tx_credit_queue)) {
2468 /* We have something to send. If we have a queued tx that does not
2469 * require a credit (free), choose it since its completion will
2470 * return a credit (here or at the peer), complete a DATA or
2471 * CONN_REQ or CONN_ACK. */
2472 struct list_head *tmp_tx = NULL;
2473 if (!list_empty(&conn->mxk_tx_free_queue)) {
2474 tmp_tx = &conn->mxk_tx_free_queue;
2476 tmp_tx = &conn->mxk_tx_credit_queue;
2478 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2480 msg_type = tx->mxc_msg_type;
2482 /* don't try to send a rx */
2483 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2485 /* ensure that it is a valid msg type */
2486 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2487 msg_type == MXLND_MSG_CONN_ACK ||
2488 msg_type == MXLND_MSG_NOOP ||
2489 msg_type == MXLND_MSG_EAGER ||
2490 msg_type == MXLND_MSG_PUT_REQ ||
2491 msg_type == MXLND_MSG_PUT_ACK ||
2492 msg_type == MXLND_MSG_PUT_DATA ||
2493 msg_type == MXLND_MSG_GET_REQ ||
2494 msg_type == MXLND_MSG_GET_DATA);
2495 LASSERT(tx->mxc_peer == peer);
2496 LASSERT(tx->mxc_nid == peer->mxp_nid);
2498 credit = mxlnd_tx_requires_credit(tx);
2501 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2502 CDEBUG(D_NET, "%s: posted enough\n",
2503 libcfs_nid2str(peer->mxp_nid));
2507 if (conn->mxk_credits == 0) {
2508 CDEBUG(D_NET, "%s: no credits\n",
2509 libcfs_nid2str(peer->mxp_nid));
2513 if (conn->mxk_credits == 1 && /* last credit reserved for */
2514 conn->mxk_outstanding == 0) { /* giving back credits */
2515 CDEBUG(D_NET, "%s: not using last credit\n",
2516 libcfs_nid2str(peer->mxp_nid));
2521 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2522 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2523 msg_type == MXLND_MSG_CONN_ACK)) {
2524 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2525 mxlnd_connstatus_to_str(conn->mxk_status),
2527 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2528 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2529 list_del_init(&tx->mxc_list);
2530 tx->mxc_status.code = -ECONNABORTED;
2531 mxlnd_put_idle_tx(tx);
2532 mxlnd_conn_decref(conn);
2538 list_del_init(&tx->mxc_list);
2540 /* handle credits, etc now while we have the lock to avoid races */
2542 conn->mxk_credits--;
2543 conn->mxk_ntx_posted++;
2545 if (msg_type != MXLND_MSG_PUT_DATA &&
2546 msg_type != MXLND_MSG_GET_DATA) {
2547 if (msg_type != MXLND_MSG_CONN_REQ &&
2548 msg_type != MXLND_MSG_CONN_ACK) {
2549 conn->mxk_ntx_msgs--;
2552 if (tx->mxc_incarnation == 0 &&
2553 conn->mxk_incarnation != 0) {
2554 tx->mxc_incarnation = conn->mxk_incarnation;
2556 spin_unlock(&conn->mxk_lock);
2558 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2559 * or (2) there is a non-DATA msg that can return credits in the
2560 * queue, then drop this duplicate NOOP */
2561 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2562 spin_lock(&conn->mxk_lock);
2563 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2564 (conn->mxk_ntx_msgs >= 1)) {
2565 conn->mxk_credits++;
2566 conn->mxk_ntx_posted--;
2567 spin_unlock(&conn->mxk_lock);
2568 /* redundant NOOP */
2569 mxlnd_put_idle_tx(tx);
2570 mxlnd_conn_decref(conn);
2571 CDEBUG(D_NET, "%s: redundant noop\n",
2572 libcfs_nid2str(peer->mxp_nid));
2576 spin_unlock(&conn->mxk_lock);
2580 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2581 (msg_type != MXLND_MSG_GET_DATA))) {
2585 //ret = -ECONNABORTED;
2588 spin_lock(&conn->mxk_lock);
2589 status = conn->mxk_status;
2590 spin_unlock(&conn->mxk_lock);
2592 if (likely((status == MXLND_CONN_READY) ||
2593 (msg_type == MXLND_MSG_CONN_REQ) ||
2594 (msg_type == MXLND_MSG_CONN_ACK))) {
2596 if (msg_type != MXLND_MSG_CONN_REQ &&
2597 msg_type != MXLND_MSG_CONN_ACK) {
2598 /* add to the pending list */
2599 ret = mxlnd_q_pending_ctx(tx);
2601 /* FIXME the conn is disconnected, now what? */
2605 tx->mxc_state = MXLND_CTX_PENDING;
2609 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2610 msg_type != MXLND_MSG_GET_DATA)) {
2611 /* send a msg style tx */
2612 LASSERT(tx->mxc_nseg == 1);
2613 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2614 CDEBUG(D_NET, "sending %s 0x%llx\n",
2615 mxlnd_msgtype_to_str(msg_type),
2617 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2626 /* send a DATA tx */
2627 spin_lock(&conn->mxk_lock);
2628 conn->mxk_ntx_data--;
2629 conn->mxk_data_posted++;
2630 spin_unlock(&conn->mxk_lock);
2631 CDEBUG(D_NET, "sending %s 0x%llx\n",
2632 mxlnd_msgtype_to_str(msg_type),
2634 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2644 mxret = MX_CONNECTION_FAILED;
2646 if (likely(mxret == MX_SUCCESS)) {
2649 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2650 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2651 libcfs_nid2str(peer->mxp_nid));
2652 /* NOTE mx_kisend() only fails if there are not enough
2653 * resources. Do not change the connection status. */
2654 if (mxret == MX_NO_RESOURCES) {
2655 tx->mxc_status.code = -ENOMEM;
2657 tx->mxc_status.code = -ECONNABORTED;
2660 spin_lock(&conn->mxk_lock);
2661 conn->mxk_ntx_posted--;
2662 conn->mxk_credits++;
2663 spin_unlock(&conn->mxk_lock);
2664 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2665 msg_type == MXLND_MSG_GET_DATA) {
2666 spin_lock(&conn->mxk_lock);
2667 conn->mxk_data_posted--;
2668 spin_unlock(&conn->mxk_lock);
2670 if (msg_type != MXLND_MSG_PUT_DATA &&
2671 msg_type != MXLND_MSG_GET_DATA &&
2672 msg_type != MXLND_MSG_CONN_REQ &&
2673 msg_type != MXLND_MSG_CONN_ACK) {
2674 spin_lock(&conn->mxk_lock);
2675 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2676 spin_unlock(&conn->mxk_lock);
2678 if (msg_type != MXLND_MSG_CONN_REQ &&
2679 msg_type != MXLND_MSG_CONN_ACK) {
2680 /* remove from the pending list */
2681 mxlnd_deq_pending_ctx(tx);
2683 mxlnd_put_idle_tx(tx);
2684 mxlnd_conn_decref(conn);
2687 spin_lock(&conn->mxk_lock);
2690 spin_unlock(&conn->mxk_lock);
2692 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2698 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2699 * @ctx - the tx descriptor
2701 * Determine which type of send request it was and start the next step, if needed,
2702 * or, if done, signal completion to LNET. After we are done, put back on the
2706 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2708 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2709 struct kmx_msg *msg = tx->mxc_msg;
2710 struct kmx_peer *peer = tx->mxc_peer;
2711 struct kmx_conn *conn = tx->mxc_conn;
2712 u8 type = tx->mxc_msg_type;
2713 int credit = mxlnd_tx_requires_credit(tx);
2714 u64 cookie = tx->mxc_cookie;
2716 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2717 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2719 if (unlikely(conn == NULL)) {
2720 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2721 conn = peer->mxp_conn;
2723 /* do not add a ref for the tx, it was set before sending */
2724 tx->mxc_conn = conn;
2725 tx->mxc_peer = conn->mxk_peer;
2728 LASSERT (peer != NULL);
2729 LASSERT (conn != NULL);
2731 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2732 LASSERT (type == msg->mxm_type);
2736 tx->mxc_status.code = -EIO;
2738 spin_lock(&conn->mxk_lock);
2739 conn->mxk_last_tx = jiffies;
2740 spin_unlock(&conn->mxk_lock);
2745 case MXLND_MSG_GET_DATA:
2746 spin_lock(&conn->mxk_lock);
2747 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2748 conn->mxk_outstanding++;
2749 conn->mxk_data_posted--;
2751 spin_unlock(&conn->mxk_lock);
2754 case MXLND_MSG_PUT_DATA:
2755 spin_lock(&conn->mxk_lock);
2756 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2757 conn->mxk_data_posted--;
2759 spin_unlock(&conn->mxk_lock);
2762 case MXLND_MSG_NOOP:
2763 case MXLND_MSG_PUT_REQ:
2764 case MXLND_MSG_PUT_ACK:
2765 case MXLND_MSG_GET_REQ:
2766 case MXLND_MSG_EAGER:
2767 //case MXLND_MSG_NAK:
2770 case MXLND_MSG_CONN_ACK:
2771 if (peer->mxp_incompatible) {
2772 /* we sent our params, now close this conn */
2773 mxlnd_conn_disconnect(conn, 0, 1);
2775 case MXLND_MSG_CONN_REQ:
2777 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2778 "failed with %s (%d) to %s\n",
2779 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2780 mx_strstatus(tx->mxc_status.code),
2781 tx->mxc_status.code,
2782 libcfs_nid2str(tx->mxc_nid));
2783 if (!peer->mxp_incompatible) {
2784 spin_lock(&conn->mxk_lock);
2785 conn->mxk_status = MXLND_CONN_FAIL;
2786 spin_unlock(&conn->mxk_lock);
2792 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2797 spin_lock(&conn->mxk_lock);
2798 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2799 conn->mxk_ntx_posted--;
2801 spin_unlock(&conn->mxk_lock);
2804 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2805 mxlnd_put_idle_tx(tx);
2806 mxlnd_conn_decref(conn);
2808 mxlnd_check_sends(peer);
2814 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2819 u32 nob = rx->mxc_status.xfer_length;
2820 u64 bits = rx->mxc_status.match_info;
2821 struct kmx_msg *msg = rx->mxc_msg;
2822 struct kmx_peer *peer = rx->mxc_peer;
2823 struct kmx_conn *conn = rx->mxc_conn;
2824 u8 type = rx->mxc_msg_type;
2826 lnet_msg_t *lntmsg[2];
2832 int incompatible = 0;
2834 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2835 * failed GET reply, CONN_REQ, or a CONN_ACK */
2837 /* NOTE peer may still be NULL if it is a new peer and
2838 * conn may be NULL if this is a re-connect */
2839 if (likely(peer != NULL && conn != NULL)) {
2840 /* we have a reference on the conn */
2842 } else if (peer != NULL && conn == NULL) {
2843 /* we have a reference on the peer */
2845 } else if (peer == NULL && conn != NULL) {
2847 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2849 } /* else peer and conn == NULL */
2852 if (peer == NULL || conn == NULL) {
2853 /* if the peer was disconnected, the peer may exist but
2854 * not have any valid conns */
2855 decref = 0; /* no peer means no ref was taken for this rx */
2859 if (conn == NULL && peer != NULL) {
2860 spin_lock(&peer->mxp_lock);
2861 conn = peer->mxp_conn;
2863 mxlnd_conn_addref(conn); /* conn takes ref... */
2864 mxlnd_peer_decref(peer); /* from peer */
2868 spin_unlock(&peer->mxp_lock);
2869 rx->mxc_conn = conn;
2873 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2879 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2880 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2881 libcfs_nid2str(rx->mxc_nid),
2882 mx_strstatus(rx->mxc_status.code),
2883 (int) rx->mxc_status.code);
2889 /* this may be a failed GET reply */
2890 if (type == MXLND_MSG_GET_DATA) {
2891 bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2892 ret = (u32) (bits>>52);
2893 lntmsg[0] = rx->mxc_lntmsg[0];
2897 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2898 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2899 libcfs_nid2str(rx->mxc_nid));
2904 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2905 if (type == MXLND_MSG_PUT_DATA) {
2906 result = rx->mxc_status.code;
2907 lntmsg[0] = rx->mxc_lntmsg[0];
2909 } else if (type == MXLND_MSG_GET_DATA) {
2910 result = rx->mxc_status.code;
2911 lntmsg[0] = rx->mxc_lntmsg[0];
2912 lntmsg[1] = rx->mxc_lntmsg[1];
2916 ret = mxlnd_unpack_msg(msg, nob);
2918 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2919 ret, libcfs_nid2str(rx->mxc_nid));
2923 type = msg->mxm_type;
2926 if (type != MXLND_MSG_CONN_REQ &&
2927 (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2928 !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2929 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2930 "0x%llx and rx msg dst is 0x%llx)\n",
2931 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2936 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2937 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2938 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2940 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2941 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2942 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2943 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2944 msg->mxm_srcstamp, conn->mxk_incarnation,
2945 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2947 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2948 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2949 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2950 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2957 CDEBUG(D_NET, "Received %s with %d credits\n",
2958 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2960 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2961 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2962 LASSERT(peer != NULL);
2963 LASSERT(conn != NULL);
2964 if (msg->mxm_credits != 0) {
2965 spin_lock(&conn->mxk_lock);
2966 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2967 if ((conn->mxk_credits + msg->mxm_credits) >
2968 *kmxlnd_tunables.kmx_credits) {
2969 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2970 conn->mxk_credits, msg->mxm_credits);
2972 conn->mxk_credits += msg->mxm_credits;
2973 LASSERT(conn->mxk_credits >= 0);
2974 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2976 spin_unlock(&conn->mxk_lock);
2980 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2982 case MXLND_MSG_NOOP:
2985 case MXLND_MSG_EAGER:
2986 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2987 msg->mxm_srcnid, rx, 0);
2991 case MXLND_MSG_PUT_REQ:
2992 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2993 msg->mxm_srcnid, rx, 1);
2997 case MXLND_MSG_PUT_ACK: {
2998 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2999 if (cookie > MXLND_MAX_COOKIE) {
3000 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3001 libcfs_nid2str(rx->mxc_nid));
3002 result = -((cookie >> 52) & 0xff);
3003 lntmsg[0] = rx->mxc_lntmsg[0];
3005 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3006 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3007 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3012 case MXLND_MSG_GET_REQ:
3013 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3014 msg->mxm_srcnid, rx, 1);
3018 case MXLND_MSG_CONN_REQ:
3019 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3020 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3021 libcfs_nid2str(msg->mxm_srcnid),
3022 libcfs_nid2str(msg->mxm_dstnid));
3025 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3026 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3028 libcfs_nid2str(msg->mxm_srcnid),
3029 msg->mxm_u.conn_req.mxcrm_queue_depth,
3030 *kmxlnd_tunables.kmx_credits);
3033 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3034 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3036 libcfs_nid2str(msg->mxm_srcnid),
3037 msg->mxm_u.conn_req.mxcrm_eager_size,
3038 (int) MXLND_EAGER_SIZE);
3042 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3045 struct kmx_peer *existing_peer = NULL;
3046 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3048 mx_decompose_endpoint_addr(rx->mxc_status.source,
3050 rx->mxc_nid = msg->mxm_srcnid;
3052 /* adds conn ref for peer and one for this function */
3053 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3057 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3058 write_lock(&kmxlnd_data.kmx_peers_lock);
3059 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3060 if (existing_peer) {
3061 mxlnd_conn_decref(peer->mxp_conn);
3062 mxlnd_peer_decref(peer);
3063 peer = existing_peer;
3064 mxlnd_conn_addref(peer->mxp_conn);
3066 list_add_tail(&peer->mxp_peers,
3067 &kmxlnd_data.kmx_peers[hash]);
3068 write_unlock(&kmxlnd_data.kmx_peers_lock);
3069 atomic_inc(&kmxlnd_data.kmx_npeers);
3072 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3073 mxlnd_peer_decref(peer); /* drop ref taken above */
3075 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3079 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3080 conn = peer->mxp_conn;
3082 struct kmx_conn *old_conn = conn;
3084 /* do not call mx_disconnect() */
3085 mxlnd_conn_disconnect(old_conn, 0, 0);
3087 /* the ref for this rx was taken on the old_conn */
3088 mxlnd_conn_decref(old_conn);
3090 /* This allocs a conn, points peer->mxp_conn to this one.
3091 * The old conn is still on the peer->mxp_conns list.
3092 * As the pending requests complete, they will call
3093 * conn_decref() which will eventually free it. */
3094 ret = mxlnd_conn_alloc(&conn, peer);
3096 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3099 /* conn_alloc() adds one ref for the peer and one for this function */
3102 spin_lock(&peer->mxp_lock);
3103 peer->mxp_incarnation = msg->mxm_srcstamp;
3104 peer->mxp_incompatible = incompatible;
3105 spin_unlock(&peer->mxp_lock);
3106 spin_lock(&conn->mxk_lock);
3107 conn->mxk_incarnation = msg->mxm_srcstamp;
3108 conn->mxk_status = MXLND_CONN_WAIT;
3109 spin_unlock(&conn->mxk_lock);
3111 /* handle_conn_ack() will create the CONN_ACK msg */
3112 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3116 case MXLND_MSG_CONN_ACK:
3117 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3118 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3119 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3120 libcfs_nid2str(msg->mxm_dstnid));
3124 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3125 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3126 "incompatible queue depth %d (%d wanted)\n",
3127 libcfs_nid2str(msg->mxm_srcnid),
3128 msg->mxm_u.conn_req.mxcrm_queue_depth,
3129 *kmxlnd_tunables.kmx_credits);
3130 spin_lock(&conn->mxk_lock);
3131 conn->mxk_status = MXLND_CONN_FAIL;
3132 spin_unlock(&conn->mxk_lock);
3136 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3137 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3138 "incompatible EAGER size %d (%d wanted)\n",
3139 libcfs_nid2str(msg->mxm_srcnid),
3140 msg->mxm_u.conn_req.mxcrm_eager_size,
3141 (int) MXLND_EAGER_SIZE);
3142 spin_lock(&conn->mxk_lock);
3143 conn->mxk_status = MXLND_CONN_FAIL;
3144 spin_unlock(&conn->mxk_lock);
3148 spin_lock(&peer->mxp_lock);
3149 peer->mxp_incarnation = msg->mxm_srcstamp;
3150 peer->mxp_incompatible = incompatible;
3151 spin_unlock(&peer->mxp_lock);
3152 spin_lock(&conn->mxk_lock);
3153 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3154 conn->mxk_outstanding = 0;
3155 conn->mxk_incarnation = msg->mxm_srcstamp;
3156 conn->mxk_timeout = 0;
3157 if (!incompatible) {
3158 conn->mxk_status = MXLND_CONN_READY;
3160 spin_unlock(&conn->mxk_lock);
3161 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3165 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3166 libcfs_nid2str(rx->mxc_nid));
3173 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3174 spin_lock(&conn->mxk_lock);
3175 conn->mxk_status = MXLND_CONN_FAIL;
3176 spin_unlock(&conn->mxk_lock);
3181 spin_lock(&conn->mxk_lock);
3182 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3183 spin_unlock(&conn->mxk_lock);
3187 /* lnet_parse() failed, etc., repost now */
3188 mxlnd_put_idle_rx(rx);
3189 if (conn != NULL && credit == 1) {
3190 if (type == MXLND_MSG_PUT_DATA) {
3191 spin_lock(&conn->mxk_lock);
3192 conn->mxk_outstanding++;
3193 spin_unlock(&conn->mxk_lock);
3194 } else if (type != MXLND_MSG_GET_DATA &&
3195 (type == MXLND_MSG_EAGER ||
3196 type == MXLND_MSG_PUT_REQ ||
3197 type == MXLND_MSG_NOOP)) {
3198 spin_lock(&conn->mxk_lock);
3199 conn->mxk_outstanding++;
3200 spin_unlock(&conn->mxk_lock);
3203 if (conn_ref) mxlnd_conn_decref(conn);
3204 LASSERT(peer_ref == 0);
3207 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3208 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3210 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3213 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3214 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3216 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3224 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3226 struct kmx_ctx *tx = NULL;
3227 struct kmx_msg *txmsg = NULL;
3228 struct kmx_conn *conn = peer->mxp_conn;
3230 /* a conn ref was taken when calling mx_iconnect(),
3231 * hold it until CONN_REQ or CONN_ACK completes */
3233 CDEBUG(D_NET, "entering\n");
3234 if (status.code != MX_STATUS_SUCCESS) {
3235 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3236 mx_strstatus(status.code), status.code,
3237 libcfs_nid2str(peer->mxp_nid));
3238 spin_lock(&conn->mxk_lock);
3239 conn->mxk_status = MXLND_CONN_FAIL;
3240 spin_unlock(&conn->mxk_lock);
3242 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3243 struct kmx_conn *new_conn = NULL;
3244 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3245 mxlnd_conn_disconnect(conn, 0, 1);
3246 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3247 mxlnd_conn_decref(new_conn); /* which we no longer need */
3248 spin_lock(&peer->mxp_lock);
3249 peer->mxp_reconnect_time = 0;
3250 spin_unlock(&peer->mxp_lock);
3253 mxlnd_conn_decref(conn);
3257 spin_lock(&conn->mxk_lock);
3258 conn->mxk_epa = status.source;
3259 spin_unlock(&conn->mxk_lock);
3260 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3261 * we should not need to lock the peer */
3262 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3264 /* mx_iconnect() succeeded, reset delay to 0 */
3265 spin_lock(&peer->mxp_lock);
3266 peer->mxp_reconnect_time = 0;
3267 spin_unlock(&peer->mxp_lock);
3269 /* marshal CONN_REQ msg */
3270 /* we are still using the conn ref from iconnect() - do not take another */
3271 tx = mxlnd_get_idle_tx();
3273 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3274 libcfs_nid2str(peer->mxp_nid));
3275 spin_lock(&conn->mxk_lock);
3276 conn->mxk_status = MXLND_CONN_FAIL;
3277 spin_unlock(&conn->mxk_lock);
3278 mxlnd_conn_decref(conn);
3282 tx->mxc_peer = peer;
3283 tx->mxc_conn = conn;
3284 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3285 txmsg = tx->mxc_msg;
3286 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3287 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3288 tx->mxc_match = mxlnd_create_match(tx, 0);
3290 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3296 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3298 struct kmx_ctx *tx = NULL;
3299 struct kmx_msg *txmsg = NULL;
3300 struct kmx_conn *conn = peer->mxp_conn;
3302 /* a conn ref was taken when calling mx_iconnect(),
3303 * hold it until CONN_REQ or CONN_ACK completes */
3305 CDEBUG(D_NET, "entering\n");
3306 if (status.code != MX_STATUS_SUCCESS) {
3307 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3308 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3309 mx_strstatus(status.code), status.code,
3310 libcfs_nid2str(peer->mxp_nid),
3313 peer->mxp_host->mxh_ep_id);
3314 spin_lock(&conn->mxk_lock);
3315 conn->mxk_status = MXLND_CONN_FAIL;
3316 spin_unlock(&conn->mxk_lock);
3318 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3319 struct kmx_conn *new_conn = NULL;
3320 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3321 mxlnd_conn_disconnect(conn, 0, 1);
3322 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3324 mxlnd_conn_decref(new_conn); /* which we no longer need */
3325 spin_lock(&peer->mxp_lock);
3326 peer->mxp_reconnect_time = 0;
3327 spin_unlock(&peer->mxp_lock);
3330 mxlnd_conn_decref(conn);
3333 spin_lock(&conn->mxk_lock);
3334 conn->mxk_epa = status.source;
3335 if (likely(!peer->mxp_incompatible)) {
3336 conn->mxk_status = MXLND_CONN_READY;
3338 spin_unlock(&conn->mxk_lock);
3339 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3340 * we should not have to lock the peer */
3341 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3343 /* mx_iconnect() succeeded, reset delay to 0 */
3344 spin_lock(&peer->mxp_lock);
3345 peer->mxp_reconnect_time = 0;
3346 spin_unlock(&peer->mxp_lock);
3348 /* marshal CONN_ACK msg */
3349 tx = mxlnd_get_idle_tx();
3351 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3352 libcfs_nid2str(peer->mxp_nid));
3353 spin_lock(&conn->mxk_lock);
3354 conn->mxk_status = MXLND_CONN_FAIL;
3355 spin_unlock(&conn->mxk_lock);
3356 mxlnd_conn_decref(conn);
3360 tx->mxc_peer = peer;
3361 tx->mxc_conn = conn;
3362 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3363 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3364 txmsg = tx->mxc_msg;
3365 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3366 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3367 tx->mxc_match = mxlnd_create_match(tx, 0);
3374 * mxlnd_request_waitd - the MX request completion thread(s)
3375 * @arg - thread id (as a void *)
3377 * This thread waits for a MX completion and then completes the request.
3378 * We will create one thread per CPU.
3381 mxlnd_request_waitd(void *arg)
3383 long id = (long) arg;
3386 mx_return_t mxret = MX_SUCCESS;
3388 struct kmx_ctx *ctx = NULL;
3389 enum kmx_req_state req_type = MXLND_REQ_TX;
3390 struct kmx_peer *peer = NULL;
3391 struct kmx_conn *conn = NULL;
3396 memset(name, 0, sizeof(name));
3397 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3398 cfs_daemonize(name);
3399 //cfs_block_allsigs();
3401 memset(&status, 0, sizeof(status));
3403 CDEBUG(D_NET, "%s starting\n", name);
3405 while (!kmxlnd_data.kmx_shutdown) {
3409 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3410 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3414 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3415 0LL, 0LL, &status, &result);
3418 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3419 0LL, 0LL, &status, &result);
3421 if (unlikely(kmxlnd_data.kmx_shutdown))
3425 /* nothing completed... */
3429 if (status.code != MX_STATUS_SUCCESS) {
3430 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3431 "match_info 0x%llx and length %d\n",
3432 mx_strstatus(status.code), status.code,
3433 (u64) status.match_info, status.msg_length);
3436 /* This may be a mx_iconnect() request completing,
3437 * check the bit mask for CONN_REQ and CONN_ACK */
3438 if (status.match_info == MXLND_MASK_ICON_REQ ||
3439 status.match_info == MXLND_MASK_ICON_ACK) {
3440 peer = (struct kmx_peer*) status.context;
3441 if (status.match_info == MXLND_MASK_ICON_REQ) {
3442 mxlnd_handle_conn_req(peer, status);
3444 mxlnd_handle_conn_ack(peer, status);
3449 /* This must be a tx or rx */
3451 /* NOTE: if this is a RX from the unexpected callback, it may
3452 * have very little info. If we dropped it in unexpected_recv(),
3453 * it will not have a context. If so, ignore it. */
3454 ctx = (struct kmx_ctx *) status.context;
3457 req_type = ctx->mxc_type;
3458 conn = ctx->mxc_conn; /* this may be NULL */
3459 mxlnd_deq_pending_ctx(ctx);
3461 /* copy status to ctx->mxc_status */
3462 memcpy(&ctx->mxc_status, &status, sizeof(status));
3466 mxlnd_handle_tx_completion(ctx);
3469 mxlnd_handle_rx_completion(ctx);
3472 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3477 /* FIXME may need to reconsider this */
3478 /* conn is always set except for the first CONN_REQ rx
3479 * from a new peer */
3480 if (!(status.code == MX_STATUS_SUCCESS ||
3481 status.code == MX_STATUS_TRUNCATED) &&
3483 mxlnd_conn_disconnect(conn, 1, 1);
3486 CDEBUG(D_NET, "waitd() completed task\n");
3488 CDEBUG(D_NET, "%s stopping\n", name);
3489 mxlnd_thread_stop(id);
3495 mxlnd_check_timeouts(unsigned long now)
3499 unsigned long next = 0;
3500 struct kmx_peer *peer = NULL;
3501 struct kmx_conn *conn = NULL;
3503 read_lock(&kmxlnd_data.kmx_peers_lock);
3504 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3505 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3507 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3508 read_unlock(&kmxlnd_data.kmx_peers_lock);
3512 spin_lock(&peer->mxp_lock);
3513 conn = peer->mxp_conn;
3515 mxlnd_conn_addref(conn);
3516 spin_unlock(&peer->mxp_lock);
3518 spin_unlock(&peer->mxp_lock);
3522 spin_lock(&conn->mxk_lock);
3524 /* if nothing pending (timeout == 0) or
3525 * if conn is already disconnected,
3527 if (conn->mxk_timeout == 0 ||
3528 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3529 spin_unlock(&conn->mxk_lock);
3530 mxlnd_conn_decref(conn);
3534 /* we want to find the timeout that will occur first.
3535 * if it is in the future, we will sleep until then.
3536 * if it is in the past, then we will sleep one
3537 * second and repeat the process. */
3538 if ((next == 0) || (conn->mxk_timeout < next)) {
3539 next = conn->mxk_timeout;
3544 if (time_after_eq(now, conn->mxk_timeout)) {
3547 spin_unlock(&conn->mxk_lock);
3550 mxlnd_conn_disconnect(conn, 1, 1);
3552 mxlnd_conn_decref(conn);
3555 read_unlock(&kmxlnd_data.kmx_peers_lock);
3556 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3562 * mxlnd_timeoutd - enforces timeouts on messages
3563 * @arg - thread id (as a void *)
3565 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3566 * it calls mxlnd_conn_disconnect().
3568 * After checking for timeouts, try progressing sends (call check_sends()).
3571 mxlnd_timeoutd(void *arg)
3574 long id = (long) arg;
3575 unsigned long now = 0;
3576 unsigned long next = 0;
3577 unsigned long delay = HZ;
3578 struct kmx_peer *peer = NULL;
3579 struct kmx_conn *conn = NULL;
3581 cfs_daemonize("mxlnd_timeoutd");
3582 //cfs_block_allsigs();
3584 CDEBUG(D_NET, "timeoutd starting\n");
3586 while (!kmxlnd_data.kmx_shutdown) {
3589 /* if the next timeout has not arrived, go back to sleep */
3590 if (time_after(now, next)) {
3591 next = mxlnd_check_timeouts(now);
3594 read_lock(&kmxlnd_data.kmx_peers_lock);
3595 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3596 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3597 spin_lock(&peer->mxp_lock);
3598 conn = peer->mxp_conn;
3599 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3600 spin_unlock(&peer->mxp_lock);
3605 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3606 time_after(now, conn->mxk_last_tx + HZ)) {
3607 mxlnd_check_sends(peer);
3609 mxlnd_conn_decref(conn); /* until here */
3612 read_unlock(&kmxlnd_data.kmx_peers_lock);
3616 CDEBUG(D_NET, "timeoutd stopping\n");
3617 mxlnd_thread_stop(id);