1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (C) 2006 Myricom, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lnet/klnds/mxlnd/mxlnd.c
40 * Author: Eric Barton <eric@bartonsoftware.com>
41 * Author: Scott Atchley <atchley at myri.com>
46 inline void mxlnd_noop(char *s, ...)
52 mxlnd_ctxstate_to_str(int mxc_state)
56 return "MXLND_CTX_INIT";
58 return "MXLND_CTX_IDLE";
60 return "MXLND_CTX_PREP";
61 case MXLND_CTX_PENDING:
62 return "MXLND_CTX_PENDING";
63 case MXLND_CTX_COMPLETED:
64 return "MXLND_CTX_COMPLETED";
65 case MXLND_CTX_CANCELED:
66 return "MXLND_CTX_CANCELED";
73 mxlnd_connstatus_to_str(int mxk_status)
76 case MXLND_CONN_READY:
77 return "MXLND_CONN_READY";
79 return "MXLND_CONN_INIT";
81 return "MXLND_CONN_REQ";
83 return "MXLND_CONN_ACK";
85 return "MXLND_CONN_WAIT";
86 case MXLND_CONN_DISCONNECT:
87 return "MXLND_CONN_DISCONNECT";
89 return "MXLND_CONN_FAIL";
96 mxlnd_msgtype_to_str(int type) {
99 return "MXLND_MSG_EAGER";
100 case MXLND_MSG_CONN_REQ:
101 return "MXLND_MSG_CONN_REQ";
102 case MXLND_MSG_CONN_ACK:
103 return "MXLND_MSG_CONN_ACK";
105 return "MXLND_MSG_NOOP";
106 case MXLND_MSG_PUT_REQ:
107 return "MXLND_MSG_PUT_REQ";
108 case MXLND_MSG_PUT_ACK:
109 return "MXLND_MSG_PUT_ACK";
110 case MXLND_MSG_PUT_DATA:
111 return "MXLND_MSG_PUT_DATA";
112 case MXLND_MSG_GET_REQ:
113 return "MXLND_MSG_GET_REQ";
114 case MXLND_MSG_GET_DATA:
115 return "MXLND_MSG_GET_DATA";
122 mxlnd_lnetmsg_to_str(int type)
126 return "LNET_MSG_ACK";
128 return "LNET_MSG_PUT";
130 return "LNET_MSG_GET";
132 return "LNET_MSG_REPLY";
134 return "LNET_MSG_HELLO";
142 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
143 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
145 u64 type = (u64) ctx->mxc_msg_type;
146 u64 err = (u64) error;
149 LASSERT(ctx->mxc_msg_type != 0);
150 LASSERT(ctx->mxc_cookie >> 52 == 0);
151 match = (type << 60) | (err << 52) | ctx->mxc_cookie;
156 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
158 *msg_type = (u8) (match >> 60);
159 *error = (u8) ((match >> 52) & 0xFF);
160 *cookie = match & 0xFFFFFFFFFFFFFLL;
161 LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
162 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
163 *msg_type == MXLND_MSG_EAGER ||
164 *msg_type == MXLND_MSG_CONN_REQ ||
165 *msg_type == MXLND_MSG_CONN_ACK ||
166 *msg_type == MXLND_MSG_NOOP ||
167 *msg_type == MXLND_MSG_PUT_REQ ||
168 *msg_type == MXLND_MSG_PUT_ACK ||
169 *msg_type == MXLND_MSG_PUT_DATA ||
170 *msg_type == MXLND_MSG_GET_REQ ||
171 *msg_type == MXLND_MSG_GET_DATA);
176 mxlnd_get_idle_rx(void)
178 struct list_head *tmp = NULL;
179 struct kmx_ctx *rx = NULL;
181 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
183 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
184 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
188 tmp = &kmxlnd_data.kmx_rx_idle;
189 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
190 list_del_init(&rx->mxc_list);
191 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
194 if (rx->mxc_get != rx->mxc_put) {
195 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
196 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
197 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
198 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
199 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
200 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
201 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
202 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
203 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
204 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
207 LASSERT (rx->mxc_get == rx->mxc_put);
211 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
212 rx->mxc_state = MXLND_CTX_PREP;
218 mxlnd_put_idle_rx(struct kmx_ctx *rx)
221 CDEBUG(D_NETERROR, "called with NULL pointer\n");
223 } else if (rx->mxc_type != MXLND_REQ_RX) {
224 CDEBUG(D_NETERROR, "called with tx\n");
227 LASSERT(rx->mxc_get == rx->mxc_put + 1);
230 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
231 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
232 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
237 mxlnd_reduce_idle_rxs(__u32 count)
240 struct kmx_ctx *rx = NULL;
242 spin_lock(&kmxlnd_data.kmx_rxs_lock);
243 for (i = 0; i < count; i++) {
244 rx = mxlnd_get_idle_rx();
246 struct list_head *tmp = &rx->mxc_global_list;
250 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
254 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
259 mxlnd_get_idle_tx(void)
261 struct list_head *tmp = NULL;
262 struct kmx_ctx *tx = NULL;
264 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
266 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
267 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
268 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
272 tmp = &kmxlnd_data.kmx_tx_idle;
273 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
274 list_del_init(&tx->mxc_list);
276 /* Allocate a new completion cookie. It might not be needed,
277 * but we've got a lock right now and we're unlikely to
279 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
280 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
281 kmxlnd_data.kmx_tx_next_cookie = 1;
283 kmxlnd_data.kmx_tx_used++;
284 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
286 LASSERT (tx->mxc_get == tx->mxc_put);
290 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
291 LASSERT (tx->mxc_lntmsg[0] == NULL);
292 LASSERT (tx->mxc_lntmsg[1] == NULL);
294 tx->mxc_state = MXLND_CTX_PREP;
300 mxlnd_put_idle_tx(struct kmx_ctx *tx)
302 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
304 lnet_msg_t *lntmsg[2];
307 CDEBUG(D_NETERROR, "called with NULL pointer\n");
309 } else if (tx->mxc_type != MXLND_REQ_TX) {
310 CDEBUG(D_NETERROR, "called with rx\n");
313 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
314 tx->mxc_status.code == MX_STATUS_TRUNCATED))
317 lntmsg[0] = tx->mxc_lntmsg[0];
318 lntmsg[1] = tx->mxc_lntmsg[1];
320 LASSERT(tx->mxc_get == tx->mxc_put + 1);
323 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
324 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
325 kmxlnd_data.kmx_tx_used--;
326 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
327 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
328 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
333 * mxlnd_conn_free - free the conn
334 * @conn - a kmx_conn pointer
336 * The calling function should remove the conn from the conns list first
340 mxlnd_conn_free(struct kmx_conn *conn)
342 struct kmx_peer *peer = conn->mxk_peer;
344 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
345 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
346 list_empty (&conn->mxk_tx_free_queue) &&
347 list_empty (&conn->mxk_pending));
348 if (!list_empty(&conn->mxk_list)) {
349 spin_lock(&peer->mxp_lock);
350 list_del_init(&conn->mxk_list);
351 if (peer->mxp_conn == conn) {
352 peer->mxp_conn = NULL;
353 if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
354 mx_set_endpoint_addr_context(conn->mxk_epa,
358 spin_unlock(&peer->mxp_lock);
360 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
361 MXLND_FREE (conn, sizeof (*conn));
367 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
370 struct kmx_ctx *ctx = NULL;
371 struct kmx_ctx *next = NULL;
372 mx_return_t mxret = MX_SUCCESS;
377 spin_lock(&conn->mxk_lock);
378 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
379 /* we will delete all including txs */
380 list_del_init(&ctx->mxc_list);
381 if (ctx->mxc_type == MXLND_REQ_RX) {
383 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
386 if (mxret != MX_SUCCESS) {
387 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
390 ctx->mxc_status.code = -ECONNABORTED;
391 ctx->mxc_state = MXLND_CTX_CANCELED;
392 /* NOTE this calls lnet_finalize() and
393 * we cannot hold any locks when calling it.
394 * It also calls mxlnd_conn_decref(conn) */
395 spin_unlock(&conn->mxk_lock);
396 mxlnd_handle_rx_completion(ctx);
397 spin_lock(&conn->mxk_lock);
402 spin_unlock(&conn->mxk_lock);
410 * mxlnd_conn_disconnect - shutdown a connection
411 * @conn - a kmx_conn pointer
413 * This function sets the status to DISCONNECT, completes queued
414 * txs with failure, calls mx_disconnect, which will complete
415 * pending txs and matched rxs with failure.
418 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
420 struct list_head *tmp = NULL;
422 spin_lock(&conn->mxk_lock);
423 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
424 spin_unlock(&conn->mxk_lock);
427 conn->mxk_status = MXLND_CONN_DISCONNECT;
428 conn->mxk_timeout = 0;
430 while (!list_empty(&conn->mxk_tx_free_queue) ||
431 !list_empty(&conn->mxk_tx_credit_queue)) {
433 struct kmx_ctx *tx = NULL;
435 if (!list_empty(&conn->mxk_tx_free_queue)) {
436 tmp = &conn->mxk_tx_free_queue;
438 tmp = &conn->mxk_tx_credit_queue;
441 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
442 list_del_init(&tx->mxc_list);
443 tx->mxc_status.code = -ECONNABORTED;
444 spin_unlock(&conn->mxk_lock);
445 mxlnd_put_idle_tx(tx);
446 mxlnd_conn_decref(conn); /* for this tx */
447 spin_lock(&conn->mxk_lock);
450 spin_unlock(&conn->mxk_lock);
452 /* cancel pending rxs */
453 mxlnd_conn_cancel_pending_rxs(conn);
455 if (kmxlnd_data.kmx_shutdown != 1) {
457 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
460 time_t last_alive = 0;
461 unsigned long last_msg = 0;
463 /* notify LNET that we are giving up on this peer */
464 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
465 last_msg = conn->mxk_last_rx;
467 last_msg = conn->mxk_last_tx;
469 last_alive = cfs_time_current_sec() -
470 cfs_duration_sec(cfs_time_current() - last_msg);
471 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
474 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
480 * mxlnd_conn_alloc - allocate and initialize a new conn struct
481 * @connp - address of a kmx_conn pointer
482 * @peer - owning kmx_peer
484 * Returns 0 on success and -ENOMEM on failure
487 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
489 struct kmx_conn *conn = NULL;
491 LASSERT(peer != NULL);
493 MXLND_ALLOC(conn, sizeof (*conn));
495 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
498 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
500 memset(conn, 0, sizeof(*conn));
502 /* conn->mxk_incarnation = 0 - will be set by peer */
503 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
504 and one for the caller */
505 conn->mxk_peer = peer;
506 /* mxk_epa - to be set after mx_iconnect() */
507 INIT_LIST_HEAD(&conn->mxk_list);
508 spin_lock_init(&conn->mxk_lock);
509 /* conn->mxk_timeout = 0 */
510 conn->mxk_last_tx = jiffies;
511 conn->mxk_last_rx = conn->mxk_last_tx;
512 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
513 /* mxk_outstanding = 0 */
514 conn->mxk_status = MXLND_CONN_INIT;
515 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
516 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
517 /* conn->mxk_ntx_msgs = 0 */
518 /* conn->mxk_ntx_data = 0 */
519 /* conn->mxk_ntx_posted = 0 */
520 /* conn->mxk_data_posted = 0 */
521 INIT_LIST_HEAD(&conn->mxk_pending);
525 mxlnd_peer_addref(peer); /* add a ref for this conn */
527 /* add to front of peer's conns list */
528 list_add(&conn->mxk_list, &peer->mxp_conns);
529 peer->mxp_conn = conn;
534 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
537 spin_lock(&peer->mxp_lock);
538 ret = mxlnd_conn_alloc_locked(connp, peer);
539 spin_unlock(&peer->mxp_lock);
544 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
547 struct kmx_conn *conn = ctx->mxc_conn;
549 ctx->mxc_state = MXLND_CTX_PENDING;
551 spin_lock(&conn->mxk_lock);
552 if (conn->mxk_status >= MXLND_CONN_INIT) {
553 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
554 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
555 conn->mxk_timeout = ctx->mxc_deadline;
558 ctx->mxc_state = MXLND_CTX_COMPLETED;
561 spin_unlock(&conn->mxk_lock);
567 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
569 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
570 ctx->mxc_state == MXLND_CTX_COMPLETED);
571 if (ctx->mxc_state != MXLND_CTX_PENDING &&
572 ctx->mxc_state != MXLND_CTX_COMPLETED) {
573 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
574 mxlnd_ctxstate_to_str(ctx->mxc_state));
576 ctx->mxc_state = MXLND_CTX_COMPLETED;
577 if (!list_empty(&ctx->mxc_list)) {
578 struct kmx_conn *conn = ctx->mxc_conn;
579 struct kmx_ctx *next = NULL;
580 LASSERT(conn != NULL);
581 spin_lock(&conn->mxk_lock);
582 list_del_init(&ctx->mxc_list);
583 conn->mxk_timeout = 0;
584 if (!list_empty(&conn->mxk_pending)) {
585 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
586 conn->mxk_timeout = next->mxc_deadline;
588 spin_unlock(&conn->mxk_lock);
594 * mxlnd_peer_free - free the peer
595 * @peer - a kmx_peer pointer
597 * The calling function should decrement the rxs, drain the tx queues and
598 * remove the peer from the peers list first then destroy it.
601 mxlnd_peer_free(struct kmx_peer *peer)
603 CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
605 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
607 if (peer->mxp_host != NULL) {
608 spin_lock(&peer->mxp_host->mxh_lock);
609 peer->mxp_host->mxh_peer = NULL;
610 spin_unlock(&peer->mxp_host->mxh_lock);
612 if (!list_empty(&peer->mxp_peers)) {
613 /* assume we are locked */
614 list_del_init(&peer->mxp_peers);
617 MXLND_FREE (peer, sizeof (*peer));
618 atomic_dec(&kmxlnd_data.kmx_npeers);
623 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
626 char name[MX_MAX_HOSTNAME_LEN + 1];
627 mx_return_t mxret = MX_SUCCESS;
629 memset(name, 0, sizeof(name));
630 snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
631 mxret = mx_hostname_to_nic_id(name, &nic_id);
632 if (mxret == MX_SUCCESS) {
633 peer->mxp_nic_id = nic_id;
635 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
636 "with %s\n", name, mx_strerror(mxret));
637 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
638 if (mxret == MX_SUCCESS) {
639 peer->mxp_nic_id = nic_id;
641 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
642 "with %s\n", peer->mxp_host->mxh_hostname,
650 * mxlnd_peer_alloc - allocate and initialize a new peer struct
651 * @peerp - address of a kmx_peer pointer
652 * @nid - LNET node id
654 * Returns 0 on success and -ENOMEM on failure
657 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
661 u32 addr = LNET_NIDADDR(nid);
662 struct kmx_peer *peer = NULL;
663 struct kmx_host *host = NULL;
665 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
667 MXLND_ALLOC(peer, sizeof (*peer));
669 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
672 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
674 memset(peer, 0, sizeof(*peer));
676 list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
677 if (addr == host->mxh_addr) {
678 peer->mxp_host = host;
679 spin_lock(&host->mxh_lock);
680 host->mxh_peer = peer;
681 spin_unlock(&host->mxh_lock);
685 if (peer->mxp_host == NULL) {
686 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
687 MXLND_FREE(peer, sizeof(*peer));
692 /* peer->mxp_incarnation */
693 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
694 mxlnd_peer_hostname_to_nic_id(peer);
696 INIT_LIST_HEAD(&peer->mxp_peers);
697 spin_lock_init(&peer->mxp_lock);
698 INIT_LIST_HEAD(&peer->mxp_conns);
699 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
701 mxlnd_peer_decref(peer);
705 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
706 struct kmx_ctx *rx = NULL;
707 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
709 mxlnd_reduce_idle_rxs(i);
710 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
711 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
712 mxlnd_peer_decref(peer);
715 spin_lock(&kmxlnd_data.kmx_rxs_lock);
716 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
717 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
719 mxlnd_put_idle_rx(rx);
721 /* peer->mxp_reconnect_time = 0 */
722 /* peer->mxp_incompatible = 0 */
729 * mxlnd_nid_to_hash - hash the nid
732 * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
735 mxlnd_nid_to_hash(lnet_nid_t nid)
737 return (nid & MXLND_HASH_MASK) ^
738 ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
741 static inline struct kmx_peer *
742 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
746 struct kmx_peer *peer = NULL;
748 hash = mxlnd_nid_to_hash(nid);
750 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
751 if (peer->mxp_nid == nid) {
753 mxlnd_peer_addref(peer);
757 return (found ? peer : NULL);
760 static inline struct kmx_peer *
761 mxlnd_find_peer_by_nid(lnet_nid_t nid)
763 struct kmx_peer *peer = NULL;
765 read_lock(&kmxlnd_data.kmx_peers_lock);
766 peer = mxlnd_find_peer_by_nid_locked(nid);
767 read_unlock(&kmxlnd_data.kmx_peers_lock);
772 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
774 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
775 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
776 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
777 tx->mxc_msg_type == MXLND_MSG_NOOP);
781 * mxlnd_init_msg - set type and number of bytes
784 * @body_nob - bytes in msg body
787 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
789 msg->mxm_type = type;
790 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
794 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
796 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
797 struct kmx_msg *msg = NULL;
799 LASSERT (tx != NULL);
800 LASSERT (nob <= MXLND_EAGER_SIZE);
803 /* tx->mxc_peer should have already been set if we know it */
804 tx->mxc_msg_type = type;
806 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
807 tx->mxc_seg.segment_length = nob;
808 tx->mxc_pin_type = MX_PIN_PHYSICAL;
809 //tx->mxc_state = MXLND_CTX_PENDING;
812 msg->mxm_type = type;
819 mxlnd_cksum (void *ptr, int nob)
825 sum = ((sum << 1) | (sum >> 31)) + *c++;
827 /* ensure I don't return 0 (== no checksum) */
828 return (sum == 0) ? 1 : sum;
832 * mxlnd_pack_msg - complete msg info
836 mxlnd_pack_msg(struct kmx_ctx *tx)
838 struct kmx_msg *msg = tx->mxc_msg;
840 /* type and nob should already be set in init_msg() */
841 msg->mxm_magic = MXLND_MSG_MAGIC;
842 msg->mxm_version = MXLND_MSG_VERSION;
844 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
845 * return credits as well */
846 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
847 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
848 spin_lock(&tx->mxc_conn->mxk_lock);
849 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
850 tx->mxc_conn->mxk_outstanding = 0;
851 spin_unlock(&tx->mxc_conn->mxk_lock);
853 msg->mxm_credits = 0;
857 msg->mxm_srcnid = kmxlnd_data.kmx_ni->ni_nid;
858 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
859 msg->mxm_dstnid = tx->mxc_nid;
860 /* if it is a new peer, the dststamp will be 0 */
861 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
862 msg->mxm_seq = tx->mxc_cookie;
864 if (*kmxlnd_tunables.kmx_cksum) {
865 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
870 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
872 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
877 /* 6 bytes are enough to have received magic + version */
879 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
883 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
885 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
888 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
892 if (msg->mxm_version !=
893 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
894 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
898 if (nob < hdr_size) {
899 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
903 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
905 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
909 /* checksum must be computed with mxm_cksum zero and BEFORE anything
911 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
913 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
914 CDEBUG(D_NETERROR, "Bad checksum\n");
917 msg->mxm_cksum = msg_cksum;
920 /* leave magic unflipped as a clue to peer endianness */
921 __swab16s(&msg->mxm_version);
922 CLASSERT (sizeof(msg->mxm_type) == 1);
923 CLASSERT (sizeof(msg->mxm_credits) == 1);
924 msg->mxm_nob = msg_nob;
925 __swab64s(&msg->mxm_srcnid);
926 __swab64s(&msg->mxm_srcstamp);
927 __swab64s(&msg->mxm_dstnid);
928 __swab64s(&msg->mxm_dststamp);
929 __swab64s(&msg->mxm_seq);
932 if (msg->mxm_srcnid == LNET_NID_ANY) {
933 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
937 switch (msg->mxm_type) {
939 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
945 case MXLND_MSG_EAGER:
946 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
947 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
948 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
953 case MXLND_MSG_PUT_REQ:
954 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
955 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
956 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
960 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
963 case MXLND_MSG_PUT_ACK:
964 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
965 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
966 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
970 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
971 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
975 case MXLND_MSG_GET_REQ:
976 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
977 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
978 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
982 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
986 case MXLND_MSG_CONN_REQ:
987 case MXLND_MSG_CONN_ACK:
988 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
989 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
990 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
994 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
995 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1004 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1008 * @length - length of incoming message
1009 * @pending - add to kmx_pending (0 is NO and 1 is YES)
1011 * The caller gets the rx and sets nid, peer and conn if known.
1013 * Returns 0 on success and -1 on failure
1016 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1019 mx_return_t mxret = MX_SUCCESS;
1020 uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
1022 rx->mxc_msg_type = msg_type;
1023 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1024 rx->mxc_cookie = cookie;
1025 /* rx->mxc_match may already be set */
1026 /* rx->mxc_seg.segment_ptr is already set */
1027 rx->mxc_seg.segment_length = length;
1028 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1029 ret = mxlnd_q_pending_ctx(rx);
1031 /* the caller is responsible for calling conn_decref() if needed */
1034 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1035 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1036 if (mxret != MX_SUCCESS) {
1037 mxlnd_deq_pending_ctx(rx);
1038 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1039 mx_strerror(mxret), (int) mxret);
1047 * mxlnd_unexpected_recv - this is the callback function that will handle
1048 * unexpected receives
1049 * @context - NULL, ignore
1050 * @source - the peer's mx_endpoint_addr_t
1051 * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1052 * @length - length of incoming message
1053 * @data_if_available - ignore
1055 * If it is an eager-sized msg, we will call recv_msg() with the actual
1056 * length. If it is a large message, we will call recv_msg() with a
1057 * length of 0 bytes to drop it because we should never have a large,
1058 * unexpected message.
1060 * NOTE - The MX library blocks until this function completes. Make it as fast as
1061 * possible. DO NOT allocate memory which can block!
1063 * If we cannot get a rx or the conn is closed, drop the message on the floor
1064 * (i.e. recv 0 bytes and ignore).
1066 mx_unexp_handler_action_t
1067 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1068 uint64_t match_value, uint32_t length, void *data_if_available)
1071 struct kmx_ctx *rx = NULL;
1077 if (context != NULL) {
1078 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1082 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1085 rx = mxlnd_get_idle_rx();
1087 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1088 if (length <= MXLND_EAGER_SIZE) {
1089 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1091 CDEBUG(D_NETERROR, "unexpected large receive with "
1092 "match_value=0x%llx length=%d\n",
1093 match_value, length);
1094 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1098 struct kmx_peer *peer = NULL;
1099 struct kmx_conn *conn = NULL;
1101 /* NOTE to avoid a peer disappearing out from under us,
1102 * read lock the peers lock first */
1103 read_lock(&kmxlnd_data.kmx_peers_lock);
1104 mx_get_endpoint_addr_context(source, (void **) &peer);
1106 mxlnd_peer_addref(peer); /* add a ref... */
1107 spin_lock(&peer->mxp_lock);
1108 conn = peer->mxp_conn;
1110 mxlnd_conn_addref(conn); /* add ref until rx completed */
1111 mxlnd_peer_decref(peer); /* and drop peer ref */
1112 rx->mxc_conn = conn;
1114 spin_unlock(&peer->mxp_lock);
1115 rx->mxc_peer = peer;
1116 rx->mxc_nid = peer->mxp_nid;
1118 read_unlock(&kmxlnd_data.kmx_peers_lock);
1120 CDEBUG(D_NETERROR, "could not post receive\n");
1121 mxlnd_put_idle_rx(rx);
1125 if (rx == NULL || ret != 0) {
1127 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1130 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1132 seg.segment_ptr = 0LL;
1133 seg.segment_length = 0;
1134 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1135 match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1138 return MX_RECV_CONTINUE;
1143 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1147 struct kmx_peer *peer = NULL;
1149 read_lock(&kmxlnd_data.kmx_peers_lock);
1150 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1151 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1155 *nidp = peer->mxp_nid;
1156 *count = atomic_read(&peer->mxp_refcount);
1161 read_unlock(&kmxlnd_data.kmx_peers_lock);
1167 mxlnd_del_peer_locked(struct kmx_peer *peer)
1169 list_del_init(&peer->mxp_peers); /* remove from the global list */
1170 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1171 mxlnd_peer_decref(peer); /* drop global list ref */
1176 mxlnd_del_peer(lnet_nid_t nid)
1180 struct kmx_peer *peer = NULL;
1181 struct kmx_peer *next = NULL;
1183 if (nid != LNET_NID_ANY) {
1184 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1186 write_lock(&kmxlnd_data.kmx_peers_lock);
1187 if (nid != LNET_NID_ANY) {
1191 mxlnd_peer_decref(peer); /* and drops it */
1192 mxlnd_del_peer_locked(peer);
1194 } else { /* LNET_NID_ANY */
1195 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1196 list_for_each_entry_safe(peer, next,
1197 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1198 mxlnd_del_peer_locked(peer);
1202 write_unlock(&kmxlnd_data.kmx_peers_lock);
1208 mxlnd_get_conn_by_idx(int index)
1211 struct kmx_peer *peer = NULL;
1212 struct kmx_conn *conn = NULL;
1214 read_lock(&kmxlnd_data.kmx_peers_lock);
1215 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1216 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1217 spin_lock(&peer->mxp_lock);
1218 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1223 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1224 spin_unlock(&peer->mxp_lock);
1225 read_unlock(&kmxlnd_data.kmx_peers_lock);
1228 spin_unlock(&peer->mxp_lock);
1231 read_unlock(&kmxlnd_data.kmx_peers_lock);
1237 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1239 struct kmx_conn *conn = NULL;
1240 struct kmx_conn *next = NULL;
1242 spin_lock(&peer->mxp_lock);
1243 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1244 mxlnd_conn_disconnect(conn, 0 , 0);
1246 spin_unlock(&peer->mxp_lock);
1251 mxlnd_close_matching_conns(lnet_nid_t nid)
1255 struct kmx_peer *peer = NULL;
1257 read_lock(&kmxlnd_data.kmx_peers_lock);
1258 if (nid != LNET_NID_ANY) {
1259 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1263 mxlnd_close_matching_conns_locked(peer);
1264 mxlnd_peer_decref(peer); /* and drops it here */
1266 } else { /* LNET_NID_ANY */
1267 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1268 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1269 mxlnd_close_matching_conns_locked(peer);
1272 read_unlock(&kmxlnd_data.kmx_peers_lock);
1278 * mxlnd_ctl - modify MXLND parameters
1279 * @ni - LNET interface handle
1280 * @cmd - command to change
1281 * @arg - the ioctl data
1283 * Not implemented yet.
1286 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1288 struct libcfs_ioctl_data *data = arg;
1291 LASSERT (ni == kmxlnd_data.kmx_ni);
1294 case IOC_LIBCFS_GET_PEER: {
1298 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1299 data->ioc_nid = nid;
1300 data->ioc_count = count;
1303 case IOC_LIBCFS_DEL_PEER: {
1304 ret = mxlnd_del_peer(data->ioc_nid);
1307 case IOC_LIBCFS_GET_CONN: {
1308 struct kmx_conn *conn = NULL;
1310 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1315 data->ioc_nid = conn->mxk_peer->mxp_nid;
1316 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1320 case IOC_LIBCFS_CLOSE_CONNECTION: {
1321 ret = mxlnd_close_matching_conns(data->ioc_nid);
1325 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1333 * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1336 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1339 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1341 u8 msg_type = tx->mxc_msg_type;
1342 //struct kmx_peer *peer = tx->mxc_peer;
1343 struct kmx_conn *conn = tx->mxc_conn;
1345 LASSERT (msg_type != 0);
1346 LASSERT (tx->mxc_nid != 0);
1347 LASSERT (tx->mxc_peer != NULL);
1348 LASSERT (tx->mxc_conn != NULL);
1350 tx->mxc_incarnation = conn->mxk_incarnation;
1352 if (msg_type != MXLND_MSG_PUT_DATA &&
1353 msg_type != MXLND_MSG_GET_DATA) {
1355 if (mxlnd_tx_requires_credit(tx)) {
1356 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1357 conn->mxk_ntx_msgs++;
1358 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1359 msg_type == MXLND_MSG_CONN_ACK) {
1360 /* put conn msgs at the front of the queue */
1361 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1363 /* PUT_ACK, PUT_NAK */
1364 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1365 conn->mxk_ntx_msgs++;
1369 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1370 conn->mxk_ntx_data++;
1377 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1380 * Add the tx to the peer's msg or data queue
1383 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1385 LASSERT(tx->mxc_peer != NULL);
1386 LASSERT(tx->mxc_conn != NULL);
1387 spin_lock(&tx->mxc_conn->mxk_lock);
1388 mxlnd_peer_queue_tx_locked(tx);
1389 spin_unlock(&tx->mxc_conn->mxk_lock);
1395 * mxlnd_queue_tx - add the tx to the global tx queue
1398 * Add the tx to the global queue and up the tx_queue_sem
1401 mxlnd_queue_tx(struct kmx_ctx *tx)
1403 struct kmx_peer *peer = tx->mxc_peer;
1404 LASSERT (tx->mxc_nid != 0);
1407 if (peer->mxp_incompatible &&
1408 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1409 /* let this fail now */
1410 tx->mxc_status.code = -ECONNABORTED;
1411 mxlnd_conn_decref(peer->mxp_conn);
1412 mxlnd_put_idle_tx(tx);
1415 if (tx->mxc_conn == NULL) {
1417 struct kmx_conn *conn = NULL;
1419 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1421 tx->mxc_status.code = ret;
1422 mxlnd_put_idle_tx(tx);
1425 tx->mxc_conn = conn;
1426 mxlnd_peer_decref(peer); /* and takes it from peer */
1428 LASSERT(tx->mxc_conn != NULL);
1429 mxlnd_peer_queue_tx(tx);
1430 mxlnd_check_sends(peer);
1432 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1433 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1434 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1435 up(&kmxlnd_data.kmx_tx_queue_sem);
1442 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1449 int first_iov_offset = 0;
1450 int first_found = 0;
1452 int last_iov_length = 0;
1453 mx_ksegment_t *seg = NULL;
1455 if (niov == 0) return 0;
1456 LASSERT(iov != NULL);
1458 for (i = 0; i < niov; i++) {
1459 sum = old_sum + (u32) iov[i].iov_len;
1460 if (!first_found && (sum > offset)) {
1462 first_iov_offset = offset - old_sum;
1464 sum = (u32) iov[i].iov_len - first_iov_offset;
1469 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1470 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1475 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1476 nseg = last_iov - first_iov + 1;
1479 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1481 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1484 memset(seg, 0, nseg * sizeof(*seg));
1485 ctx->mxc_nseg = nseg;
1487 for (i = 0; i < nseg; i++) {
1488 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1489 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1491 seg[i].segment_ptr += (u64) first_iov_offset;
1492 seg[i].segment_length -= (u32) first_iov_offset;
1494 if (i == (nseg - 1)) {
1495 seg[i].segment_length = (u32) last_iov_length;
1497 sum += seg[i].segment_length;
1499 ctx->mxc_seg_list = seg;
1500 ctx->mxc_pin_type = MX_PIN_KERNEL;
1501 #ifdef MX_PIN_FULLPAGES
1502 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1504 LASSERT(nob == sum);
1509 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1515 int first_kiov = -1;
1516 int first_kiov_offset = 0;
1517 int first_found = 0;
1519 int last_kiov_length = 0;
1520 mx_ksegment_t *seg = NULL;
1522 if (niov == 0) return 0;
1523 LASSERT(kiov != NULL);
1525 for (i = 0; i < niov; i++) {
1526 sum = old_sum + kiov[i].kiov_len;
1527 if (i == 0) sum -= kiov[i].kiov_offset;
1528 if (!first_found && (sum > offset)) {
1530 first_kiov_offset = offset - old_sum;
1531 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1532 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1534 sum = kiov[i].kiov_len - first_kiov_offset;
1539 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1540 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1545 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1546 nseg = last_kiov - first_kiov + 1;
1549 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1551 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1554 memset(seg, 0, niov * sizeof(*seg));
1555 ctx->mxc_nseg = niov;
1557 for (i = 0; i < niov; i++) {
1558 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1559 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1561 seg[i].segment_ptr += (u64) first_kiov_offset;
1562 /* we have to add back the original kiov_offset */
1563 seg[i].segment_length -= first_kiov_offset +
1564 kiov[first_kiov].kiov_offset;
1566 if (i == (nseg - 1)) {
1567 seg[i].segment_length = last_kiov_length;
1569 sum += seg[i].segment_length;
1571 ctx->mxc_seg_list = seg;
1572 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1573 #ifdef MX_PIN_FULLPAGES
1574 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1576 LASSERT(nob == sum);
1581 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1583 LASSERT(type == MXLND_MSG_PUT_ACK);
1584 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1585 tx->mxc_cookie = cookie;
1586 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1587 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1588 tx->mxc_match = mxlnd_create_match(tx, status);
1595 * mxlnd_send_data - get tx, map [k]iov, queue tx
1602 * This setups the DATA send for PUT or GET.
1604 * On success, it queues the tx, on failure it calls lnet_finalize()
1607 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1610 lnet_process_id_t target = lntmsg->msg_target;
1611 unsigned int niov = lntmsg->msg_niov;
1612 struct iovec *iov = lntmsg->msg_iov;
1613 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1614 unsigned int offset = lntmsg->msg_offset;
1615 unsigned int nob = lntmsg->msg_len;
1616 struct kmx_ctx *tx = NULL;
1618 LASSERT(lntmsg != NULL);
1619 LASSERT(peer != NULL);
1620 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1621 LASSERT((cookie>>52) == 0);
1623 tx = mxlnd_get_idle_tx();
1625 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1626 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1627 libcfs_nid2str(target.nid));
1630 tx->mxc_nid = target.nid;
1631 /* NOTE called when we have a ref on the conn, get one for this tx */
1632 mxlnd_conn_addref(peer->mxp_conn);
1633 tx->mxc_peer = peer;
1634 tx->mxc_conn = peer->mxp_conn;
1635 tx->mxc_msg_type = msg_type;
1636 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1637 tx->mxc_state = MXLND_CTX_PENDING;
1638 tx->mxc_lntmsg[0] = lntmsg;
1639 tx->mxc_cookie = cookie;
1640 tx->mxc_match = mxlnd_create_match(tx, 0);
1642 /* This setups up the mx_ksegment_t to send the DATA payload */
1644 /* do not setup the segments */
1645 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1646 "to %s?\n", libcfs_nid2str(target.nid));
1648 } else if (kiov == NULL) {
1649 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1651 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1654 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1655 libcfs_nid2str(target.nid));
1656 tx->mxc_status.code = -EIO;
1663 mxlnd_conn_decref(peer->mxp_conn);
1664 mxlnd_put_idle_tx(tx);
1668 CDEBUG(D_NETERROR, "no tx avail\n");
1669 lnet_finalize(ni, lntmsg, -EIO);
1674 * mxlnd_recv_data - map [k]iov, post rx
1681 * This setups the DATA receive for PUT or GET.
1683 * On success, it returns 0, on failure it returns -1
1686 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1689 lnet_process_id_t target = lntmsg->msg_target;
1690 unsigned int niov = lntmsg->msg_niov;
1691 struct iovec *iov = lntmsg->msg_iov;
1692 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1693 unsigned int offset = lntmsg->msg_offset;
1694 unsigned int nob = lntmsg->msg_len;
1695 mx_return_t mxret = MX_SUCCESS;
1697 /* above assumes MXLND_MSG_PUT_DATA */
1698 if (msg_type == MXLND_MSG_GET_DATA) {
1699 niov = lntmsg->msg_md->md_niov;
1700 iov = lntmsg->msg_md->md_iov.iov;
1701 kiov = lntmsg->msg_md->md_iov.kiov;
1703 nob = lntmsg->msg_md->md_length;
1706 LASSERT(lntmsg != NULL);
1707 LASSERT(rx != NULL);
1708 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1709 LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1711 rx->mxc_msg_type = msg_type;
1712 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1713 rx->mxc_state = MXLND_CTX_PENDING;
1714 rx->mxc_nid = target.nid;
1715 /* if posting a GET_DATA, we may not yet know the peer */
1716 if (rx->mxc_peer != NULL) {
1717 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1719 rx->mxc_lntmsg[0] = lntmsg;
1720 rx->mxc_cookie = cookie;
1721 rx->mxc_match = mxlnd_create_match(rx, 0);
1722 /* This setups up the mx_ksegment_t to receive the DATA payload */
1724 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1726 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1728 if (msg_type == MXLND_MSG_GET_DATA) {
1729 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1730 if (rx->mxc_lntmsg[1] == NULL) {
1731 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1732 libcfs_nid2str(target.nid));
1737 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1738 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1739 libcfs_nid2str(target.nid));
1742 ret = mxlnd_q_pending_ctx(rx);
1746 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1747 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1748 rx->mxc_seg_list, rx->mxc_nseg,
1749 rx->mxc_pin_type, rx->mxc_match,
1750 0xF00FFFFFFFFFFFFFLL, (void *) rx,
1752 if (mxret != MX_SUCCESS) {
1753 if (rx->mxc_conn != NULL) {
1754 mxlnd_deq_pending_ctx(rx);
1756 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1757 (int) mxret, libcfs_nid2str(target.nid));
1765 * mxlnd_send - the LND required send function
1770 * This must not block. Since we may not have a peer struct for the receiver,
1771 * it will append send messages on a global tx list. We will then up the
1772 * tx_queued's semaphore to notify it of the new send.
1775 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1778 int type = lntmsg->msg_type;
1779 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1780 lnet_process_id_t target = lntmsg->msg_target;
1781 lnet_nid_t nid = target.nid;
1782 int target_is_router = lntmsg->msg_target_is_router;
1783 int routing = lntmsg->msg_routing;
1784 unsigned int payload_niov = lntmsg->msg_niov;
1785 struct iovec *payload_iov = lntmsg->msg_iov;
1786 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1787 unsigned int payload_offset = lntmsg->msg_offset;
1788 unsigned int payload_nob = lntmsg->msg_len;
1789 struct kmx_ctx *tx = NULL;
1790 struct kmx_msg *txmsg = NULL;
1791 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1792 struct kmx_ctx *rx_data = NULL;
1793 struct kmx_conn *conn = NULL;
1795 uint32_t length = 0;
1796 struct kmx_peer *peer = NULL;
1798 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1799 payload_nob, payload_niov, libcfs_id2str(target));
1801 LASSERT (payload_nob == 0 || payload_niov > 0);
1802 LASSERT (payload_niov <= LNET_MAX_IOV);
1803 /* payload is either all vaddrs or all pages */
1804 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1806 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1808 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1809 * to a new peer, use the nid */
1810 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1812 if (unlikely(peer->mxp_incompatible)) {
1813 mxlnd_peer_decref(peer); /* drop ref taken above */
1815 spin_lock(&peer->mxp_lock);
1816 conn = peer->mxp_conn;
1818 mxlnd_conn_addref(conn);
1819 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1821 spin_unlock(&peer->mxp_lock);
1824 if (conn == NULL && peer != NULL) {
1825 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1826 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1831 LASSERT (payload_nob == 0);
1834 case LNET_MSG_REPLY:
1836 /* Is the payload small enough not to need DATA? */
1837 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1838 if (nob <= MXLND_EAGER_SIZE)
1839 break; /* send EAGER */
1841 tx = mxlnd_get_idle_tx();
1842 if (unlikely(tx == NULL)) {
1843 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1844 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1845 libcfs_nid2str(nid));
1846 if (conn) mxlnd_conn_decref(conn);
1850 /* the peer may be NULL */
1851 tx->mxc_peer = peer;
1852 tx->mxc_conn = conn; /* may be NULL */
1853 /* we added a conn ref above */
1854 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1855 txmsg = tx->mxc_msg;
1856 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1857 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1858 tx->mxc_match = mxlnd_create_match(tx, 0);
1860 /* we must post a receive _before_ sending the request.
1861 * we need to determine how much to receive, it will be either
1862 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1864 rx = mxlnd_get_idle_rx();
1865 if (unlikely(rx == NULL)) {
1866 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1867 libcfs_nid2str(nid));
1868 mxlnd_put_idle_tx(tx);
1869 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1873 rx->mxc_peer = peer;
1874 /* conn may be NULL but unlikely since the first msg is always small */
1875 /* NOTE no need to lock peer before adding conn ref since we took
1876 * a conn ref for the tx (it cannot be freed between there and here ) */
1877 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1878 rx->mxc_conn = conn;
1879 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1880 rx->mxc_cookie = tx->mxc_cookie;
1881 rx->mxc_match = mxlnd_create_match(rx, 0);
1883 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1884 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1885 if (unlikely(ret != 0)) {
1886 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1887 libcfs_nid2str(nid));
1888 rx->mxc_lntmsg[0] = NULL;
1889 mxlnd_put_idle_rx(rx);
1890 mxlnd_put_idle_tx(tx);
1892 mxlnd_conn_decref(conn); /* for the rx... */
1893 mxlnd_conn_decref(conn); /* and for the tx */
1895 return -EHOSTUNREACH;
1902 if (routing || target_is_router)
1903 break; /* send EAGER */
1905 /* is the REPLY message too small for DATA? */
1906 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1907 if (nob <= MXLND_EAGER_SIZE)
1908 break; /* send EAGER */
1910 /* get tx (we need the cookie) , post rx for incoming DATA,
1911 * then post GET_REQ tx */
1912 tx = mxlnd_get_idle_tx();
1913 if (unlikely(tx == NULL)) {
1914 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1915 libcfs_nid2str(nid));
1916 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1919 rx_data = mxlnd_get_idle_rx();
1920 if (unlikely(rx_data == NULL)) {
1921 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1922 libcfs_nid2str(nid));
1923 mxlnd_put_idle_tx(tx);
1924 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1927 rx_data->mxc_peer = peer;
1928 /* NOTE no need to lock peer before adding conn ref since we took
1929 * a conn ref for the tx (it cannot be freed between there and here ) */
1930 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1931 rx_data->mxc_conn = conn; /* may be NULL */
1933 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1934 if (unlikely(ret != 0)) {
1935 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1936 libcfs_nid2str(nid));
1937 mxlnd_put_idle_rx(rx_data);
1938 mxlnd_put_idle_tx(tx);
1940 mxlnd_conn_decref(conn); /* for the rx_data... */
1941 mxlnd_conn_decref(conn); /* and for the tx */
1946 tx->mxc_peer = peer;
1947 tx->mxc_conn = conn; /* may be NULL */
1948 /* conn ref taken above */
1949 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1950 txmsg = tx->mxc_msg;
1951 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1952 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1953 tx->mxc_match = mxlnd_create_match(tx, 0);
1960 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1966 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1967 <= MXLND_EAGER_SIZE);
1969 tx = mxlnd_get_idle_tx();
1970 if (unlikely(tx == NULL)) {
1971 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1972 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1973 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1977 tx->mxc_peer = peer;
1978 tx->mxc_conn = conn; /* may be NULL */
1979 /* conn ref taken above */
1980 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1981 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1982 tx->mxc_match = mxlnd_create_match(tx, 0);
1984 txmsg = tx->mxc_msg;
1985 txmsg->mxm_u.eager.mxem_hdr = *hdr;
1987 if (payload_kiov != NULL)
1988 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1989 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1990 payload_niov, payload_kiov, payload_offset, payload_nob);
1992 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1993 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1994 payload_niov, payload_iov, payload_offset, payload_nob);
1996 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
2002 * mxlnd_recv - the LND required recv function
2013 * This must not block.
2016 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2017 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2018 unsigned int offset, unsigned int mlen, unsigned int rlen)
2023 struct kmx_ctx *rx = private;
2024 struct kmx_msg *rxmsg = rx->mxc_msg;
2025 lnet_nid_t nid = rx->mxc_nid;
2026 struct kmx_ctx *tx = NULL;
2027 struct kmx_msg *txmsg = NULL;
2028 struct kmx_peer *peer = rx->mxc_peer;
2029 struct kmx_conn *conn = peer->mxp_conn;
2031 int msg_type = rxmsg->mxm_type;
2036 LASSERT (mlen <= rlen);
2037 /* Either all pages or all vaddrs */
2038 LASSERT (!(kiov != NULL && iov != NULL));
2039 LASSERT (peer != NULL);
2041 /* conn_addref(conn) already taken for the primary rx */
2044 case MXLND_MSG_EAGER:
2045 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2046 len = rx->mxc_status.xfer_length;
2047 if (unlikely(nob > len)) {
2048 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2049 libcfs_nid2str(nid), nob, len);
2055 lnet_copy_flat2kiov(niov, kiov, offset,
2056 MXLND_EAGER_SIZE, rxmsg,
2057 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2060 lnet_copy_flat2iov(niov, iov, offset,
2061 MXLND_EAGER_SIZE, rxmsg,
2062 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2068 case MXLND_MSG_PUT_REQ:
2069 /* we are going to reuse the rx, store the needed info */
2070 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2072 /* get tx, post rx, send PUT_ACK */
2074 tx = mxlnd_get_idle_tx();
2075 if (unlikely(tx == NULL)) {
2076 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2077 /* Not replying will break the connection */
2081 if (unlikely(mlen == 0)) {
2083 tx->mxc_peer = peer;
2084 tx->mxc_conn = conn;
2085 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2090 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2091 tx->mxc_peer = peer;
2092 tx->mxc_conn = conn;
2093 /* no need to lock peer first since we already have a ref */
2094 mxlnd_conn_addref(conn); /* for the tx */
2095 txmsg = tx->mxc_msg;
2096 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2097 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2098 tx->mxc_cookie = cookie;
2099 tx->mxc_match = mxlnd_create_match(tx, 0);
2101 /* we must post a receive _before_ sending the PUT_ACK */
2103 rx->mxc_state = MXLND_CTX_PREP;
2104 rx->mxc_peer = peer;
2105 rx->mxc_conn = conn;
2106 /* do not take another ref for this rx, it is already taken */
2107 rx->mxc_nid = peer->mxp_nid;
2108 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2109 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2111 if (unlikely(ret != 0)) {
2112 /* Notify peer that it's over */
2113 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2114 libcfs_nid2str(nid), ret);
2116 tx->mxc_state = MXLND_CTX_PREP;
2117 tx->mxc_peer = peer;
2118 tx->mxc_conn = conn;
2119 /* finalize = 0, let the PUT_ACK tx finalize this */
2120 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2121 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2122 /* conn ref already taken above */
2123 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2129 /* do not return a credit until after PUT_DATA returns */
2133 case MXLND_MSG_GET_REQ:
2134 if (likely(lntmsg != NULL)) {
2135 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2136 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2138 /* GET didn't match anything */
2139 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2140 * We have to embed the error code in the match bits.
2141 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2142 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2144 tx = mxlnd_get_idle_tx();
2145 if (unlikely(tx == NULL)) {
2146 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2147 libcfs_nid2str(nid));
2151 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2152 tx->mxc_state = MXLND_CTX_PENDING;
2154 tx->mxc_peer = peer;
2155 tx->mxc_conn = conn;
2156 /* no need to lock peer first since we already have a ref */
2157 mxlnd_conn_addref(conn); /* for this tx */
2158 tx->mxc_cookie = cookie;
2159 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2160 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2163 /* finalize lntmsg after tx completes */
2171 /* we received a message, increment peer's outstanding credits */
2173 spin_lock(&conn->mxk_lock);
2174 conn->mxk_outstanding++;
2175 spin_unlock(&conn->mxk_lock);
2177 /* we are done with the rx */
2178 mxlnd_put_idle_rx(rx);
2179 mxlnd_conn_decref(conn);
2182 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2184 /* we received a credit, see if we can use it to send a msg */
2185 if (credit) mxlnd_check_sends(peer);
2191 mxlnd_sleep(unsigned long timeout)
2193 set_current_state(TASK_INTERRUPTIBLE);
2194 schedule_timeout(timeout);
2199 * mxlnd_tx_queued - the generic send queue thread
2200 * @arg - thread id (as a void *)
2202 * This thread moves send messages from the global tx_queue to the owning
2203 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2204 * it to the global peer list.
2207 mxlnd_tx_queued(void *arg)
2209 long id = (long) arg;
2212 struct kmx_ctx *tx = NULL;
2213 struct kmx_peer *peer = NULL;
2214 struct list_head *tmp_tx = NULL;
2216 cfs_daemonize("mxlnd_tx_queued");
2217 //cfs_block_allsigs();
2219 while (!kmxlnd_data.kmx_shutdown) {
2220 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2221 if (kmxlnd_data.kmx_shutdown)
2223 if (ret != 0) // Should we check for -EINTR?
2225 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2226 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2227 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2230 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2231 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2232 list_del_init(&tx->mxc_list);
2233 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2236 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2238 tx->mxc_peer = peer;
2239 spin_lock(&peer->mxp_lock);
2240 if (peer->mxp_conn == NULL) {
2241 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2243 /* out of memory, give up and fail tx */
2244 tx->mxc_status.code = -ENOMEM;
2245 spin_unlock(&peer->mxp_lock);
2246 mxlnd_peer_decref(peer);
2247 mxlnd_put_idle_tx(tx);
2251 tx->mxc_conn = peer->mxp_conn;
2252 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2253 spin_unlock(&peer->mxp_lock);
2254 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2260 struct kmx_peer *peer = NULL;
2261 struct kmx_peer *old = NULL;
2263 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2265 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2266 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2268 /* adds conn ref for this function */
2269 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2271 /* finalize message */
2272 tx->mxc_status.code = ret;
2273 mxlnd_put_idle_tx(tx);
2276 tx->mxc_peer = peer;
2277 tx->mxc_conn = peer->mxp_conn;
2278 /* this tx will keep the conn ref taken in peer_alloc() */
2280 /* add peer to global peer list, but look to see
2281 * if someone already created it after we released
2283 write_lock(&kmxlnd_data.kmx_peers_lock);
2284 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2285 if (old->mxp_nid == peer->mxp_nid) {
2286 /* somebody beat us here, we created a duplicate */
2293 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2294 atomic_inc(&kmxlnd_data.kmx_npeers);
2297 spin_lock(&old->mxp_lock);
2298 tx->mxc_conn = old->mxp_conn;
2299 /* FIXME can conn be NULL? */
2300 LASSERT(old->mxp_conn != NULL);
2301 mxlnd_conn_addref(old->mxp_conn);
2302 spin_unlock(&old->mxp_lock);
2303 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2304 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2305 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2306 mxlnd_peer_decref(peer);
2308 write_unlock(&kmxlnd_data.kmx_peers_lock);
2313 mxlnd_thread_stop(id);
2317 /* When calling this, we must not have the peer lock. */
2319 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2321 mx_return_t mxret = MX_SUCCESS;
2322 mx_request_t request;
2323 struct kmx_conn *conn = peer->mxp_conn;
2325 /* NOTE we are holding a conn ref every time we call this function,
2326 * we do not need to lock the peer before taking another ref */
2327 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2329 LASSERT(mask == MXLND_MASK_ICON_REQ ||
2330 mask == MXLND_MASK_ICON_ACK);
2332 if (peer->mxp_reconnect_time == 0) {
2333 peer->mxp_reconnect_time = jiffies;
2336 if (peer->mxp_nic_id == 0LL) {
2337 mxlnd_peer_hostname_to_nic_id(peer);
2338 if (peer->mxp_nic_id == 0LL) {
2339 /* not mapped yet, return */
2340 spin_lock(&conn->mxk_lock);
2341 conn->mxk_status = MXLND_CONN_INIT;
2342 spin_unlock(&conn->mxk_lock);
2343 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2344 /* give up and notify LNET */
2345 mxlnd_conn_disconnect(conn, 0, 1);
2346 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2348 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2351 mxlnd_conn_decref(conn);
2356 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2357 peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2358 (void *) peer, &request);
2359 if (unlikely(mxret != MX_SUCCESS)) {
2360 spin_lock(&conn->mxk_lock);
2361 conn->mxk_status = MXLND_CONN_FAIL;
2362 spin_unlock(&conn->mxk_lock);
2363 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2364 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2365 mxlnd_conn_decref(conn);
2370 #define MXLND_STATS 0
2373 mxlnd_check_sends(struct kmx_peer *peer)
2377 mx_return_t mxret = MX_SUCCESS;
2378 struct kmx_ctx *tx = NULL;
2379 struct kmx_conn *conn = NULL;
2386 static unsigned long last = 0;
2389 if (unlikely(peer == NULL)) {
2390 LASSERT(peer != NULL);
2393 spin_lock(&peer->mxp_lock);
2394 conn = peer->mxp_conn;
2395 /* NOTE take a ref for the duration of this function since it is called
2396 * when there might not be any queued txs for this peer */
2397 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2398 spin_unlock(&peer->mxp_lock);
2400 /* do not add another ref for this tx */
2403 /* we do not have any conns */
2408 if (time_after(jiffies, last)) {
2409 last = jiffies + HZ;
2410 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2411 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2412 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2413 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2414 conn->mxk_ntx_data, conn->mxk_data_posted);
2418 /* cache peer state for asserts */
2419 spin_lock(&conn->mxk_lock);
2420 ntx_posted = conn->mxk_ntx_posted;
2421 credits = conn->mxk_credits;
2422 spin_unlock(&conn->mxk_lock);
2424 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2425 LASSERT(ntx_posted >= 0);
2427 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2428 LASSERT(credits >= 0);
2430 /* check number of queued msgs, ignore data */
2431 spin_lock(&conn->mxk_lock);
2432 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2433 /* check if any txs queued that could return credits... */
2434 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2435 /* if not, send a NOOP */
2436 tx = mxlnd_get_idle_tx();
2437 if (likely(tx != NULL)) {
2438 tx->mxc_peer = peer;
2439 tx->mxc_conn = peer->mxp_conn;
2440 mxlnd_conn_addref(conn); /* for this tx */
2441 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2442 tx->mxc_match = mxlnd_create_match(tx, 0);
2443 mxlnd_peer_queue_tx_locked(tx);
2449 spin_unlock(&conn->mxk_lock);
2451 /* if the peer is not ready, try to connect */
2452 spin_lock(&conn->mxk_lock);
2453 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2454 conn->mxk_status == MXLND_CONN_FAIL ||
2455 conn->mxk_status == MXLND_CONN_REQ)) {
2456 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2457 conn->mxk_status = MXLND_CONN_WAIT;
2458 spin_unlock(&conn->mxk_lock);
2459 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2462 spin_unlock(&conn->mxk_lock);
2464 spin_lock(&conn->mxk_lock);
2465 while (!list_empty(&conn->mxk_tx_free_queue) ||
2466 !list_empty(&conn->mxk_tx_credit_queue)) {
2467 /* We have something to send. If we have a queued tx that does not
2468 * require a credit (free), choose it since its completion will
2469 * return a credit (here or at the peer), complete a DATA or
2470 * CONN_REQ or CONN_ACK. */
2471 struct list_head *tmp_tx = NULL;
2472 if (!list_empty(&conn->mxk_tx_free_queue)) {
2473 tmp_tx = &conn->mxk_tx_free_queue;
2475 tmp_tx = &conn->mxk_tx_credit_queue;
2477 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2479 msg_type = tx->mxc_msg_type;
2481 /* don't try to send a rx */
2482 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2484 /* ensure that it is a valid msg type */
2485 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2486 msg_type == MXLND_MSG_CONN_ACK ||
2487 msg_type == MXLND_MSG_NOOP ||
2488 msg_type == MXLND_MSG_EAGER ||
2489 msg_type == MXLND_MSG_PUT_REQ ||
2490 msg_type == MXLND_MSG_PUT_ACK ||
2491 msg_type == MXLND_MSG_PUT_DATA ||
2492 msg_type == MXLND_MSG_GET_REQ ||
2493 msg_type == MXLND_MSG_GET_DATA);
2494 LASSERT(tx->mxc_peer == peer);
2495 LASSERT(tx->mxc_nid == peer->mxp_nid);
2497 credit = mxlnd_tx_requires_credit(tx);
2500 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2501 CDEBUG(D_NET, "%s: posted enough\n",
2502 libcfs_nid2str(peer->mxp_nid));
2506 if (conn->mxk_credits == 0) {
2507 CDEBUG(D_NET, "%s: no credits\n",
2508 libcfs_nid2str(peer->mxp_nid));
2512 if (conn->mxk_credits == 1 && /* last credit reserved for */
2513 conn->mxk_outstanding == 0) { /* giving back credits */
2514 CDEBUG(D_NET, "%s: not using last credit\n",
2515 libcfs_nid2str(peer->mxp_nid));
2520 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2521 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2522 msg_type == MXLND_MSG_CONN_ACK)) {
2523 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2524 mxlnd_connstatus_to_str(conn->mxk_status),
2526 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2527 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2528 list_del_init(&tx->mxc_list);
2529 tx->mxc_status.code = -ECONNABORTED;
2530 mxlnd_put_idle_tx(tx);
2531 mxlnd_conn_decref(conn);
2537 list_del_init(&tx->mxc_list);
2539 /* handle credits, etc now while we have the lock to avoid races */
2541 conn->mxk_credits--;
2542 conn->mxk_ntx_posted++;
2544 if (msg_type != MXLND_MSG_PUT_DATA &&
2545 msg_type != MXLND_MSG_GET_DATA) {
2546 if (msg_type != MXLND_MSG_CONN_REQ &&
2547 msg_type != MXLND_MSG_CONN_ACK) {
2548 conn->mxk_ntx_msgs--;
2551 if (tx->mxc_incarnation == 0 &&
2552 conn->mxk_incarnation != 0) {
2553 tx->mxc_incarnation = conn->mxk_incarnation;
2555 spin_unlock(&conn->mxk_lock);
2557 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2558 * or (2) there is a non-DATA msg that can return credits in the
2559 * queue, then drop this duplicate NOOP */
2560 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2561 spin_lock(&conn->mxk_lock);
2562 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2563 (conn->mxk_ntx_msgs >= 1)) {
2564 conn->mxk_credits++;
2565 conn->mxk_ntx_posted--;
2566 spin_unlock(&conn->mxk_lock);
2567 /* redundant NOOP */
2568 mxlnd_put_idle_tx(tx);
2569 mxlnd_conn_decref(conn);
2570 CDEBUG(D_NET, "%s: redundant noop\n",
2571 libcfs_nid2str(peer->mxp_nid));
2575 spin_unlock(&conn->mxk_lock);
2579 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2580 (msg_type != MXLND_MSG_GET_DATA))) {
2584 //ret = -ECONNABORTED;
2587 spin_lock(&conn->mxk_lock);
2588 status = conn->mxk_status;
2589 spin_unlock(&conn->mxk_lock);
2591 if (likely((status == MXLND_CONN_READY) ||
2592 (msg_type == MXLND_MSG_CONN_REQ) ||
2593 (msg_type == MXLND_MSG_CONN_ACK))) {
2595 if (msg_type != MXLND_MSG_CONN_REQ &&
2596 msg_type != MXLND_MSG_CONN_ACK) {
2597 /* add to the pending list */
2598 ret = mxlnd_q_pending_ctx(tx);
2600 /* FIXME the conn is disconnected, now what? */
2604 tx->mxc_state = MXLND_CTX_PENDING;
2608 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2609 msg_type != MXLND_MSG_GET_DATA)) {
2610 /* send a msg style tx */
2611 LASSERT(tx->mxc_nseg == 1);
2612 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2613 CDEBUG(D_NET, "sending %s 0x%llx\n",
2614 mxlnd_msgtype_to_str(msg_type),
2616 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2625 /* send a DATA tx */
2626 spin_lock(&conn->mxk_lock);
2627 conn->mxk_ntx_data--;
2628 conn->mxk_data_posted++;
2629 spin_unlock(&conn->mxk_lock);
2630 CDEBUG(D_NET, "sending %s 0x%llx\n",
2631 mxlnd_msgtype_to_str(msg_type),
2633 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2643 mxret = MX_CONNECTION_FAILED;
2645 if (likely(mxret == MX_SUCCESS)) {
2648 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2649 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2650 libcfs_nid2str(peer->mxp_nid));
2651 /* NOTE mx_kisend() only fails if there are not enough
2652 * resources. Do not change the connection status. */
2653 if (mxret == MX_NO_RESOURCES) {
2654 tx->mxc_status.code = -ENOMEM;
2656 tx->mxc_status.code = -ECONNABORTED;
2659 spin_lock(&conn->mxk_lock);
2660 conn->mxk_ntx_posted--;
2661 conn->mxk_credits++;
2662 spin_unlock(&conn->mxk_lock);
2663 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2664 msg_type == MXLND_MSG_GET_DATA) {
2665 spin_lock(&conn->mxk_lock);
2666 conn->mxk_data_posted--;
2667 spin_unlock(&conn->mxk_lock);
2669 if (msg_type != MXLND_MSG_PUT_DATA &&
2670 msg_type != MXLND_MSG_GET_DATA &&
2671 msg_type != MXLND_MSG_CONN_REQ &&
2672 msg_type != MXLND_MSG_CONN_ACK) {
2673 spin_lock(&conn->mxk_lock);
2674 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2675 spin_unlock(&conn->mxk_lock);
2677 if (msg_type != MXLND_MSG_CONN_REQ &&
2678 msg_type != MXLND_MSG_CONN_ACK) {
2679 /* remove from the pending list */
2680 mxlnd_deq_pending_ctx(tx);
2682 mxlnd_put_idle_tx(tx);
2683 mxlnd_conn_decref(conn);
2686 spin_lock(&conn->mxk_lock);
2689 spin_unlock(&conn->mxk_lock);
2691 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2697 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2698 * @ctx - the tx descriptor
2700 * Determine which type of send request it was and start the next step, if needed,
2701 * or, if done, signal completion to LNET. After we are done, put back on the
2705 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2707 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2708 struct kmx_msg *msg = tx->mxc_msg;
2709 struct kmx_peer *peer = tx->mxc_peer;
2710 struct kmx_conn *conn = tx->mxc_conn;
2711 u8 type = tx->mxc_msg_type;
2712 int credit = mxlnd_tx_requires_credit(tx);
2713 u64 cookie = tx->mxc_cookie;
2715 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2716 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2718 if (unlikely(conn == NULL)) {
2719 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2720 conn = peer->mxp_conn;
2722 /* do not add a ref for the tx, it was set before sending */
2723 tx->mxc_conn = conn;
2724 tx->mxc_peer = conn->mxk_peer;
2727 LASSERT (peer != NULL);
2728 LASSERT (conn != NULL);
2730 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2731 LASSERT (type == msg->mxm_type);
2735 tx->mxc_status.code = -EIO;
2737 spin_lock(&conn->mxk_lock);
2738 conn->mxk_last_tx = jiffies;
2739 spin_unlock(&conn->mxk_lock);
2744 case MXLND_MSG_GET_DATA:
2745 spin_lock(&conn->mxk_lock);
2746 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2747 conn->mxk_outstanding++;
2748 conn->mxk_data_posted--;
2750 spin_unlock(&conn->mxk_lock);
2753 case MXLND_MSG_PUT_DATA:
2754 spin_lock(&conn->mxk_lock);
2755 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2756 conn->mxk_data_posted--;
2758 spin_unlock(&conn->mxk_lock);
2761 case MXLND_MSG_NOOP:
2762 case MXLND_MSG_PUT_REQ:
2763 case MXLND_MSG_PUT_ACK:
2764 case MXLND_MSG_GET_REQ:
2765 case MXLND_MSG_EAGER:
2766 //case MXLND_MSG_NAK:
2769 case MXLND_MSG_CONN_ACK:
2770 if (peer->mxp_incompatible) {
2771 /* we sent our params, now close this conn */
2772 mxlnd_conn_disconnect(conn, 0, 1);
2774 case MXLND_MSG_CONN_REQ:
2776 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2777 "failed with %s (%d) to %s\n",
2778 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2779 mx_strstatus(tx->mxc_status.code),
2780 tx->mxc_status.code,
2781 libcfs_nid2str(tx->mxc_nid));
2782 if (!peer->mxp_incompatible) {
2783 spin_lock(&conn->mxk_lock);
2784 conn->mxk_status = MXLND_CONN_FAIL;
2785 spin_unlock(&conn->mxk_lock);
2791 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2796 spin_lock(&conn->mxk_lock);
2797 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2798 conn->mxk_ntx_posted--;
2800 spin_unlock(&conn->mxk_lock);
2803 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2804 mxlnd_put_idle_tx(tx);
2805 mxlnd_conn_decref(conn);
2807 mxlnd_check_sends(peer);
2813 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2818 u32 nob = rx->mxc_status.xfer_length;
2819 u64 bits = rx->mxc_status.match_info;
2820 struct kmx_msg *msg = rx->mxc_msg;
2821 struct kmx_peer *peer = rx->mxc_peer;
2822 struct kmx_conn *conn = rx->mxc_conn;
2823 u8 type = rx->mxc_msg_type;
2825 lnet_msg_t *lntmsg[2];
2831 int incompatible = 0;
2833 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2834 * failed GET reply, CONN_REQ, or a CONN_ACK */
2836 /* NOTE peer may still be NULL if it is a new peer and
2837 * conn may be NULL if this is a re-connect */
2838 if (likely(peer != NULL && conn != NULL)) {
2839 /* we have a reference on the conn */
2841 } else if (peer != NULL && conn == NULL) {
2842 /* we have a reference on the peer */
2844 } else if (peer == NULL && conn != NULL) {
2846 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2848 } /* else peer and conn == NULL */
2851 if (peer == NULL || conn == NULL) {
2852 /* if the peer was disconnected, the peer may exist but
2853 * not have any valid conns */
2854 decref = 0; /* no peer means no ref was taken for this rx */
2858 if (conn == NULL && peer != NULL) {
2859 spin_lock(&peer->mxp_lock);
2860 conn = peer->mxp_conn;
2862 mxlnd_conn_addref(conn); /* conn takes ref... */
2863 mxlnd_peer_decref(peer); /* from peer */
2867 spin_unlock(&peer->mxp_lock);
2868 rx->mxc_conn = conn;
2872 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2878 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2879 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2880 libcfs_nid2str(rx->mxc_nid),
2881 mx_strstatus(rx->mxc_status.code),
2882 (int) rx->mxc_status.code);
2888 /* this may be a failed GET reply */
2889 if (type == MXLND_MSG_GET_DATA) {
2890 bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2891 ret = (u32) (bits>>52);
2892 lntmsg[0] = rx->mxc_lntmsg[0];
2896 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2897 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2898 libcfs_nid2str(rx->mxc_nid));
2903 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2904 if (type == MXLND_MSG_PUT_DATA) {
2905 result = rx->mxc_status.code;
2906 lntmsg[0] = rx->mxc_lntmsg[0];
2908 } else if (type == MXLND_MSG_GET_DATA) {
2909 result = rx->mxc_status.code;
2910 lntmsg[0] = rx->mxc_lntmsg[0];
2911 lntmsg[1] = rx->mxc_lntmsg[1];
2915 ret = mxlnd_unpack_msg(msg, nob);
2917 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2918 ret, libcfs_nid2str(rx->mxc_nid));
2922 type = msg->mxm_type;
2925 if (type != MXLND_MSG_CONN_REQ &&
2926 (rx->mxc_nid != msg->mxm_srcnid ||
2927 kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2928 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2929 "0x%llx and rx msg dst is 0x%llx)\n",
2930 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2935 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2936 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2937 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2939 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2940 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2941 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2942 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2943 msg->mxm_srcstamp, conn->mxk_incarnation,
2944 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2946 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2947 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2948 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2949 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2956 CDEBUG(D_NET, "Received %s with %d credits\n",
2957 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2959 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2960 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2961 LASSERT(peer != NULL);
2962 LASSERT(conn != NULL);
2963 if (msg->mxm_credits != 0) {
2964 spin_lock(&conn->mxk_lock);
2965 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2966 if ((conn->mxk_credits + msg->mxm_credits) >
2967 *kmxlnd_tunables.kmx_credits) {
2968 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2969 conn->mxk_credits, msg->mxm_credits);
2971 conn->mxk_credits += msg->mxm_credits;
2972 LASSERT(conn->mxk_credits >= 0);
2973 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2975 spin_unlock(&conn->mxk_lock);
2979 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2981 case MXLND_MSG_NOOP:
2984 case MXLND_MSG_EAGER:
2985 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2986 msg->mxm_srcnid, rx, 0);
2990 case MXLND_MSG_PUT_REQ:
2991 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2992 msg->mxm_srcnid, rx, 1);
2996 case MXLND_MSG_PUT_ACK: {
2997 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2998 if (cookie > MXLND_MAX_COOKIE) {
2999 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3000 libcfs_nid2str(rx->mxc_nid));
3001 result = -((cookie >> 52) & 0xff);
3002 lntmsg[0] = rx->mxc_lntmsg[0];
3004 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3005 rx->mxc_peer, MXLND_MSG_PUT_DATA,
3006 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3011 case MXLND_MSG_GET_REQ:
3012 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3013 msg->mxm_srcnid, rx, 1);
3017 case MXLND_MSG_CONN_REQ:
3018 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3019 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3020 libcfs_nid2str(msg->mxm_srcnid),
3021 libcfs_nid2str(msg->mxm_dstnid));
3024 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3025 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3027 libcfs_nid2str(msg->mxm_srcnid),
3028 msg->mxm_u.conn_req.mxcrm_queue_depth,
3029 *kmxlnd_tunables.kmx_credits);
3032 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3033 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3035 libcfs_nid2str(msg->mxm_srcnid),
3036 msg->mxm_u.conn_req.mxcrm_eager_size,
3037 (int) MXLND_EAGER_SIZE);
3041 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3044 struct kmx_peer *existing_peer = NULL;
3045 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3047 mx_decompose_endpoint_addr(rx->mxc_status.source,
3049 rx->mxc_nid = msg->mxm_srcnid;
3051 /* adds conn ref for peer and one for this function */
3052 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3056 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3057 write_lock(&kmxlnd_data.kmx_peers_lock);
3058 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3059 if (existing_peer) {
3060 mxlnd_conn_decref(peer->mxp_conn);
3061 mxlnd_peer_decref(peer);
3062 peer = existing_peer;
3063 mxlnd_conn_addref(peer->mxp_conn);
3065 list_add_tail(&peer->mxp_peers,
3066 &kmxlnd_data.kmx_peers[hash]);
3067 write_unlock(&kmxlnd_data.kmx_peers_lock);
3068 atomic_inc(&kmxlnd_data.kmx_npeers);
3071 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3072 mxlnd_peer_decref(peer); /* drop ref taken above */
3074 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3078 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3079 conn = peer->mxp_conn;
3081 struct kmx_conn *old_conn = conn;
3083 /* do not call mx_disconnect() */
3084 mxlnd_conn_disconnect(old_conn, 0, 0);
3086 /* the ref for this rx was taken on the old_conn */
3087 mxlnd_conn_decref(old_conn);
3089 /* This allocs a conn, points peer->mxp_conn to this one.
3090 * The old conn is still on the peer->mxp_conns list.
3091 * As the pending requests complete, they will call
3092 * conn_decref() which will eventually free it. */
3093 ret = mxlnd_conn_alloc(&conn, peer);
3095 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3098 /* conn_alloc() adds one ref for the peer and one for this function */
3101 spin_lock(&peer->mxp_lock);
3102 peer->mxp_incarnation = msg->mxm_srcstamp;
3103 peer->mxp_incompatible = incompatible;
3104 spin_unlock(&peer->mxp_lock);
3105 spin_lock(&conn->mxk_lock);
3106 conn->mxk_incarnation = msg->mxm_srcstamp;
3107 conn->mxk_status = MXLND_CONN_WAIT;
3108 spin_unlock(&conn->mxk_lock);
3110 /* handle_conn_ack() will create the CONN_ACK msg */
3111 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3115 case MXLND_MSG_CONN_ACK:
3116 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3117 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3118 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3119 libcfs_nid2str(msg->mxm_dstnid));
3123 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3124 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3125 "incompatible queue depth %d (%d wanted)\n",
3126 libcfs_nid2str(msg->mxm_srcnid),
3127 msg->mxm_u.conn_req.mxcrm_queue_depth,
3128 *kmxlnd_tunables.kmx_credits);
3129 spin_lock(&conn->mxk_lock);
3130 conn->mxk_status = MXLND_CONN_FAIL;
3131 spin_unlock(&conn->mxk_lock);
3135 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3136 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3137 "incompatible EAGER size %d (%d wanted)\n",
3138 libcfs_nid2str(msg->mxm_srcnid),
3139 msg->mxm_u.conn_req.mxcrm_eager_size,
3140 (int) MXLND_EAGER_SIZE);
3141 spin_lock(&conn->mxk_lock);
3142 conn->mxk_status = MXLND_CONN_FAIL;
3143 spin_unlock(&conn->mxk_lock);
3147 spin_lock(&peer->mxp_lock);
3148 peer->mxp_incarnation = msg->mxm_srcstamp;
3149 peer->mxp_incompatible = incompatible;
3150 spin_unlock(&peer->mxp_lock);
3151 spin_lock(&conn->mxk_lock);
3152 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3153 conn->mxk_outstanding = 0;
3154 conn->mxk_incarnation = msg->mxm_srcstamp;
3155 conn->mxk_timeout = 0;
3156 if (!incompatible) {
3157 conn->mxk_status = MXLND_CONN_READY;
3159 spin_unlock(&conn->mxk_lock);
3160 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3164 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3165 libcfs_nid2str(rx->mxc_nid));
3172 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3173 spin_lock(&conn->mxk_lock);
3174 conn->mxk_status = MXLND_CONN_FAIL;
3175 spin_unlock(&conn->mxk_lock);
3180 spin_lock(&conn->mxk_lock);
3181 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3182 spin_unlock(&conn->mxk_lock);
3186 /* lnet_parse() failed, etc., repost now */
3187 mxlnd_put_idle_rx(rx);
3188 if (conn != NULL && credit == 1) {
3189 if (type == MXLND_MSG_PUT_DATA) {
3190 spin_lock(&conn->mxk_lock);
3191 conn->mxk_outstanding++;
3192 spin_unlock(&conn->mxk_lock);
3193 } else if (type != MXLND_MSG_GET_DATA &&
3194 (type == MXLND_MSG_EAGER ||
3195 type == MXLND_MSG_PUT_REQ ||
3196 type == MXLND_MSG_NOOP)) {
3197 spin_lock(&conn->mxk_lock);
3198 conn->mxk_outstanding++;
3199 spin_unlock(&conn->mxk_lock);
3202 if (conn_ref) mxlnd_conn_decref(conn);
3203 LASSERT(peer_ref == 0);
3206 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3207 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3209 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3212 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3213 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3215 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3223 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3225 struct kmx_ctx *tx = NULL;
3226 struct kmx_msg *txmsg = NULL;
3227 struct kmx_conn *conn = peer->mxp_conn;
3229 /* a conn ref was taken when calling mx_iconnect(),
3230 * hold it until CONN_REQ or CONN_ACK completes */
3232 CDEBUG(D_NET, "entering\n");
3233 if (status.code != MX_STATUS_SUCCESS) {
3234 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3235 mx_strstatus(status.code), status.code,
3236 libcfs_nid2str(peer->mxp_nid));
3237 spin_lock(&conn->mxk_lock);
3238 conn->mxk_status = MXLND_CONN_FAIL;
3239 spin_unlock(&conn->mxk_lock);
3241 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3242 struct kmx_conn *new_conn = NULL;
3243 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3244 mxlnd_conn_disconnect(conn, 0, 1);
3245 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3246 mxlnd_conn_decref(new_conn); /* which we no longer need */
3247 spin_lock(&peer->mxp_lock);
3248 peer->mxp_reconnect_time = 0;
3249 spin_unlock(&peer->mxp_lock);
3252 mxlnd_conn_decref(conn);
3256 spin_lock(&conn->mxk_lock);
3257 conn->mxk_epa = status.source;
3258 spin_unlock(&conn->mxk_lock);
3259 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3260 * we should not need to lock the peer */
3261 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3263 /* mx_iconnect() succeeded, reset delay to 0 */
3264 spin_lock(&peer->mxp_lock);
3265 peer->mxp_reconnect_time = 0;
3266 spin_unlock(&peer->mxp_lock);
3268 /* marshal CONN_REQ msg */
3269 /* we are still using the conn ref from iconnect() - do not take another */
3270 tx = mxlnd_get_idle_tx();
3272 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3273 libcfs_nid2str(peer->mxp_nid));
3274 spin_lock(&conn->mxk_lock);
3275 conn->mxk_status = MXLND_CONN_FAIL;
3276 spin_unlock(&conn->mxk_lock);
3277 mxlnd_conn_decref(conn);
3281 tx->mxc_peer = peer;
3282 tx->mxc_conn = conn;
3283 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3284 txmsg = tx->mxc_msg;
3285 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3286 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3287 tx->mxc_match = mxlnd_create_match(tx, 0);
3289 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3295 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3297 struct kmx_ctx *tx = NULL;
3298 struct kmx_msg *txmsg = NULL;
3299 struct kmx_conn *conn = peer->mxp_conn;
3301 /* a conn ref was taken when calling mx_iconnect(),
3302 * hold it until CONN_REQ or CONN_ACK completes */
3304 CDEBUG(D_NET, "entering\n");
3305 if (status.code != MX_STATUS_SUCCESS) {
3306 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3307 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3308 mx_strstatus(status.code), status.code,
3309 libcfs_nid2str(peer->mxp_nid),
3312 peer->mxp_host->mxh_ep_id);
3313 spin_lock(&conn->mxk_lock);
3314 conn->mxk_status = MXLND_CONN_FAIL;
3315 spin_unlock(&conn->mxk_lock);
3317 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3318 struct kmx_conn *new_conn = NULL;
3319 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3320 mxlnd_conn_disconnect(conn, 0, 1);
3321 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3323 mxlnd_conn_decref(new_conn); /* which we no longer need */
3324 spin_lock(&peer->mxp_lock);
3325 peer->mxp_reconnect_time = 0;
3326 spin_unlock(&peer->mxp_lock);
3329 mxlnd_conn_decref(conn);
3332 spin_lock(&conn->mxk_lock);
3333 conn->mxk_epa = status.source;
3334 if (likely(!peer->mxp_incompatible)) {
3335 conn->mxk_status = MXLND_CONN_READY;
3337 spin_unlock(&conn->mxk_lock);
3338 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3339 * we should not have to lock the peer */
3340 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3342 /* mx_iconnect() succeeded, reset delay to 0 */
3343 spin_lock(&peer->mxp_lock);
3344 peer->mxp_reconnect_time = 0;
3345 spin_unlock(&peer->mxp_lock);
3347 /* marshal CONN_ACK msg */
3348 tx = mxlnd_get_idle_tx();
3350 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3351 libcfs_nid2str(peer->mxp_nid));
3352 spin_lock(&conn->mxk_lock);
3353 conn->mxk_status = MXLND_CONN_FAIL;
3354 spin_unlock(&conn->mxk_lock);
3355 mxlnd_conn_decref(conn);
3359 tx->mxc_peer = peer;
3360 tx->mxc_conn = conn;
3361 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3362 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3363 txmsg = tx->mxc_msg;
3364 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3365 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3366 tx->mxc_match = mxlnd_create_match(tx, 0);
3373 * mxlnd_request_waitd - the MX request completion thread(s)
3374 * @arg - thread id (as a void *)
3376 * This thread waits for a MX completion and then completes the request.
3377 * We will create one thread per CPU.
3380 mxlnd_request_waitd(void *arg)
3382 long id = (long) arg;
3385 mx_return_t mxret = MX_SUCCESS;
3387 struct kmx_ctx *ctx = NULL;
3388 enum kmx_req_state req_type = MXLND_REQ_TX;
3389 struct kmx_peer *peer = NULL;
3390 struct kmx_conn *conn = NULL;
3395 memset(name, 0, sizeof(name));
3396 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3397 cfs_daemonize(name);
3398 //cfs_block_allsigs();
3400 memset(&status, 0, sizeof(status));
3402 CDEBUG(D_NET, "%s starting\n", name);
3404 while (!kmxlnd_data.kmx_shutdown) {
3408 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3409 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3413 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3414 0LL, 0LL, &status, &result);
3417 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3418 0LL, 0LL, &status, &result);
3420 if (unlikely(kmxlnd_data.kmx_shutdown))
3424 /* nothing completed... */
3428 if (status.code != MX_STATUS_SUCCESS) {
3429 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3430 "match_info 0x%llx and length %d\n",
3431 mx_strstatus(status.code), status.code,
3432 (u64) status.match_info, status.msg_length);
3435 /* This may be a mx_iconnect() request completing,
3436 * check the bit mask for CONN_REQ and CONN_ACK */
3437 if (status.match_info == MXLND_MASK_ICON_REQ ||
3438 status.match_info == MXLND_MASK_ICON_ACK) {
3439 peer = (struct kmx_peer*) status.context;
3440 if (status.match_info == MXLND_MASK_ICON_REQ) {
3441 mxlnd_handle_conn_req(peer, status);
3443 mxlnd_handle_conn_ack(peer, status);
3448 /* This must be a tx or rx */
3450 /* NOTE: if this is a RX from the unexpected callback, it may
3451 * have very little info. If we dropped it in unexpected_recv(),
3452 * it will not have a context. If so, ignore it. */
3453 ctx = (struct kmx_ctx *) status.context;
3456 req_type = ctx->mxc_type;
3457 conn = ctx->mxc_conn; /* this may be NULL */
3458 mxlnd_deq_pending_ctx(ctx);
3460 /* copy status to ctx->mxc_status */
3461 memcpy(&ctx->mxc_status, &status, sizeof(status));
3465 mxlnd_handle_tx_completion(ctx);
3468 mxlnd_handle_rx_completion(ctx);
3471 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3476 /* FIXME may need to reconsider this */
3477 /* conn is always set except for the first CONN_REQ rx
3478 * from a new peer */
3479 if (!(status.code == MX_STATUS_SUCCESS ||
3480 status.code == MX_STATUS_TRUNCATED) &&
3482 mxlnd_conn_disconnect(conn, 1, 1);
3485 CDEBUG(D_NET, "waitd() completed task\n");
3487 CDEBUG(D_NET, "%s stopping\n", name);
3488 mxlnd_thread_stop(id);
3494 mxlnd_check_timeouts(unsigned long now)
3498 unsigned long next = 0;
3499 struct kmx_peer *peer = NULL;
3500 struct kmx_conn *conn = NULL;
3502 read_lock(&kmxlnd_data.kmx_peers_lock);
3503 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3504 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3506 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3507 read_unlock(&kmxlnd_data.kmx_peers_lock);
3511 spin_lock(&peer->mxp_lock);
3512 conn = peer->mxp_conn;
3514 mxlnd_conn_addref(conn);
3515 spin_unlock(&peer->mxp_lock);
3517 spin_unlock(&peer->mxp_lock);
3521 spin_lock(&conn->mxk_lock);
3523 /* if nothing pending (timeout == 0) or
3524 * if conn is already disconnected,
3526 if (conn->mxk_timeout == 0 ||
3527 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3528 spin_unlock(&conn->mxk_lock);
3529 mxlnd_conn_decref(conn);
3533 /* we want to find the timeout that will occur first.
3534 * if it is in the future, we will sleep until then.
3535 * if it is in the past, then we will sleep one
3536 * second and repeat the process. */
3537 if ((next == 0) || (conn->mxk_timeout < next)) {
3538 next = conn->mxk_timeout;
3543 if (time_after_eq(now, conn->mxk_timeout)) {
3546 spin_unlock(&conn->mxk_lock);
3549 mxlnd_conn_disconnect(conn, 1, 1);
3551 mxlnd_conn_decref(conn);
3554 read_unlock(&kmxlnd_data.kmx_peers_lock);
3555 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3561 * mxlnd_timeoutd - enforces timeouts on messages
3562 * @arg - thread id (as a void *)
3564 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3565 * it calls mxlnd_conn_disconnect().
3567 * After checking for timeouts, try progressing sends (call check_sends()).
3570 mxlnd_timeoutd(void *arg)
3573 long id = (long) arg;
3574 unsigned long now = 0;
3575 unsigned long next = 0;
3576 unsigned long delay = HZ;
3577 struct kmx_peer *peer = NULL;
3578 struct kmx_conn *conn = NULL;
3580 cfs_daemonize("mxlnd_timeoutd");
3581 //cfs_block_allsigs();
3583 CDEBUG(D_NET, "timeoutd starting\n");
3585 while (!kmxlnd_data.kmx_shutdown) {
3588 /* if the next timeout has not arrived, go back to sleep */
3589 if (time_after(now, next)) {
3590 next = mxlnd_check_timeouts(now);
3593 read_lock(&kmxlnd_data.kmx_peers_lock);
3594 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3595 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3596 spin_lock(&peer->mxp_lock);
3597 conn = peer->mxp_conn;
3598 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3599 spin_unlock(&peer->mxp_lock);
3604 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3605 time_after(now, conn->mxk_last_tx + HZ)) {
3606 mxlnd_check_sends(peer);
3608 mxlnd_conn_decref(conn); /* until here */
3611 read_unlock(&kmxlnd_data.kmx_peers_lock);
3615 CDEBUG(D_NET, "timeoutd stopping\n");
3616 mxlnd_thread_stop(id);