1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
6 * Copyright (C) 2006 Myricom, Inc.
7 * Author: Myricom, Inc. <help at myri.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 inline void mxlnd_noop(char *s, ...)
33 mxlnd_ctxstate_to_str(int mxc_state)
37 return "MXLND_CTX_INIT";
39 return "MXLND_CTX_IDLE";
41 return "MXLND_CTX_PREP";
42 case MXLND_CTX_PENDING:
43 return "MXLND_CTX_PENDING";
44 case MXLND_CTX_COMPLETED:
45 return "MXLND_CTX_COMPLETED";
46 case MXLND_CTX_CANCELED:
47 return "MXLND_CTX_CANCELED";
54 mxlnd_connstatus_to_str(int mxk_status)
57 case MXLND_CONN_READY:
58 return "MXLND_CONN_READY";
60 return "MXLND_CONN_INIT";
62 return "MXLND_CONN_REQ";
64 return "MXLND_CONN_ACK";
66 return "MXLND_CONN_WAIT";
67 case MXLND_CONN_DISCONNECT:
68 return "MXLND_CONN_DISCONNECT";
70 return "MXLND_CONN_FAIL";
77 mxlnd_msgtype_to_str(int type) {
80 return "MXLND_MSG_EAGER";
81 case MXLND_MSG_CONN_REQ:
82 return "MXLND_MSG_CONN_REQ";
83 case MXLND_MSG_CONN_ACK:
84 return "MXLND_MSG_CONN_ACK";
86 return "MXLND_MSG_NOOP";
87 case MXLND_MSG_PUT_REQ:
88 return "MXLND_MSG_PUT_REQ";
89 case MXLND_MSG_PUT_ACK:
90 return "MXLND_MSG_PUT_ACK";
91 case MXLND_MSG_PUT_DATA:
92 return "MXLND_MSG_PUT_DATA";
93 case MXLND_MSG_GET_REQ:
94 return "MXLND_MSG_GET_REQ";
95 case MXLND_MSG_GET_DATA:
96 return "MXLND_MSG_GET_DATA";
103 mxlnd_lnetmsg_to_str(int type)
107 return "LNET_MSG_ACK";
109 return "LNET_MSG_PUT";
111 return "LNET_MSG_GET";
113 return "LNET_MSG_REPLY";
115 return "LNET_MSG_HELLO";
122 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
123 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
125 u64 type = (u64) ctx->mxc_msg_type;
126 u64 err = (u64) error;
129 LASSERT(ctx->mxc_msg_type != 0);
130 LASSERT(ctx->mxc_cookie >> 52 == 0);
131 match = (type << 60) | (err << 52) | ctx->mxc_cookie;
136 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
138 *msg_type = (u8) (match >> 60);
139 *error = (u8) ((match >> 52) & 0xFF);
140 *cookie = match & 0xFFFFFFFFFFFFFLL;
141 LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
142 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
143 *msg_type == MXLND_MSG_EAGER ||
144 *msg_type == MXLND_MSG_CONN_REQ ||
145 *msg_type == MXLND_MSG_CONN_ACK ||
146 *msg_type == MXLND_MSG_NOOP ||
147 *msg_type == MXLND_MSG_PUT_REQ ||
148 *msg_type == MXLND_MSG_PUT_ACK ||
149 *msg_type == MXLND_MSG_PUT_DATA ||
150 *msg_type == MXLND_MSG_GET_REQ ||
151 *msg_type == MXLND_MSG_GET_DATA);
156 mxlnd_get_idle_rx(void)
158 struct list_head *tmp = NULL;
159 struct kmx_ctx *rx = NULL;
161 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
163 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
164 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
168 tmp = &kmxlnd_data.kmx_rx_idle;
169 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
170 list_del_init(&rx->mxc_list);
171 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
174 if (rx->mxc_get != rx->mxc_put) {
175 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
176 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
177 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
178 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
179 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
180 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
181 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
182 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
183 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
184 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
187 LASSERT (rx->mxc_get == rx->mxc_put);
191 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
192 rx->mxc_state = MXLND_CTX_PREP;
198 mxlnd_put_idle_rx(struct kmx_ctx *rx)
201 CDEBUG(D_NETERROR, "called with NULL pointer\n");
203 } else if (rx->mxc_type != MXLND_REQ_RX) {
204 CDEBUG(D_NETERROR, "called with tx\n");
207 LASSERT(rx->mxc_get == rx->mxc_put + 1);
210 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
211 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
212 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
217 mxlnd_reduce_idle_rxs(__u32 count)
220 struct kmx_ctx *rx = NULL;
222 spin_lock(&kmxlnd_data.kmx_rxs_lock);
223 for (i = 0; i < count; i++) {
224 rx = mxlnd_get_idle_rx();
226 struct list_head *tmp = &rx->mxc_global_list;
230 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
234 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
239 mxlnd_get_idle_tx(void)
241 struct list_head *tmp = NULL;
242 struct kmx_ctx *tx = NULL;
244 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
246 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
247 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
248 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
252 tmp = &kmxlnd_data.kmx_tx_idle;
253 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
254 list_del_init(&tx->mxc_list);
256 /* Allocate a new completion cookie. It might not be needed,
257 * but we've got a lock right now and we're unlikely to
259 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
260 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
263 kmxlnd_data.kmx_tx_used++;
264 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
266 LASSERT (tx->mxc_get == tx->mxc_put);
270 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
271 LASSERT (tx->mxc_lntmsg[0] == NULL);
272 LASSERT (tx->mxc_lntmsg[1] == NULL);
274 tx->mxc_state = MXLND_CTX_PREP;
280 mxlnd_put_idle_tx(struct kmx_ctx *tx)
282 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
283 int result = failed ? -EIO : 0;
284 lnet_msg_t *lntmsg[2];
287 CDEBUG(D_NETERROR, "called with NULL pointer\n");
289 } else if (tx->mxc_type != MXLND_REQ_TX) {
290 CDEBUG(D_NETERROR, "called with rx\n");
294 lntmsg[0] = tx->mxc_lntmsg[0];
295 lntmsg[1] = tx->mxc_lntmsg[1];
297 LASSERT(tx->mxc_get == tx->mxc_put + 1);
300 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
301 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
302 kmxlnd_data.kmx_tx_used--;
303 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
304 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
305 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
310 * mxlnd_conn_free - free the conn
311 * @conn - a kmx_conn pointer
313 * The calling function should remove the conn from the conns list first
317 mxlnd_conn_free(struct kmx_conn *conn)
319 struct kmx_peer *peer = conn->mxk_peer;
321 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
322 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
323 list_empty (&conn->mxk_tx_free_queue) &&
324 list_empty (&conn->mxk_pending));
325 if (!list_empty(&conn->mxk_list)) {
326 spin_lock(&peer->mxp_lock);
327 list_del_init(&conn->mxk_list);
328 if (peer->mxp_conn == conn) {
329 peer->mxp_conn = NULL;
330 if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
331 mx_set_endpoint_addr_context(conn->mxk_epa,
335 spin_unlock(&peer->mxp_lock);
337 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
338 MXLND_FREE (conn, sizeof (*conn));
344 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
347 struct kmx_ctx *ctx = NULL;
348 struct kmx_ctx *next = NULL;
349 mx_return_t mxret = MX_SUCCESS;
354 spin_lock(&conn->mxk_lock);
355 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
356 /* we will delete all including txs */
357 list_del_init(&ctx->mxc_list);
358 if (ctx->mxc_type == MXLND_REQ_RX) {
360 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
363 if (mxret != MX_SUCCESS) {
364 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
367 ctx->mxc_status.code = -ECONNABORTED;
368 ctx->mxc_state = MXLND_CTX_CANCELED;
369 /* NOTE this calls lnet_finalize() and
370 * we cannot hold any locks when calling it.
371 * It also calls mxlnd_conn_decref(conn) */
372 spin_unlock(&conn->mxk_lock);
373 mxlnd_handle_rx_completion(ctx);
374 spin_lock(&conn->mxk_lock);
379 spin_unlock(&conn->mxk_lock);
387 * mxlnd_conn_disconnect - shutdown a connection
388 * @conn - a kmx_conn pointer
390 * This function sets the status to DISCONNECT, completes queued
391 * txs with failure, calls mx_disconnect, which will complete
392 * pending txs and matched rxs with failure.
395 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
397 struct list_head *tmp = NULL;
399 spin_lock(&conn->mxk_lock);
400 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
401 spin_unlock(&conn->mxk_lock);
404 conn->mxk_status = MXLND_CONN_DISCONNECT;
405 conn->mxk_timeout = 0;
407 while (!list_empty(&conn->mxk_tx_free_queue) ||
408 !list_empty(&conn->mxk_tx_credit_queue)) {
410 struct kmx_ctx *tx = NULL;
412 if (!list_empty(&conn->mxk_tx_free_queue)) {
413 tmp = &conn->mxk_tx_free_queue;
415 tmp = &conn->mxk_tx_credit_queue;
418 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
419 list_del_init(&tx->mxc_list);
420 tx->mxc_status.code = -ECONNABORTED;
421 spin_unlock(&conn->mxk_lock);
422 mxlnd_put_idle_tx(tx);
423 mxlnd_conn_decref(conn); /* for this tx */
424 spin_lock(&conn->mxk_lock);
427 spin_unlock(&conn->mxk_lock);
429 /* cancel pending rxs */
430 mxlnd_conn_cancel_pending_rxs(conn);
432 if (kmxlnd_data.kmx_shutdown != 1) {
434 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
437 time_t last_alive = 0;
438 unsigned long last_msg = 0;
440 /* notify LNET that we are giving up on this peer */
441 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
442 last_msg = conn->mxk_last_rx;
444 last_msg = conn->mxk_last_tx;
446 last_alive = cfs_time_current_sec() -
447 cfs_duration_sec(cfs_time_current() - last_msg);
448 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
451 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
457 * mxlnd_conn_alloc - allocate and initialize a new conn struct
458 * @connp - address of a kmx_conn pointer
459 * @peer - owning kmx_peer
461 * Returns 0 on success and -ENOMEM on failure
464 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
466 struct kmx_conn *conn = NULL;
468 LASSERT(peer != NULL);
470 MXLND_ALLOC(conn, sizeof (*conn));
472 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
475 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
477 memset(conn, 0, sizeof(*conn));
479 /* conn->mxk_incarnation = 0 - will be set by peer */
480 atomic_set(&conn->mxk_refcount, 1); /* ref for owning peer */
481 conn->mxk_peer = peer;
482 /* mxk_epa - to be set after mx_iconnect() */
483 INIT_LIST_HEAD(&conn->mxk_list);
484 spin_lock_init(&conn->mxk_lock);
485 /* conn->mxk_timeout = 0 */
486 conn->mxk_last_tx = jiffies;
487 conn->mxk_last_rx = conn->mxk_last_tx;
488 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
489 /* mxk_outstanding = 0 */
490 conn->mxk_status = MXLND_CONN_INIT;
491 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
492 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
493 /* conn->mxk_ntx_msgs = 0 */
494 /* conn->mxk_ntx_data = 0 */
495 /* conn->mxk_ntx_posted = 0 */
496 /* conn->mxk_data_posted = 0 */
497 INIT_LIST_HEAD(&conn->mxk_pending);
501 mxlnd_peer_addref(peer); /* add a ref for this conn */
503 /* add to front of peer's conns list */
504 spin_lock(&peer->mxp_lock);
505 list_add(&conn->mxk_list, &peer->mxp_conns);
506 peer->mxp_conn = conn;
507 spin_unlock(&peer->mxp_lock);
513 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
516 struct kmx_conn *conn = ctx->mxc_conn;
518 ctx->mxc_state = MXLND_CTX_PENDING;
520 spin_lock(&conn->mxk_lock);
521 if (conn->mxk_status >= MXLND_CONN_INIT) {
522 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
523 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
524 conn->mxk_timeout = ctx->mxc_deadline;
527 ctx->mxc_state = MXLND_CTX_COMPLETED;
530 spin_unlock(&conn->mxk_lock);
536 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
538 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
539 ctx->mxc_state == MXLND_CTX_COMPLETED);
540 if (ctx->mxc_state != MXLND_CTX_PENDING &&
541 ctx->mxc_state != MXLND_CTX_COMPLETED) {
542 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
543 mxlnd_ctxstate_to_str(ctx->mxc_state));
545 ctx->mxc_state = MXLND_CTX_COMPLETED;
546 if (!list_empty(&ctx->mxc_list)) {
547 struct kmx_conn *conn = ctx->mxc_conn;
548 struct kmx_ctx *next = NULL;
549 LASSERT(conn != NULL);
550 spin_lock(&conn->mxk_lock);
551 list_del_init(&ctx->mxc_list);
552 conn->mxk_timeout = 0;
553 if (!list_empty(&conn->mxk_pending)) {
554 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
555 conn->mxk_timeout = next->mxc_deadline;
557 spin_unlock(&ctx->mxc_conn->mxk_lock);
563 * mxlnd_peer_free - free the peer
564 * @peer - a kmx_peer pointer
566 * The calling function should decrement the rxs, drain the tx queues and
567 * remove the peer from the peers list first then destroy it.
570 mxlnd_peer_free(struct kmx_peer *peer)
572 CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
574 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
576 if (peer->mxp_host != NULL) {
577 spin_lock(&peer->mxp_host->mxh_lock);
578 peer->mxp_host->mxh_peer = NULL;
579 spin_unlock(&peer->mxp_host->mxh_lock);
581 if (!list_empty(&peer->mxp_peers)) {
582 /* assume we are locked */
583 list_del_init(&peer->mxp_peers);
586 MXLND_FREE (peer, sizeof (*peer));
587 atomic_dec(&kmxlnd_data.kmx_npeers);
592 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
595 char name[MX_MAX_HOSTNAME_LEN + 1];
596 mx_return_t mxret = MX_SUCCESS;
598 memset(name, 0, sizeof(name));
599 snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
600 mxret = mx_hostname_to_nic_id(name, &nic_id);
601 if (mxret == MX_SUCCESS) {
602 peer->mxp_nic_id = nic_id;
604 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
605 "with %s\n", mx_strerror(mxret), name);
606 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
607 if (mxret == MX_SUCCESS) {
608 peer->mxp_nic_id = nic_id;
610 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
611 "with %s\n", mx_strerror(mxret),
612 peer->mxp_host->mxh_hostname);
619 * mxlnd_peer_alloc - allocate and initialize a new peer struct
620 * @peerp - address of a kmx_peer pointer
621 * @nid - LNET node id
623 * Returns 0 on success and -ENOMEM on failure
626 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
630 u32 addr = LNET_NIDADDR(nid);
631 struct kmx_peer *peer = NULL;
632 struct kmx_host *host = NULL;
634 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
636 MXLND_ALLOC(peer, sizeof (*peer));
638 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
641 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
643 memset(peer, 0, sizeof(*peer));
645 list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
646 if (addr == host->mxh_addr) {
647 peer->mxp_host = host;
648 spin_lock(&host->mxh_lock);
649 host->mxh_peer = peer;
650 spin_unlock(&host->mxh_lock);
654 LASSERT(peer->mxp_host != NULL);
657 /* peer->mxp_incarnation */
658 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
659 mxlnd_peer_hostname_to_nic_id(peer);
661 INIT_LIST_HEAD(&peer->mxp_peers);
662 spin_lock_init(&peer->mxp_lock);
663 INIT_LIST_HEAD(&peer->mxp_conns);
664 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer);
666 mxlnd_peer_decref(peer);
670 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
671 struct kmx_ctx *rx = NULL;
672 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
674 mxlnd_reduce_idle_rxs(i);
675 mxlnd_peer_decref(peer);
678 spin_lock(&kmxlnd_data.kmx_rxs_lock);
679 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
680 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
682 mxlnd_put_idle_rx(rx);
684 /* peer->mxp_reconnect_time = 0 */
685 /* peer->mxp_incompatible = 0 */
692 * mxlnd_nid_to_hash - hash the nid
695 * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
698 mxlnd_nid_to_hash(lnet_nid_t nid)
700 return (nid & MXLND_HASH_MASK) ^
701 ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
704 static inline struct kmx_peer *
705 mxlnd_find_peer_by_nid(lnet_nid_t nid)
709 struct kmx_peer *peer = NULL;
711 hash = mxlnd_nid_to_hash(nid);
713 read_lock(&kmxlnd_data.kmx_peers_lock);
714 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
715 if (peer->mxp_nid == nid) {
720 read_unlock(&kmxlnd_data.kmx_peers_lock);
721 return (found ? peer : NULL);
725 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
727 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
728 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
729 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
730 tx->mxc_msg_type == MXLND_MSG_NOOP);
734 * mxlnd_init_msg - set type and number of bytes
737 * @body_nob - bytes in msg body
740 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
742 msg->mxm_type = type;
743 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
747 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
749 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
750 struct kmx_msg *msg = NULL;
752 LASSERT (tx != NULL);
753 LASSERT (nob <= MXLND_EAGER_SIZE);
756 /* tx->mxc_peer should have already been set if we know it */
757 tx->mxc_msg_type = type;
759 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
760 tx->mxc_seg.segment_length = nob;
761 tx->mxc_pin_type = MX_PIN_PHYSICAL;
762 //tx->mxc_state = MXLND_CTX_PENDING;
765 msg->mxm_type = type;
772 mxlnd_cksum (void *ptr, int nob)
778 sum = ((sum << 1) | (sum >> 31)) + *c++;
780 /* ensure I don't return 0 (== no checksum) */
781 return (sum == 0) ? 1 : sum;
785 * mxlnd_pack_msg - complete msg info
789 mxlnd_pack_msg(struct kmx_ctx *tx)
791 struct kmx_msg *msg = tx->mxc_msg;
793 /* type and nob should already be set in init_msg() */
794 msg->mxm_magic = MXLND_MSG_MAGIC;
795 msg->mxm_version = MXLND_MSG_VERSION;
797 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
798 * return credits as well */
799 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
800 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
801 spin_lock(&tx->mxc_conn->mxk_lock);
802 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
803 tx->mxc_conn->mxk_outstanding = 0;
804 spin_unlock(&tx->mxc_conn->mxk_lock);
806 msg->mxm_credits = 0;
810 msg->mxm_srcnid = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
811 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
812 msg->mxm_dstnid = tx->mxc_nid;
813 /* if it is a new peer, the dststamp will be 0 */
814 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
815 msg->mxm_seq = tx->mxc_cookie;
817 if (*kmxlnd_tunables.kmx_cksum) {
818 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
823 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
825 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
830 /* 6 bytes are enough to have received magic + version */
832 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
836 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
838 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
841 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
845 if (msg->mxm_version !=
846 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
847 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
851 if (nob < hdr_size) {
852 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
856 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
858 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
862 /* checksum must be computed with mxm_cksum zero and BEFORE anything
864 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
866 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
867 CDEBUG(D_NETERROR, "Bad checksum\n");
870 msg->mxm_cksum = msg_cksum;
873 /* leave magic unflipped as a clue to peer endianness */
874 __swab16s(&msg->mxm_version);
875 CLASSERT (sizeof(msg->mxm_type) == 1);
876 CLASSERT (sizeof(msg->mxm_credits) == 1);
877 msg->mxm_nob = msg_nob;
878 __swab64s(&msg->mxm_srcnid);
879 __swab64s(&msg->mxm_srcstamp);
880 __swab64s(&msg->mxm_dstnid);
881 __swab64s(&msg->mxm_dststamp);
882 __swab64s(&msg->mxm_seq);
885 if (msg->mxm_srcnid == LNET_NID_ANY) {
886 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
890 switch (msg->mxm_type) {
892 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
898 case MXLND_MSG_EAGER:
899 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
900 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
901 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
906 case MXLND_MSG_PUT_REQ:
907 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
908 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
909 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
913 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
916 case MXLND_MSG_PUT_ACK:
917 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
918 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
919 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
923 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
924 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
928 case MXLND_MSG_GET_REQ:
929 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
930 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
931 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
935 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
939 case MXLND_MSG_CONN_REQ:
940 case MXLND_MSG_CONN_ACK:
941 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
942 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
943 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
947 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
948 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
957 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
961 * @length - length of incoming message
962 * @pending - add to kmx_pending (0 is NO and 1 is YES)
964 * The caller gets the rx and sets nid, peer and conn if known.
966 * Returns 0 on success and -1 on failure
969 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
972 mx_return_t mxret = MX_SUCCESS;
973 uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
975 rx->mxc_msg_type = msg_type;
976 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
977 rx->mxc_cookie = cookie;
978 /* rx->mxc_match may already be set */
979 /* rx->mxc_seg.segment_ptr is already set */
980 rx->mxc_seg.segment_length = length;
981 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
982 ret = mxlnd_q_pending_ctx(rx);
984 /* FIXME the conn is disconnected, now what? */
987 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
988 cookie, mask, (void *) rx, &rx->mxc_mxreq);
989 if (mxret != MX_SUCCESS) {
990 mxlnd_deq_pending_ctx(rx);
991 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
992 mx_strerror(mxret), (int) mxret);
1000 * mxlnd_unexpected_recv - this is the callback function that will handle
1001 * unexpected receives
1002 * @context - NULL, ignore
1003 * @source - the peer's mx_endpoint_addr_t
1004 * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1005 * @length - length of incoming message
1006 * @data_if_available - ignore
1008 * If it is an eager-sized msg, we will call recv_msg() with the actual
1009 * length. If it is a large message, we will call recv_msg() with a
1010 * length of 0 bytes to drop it because we should never have a large,
1011 * unexpected message.
1013 * NOTE - The MX library blocks until this function completes. Make it as fast as
1014 * possible. DO NOT allocate memory which can block!
1016 * If we cannot get a rx or the conn is closed, drop the message on the floor
1017 * (i.e. recv 0 bytes and ignore).
1019 mx_unexp_handler_action_t
1020 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1021 uint64_t match_value, uint32_t length, void *data_if_available)
1024 struct kmx_ctx *rx = NULL;
1030 if (context != NULL) {
1031 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1035 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1038 rx = mxlnd_get_idle_rx();
1040 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1041 if (length <= MXLND_EAGER_SIZE) {
1042 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1044 CDEBUG(D_NETERROR, "unexpected large receive with "
1045 "match_value=0x%llx length=%d\n",
1046 match_value, length);
1047 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1050 struct kmx_conn *conn = NULL;
1051 mx_get_endpoint_addr_context(source, (void **) &conn);
1053 mxlnd_conn_addref(conn);
1054 rx->mxc_conn = conn;
1055 rx->mxc_peer = conn->mxk_peer;
1056 if (conn->mxk_peer != NULL) {
1057 rx->mxc_nid = conn->mxk_peer->mxp_nid;
1059 CDEBUG(D_NETERROR, "conn is 0x%p and peer "
1064 CDEBUG(D_NETERROR, "could not post receive\n");
1065 mxlnd_put_idle_rx(rx);
1069 if (rx == NULL || ret != 0) {
1071 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1074 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1076 seg.segment_ptr = 0LL;
1077 seg.segment_length = 0;
1078 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1079 match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1082 return MX_RECV_CONTINUE;
1087 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1090 struct kmx_peer *peer = NULL;
1091 struct kmx_conn *conn = NULL;
1093 read_lock(&kmxlnd_data.kmx_peers_lock);
1094 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1095 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1096 conn = peer->mxp_conn;
1100 *nidp = peer->mxp_nid;
1101 *count = atomic_read(&peer->mxp_refcount);
1104 read_unlock(&kmxlnd_data.kmx_peers_lock);
1110 mxlnd_del_peer_locked(struct kmx_peer *peer)
1112 list_del_init(&peer->mxp_peers); /* remove from the global list */
1113 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 0, 0);
1114 mxlnd_peer_decref(peer); /* drop global list ref */
1119 mxlnd_del_peer(lnet_nid_t nid)
1123 struct kmx_peer *peer = NULL;
1124 struct kmx_peer *next = NULL;
1126 if (nid != LNET_NID_ANY) {
1127 peer = mxlnd_find_peer_by_nid(nid);
1129 write_lock(&kmxlnd_data.kmx_peers_lock);
1130 if (nid != LNET_NID_ANY) {
1134 mxlnd_del_peer_locked(peer);
1136 } else { /* LNET_NID_ANY */
1137 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1138 list_for_each_entry_safe(peer, next,
1139 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1140 mxlnd_del_peer_locked(peer);
1144 write_unlock(&kmxlnd_data.kmx_peers_lock);
1150 mxlnd_get_conn_by_idx(int index)
1153 struct kmx_peer *peer = NULL;
1154 struct kmx_conn *conn = NULL;
1156 read_lock(&kmxlnd_data.kmx_peers_lock);
1157 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1158 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1159 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1163 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1164 read_unlock(&kmxlnd_data.kmx_peers_lock);
1169 read_unlock(&kmxlnd_data.kmx_peers_lock);
1175 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1177 struct kmx_conn *conn = NULL;
1178 struct kmx_conn *next = NULL;
1180 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1181 mxlnd_conn_disconnect(conn, 0 , 0);
1187 mxlnd_close_matching_conns(lnet_nid_t nid)
1191 struct kmx_peer *peer = NULL;
1193 read_lock(&kmxlnd_data.kmx_peers_lock);
1194 if (nid != LNET_NID_ANY) {
1195 peer = mxlnd_find_peer_by_nid(nid);
1199 mxlnd_close_matching_conns_locked(peer);
1201 } else { /* LNET_NID_ANY */
1202 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1203 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1204 mxlnd_close_matching_conns_locked(peer);
1207 read_unlock(&kmxlnd_data.kmx_peers_lock);
1213 * mxlnd_ctl - modify MXLND parameters
1214 * @ni - LNET interface handle
1215 * @cmd - command to change
1216 * @arg - the ioctl data
1218 * Not implemented yet.
1221 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1223 struct libcfs_ioctl_data *data = arg;
1226 LASSERT (ni == kmxlnd_data.kmx_ni);
1229 case IOC_LIBCFS_GET_PEER: {
1233 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1234 data->ioc_nid = nid;
1235 data->ioc_count = count;
1238 case IOC_LIBCFS_DEL_PEER: {
1239 ret = mxlnd_del_peer(data->ioc_nid);
1242 case IOC_LIBCFS_GET_CONN: {
1243 struct kmx_conn *conn = NULL;
1245 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1250 data->ioc_nid = conn->mxk_peer->mxp_nid;
1251 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1255 case IOC_LIBCFS_CLOSE_CONNECTION: {
1256 ret = mxlnd_close_matching_conns(data->ioc_nid);
1260 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1268 * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1271 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1274 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1276 u8 msg_type = tx->mxc_msg_type;
1277 //struct kmx_peer *peer = tx->mxc_peer;
1278 struct kmx_conn *conn = tx->mxc_conn;
1280 LASSERT (msg_type != 0);
1281 LASSERT (tx->mxc_nid != 0);
1282 LASSERT (tx->mxc_peer != NULL);
1283 LASSERT (tx->mxc_conn != NULL);
1285 tx->mxc_incarnation = conn->mxk_incarnation;
1287 if (msg_type != MXLND_MSG_PUT_DATA &&
1288 msg_type != MXLND_MSG_GET_DATA) {
1290 if (mxlnd_tx_requires_credit(tx)) {
1291 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1292 conn->mxk_ntx_msgs++;
1293 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1294 msg_type == MXLND_MSG_CONN_ACK) {
1295 /* put conn msgs at the front of the queue */
1296 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1298 /* PUT_ACK, PUT_NAK */
1299 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1300 conn->mxk_ntx_msgs++;
1304 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1305 conn->mxk_ntx_data++;
1312 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1315 * Add the tx to the peer's msg or data queue
1318 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1320 LASSERT(tx->mxc_peer != NULL);
1321 LASSERT(tx->mxc_conn != NULL);
1322 spin_lock(&tx->mxc_conn->mxk_lock);
1323 mxlnd_peer_queue_tx_locked(tx);
1324 spin_unlock(&tx->mxc_conn->mxk_lock);
1330 * mxlnd_queue_tx - add the tx to the global tx queue
1333 * Add the tx to the global queue and up the tx_queue_sem
1336 mxlnd_queue_tx(struct kmx_ctx *tx)
1339 struct kmx_peer *peer = tx->mxc_peer;
1340 LASSERT (tx->mxc_nid != 0);
1343 if (peer->mxp_incompatible &&
1344 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1345 /* let this fail now */
1346 tx->mxc_status.code = -ECONNABORTED;
1347 mxlnd_put_idle_tx(tx);
1350 if (tx->mxc_conn == NULL) {
1351 mxlnd_conn_alloc(&tx->mxc_conn, peer);
1353 LASSERT(tx->mxc_conn != NULL);
1354 mxlnd_peer_queue_tx(tx);
1355 ret = mxlnd_check_sends(peer);
1357 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1358 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1359 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1360 up(&kmxlnd_data.kmx_tx_queue_sem);
1366 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1373 int first_iov_offset = 0;
1374 int first_found = 0;
1376 int last_iov_length = 0;
1377 mx_ksegment_t *seg = NULL;
1379 if (niov == 0) return 0;
1380 LASSERT(iov != NULL);
1382 for (i = 0; i < niov; i++) {
1383 sum = old_sum + (u32) iov[i].iov_len;
1384 if (!first_found && (sum > offset)) {
1386 first_iov_offset = offset - old_sum;
1388 sum = (u32) iov[i].iov_len - first_iov_offset;
1393 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1394 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1399 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1400 nseg = last_iov - first_iov + 1;
1403 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1405 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1408 memset(seg, 0, nseg * sizeof(*seg));
1409 ctx->mxc_nseg = nseg;
1411 for (i = 0; i < nseg; i++) {
1412 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1413 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1415 seg[i].segment_ptr += (u64) first_iov_offset;
1416 seg[i].segment_length -= (u32) first_iov_offset;
1418 if (i == (nseg - 1)) {
1419 seg[i].segment_length = (u32) last_iov_length;
1421 sum += seg[i].segment_length;
1423 ctx->mxc_seg_list = seg;
1424 ctx->mxc_pin_type = MX_PIN_KERNEL;
1425 #ifdef MX_PIN_FULLPAGES
1426 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1428 LASSERT(nob == sum);
1433 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1439 int first_kiov = -1;
1440 int first_kiov_offset = 0;
1441 int first_found = 0;
1443 int last_kiov_length = 0;
1444 mx_ksegment_t *seg = NULL;
1446 if (niov == 0) return 0;
1447 LASSERT(kiov != NULL);
1449 for (i = 0; i < niov; i++) {
1450 sum = old_sum + kiov[i].kiov_len;
1451 if (i == 0) sum -= kiov[i].kiov_offset;
1452 if (!first_found && (sum > offset)) {
1454 first_kiov_offset = offset - old_sum;
1455 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1456 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1458 sum = kiov[i].kiov_len - first_kiov_offset;
1463 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1464 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1469 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1470 nseg = last_kiov - first_kiov + 1;
1473 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1475 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1478 memset(seg, 0, niov * sizeof(*seg));
1479 ctx->mxc_nseg = niov;
1481 for (i = 0; i < niov; i++) {
1482 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1483 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1485 seg[i].segment_ptr += (u64) first_kiov_offset;
1486 /* we have to add back the original kiov_offset */
1487 seg[i].segment_length -= first_kiov_offset +
1488 kiov[first_kiov].kiov_offset;
1490 if (i == (nseg - 1)) {
1491 seg[i].segment_length = last_kiov_length;
1493 sum += seg[i].segment_length;
1495 ctx->mxc_seg_list = seg;
1496 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1497 #ifdef MX_PIN_FULLPAGES
1498 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1500 LASSERT(nob == sum);
1505 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1507 LASSERT(type == MXLND_MSG_PUT_ACK);
1508 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1509 tx->mxc_cookie = cookie;
1510 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1511 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1512 tx->mxc_match = mxlnd_create_match(tx, status);
1519 * mxlnd_send_data - get tx, map [k]iov, queue tx
1526 * This setups the DATA send for PUT or GET.
1528 * On success, it queues the tx, on failure it calls lnet_finalize()
1531 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1534 lnet_process_id_t target = lntmsg->msg_target;
1535 unsigned int niov = lntmsg->msg_niov;
1536 struct iovec *iov = lntmsg->msg_iov;
1537 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1538 unsigned int offset = lntmsg->msg_offset;
1539 unsigned int nob = lntmsg->msg_len;
1540 struct kmx_ctx *tx = NULL;
1542 LASSERT(lntmsg != NULL);
1543 LASSERT(peer != NULL);
1544 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1545 LASSERT((cookie>>52) == 0);
1547 tx = mxlnd_get_idle_tx();
1549 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1550 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1551 libcfs_nid2str(target.nid));
1554 tx->mxc_nid = target.nid;
1555 mxlnd_conn_addref(peer->mxp_conn);
1556 tx->mxc_peer = peer;
1557 tx->mxc_conn = peer->mxp_conn;
1558 tx->mxc_msg_type = msg_type;
1559 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1560 tx->mxc_state = MXLND_CTX_PENDING;
1561 tx->mxc_lntmsg[0] = lntmsg;
1562 tx->mxc_cookie = cookie;
1563 tx->mxc_match = mxlnd_create_match(tx, 0);
1565 /* This setups up the mx_ksegment_t to send the DATA payload */
1567 /* do not setup the segments */
1568 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1569 "to %s?\n", libcfs_nid2str(target.nid));
1571 } else if (kiov == NULL) {
1572 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1574 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1577 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1578 libcfs_nid2str(target.nid));
1579 tx->mxc_status.code = -EIO;
1586 mxlnd_conn_decref(peer->mxp_conn);
1587 mxlnd_put_idle_tx(tx);
1591 CDEBUG(D_NETERROR, "no tx avail\n");
1592 lnet_finalize(ni, lntmsg, -EIO);
1597 * mxlnd_recv_data - map [k]iov, post rx
1604 * This setups the DATA receive for PUT or GET.
1606 * On success, it returns 0, on failure it returns -1
1609 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1612 lnet_process_id_t target = lntmsg->msg_target;
1613 unsigned int niov = lntmsg->msg_niov;
1614 struct iovec *iov = lntmsg->msg_iov;
1615 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1616 unsigned int offset = lntmsg->msg_offset;
1617 unsigned int nob = lntmsg->msg_len;
1618 mx_return_t mxret = MX_SUCCESS;
1620 /* above assumes MXLND_MSG_PUT_DATA */
1621 if (msg_type == MXLND_MSG_GET_DATA) {
1622 niov = lntmsg->msg_md->md_niov;
1623 iov = lntmsg->msg_md->md_iov.iov;
1624 kiov = lntmsg->msg_md->md_iov.kiov;
1626 nob = lntmsg->msg_md->md_length;
1629 LASSERT(lntmsg != NULL);
1630 LASSERT(rx != NULL);
1631 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1632 LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1634 rx->mxc_msg_type = msg_type;
1635 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1636 rx->mxc_state = MXLND_CTX_PENDING;
1637 rx->mxc_nid = target.nid;
1638 /* if posting a GET_DATA, we may not yet know the peer */
1639 if (rx->mxc_peer != NULL) {
1640 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1642 rx->mxc_lntmsg[0] = lntmsg;
1643 rx->mxc_cookie = cookie;
1644 rx->mxc_match = mxlnd_create_match(rx, 0);
1645 /* This setups up the mx_ksegment_t to receive the DATA payload */
1647 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1649 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1651 if (msg_type == MXLND_MSG_GET_DATA) {
1652 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1653 if (rx->mxc_lntmsg[1] == NULL) {
1654 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1655 libcfs_nid2str(target.nid));
1660 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1661 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1662 libcfs_nid2str(target.nid));
1665 ret = mxlnd_q_pending_ctx(rx);
1669 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1670 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1671 rx->mxc_seg_list, rx->mxc_nseg,
1672 rx->mxc_pin_type, rx->mxc_match,
1673 0xF00FFFFFFFFFFFFFLL, (void *) rx,
1675 if (mxret != MX_SUCCESS) {
1676 if (rx->mxc_conn != NULL) {
1677 mxlnd_deq_pending_ctx(rx);
1679 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1680 (int) mxret, libcfs_nid2str(target.nid));
1688 * mxlnd_send - the LND required send function
1693 * This must not block. Since we may not have a peer struct for the receiver,
1694 * it will append send messages on a global tx list. We will then up the
1695 * tx_queued's semaphore to notify it of the new send.
1698 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1701 int type = lntmsg->msg_type;
1702 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1703 lnet_process_id_t target = lntmsg->msg_target;
1704 lnet_nid_t nid = target.nid;
1705 int target_is_router = lntmsg->msg_target_is_router;
1706 int routing = lntmsg->msg_routing;
1707 unsigned int payload_niov = lntmsg->msg_niov;
1708 struct iovec *payload_iov = lntmsg->msg_iov;
1709 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1710 unsigned int payload_offset = lntmsg->msg_offset;
1711 unsigned int payload_nob = lntmsg->msg_len;
1712 struct kmx_ctx *tx = NULL;
1713 struct kmx_msg *txmsg = NULL;
1714 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1715 struct kmx_ctx *rx_data = NULL;
1716 struct kmx_conn *conn = NULL;
1718 uint32_t length = 0;
1719 struct kmx_peer *peer = NULL;
1721 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1722 payload_nob, payload_niov, libcfs_id2str(target));
1724 LASSERT (payload_nob == 0 || payload_niov > 0);
1725 LASSERT (payload_niov <= LNET_MAX_IOV);
1726 /* payload is either all vaddrs or all pages */
1727 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1729 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1731 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1732 * to a new peer, use the nid */
1733 peer = mxlnd_find_peer_by_nid(nid);
1735 conn = peer->mxp_conn;
1736 if (conn) mxlnd_conn_addref(conn);
1738 if (conn == NULL && peer != NULL) {
1739 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1740 peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" :
1741 ((type==LNET_MSG_GET) ? "GET" : "Other")));
1746 LASSERT (payload_nob == 0);
1749 case LNET_MSG_REPLY:
1751 /* Is the payload small enough not to need DATA? */
1752 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1753 if (nob <= MXLND_EAGER_SIZE)
1754 break; /* send EAGER */
1756 tx = mxlnd_get_idle_tx();
1757 if (unlikely(tx == NULL)) {
1758 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1759 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1760 libcfs_nid2str(nid));
1761 if (conn) mxlnd_conn_decref(conn);
1765 /* the peer may be NULL */
1766 tx->mxc_peer = peer;
1767 tx->mxc_conn = conn; /* may be NULL */
1768 /* we added a conn ref above */
1769 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1770 txmsg = tx->mxc_msg;
1771 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1772 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1773 tx->mxc_match = mxlnd_create_match(tx, 0);
1775 /* we must post a receive _before_ sending the request.
1776 * we need to determine how much to receive, it will be either
1777 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1779 rx = mxlnd_get_idle_rx();
1780 if (unlikely(rx == NULL)) {
1781 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1782 libcfs_nid2str(nid));
1783 mxlnd_put_idle_tx(tx);
1784 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1788 rx->mxc_peer = peer;
1789 /* conn may be NULL but unlikely since the first msg is always small */
1790 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1791 rx->mxc_conn = conn;
1792 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1793 rx->mxc_cookie = tx->mxc_cookie;
1794 rx->mxc_match = mxlnd_create_match(rx, 0);
1796 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1797 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1798 if (unlikely(ret != 0)) {
1799 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1800 libcfs_nid2str(nid));
1801 rx->mxc_lntmsg[0] = NULL;
1802 mxlnd_put_idle_rx(rx);
1803 mxlnd_put_idle_tx(tx);
1805 mxlnd_conn_decref(conn); /* for the rx... */
1806 mxlnd_conn_decref(conn); /* and for the tx */
1815 if (routing || target_is_router)
1816 break; /* send EAGER */
1818 /* is the REPLY message too small for DATA? */
1819 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1820 if (nob <= MXLND_EAGER_SIZE)
1821 break; /* send EAGER */
1823 /* get tx (we need the cookie) , post rx for incoming DATA,
1824 * then post GET_REQ tx */
1825 tx = mxlnd_get_idle_tx();
1826 if (unlikely(tx == NULL)) {
1827 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1828 libcfs_nid2str(nid));
1829 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1832 rx_data = mxlnd_get_idle_rx();
1833 if (unlikely(rx_data == NULL)) {
1834 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1835 libcfs_nid2str(nid));
1836 mxlnd_put_idle_tx(tx);
1837 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1840 rx_data->mxc_peer = peer;
1841 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1842 rx_data->mxc_conn = conn; /* may be NULL */
1844 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1845 if (unlikely(ret != 0)) {
1846 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1847 libcfs_nid2str(nid));
1848 mxlnd_put_idle_rx(rx_data);
1849 mxlnd_put_idle_tx(tx);
1851 mxlnd_conn_decref(conn); /* for the rx_data... */
1852 mxlnd_conn_decref(conn); /* and for the tx */
1857 tx->mxc_peer = peer;
1858 tx->mxc_conn = conn; /* may be NULL */
1859 /* conn ref taken above */
1860 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1861 txmsg = tx->mxc_msg;
1862 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1863 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1864 tx->mxc_match = mxlnd_create_match(tx, 0);
1871 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1877 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1878 <= MXLND_EAGER_SIZE);
1880 tx = mxlnd_get_idle_tx();
1881 if (unlikely(tx == NULL)) {
1882 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1883 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1884 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1888 tx->mxc_peer = peer;
1889 tx->mxc_conn = conn; /* may be NULL */
1890 /* conn ref taken above */
1891 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1892 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1893 tx->mxc_match = mxlnd_create_match(tx, 0);
1895 txmsg = tx->mxc_msg;
1896 txmsg->mxm_u.eager.mxem_hdr = *hdr;
1898 if (payload_kiov != NULL)
1899 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1900 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1901 payload_niov, payload_kiov, payload_offset, payload_nob);
1903 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1904 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1905 payload_niov, payload_iov, payload_offset, payload_nob);
1907 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
1913 * mxlnd_recv - the LND required recv function
1924 * This must not block.
1927 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1928 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1929 unsigned int offset, unsigned int mlen, unsigned int rlen)
1934 struct kmx_ctx *rx = private;
1935 struct kmx_msg *rxmsg = rx->mxc_msg;
1936 lnet_nid_t nid = rx->mxc_nid;
1937 struct kmx_ctx *tx = NULL;
1938 struct kmx_msg *txmsg = NULL;
1939 struct kmx_peer *peer = rx->mxc_peer;
1940 struct kmx_conn *conn = peer->mxp_conn;
1942 int msg_type = rxmsg->mxm_type;
1947 LASSERT (mlen <= rlen);
1948 /* Either all pages or all vaddrs */
1949 LASSERT (!(kiov != NULL && iov != NULL));
1950 LASSERT (peer != NULL);
1952 /* conn_addref(conn) already taken for the primary rx */
1955 case MXLND_MSG_EAGER:
1956 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
1957 len = rx->mxc_status.xfer_length;
1958 if (unlikely(nob > len)) {
1959 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
1960 libcfs_nid2str(nid), nob, len);
1966 lnet_copy_flat2kiov(niov, kiov, offset,
1967 MXLND_EAGER_SIZE, rxmsg,
1968 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1971 lnet_copy_flat2iov(niov, iov, offset,
1972 MXLND_EAGER_SIZE, rxmsg,
1973 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1979 case MXLND_MSG_PUT_REQ:
1980 /* we are going to reuse the rx, store the needed info */
1981 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
1983 /* get tx, post rx, send PUT_ACK */
1985 tx = mxlnd_get_idle_tx();
1986 if (unlikely(tx == NULL)) {
1987 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
1988 /* Not replying will break the connection */
1992 if (unlikely(mlen == 0)) {
1994 tx->mxc_peer = peer;
1995 tx->mxc_conn = conn;
1996 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2001 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2002 tx->mxc_peer = peer;
2003 tx->mxc_conn = conn;
2004 mxlnd_conn_addref(conn); /* for the tx */
2005 txmsg = tx->mxc_msg;
2006 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2007 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2008 tx->mxc_cookie = cookie;
2009 tx->mxc_match = mxlnd_create_match(tx, 0);
2011 /* we must post a receive _before_ sending the PUT_ACK */
2013 rx->mxc_state = MXLND_CTX_PREP;
2014 rx->mxc_peer = peer;
2015 rx->mxc_conn = conn;
2016 /* do not take another ref for this rx, it is already taken */
2017 rx->mxc_nid = peer->mxp_nid;
2018 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2019 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2021 if (unlikely(ret != 0)) {
2022 /* Notify peer that it's over */
2023 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2024 libcfs_nid2str(nid), ret);
2026 tx->mxc_state = MXLND_CTX_PREP;
2027 tx->mxc_peer = peer;
2028 tx->mxc_conn = conn;
2029 /* finalize = 0, let the PUT_ACK tx finalize this */
2030 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2031 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2032 /* conn ref already taken above */
2033 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2039 /* do not return a credit until after PUT_DATA returns */
2043 case MXLND_MSG_GET_REQ:
2044 if (likely(lntmsg != NULL)) {
2045 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2046 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2048 /* GET didn't match anything */
2049 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2050 * We have to embed the error code in the match bits.
2051 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2052 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2054 tx = mxlnd_get_idle_tx();
2055 if (unlikely(tx == NULL)) {
2056 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2057 libcfs_nid2str(nid));
2061 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2062 tx->mxc_state = MXLND_CTX_PENDING;
2064 tx->mxc_peer = peer;
2065 tx->mxc_conn = conn;
2066 mxlnd_conn_addref(conn); /* for this tx */
2067 tx->mxc_cookie = cookie;
2068 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2069 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2072 /* finalize lntmsg after tx completes */
2080 /* we received a message, increment peer's outstanding credits */
2082 spin_lock(&conn->mxk_lock);
2083 conn->mxk_outstanding++;
2084 spin_unlock(&conn->mxk_lock);
2086 /* we are done with the rx */
2087 mxlnd_put_idle_rx(rx);
2088 mxlnd_conn_decref(conn);
2091 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2093 /* we received a credit, see if we can use it to send a msg */
2094 if (credit) mxlnd_check_sends(peer);
2100 mxlnd_sleep(unsigned long timeout)
2102 set_current_state(TASK_INTERRUPTIBLE);
2103 schedule_timeout(timeout);
2108 * mxlnd_tx_queued - the generic send queue thread
2109 * @arg - thread id (as a void *)
2111 * This thread moves send messages from the global tx_queue to the owning
2112 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2113 * it to the global peer list.
2116 mxlnd_tx_queued(void *arg)
2118 long id = (long) arg;
2121 struct kmx_ctx *tx = NULL;
2122 struct kmx_peer *peer = NULL;
2123 struct list_head *tmp_tx = NULL;
2125 cfs_daemonize("mxlnd_tx_queued");
2126 //cfs_block_allsigs();
2128 while (!kmxlnd_data.kmx_shutdown) {
2129 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2130 if (kmxlnd_data.kmx_shutdown)
2132 if (ret != 0) // Should we check for -EINTR?
2134 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2135 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2136 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2139 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2140 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2141 list_del_init(&tx->mxc_list);
2142 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2145 peer = mxlnd_find_peer_by_nid(tx->mxc_nid);
2147 tx->mxc_peer = peer;
2148 tx->mxc_conn = peer->mxp_conn;
2149 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2155 struct kmx_peer *peer = NULL;
2156 struct kmx_peer *old = NULL;
2158 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2160 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2161 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2163 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2165 /* finalize message */
2166 tx->mxc_status.code = -ECONNABORTED;
2167 mxlnd_put_idle_tx(tx);
2170 tx->mxc_peer = peer;
2171 tx->mxc_conn = peer->mxp_conn;
2173 /* add peer to global peer list, but look to see
2174 * if someone already created it after we released
2176 write_lock(&kmxlnd_data.kmx_peers_lock);
2177 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2178 if (old->mxp_nid == peer->mxp_nid) {
2179 /* somebody beat us here, we created a duplicate */
2186 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2187 atomic_inc(&kmxlnd_data.kmx_npeers);
2190 tx->mxc_conn = old->mxp_conn;
2191 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2192 mxlnd_peer_decref(peer);
2194 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2195 write_unlock(&kmxlnd_data.kmx_peers_lock);
2200 mxlnd_thread_stop(id);
2204 /* When calling this, we must not have the peer lock. */
2206 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2208 mx_return_t mxret = MX_SUCCESS;
2209 mx_request_t request;
2210 struct kmx_conn *conn = peer->mxp_conn;
2212 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2214 LASSERT(mask == MXLND_MASK_ICON_REQ ||
2215 mask == MXLND_MASK_ICON_ACK);
2217 if (peer->mxp_reconnect_time == 0) {
2218 peer->mxp_reconnect_time = jiffies;
2221 if (peer->mxp_nic_id == 0LL) {
2222 mxlnd_peer_hostname_to_nic_id(peer);
2223 if (peer->mxp_nic_id == 0LL) {
2224 /* not mapped yet, return */
2225 spin_lock(&conn->mxk_lock);
2226 conn->mxk_status = MXLND_CONN_INIT;
2227 spin_unlock(&conn->mxk_lock);
2228 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2229 /* give up and notify LNET */
2230 mxlnd_conn_disconnect(conn, 0, 1);
2231 mxlnd_conn_alloc(&peer->mxp_conn, peer);
2233 mxlnd_conn_decref(conn);
2238 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2239 peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2240 (void *) peer, &request);
2241 if (unlikely(mxret != MX_SUCCESS)) {
2242 spin_lock(&conn->mxk_lock);
2243 conn->mxk_status = MXLND_CONN_FAIL;
2244 spin_unlock(&conn->mxk_lock);
2245 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2246 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2247 mxlnd_conn_decref(conn);
2252 #define MXLND_STATS 0
2255 mxlnd_check_sends(struct kmx_peer *peer)
2259 mx_return_t mxret = MX_SUCCESS;
2260 struct kmx_ctx *tx = NULL;
2261 struct kmx_conn *conn = NULL;
2268 static unsigned long last = 0;
2271 if (unlikely(peer == NULL)) {
2272 LASSERT(peer != NULL);
2275 conn = peer->mxp_conn;
2276 /* do not add another ref for this tx */
2279 /* we do not have any conns */
2284 if (time_after(jiffies, last)) {
2285 last = jiffies + HZ;
2286 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2287 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2288 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2289 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2290 conn->mxk_ntx_data, conn->mxk_data_posted);
2294 /* cache peer state for asserts */
2295 spin_lock(&conn->mxk_lock);
2296 ntx_posted = conn->mxk_ntx_posted;
2297 credits = conn->mxk_credits;
2298 spin_unlock(&conn->mxk_lock);
2300 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2301 LASSERT(ntx_posted >= 0);
2303 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2304 LASSERT(credits >= 0);
2306 /* check number of queued msgs, ignore data */
2307 spin_lock(&conn->mxk_lock);
2308 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2309 /* check if any txs queued that could return credits... */
2310 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2311 /* if not, send a NOOP */
2312 tx = mxlnd_get_idle_tx();
2313 if (likely(tx != NULL)) {
2314 tx->mxc_peer = peer;
2315 tx->mxc_conn = peer->mxp_conn;
2316 mxlnd_conn_addref(conn); /* for this tx */
2317 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2318 tx->mxc_match = mxlnd_create_match(tx, 0);
2319 mxlnd_peer_queue_tx_locked(tx);
2325 spin_unlock(&conn->mxk_lock);
2327 /* if the peer is not ready, try to connect */
2328 spin_lock(&conn->mxk_lock);
2329 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2330 conn->mxk_status == MXLND_CONN_FAIL ||
2331 conn->mxk_status == MXLND_CONN_REQ)) {
2332 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2333 conn->mxk_status = MXLND_CONN_WAIT;
2334 spin_unlock(&conn->mxk_lock);
2335 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2338 spin_unlock(&conn->mxk_lock);
2340 spin_lock(&conn->mxk_lock);
2341 while (!list_empty(&conn->mxk_tx_free_queue) ||
2342 !list_empty(&conn->mxk_tx_credit_queue)) {
2343 /* We have something to send. If we have a queued tx that does not
2344 * require a credit (free), choose it since its completion will
2345 * return a credit (here or at the peer), complete a DATA or
2346 * CONN_REQ or CONN_ACK. */
2347 struct list_head *tmp_tx = NULL;
2348 if (!list_empty(&conn->mxk_tx_free_queue)) {
2349 tmp_tx = &conn->mxk_tx_free_queue;
2351 tmp_tx = &conn->mxk_tx_credit_queue;
2353 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2355 msg_type = tx->mxc_msg_type;
2357 /* don't try to send a rx */
2358 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2360 /* ensure that it is a valid msg type */
2361 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2362 msg_type == MXLND_MSG_CONN_ACK ||
2363 msg_type == MXLND_MSG_NOOP ||
2364 msg_type == MXLND_MSG_EAGER ||
2365 msg_type == MXLND_MSG_PUT_REQ ||
2366 msg_type == MXLND_MSG_PUT_ACK ||
2367 msg_type == MXLND_MSG_PUT_DATA ||
2368 msg_type == MXLND_MSG_GET_REQ ||
2369 msg_type == MXLND_MSG_GET_DATA);
2370 LASSERT(tx->mxc_peer == peer);
2371 LASSERT(tx->mxc_nid == peer->mxp_nid);
2373 credit = mxlnd_tx_requires_credit(tx);
2376 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2377 CDEBUG(D_NET, "%s: posted enough\n",
2378 libcfs_nid2str(peer->mxp_nid));
2382 if (conn->mxk_credits == 0) {
2383 CDEBUG(D_NET, "%s: no credits\n",
2384 libcfs_nid2str(peer->mxp_nid));
2388 if (conn->mxk_credits == 1 && /* last credit reserved for */
2389 conn->mxk_outstanding == 0) { /* giving back credits */
2390 CDEBUG(D_NET, "%s: not using last credit\n",
2391 libcfs_nid2str(peer->mxp_nid));
2396 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2397 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2398 msg_type == MXLND_MSG_CONN_ACK)) {
2399 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2400 mxlnd_connstatus_to_str(conn->mxk_status),
2402 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2403 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2404 list_del_init(&tx->mxc_list);
2405 tx->mxc_status.code = -ECONNABORTED;
2406 mxlnd_put_idle_tx(tx);
2407 mxlnd_conn_decref(conn);
2413 list_del_init(&tx->mxc_list);
2415 /* handle credits, etc now while we have the lock to avoid races */
2417 conn->mxk_credits--;
2418 conn->mxk_ntx_posted++;
2420 if (msg_type != MXLND_MSG_PUT_DATA &&
2421 msg_type != MXLND_MSG_GET_DATA) {
2422 if (msg_type != MXLND_MSG_CONN_REQ &&
2423 msg_type != MXLND_MSG_CONN_ACK) {
2424 conn->mxk_ntx_msgs--;
2427 if (tx->mxc_incarnation == 0 &&
2428 conn->mxk_incarnation != 0) {
2429 tx->mxc_incarnation = conn->mxk_incarnation;
2431 spin_unlock(&conn->mxk_lock);
2433 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2434 * or (2) there is a non-DATA msg that can return credits in the
2435 * queue, then drop this duplicate NOOP */
2436 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2437 spin_lock(&conn->mxk_lock);
2438 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2439 (conn->mxk_ntx_msgs >= 1)) {
2440 conn->mxk_credits++;
2441 conn->mxk_ntx_posted--;
2442 spin_unlock(&conn->mxk_lock);
2443 /* redundant NOOP */
2444 mxlnd_put_idle_tx(tx);
2445 mxlnd_conn_decref(conn);
2446 CDEBUG(D_NET, "%s: redundant noop\n",
2447 libcfs_nid2str(peer->mxp_nid));
2451 spin_unlock(&conn->mxk_lock);
2455 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2456 (msg_type != MXLND_MSG_GET_DATA))) {
2460 //ret = -ECONNABORTED;
2463 spin_lock(&conn->mxk_lock);
2464 status = conn->mxk_status;
2465 spin_unlock(&conn->mxk_lock);
2467 if (likely((status == MXLND_CONN_READY) ||
2468 (msg_type == MXLND_MSG_CONN_REQ) ||
2469 (msg_type == MXLND_MSG_CONN_ACK))) {
2471 if (msg_type != MXLND_MSG_CONN_REQ &&
2472 msg_type != MXLND_MSG_CONN_ACK) {
2473 /* add to the pending list */
2474 ret = mxlnd_q_pending_ctx(tx);
2476 /* FIXME the conn is disconnected, now what? */
2480 tx->mxc_state = MXLND_CTX_PENDING;
2484 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2485 msg_type != MXLND_MSG_GET_DATA)) {
2486 /* send a msg style tx */
2487 LASSERT(tx->mxc_nseg == 1);
2488 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2489 CDEBUG(D_NET, "sending %s 0x%llx\n",
2490 mxlnd_msgtype_to_str(msg_type),
2492 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2501 /* send a DATA tx */
2502 spin_lock(&conn->mxk_lock);
2503 conn->mxk_ntx_data--;
2504 conn->mxk_data_posted++;
2505 spin_unlock(&conn->mxk_lock);
2506 CDEBUG(D_NET, "sending %s 0x%llx\n",
2507 mxlnd_msgtype_to_str(msg_type),
2509 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2519 mxret = MX_CONNECTION_FAILED;
2521 if (likely(mxret == MX_SUCCESS)) {
2524 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2525 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2526 libcfs_nid2str(peer->mxp_nid));
2527 /* NOTE mx_kisend() only fails if there are not enough
2528 * resources. Do not change the connection status. */
2529 if (mxret == MX_NO_RESOURCES) {
2530 tx->mxc_status.code = -ENOMEM;
2532 tx->mxc_status.code = -ECONNABORTED;
2535 spin_lock(&conn->mxk_lock);
2536 conn->mxk_ntx_posted--;
2537 conn->mxk_credits++;
2538 spin_unlock(&conn->mxk_lock);
2539 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2540 msg_type == MXLND_MSG_GET_DATA) {
2541 spin_lock(&conn->mxk_lock);
2542 conn->mxk_data_posted--;
2543 spin_unlock(&conn->mxk_lock);
2545 if (msg_type != MXLND_MSG_PUT_DATA &&
2546 msg_type != MXLND_MSG_GET_DATA &&
2547 msg_type != MXLND_MSG_CONN_REQ &&
2548 msg_type != MXLND_MSG_CONN_ACK) {
2549 spin_lock(&conn->mxk_lock);
2550 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2551 spin_unlock(&conn->mxk_lock);
2553 if (msg_type != MXLND_MSG_CONN_REQ &&
2554 msg_type != MXLND_MSG_CONN_ACK) {
2555 /* remove from the pending list */
2556 mxlnd_deq_pending_ctx(tx);
2558 mxlnd_put_idle_tx(tx);
2559 mxlnd_conn_decref(conn);
2562 spin_lock(&conn->mxk_lock);
2565 spin_unlock(&conn->mxk_lock);
2572 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2573 * @ctx - the tx descriptor
2575 * Determine which type of send request it was and start the next step, if needed,
2576 * or, if done, signal completion to LNET. After we are done, put back on the
2580 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2582 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2583 struct kmx_msg *msg = tx->mxc_msg;
2584 struct kmx_peer *peer = tx->mxc_peer;
2585 struct kmx_conn *conn = tx->mxc_conn;
2586 u8 type = tx->mxc_msg_type;
2587 int credit = mxlnd_tx_requires_credit(tx);
2588 u64 cookie = tx->mxc_cookie;
2590 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2591 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2593 if (unlikely(conn == NULL)) {
2594 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &conn);
2596 /* do not add a ref for the tx, it was set before sending */
2597 tx->mxc_conn = conn;
2598 tx->mxc_peer = conn->mxk_peer;
2601 LASSERT (peer != NULL);
2602 LASSERT (conn != NULL);
2604 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2605 LASSERT (type == msg->mxm_type);
2609 tx->mxc_status.code = -EIO;
2611 spin_lock(&conn->mxk_lock);
2612 conn->mxk_last_tx = jiffies;
2613 spin_unlock(&conn->mxk_lock);
2618 case MXLND_MSG_GET_DATA:
2619 spin_lock(&conn->mxk_lock);
2620 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2621 conn->mxk_outstanding++;
2622 conn->mxk_data_posted--;
2624 spin_unlock(&conn->mxk_lock);
2627 case MXLND_MSG_PUT_DATA:
2628 spin_lock(&conn->mxk_lock);
2629 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2630 conn->mxk_data_posted--;
2632 spin_unlock(&conn->mxk_lock);
2635 case MXLND_MSG_NOOP:
2636 case MXLND_MSG_PUT_REQ:
2637 case MXLND_MSG_PUT_ACK:
2638 case MXLND_MSG_GET_REQ:
2639 case MXLND_MSG_EAGER:
2640 //case MXLND_MSG_NAK:
2643 case MXLND_MSG_CONN_ACK:
2644 if (peer->mxp_incompatible) {
2645 /* we sent our params, now close this conn */
2646 mxlnd_conn_disconnect(conn, 0, 1);
2648 case MXLND_MSG_CONN_REQ:
2650 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2651 "failed with %s (%d) to %s\n",
2652 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2653 mx_strstatus(tx->mxc_status.code),
2654 tx->mxc_status.code,
2655 libcfs_nid2str(tx->mxc_nid));
2656 if (!peer->mxp_incompatible) {
2657 spin_lock(&conn->mxk_lock);
2658 conn->mxk_status = MXLND_CONN_FAIL;
2659 spin_unlock(&conn->mxk_lock);
2665 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2670 spin_lock(&conn->mxk_lock);
2671 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2672 conn->mxk_ntx_posted--;
2674 spin_unlock(&conn->mxk_lock);
2677 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2678 mxlnd_put_idle_tx(tx);
2679 mxlnd_conn_decref(conn);
2681 mxlnd_check_sends(peer);
2687 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2692 u32 nob = rx->mxc_status.xfer_length;
2693 u64 bits = rx->mxc_status.match_info;
2694 struct kmx_msg *msg = rx->mxc_msg;
2695 struct kmx_peer *peer = rx->mxc_peer;
2696 struct kmx_conn *conn = rx->mxc_conn;
2697 u8 type = rx->mxc_msg_type;
2699 lnet_msg_t *lntmsg[2];
2704 int incompatible = 0;
2706 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2707 * failed GET reply, CONN_REQ, or a CONN_ACK */
2709 /* NOTE peer may still be NULL if it is a new peer */
2710 if (peer == NULL || conn == NULL) {
2711 /* if the peer was disconnected, the peer may exist but
2712 * not have any valid conns */
2713 decref = 0; /* no peer means no ref was taken for this rx */
2716 if (conn == NULL && peer != NULL) {
2717 conn = peer->mxp_conn;
2718 rx->mxc_conn = conn;
2722 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2728 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2729 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2730 libcfs_nid2str(rx->mxc_nid),
2731 mx_strstatus(rx->mxc_status.code),
2732 (int) rx->mxc_status.code);
2738 /* this may be a failed GET reply */
2739 if (type == MXLND_MSG_GET_DATA) {
2740 bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2741 ret = (u32) (bits>>52);
2742 lntmsg[0] = rx->mxc_lntmsg[0];
2746 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2747 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2748 libcfs_nid2str(rx->mxc_nid));
2753 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2754 if (type == MXLND_MSG_PUT_DATA) {
2755 result = rx->mxc_status.code;
2756 lntmsg[0] = rx->mxc_lntmsg[0];
2758 } else if (type == MXLND_MSG_GET_DATA) {
2759 result = rx->mxc_status.code;
2760 lntmsg[0] = rx->mxc_lntmsg[0];
2761 lntmsg[1] = rx->mxc_lntmsg[1];
2765 ret = mxlnd_unpack_msg(msg, nob);
2767 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2768 ret, libcfs_nid2str(rx->mxc_nid));
2772 type = msg->mxm_type;
2775 if (type != MXLND_MSG_CONN_REQ &&
2776 (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2777 !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2778 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2779 "0x%llx and rx msg dst is 0x%llx)\n",
2780 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2785 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2786 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2787 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2789 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2790 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2791 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2792 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2793 msg->mxm_srcstamp, conn->mxk_incarnation,
2794 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2796 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2797 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2798 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2799 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2806 CDEBUG(D_NET, "Received %s with %d credits\n",
2807 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2809 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2810 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2811 LASSERT(peer != NULL);
2812 LASSERT(conn != NULL);
2813 if (msg->mxm_credits != 0) {
2814 spin_lock(&conn->mxk_lock);
2815 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2816 if ((conn->mxk_credits + msg->mxm_credits) >
2817 *kmxlnd_tunables.kmx_credits) {
2818 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2819 conn->mxk_credits, msg->mxm_credits);
2821 conn->mxk_credits += msg->mxm_credits;
2822 LASSERT(conn->mxk_credits >= 0);
2823 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2825 spin_unlock(&conn->mxk_lock);
2829 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2831 case MXLND_MSG_NOOP:
2834 case MXLND_MSG_EAGER:
2835 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2836 msg->mxm_srcnid, rx, 0);
2840 case MXLND_MSG_PUT_REQ:
2841 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2842 msg->mxm_srcnid, rx, 1);
2846 case MXLND_MSG_PUT_ACK: {
2847 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2848 if (cookie > MXLND_MAX_COOKIE) {
2849 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
2850 libcfs_nid2str(rx->mxc_nid));
2851 result = -((cookie >> 52) & 0xff);
2852 lntmsg[0] = rx->mxc_lntmsg[0];
2854 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
2855 rx->mxc_peer, MXLND_MSG_PUT_DATA,
2856 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
2861 case MXLND_MSG_GET_REQ:
2862 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
2863 msg->mxm_srcnid, rx, 1);
2867 case MXLND_MSG_CONN_REQ:
2868 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
2869 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
2870 libcfs_nid2str(msg->mxm_srcnid),
2871 libcfs_nid2str(msg->mxm_dstnid));
2874 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
2875 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
2877 libcfs_nid2str(msg->mxm_srcnid),
2878 msg->mxm_u.conn_req.mxcrm_queue_depth,
2879 *kmxlnd_tunables.kmx_credits);
2882 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
2883 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
2885 libcfs_nid2str(msg->mxm_srcnid),
2886 msg->mxm_u.conn_req.mxcrm_eager_size,
2887 (int) MXLND_EAGER_SIZE);
2891 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid);
2894 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
2896 mx_decompose_endpoint_addr(rx->mxc_status.source,
2898 rx->mxc_nid = msg->mxm_srcnid;
2900 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
2904 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
2905 write_lock(&kmxlnd_data.kmx_peers_lock);
2906 list_add_tail(&peer->mxp_peers,
2907 &kmxlnd_data.kmx_peers[hash]);
2908 write_unlock(&kmxlnd_data.kmx_peers_lock);
2909 atomic_inc(&kmxlnd_data.kmx_npeers);
2911 ret = mxlnd_conn_alloc(&conn, peer);
2913 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
2917 conn = peer->mxp_conn;
2919 struct kmx_conn *old_conn = conn;
2921 /* do not call mx_disconnect() */
2922 mxlnd_conn_disconnect(old_conn, 0, 0);
2924 /* the ref for this rx was taken on the old_conn */
2925 mxlnd_conn_decref(old_conn);
2927 /* do not decref this conn below */
2930 /* This allocs a conn, points peer->mxp_conn to this one.
2931 * The old conn is still on the peer->mxp_conns list.
2932 * As the pending requests complete, they will call
2933 * conn_decref() which will eventually free it. */
2934 ret = mxlnd_conn_alloc(&conn, peer);
2936 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
2940 spin_lock(&peer->mxp_lock);
2941 peer->mxp_incarnation = msg->mxm_srcstamp;
2942 peer->mxp_incompatible = incompatible;
2943 spin_unlock(&peer->mxp_lock);
2944 spin_lock(&conn->mxk_lock);
2945 conn->mxk_incarnation = msg->mxm_srcstamp;
2946 conn->mxk_status = MXLND_CONN_WAIT;
2947 spin_unlock(&conn->mxk_lock);
2949 /* handle_conn_ack() will create the CONN_ACK msg */
2950 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
2954 case MXLND_MSG_CONN_ACK:
2955 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
2956 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2957 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
2958 libcfs_nid2str(msg->mxm_dstnid));
2962 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
2963 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2964 "incompatible queue depth %d (%d wanted)\n",
2965 libcfs_nid2str(msg->mxm_srcnid),
2966 msg->mxm_u.conn_req.mxcrm_queue_depth,
2967 *kmxlnd_tunables.kmx_credits);
2968 spin_lock(&conn->mxk_lock);
2969 conn->mxk_status = MXLND_CONN_FAIL;
2970 spin_unlock(&conn->mxk_lock);
2974 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
2975 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2976 "incompatible EAGER size %d (%d wanted)\n",
2977 libcfs_nid2str(msg->mxm_srcnid),
2978 msg->mxm_u.conn_req.mxcrm_eager_size,
2979 (int) MXLND_EAGER_SIZE);
2980 spin_lock(&conn->mxk_lock);
2981 conn->mxk_status = MXLND_CONN_FAIL;
2982 spin_unlock(&conn->mxk_lock);
2986 spin_lock(&peer->mxp_lock);
2987 peer->mxp_incarnation = msg->mxm_srcstamp;
2988 peer->mxp_incompatible = incompatible;
2989 spin_unlock(&peer->mxp_lock);
2990 spin_lock(&conn->mxk_lock);
2991 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
2992 conn->mxk_outstanding = 0;
2993 conn->mxk_incarnation = msg->mxm_srcstamp;
2994 conn->mxk_timeout = 0;
2995 if (!incompatible) {
2996 conn->mxk_status = MXLND_CONN_READY;
2998 spin_unlock(&conn->mxk_lock);
2999 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3003 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3004 libcfs_nid2str(rx->mxc_nid));
3011 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3012 spin_lock(&conn->mxk_lock);
3013 conn->mxk_status = MXLND_CONN_FAIL;
3014 spin_unlock(&conn->mxk_lock);
3019 spin_lock(&conn->mxk_lock);
3020 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3021 spin_unlock(&conn->mxk_lock);
3025 /* lnet_parse() failed, etc., repost now */
3026 mxlnd_put_idle_rx(rx);
3027 if (conn != NULL && credit == 1) {
3028 if (type == MXLND_MSG_PUT_DATA) {
3029 spin_lock(&conn->mxk_lock);
3030 conn->mxk_outstanding++;
3031 spin_unlock(&conn->mxk_lock);
3032 } else if (type != MXLND_MSG_GET_DATA &&
3033 (type == MXLND_MSG_EAGER ||
3034 type == MXLND_MSG_PUT_REQ ||
3035 type == MXLND_MSG_NOOP)) {
3036 spin_lock(&conn->mxk_lock);
3037 conn->mxk_outstanding++;
3038 spin_unlock(&conn->mxk_lock);
3041 if (decref) mxlnd_conn_decref(conn);
3044 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3045 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3047 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3050 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3051 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3053 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3061 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3063 struct kmx_ctx *tx = NULL;
3064 struct kmx_msg *txmsg = NULL;
3065 struct kmx_conn *conn = peer->mxp_conn;
3067 /* a conn ref was taken when calling mx_iconnect(),
3068 * hold it until CONN_REQ or CONN_ACK completes */
3070 CDEBUG(D_NET, "entering\n");
3071 if (status.code != MX_STATUS_SUCCESS) {
3072 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3073 mx_strstatus(status.code), status.code,
3074 libcfs_nid2str(peer->mxp_nid));
3075 spin_lock(&conn->mxk_lock);
3076 conn->mxk_status = MXLND_CONN_FAIL;
3077 spin_unlock(&conn->mxk_lock);
3079 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3080 struct kmx_conn *new_conn = NULL;
3081 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3082 mxlnd_conn_disconnect(conn, 0, 1);
3083 mxlnd_conn_alloc(&new_conn, peer);
3084 spin_lock(&peer->mxp_lock);
3085 peer->mxp_reconnect_time = 0;
3086 spin_unlock(&peer->mxp_lock);
3089 mxlnd_conn_decref(conn);
3093 spin_lock(&conn->mxk_lock);
3094 conn->mxk_epa = status.source;
3095 spin_unlock(&conn->mxk_lock);
3096 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3098 /* mx_iconnect() succeeded, reset delay to 0 */
3099 spin_lock(&peer->mxp_lock);
3100 peer->mxp_reconnect_time = 0;
3101 spin_unlock(&peer->mxp_lock);
3103 /* marshal CONN_REQ msg */
3104 /* we are still using the conn ref from iconnect() - do not take another */
3105 tx = mxlnd_get_idle_tx();
3107 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3108 libcfs_nid2str(peer->mxp_nid));
3109 spin_lock(&conn->mxk_lock);
3110 conn->mxk_status = MXLND_CONN_FAIL;
3111 spin_unlock(&conn->mxk_lock);
3112 mxlnd_conn_decref(conn);
3116 tx->mxc_peer = peer;
3117 tx->mxc_conn = conn;
3118 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3119 txmsg = tx->mxc_msg;
3120 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3121 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3122 tx->mxc_match = mxlnd_create_match(tx, 0);
3124 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3130 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3132 struct kmx_ctx *tx = NULL;
3133 struct kmx_msg *txmsg = NULL;
3134 struct kmx_conn *conn = peer->mxp_conn;
3136 /* a conn ref was taken when calling mx_iconnect(),
3137 * hold it until CONN_REQ or CONN_ACK completes */
3139 CDEBUG(D_NET, "entering\n");
3140 if (status.code != MX_STATUS_SUCCESS) {
3141 struct kmx_conn *conn = peer->mxp_conn;
3142 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3143 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3144 mx_strstatus(status.code), status.code,
3145 libcfs_nid2str(peer->mxp_nid),
3148 peer->mxp_host->mxh_ep_id);
3149 spin_lock(&conn->mxk_lock);
3150 conn->mxk_status = MXLND_CONN_FAIL;
3151 spin_unlock(&conn->mxk_lock);
3153 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3154 struct kmx_conn *new_conn = NULL;
3155 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3156 mxlnd_conn_disconnect(conn, 0, 1);
3157 mxlnd_conn_alloc(&new_conn, peer);
3158 spin_lock(&peer->mxp_lock);
3159 peer->mxp_reconnect_time = 0;
3160 spin_unlock(&peer->mxp_lock);
3163 mxlnd_conn_decref(conn);
3166 spin_lock(&conn->mxk_lock);
3167 conn->mxk_epa = status.source;
3168 if (likely(!peer->mxp_incompatible)) {
3169 conn->mxk_status = MXLND_CONN_READY;
3171 spin_unlock(&conn->mxk_lock);
3172 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3174 /* mx_iconnect() succeeded, reset delay to 0 */
3175 spin_lock(&peer->mxp_lock);
3176 peer->mxp_reconnect_time = 0;
3177 spin_unlock(&peer->mxp_lock);
3179 /* marshal CONN_ACK msg */
3180 tx = mxlnd_get_idle_tx();
3182 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3183 libcfs_nid2str(peer->mxp_nid));
3184 spin_lock(&conn->mxk_lock);
3185 conn->mxk_status = MXLND_CONN_FAIL;
3186 spin_unlock(&conn->mxk_lock);
3187 mxlnd_conn_decref(conn);
3191 tx->mxc_peer = peer;
3192 tx->mxc_conn = conn;
3193 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3194 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3195 txmsg = tx->mxc_msg;
3196 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3197 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3198 tx->mxc_match = mxlnd_create_match(tx, 0);
3205 * mxlnd_request_waitd - the MX request completion thread(s)
3206 * @arg - thread id (as a void *)
3208 * This thread waits for a MX completion and then completes the request.
3209 * We will create one thread per CPU.
3212 mxlnd_request_waitd(void *arg)
3214 long id = (long) arg;
3217 mx_return_t mxret = MX_SUCCESS;
3219 struct kmx_ctx *ctx = NULL;
3220 enum kmx_req_state req_type = MXLND_REQ_TX;
3221 struct kmx_peer *peer = NULL;
3222 struct kmx_conn *conn = NULL;
3227 memset(name, 0, sizeof(name));
3228 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3229 cfs_daemonize(name);
3230 //cfs_block_allsigs();
3232 memset(&status, 0, sizeof(status));
3234 CDEBUG(D_NET, "%s starting\n", name);
3236 while (!kmxlnd_data.kmx_shutdown) {
3240 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3241 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3245 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3246 0LL, 0LL, &status, &result);
3249 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3250 0LL, 0LL, &status, &result);
3252 if (unlikely(kmxlnd_data.kmx_shutdown))
3256 /* nothing completed... */
3260 if (status.code != MX_STATUS_SUCCESS) {
3261 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3262 "match_info 0x%llx and length %d\n",
3263 mx_strstatus(status.code), status.code,
3264 (u64) status.match_info, status.msg_length);
3267 /* This may be a mx_iconnect() request completing,
3268 * check the bit mask for CONN_REQ and CONN_ACK */
3269 if (status.match_info == MXLND_MASK_ICON_REQ ||
3270 status.match_info == MXLND_MASK_ICON_ACK) {
3271 peer = (struct kmx_peer*) status.context;
3272 if (status.match_info == MXLND_MASK_ICON_REQ) {
3273 mxlnd_handle_conn_req(peer, status);
3275 mxlnd_handle_conn_ack(peer, status);
3280 /* This must be a tx or rx */
3282 /* NOTE: if this is a RX from the unexpected callback, it may
3283 * have very little info. If we dropped it in unexpected_recv(),
3284 * it will not have a context. If so, ignore it. */
3285 ctx = (struct kmx_ctx *) status.context;
3288 req_type = ctx->mxc_type;
3289 conn = ctx->mxc_conn; /* this may be NULL */
3290 mxlnd_deq_pending_ctx(ctx);
3292 /* copy status to ctx->mxc_status */
3293 memcpy(&ctx->mxc_status, &status, sizeof(status));
3297 mxlnd_handle_tx_completion(ctx);
3300 mxlnd_handle_rx_completion(ctx);
3303 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3308 /* conn is always set except for the first CONN_REQ rx
3309 * from a new peer */
3310 if (!(status.code == MX_STATUS_SUCCESS ||
3311 status.code == MX_STATUS_TRUNCATED) &&
3313 mxlnd_conn_disconnect(conn, 1, 1);
3316 CDEBUG(D_NET, "waitd() completed task\n");
3318 CDEBUG(D_NET, "%s stopping\n", name);
3319 mxlnd_thread_stop(id);
3325 mxlnd_check_timeouts(unsigned long now)
3329 unsigned long next = 0;
3330 struct kmx_peer *peer = NULL;
3331 struct kmx_conn *conn = NULL;
3333 read_lock(&kmxlnd_data.kmx_peers_lock);
3334 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3335 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3337 if (unlikely(kmxlnd_data.kmx_shutdown))
3340 conn = peer->mxp_conn;
3344 mxlnd_conn_addref(conn);
3345 spin_lock(&conn->mxk_lock);
3347 /* if nothing pending (timeout == 0) or
3348 * if conn is already disconnected,
3350 if (conn->mxk_timeout == 0 ||
3351 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3352 spin_unlock(&conn->mxk_lock);
3353 mxlnd_conn_decref(conn);
3357 /* we want to find the timeout that will occur first.
3358 * if it is in the future, we will sleep until then.
3359 * if it is in the past, then we will sleep one
3360 * second and repeat the process. */
3361 if ((next == 0) || (conn->mxk_timeout < next)) {
3362 next = conn->mxk_timeout;
3367 if (time_after_eq(now, conn->mxk_timeout)) {
3370 spin_unlock(&conn->mxk_lock);
3373 mxlnd_conn_disconnect(conn, 1, 1);
3375 mxlnd_conn_decref(conn);
3378 read_unlock(&kmxlnd_data.kmx_peers_lock);
3379 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3385 * mxlnd_timeoutd - enforces timeouts on messages
3386 * @arg - thread id (as a void *)
3388 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3389 * it calls mxlnd_conn_disconnect().
3391 * After checking for timeouts, try progressing sends (call check_sends()).
3394 mxlnd_timeoutd(void *arg)
3397 long id = (long) arg;
3398 unsigned long now = 0;
3399 unsigned long next = 0;
3400 unsigned long delay = HZ;
3401 struct kmx_peer *peer = NULL;
3402 struct kmx_conn *conn = NULL;
3404 cfs_daemonize("mxlnd_timeoutd");
3405 //cfs_block_allsigs();
3407 CDEBUG(D_NET, "timeoutd starting\n");
3409 while (!kmxlnd_data.kmx_shutdown) {
3412 /* if the next timeout has not arrived, go back to sleep */
3413 if (time_after(now, next)) {
3414 next = mxlnd_check_timeouts(now);
3417 read_lock(&kmxlnd_data.kmx_peers_lock);
3418 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3419 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3420 conn = peer->mxp_conn;
3424 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3425 time_after(now, conn->mxk_last_tx + HZ)) {
3426 mxlnd_check_sends(peer);
3430 read_unlock(&kmxlnd_data.kmx_peers_lock);
3434 CDEBUG(D_NET, "timeoutd stopping\n");
3435 mxlnd_thread_stop(id);