1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
6 * Copyright (C) 2006 Myricom, Inc.
7 * Author: Myricom, Inc. <help at myri.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 inline void mxlnd_noop(char *s, ...)
33 mxlnd_ctxstate_to_str(int mxc_state)
37 return "MXLND_CTX_INIT";
39 return "MXLND_CTX_IDLE";
41 return "MXLND_CTX_PREP";
42 case MXLND_CTX_PENDING:
43 return "MXLND_CTX_PENDING";
44 case MXLND_CTX_COMPLETED:
45 return "MXLND_CTX_COMPLETED";
46 case MXLND_CTX_CANCELED:
47 return "MXLND_CTX_CANCELED";
54 mxlnd_connstatus_to_str(int mxk_status)
57 case MXLND_CONN_READY:
58 return "MXLND_CONN_READY";
60 return "MXLND_CONN_INIT";
62 return "MXLND_CONN_REQ";
64 return "MXLND_CONN_ACK";
66 return "MXLND_CONN_WAIT";
67 case MXLND_CONN_DISCONNECT:
68 return "MXLND_CONN_DISCONNECT";
70 return "MXLND_CONN_FAIL";
77 mxlnd_msgtype_to_str(int type) {
80 return "MXLND_MSG_EAGER";
81 case MXLND_MSG_CONN_REQ:
82 return "MXLND_MSG_CONN_REQ";
83 case MXLND_MSG_CONN_ACK:
84 return "MXLND_MSG_CONN_ACK";
86 return "MXLND_MSG_NOOP";
87 case MXLND_MSG_PUT_REQ:
88 return "MXLND_MSG_PUT_REQ";
89 case MXLND_MSG_PUT_ACK:
90 return "MXLND_MSG_PUT_ACK";
91 case MXLND_MSG_PUT_DATA:
92 return "MXLND_MSG_PUT_DATA";
93 case MXLND_MSG_GET_REQ:
94 return "MXLND_MSG_GET_REQ";
95 case MXLND_MSG_GET_DATA:
96 return "MXLND_MSG_GET_DATA";
103 mxlnd_lnetmsg_to_str(int type)
107 return "LNET_MSG_ACK";
109 return "LNET_MSG_PUT";
111 return "LNET_MSG_GET";
113 return "LNET_MSG_REPLY";
115 return "LNET_MSG_HELLO";
123 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
124 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
126 u64 type = (u64) ctx->mxc_msg_type;
127 u64 err = (u64) error;
130 LASSERT(ctx->mxc_msg_type != 0);
131 LASSERT(ctx->mxc_cookie >> 52 == 0);
132 match = (type << 60) | (err << 52) | ctx->mxc_cookie;
137 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
139 *msg_type = (u8) (match >> 60);
140 *error = (u8) ((match >> 52) & 0xFF);
141 *cookie = match & 0xFFFFFFFFFFFFFLL;
142 LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
143 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
144 *msg_type == MXLND_MSG_EAGER ||
145 *msg_type == MXLND_MSG_CONN_REQ ||
146 *msg_type == MXLND_MSG_CONN_ACK ||
147 *msg_type == MXLND_MSG_NOOP ||
148 *msg_type == MXLND_MSG_PUT_REQ ||
149 *msg_type == MXLND_MSG_PUT_ACK ||
150 *msg_type == MXLND_MSG_PUT_DATA ||
151 *msg_type == MXLND_MSG_GET_REQ ||
152 *msg_type == MXLND_MSG_GET_DATA);
157 mxlnd_get_idle_rx(void)
159 struct list_head *tmp = NULL;
160 struct kmx_ctx *rx = NULL;
162 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
164 if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
165 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
169 tmp = &kmxlnd_data.kmx_rx_idle;
170 rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
171 list_del_init(&rx->mxc_list);
172 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
175 if (rx->mxc_get != rx->mxc_put) {
176 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
177 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
178 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
179 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
180 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
181 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
182 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
183 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
184 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
185 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
188 LASSERT (rx->mxc_get == rx->mxc_put);
192 LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
193 rx->mxc_state = MXLND_CTX_PREP;
199 mxlnd_put_idle_rx(struct kmx_ctx *rx)
202 CDEBUG(D_NETERROR, "called with NULL pointer\n");
204 } else if (rx->mxc_type != MXLND_REQ_RX) {
205 CDEBUG(D_NETERROR, "called with tx\n");
208 LASSERT(rx->mxc_get == rx->mxc_put + 1);
211 spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
212 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
213 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
218 mxlnd_reduce_idle_rxs(__u32 count)
221 struct kmx_ctx *rx = NULL;
223 spin_lock(&kmxlnd_data.kmx_rxs_lock);
224 for (i = 0; i < count; i++) {
225 rx = mxlnd_get_idle_rx();
227 struct list_head *tmp = &rx->mxc_global_list;
231 CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
235 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
240 mxlnd_get_idle_tx(void)
242 struct list_head *tmp = NULL;
243 struct kmx_ctx *tx = NULL;
245 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
247 if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
248 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
249 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
253 tmp = &kmxlnd_data.kmx_tx_idle;
254 tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
255 list_del_init(&tx->mxc_list);
257 /* Allocate a new completion cookie. It might not be needed,
258 * but we've got a lock right now and we're unlikely to
260 tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
261 if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
262 kmxlnd_data.kmx_tx_next_cookie = 1;
264 kmxlnd_data.kmx_tx_used++;
265 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
267 LASSERT (tx->mxc_get == tx->mxc_put);
271 LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
272 LASSERT (tx->mxc_lntmsg[0] == NULL);
273 LASSERT (tx->mxc_lntmsg[1] == NULL);
275 tx->mxc_state = MXLND_CTX_PREP;
281 mxlnd_put_idle_tx(struct kmx_ctx *tx)
283 //int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
285 lnet_msg_t *lntmsg[2];
288 CDEBUG(D_NETERROR, "called with NULL pointer\n");
290 } else if (tx->mxc_type != MXLND_REQ_TX) {
291 CDEBUG(D_NETERROR, "called with rx\n");
294 if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
295 tx->mxc_status.code == MX_STATUS_TRUNCATED))
298 lntmsg[0] = tx->mxc_lntmsg[0];
299 lntmsg[1] = tx->mxc_lntmsg[1];
301 LASSERT(tx->mxc_get == tx->mxc_put + 1);
304 spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
305 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
306 kmxlnd_data.kmx_tx_used--;
307 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
308 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
309 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
314 * mxlnd_conn_free - free the conn
315 * @conn - a kmx_conn pointer
317 * The calling function should remove the conn from the conns list first
321 mxlnd_conn_free(struct kmx_conn *conn)
323 struct kmx_peer *peer = conn->mxk_peer;
325 CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
326 LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
327 list_empty (&conn->mxk_tx_free_queue) &&
328 list_empty (&conn->mxk_pending));
329 if (!list_empty(&conn->mxk_list)) {
330 spin_lock(&peer->mxp_lock);
331 list_del_init(&conn->mxk_list);
332 if (peer->mxp_conn == conn) {
333 peer->mxp_conn = NULL;
334 if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
335 mx_set_endpoint_addr_context(conn->mxk_epa,
339 spin_unlock(&peer->mxp_lock);
341 mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
342 MXLND_FREE (conn, sizeof (*conn));
348 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
351 struct kmx_ctx *ctx = NULL;
352 struct kmx_ctx *next = NULL;
353 mx_return_t mxret = MX_SUCCESS;
358 spin_lock(&conn->mxk_lock);
359 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
360 /* we will delete all including txs */
361 list_del_init(&ctx->mxc_list);
362 if (ctx->mxc_type == MXLND_REQ_RX) {
364 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
367 if (mxret != MX_SUCCESS) {
368 CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
371 ctx->mxc_status.code = -ECONNABORTED;
372 ctx->mxc_state = MXLND_CTX_CANCELED;
373 /* NOTE this calls lnet_finalize() and
374 * we cannot hold any locks when calling it.
375 * It also calls mxlnd_conn_decref(conn) */
376 spin_unlock(&conn->mxk_lock);
377 mxlnd_handle_rx_completion(ctx);
378 spin_lock(&conn->mxk_lock);
383 spin_unlock(&conn->mxk_lock);
391 * mxlnd_conn_disconnect - shutdown a connection
392 * @conn - a kmx_conn pointer
394 * This function sets the status to DISCONNECT, completes queued
395 * txs with failure, calls mx_disconnect, which will complete
396 * pending txs and matched rxs with failure.
399 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
401 struct list_head *tmp = NULL;
403 spin_lock(&conn->mxk_lock);
404 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
405 spin_unlock(&conn->mxk_lock);
408 conn->mxk_status = MXLND_CONN_DISCONNECT;
409 conn->mxk_timeout = 0;
411 while (!list_empty(&conn->mxk_tx_free_queue) ||
412 !list_empty(&conn->mxk_tx_credit_queue)) {
414 struct kmx_ctx *tx = NULL;
416 if (!list_empty(&conn->mxk_tx_free_queue)) {
417 tmp = &conn->mxk_tx_free_queue;
419 tmp = &conn->mxk_tx_credit_queue;
422 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
423 list_del_init(&tx->mxc_list);
424 tx->mxc_status.code = -ECONNABORTED;
425 spin_unlock(&conn->mxk_lock);
426 mxlnd_put_idle_tx(tx);
427 mxlnd_conn_decref(conn); /* for this tx */
428 spin_lock(&conn->mxk_lock);
431 spin_unlock(&conn->mxk_lock);
433 /* cancel pending rxs */
434 mxlnd_conn_cancel_pending_rxs(conn);
436 if (kmxlnd_data.kmx_shutdown != 1) {
438 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
441 time_t last_alive = 0;
442 unsigned long last_msg = 0;
444 /* notify LNET that we are giving up on this peer */
445 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
446 last_msg = conn->mxk_last_rx;
448 last_msg = conn->mxk_last_tx;
450 last_alive = cfs_time_current_sec() -
451 cfs_duration_sec(cfs_time_current() - last_msg);
452 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
455 mxlnd_conn_decref(conn); /* drop the owning peer's reference */
461 * mxlnd_conn_alloc - allocate and initialize a new conn struct
462 * @connp - address of a kmx_conn pointer
463 * @peer - owning kmx_peer
465 * Returns 0 on success and -ENOMEM on failure
468 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
470 struct kmx_conn *conn = NULL;
472 LASSERT(peer != NULL);
474 MXLND_ALLOC(conn, sizeof (*conn));
476 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
479 CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
481 memset(conn, 0, sizeof(*conn));
483 /* conn->mxk_incarnation = 0 - will be set by peer */
484 atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
485 and one for the caller */
486 conn->mxk_peer = peer;
487 /* mxk_epa - to be set after mx_iconnect() */
488 INIT_LIST_HEAD(&conn->mxk_list);
489 spin_lock_init(&conn->mxk_lock);
490 /* conn->mxk_timeout = 0 */
491 conn->mxk_last_tx = jiffies;
492 conn->mxk_last_rx = conn->mxk_last_tx;
493 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
494 /* mxk_outstanding = 0 */
495 conn->mxk_status = MXLND_CONN_INIT;
496 INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
497 INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
498 /* conn->mxk_ntx_msgs = 0 */
499 /* conn->mxk_ntx_data = 0 */
500 /* conn->mxk_ntx_posted = 0 */
501 /* conn->mxk_data_posted = 0 */
502 INIT_LIST_HEAD(&conn->mxk_pending);
506 mxlnd_peer_addref(peer); /* add a ref for this conn */
508 /* add to front of peer's conns list */
509 list_add(&conn->mxk_list, &peer->mxp_conns);
510 peer->mxp_conn = conn;
515 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
518 spin_lock(&peer->mxp_lock);
519 ret = mxlnd_conn_alloc_locked(connp, peer);
520 spin_unlock(&peer->mxp_lock);
525 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
528 struct kmx_conn *conn = ctx->mxc_conn;
530 ctx->mxc_state = MXLND_CTX_PENDING;
532 spin_lock(&conn->mxk_lock);
533 if (conn->mxk_status >= MXLND_CONN_INIT) {
534 list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
535 if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
536 conn->mxk_timeout = ctx->mxc_deadline;
539 ctx->mxc_state = MXLND_CTX_COMPLETED;
542 spin_unlock(&conn->mxk_lock);
548 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
550 LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
551 ctx->mxc_state == MXLND_CTX_COMPLETED);
552 if (ctx->mxc_state != MXLND_CTX_PENDING &&
553 ctx->mxc_state != MXLND_CTX_COMPLETED) {
554 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
555 mxlnd_ctxstate_to_str(ctx->mxc_state));
557 ctx->mxc_state = MXLND_CTX_COMPLETED;
558 if (!list_empty(&ctx->mxc_list)) {
559 struct kmx_conn *conn = ctx->mxc_conn;
560 struct kmx_ctx *next = NULL;
561 LASSERT(conn != NULL);
562 spin_lock(&conn->mxk_lock);
563 list_del_init(&ctx->mxc_list);
564 conn->mxk_timeout = 0;
565 if (!list_empty(&conn->mxk_pending)) {
566 next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
567 conn->mxk_timeout = next->mxc_deadline;
569 spin_unlock(&conn->mxk_lock);
575 * mxlnd_peer_free - free the peer
576 * @peer - a kmx_peer pointer
578 * The calling function should decrement the rxs, drain the tx queues and
579 * remove the peer from the peers list first then destroy it.
582 mxlnd_peer_free(struct kmx_peer *peer)
584 CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
586 LASSERT (atomic_read(&peer->mxp_refcount) == 0);
588 if (peer->mxp_host != NULL) {
589 spin_lock(&peer->mxp_host->mxh_lock);
590 peer->mxp_host->mxh_peer = NULL;
591 spin_unlock(&peer->mxp_host->mxh_lock);
593 if (!list_empty(&peer->mxp_peers)) {
594 /* assume we are locked */
595 list_del_init(&peer->mxp_peers);
598 MXLND_FREE (peer, sizeof (*peer));
599 atomic_dec(&kmxlnd_data.kmx_npeers);
604 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
607 char name[MX_MAX_HOSTNAME_LEN + 1];
608 mx_return_t mxret = MX_SUCCESS;
610 memset(name, 0, sizeof(name));
611 snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
612 mxret = mx_hostname_to_nic_id(name, &nic_id);
613 if (mxret == MX_SUCCESS) {
614 peer->mxp_nic_id = nic_id;
616 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
617 "with %s\n", name, mx_strerror(mxret));
618 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
619 if (mxret == MX_SUCCESS) {
620 peer->mxp_nic_id = nic_id;
622 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
623 "with %s\n", peer->mxp_host->mxh_hostname,
631 * mxlnd_peer_alloc - allocate and initialize a new peer struct
632 * @peerp - address of a kmx_peer pointer
633 * @nid - LNET node id
635 * Returns 0 on success and -ENOMEM on failure
638 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
642 u32 addr = LNET_NIDADDR(nid);
643 struct kmx_peer *peer = NULL;
644 struct kmx_host *host = NULL;
646 LASSERT (nid != LNET_NID_ANY && nid != 0LL);
648 MXLND_ALLOC(peer, sizeof (*peer));
650 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
653 CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
655 memset(peer, 0, sizeof(*peer));
657 list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
658 if (addr == host->mxh_addr) {
659 peer->mxp_host = host;
660 spin_lock(&host->mxh_lock);
661 host->mxh_peer = peer;
662 spin_unlock(&host->mxh_lock);
666 if (peer->mxp_host == NULL) {
667 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
668 MXLND_FREE(peer, sizeof(*peer));
673 /* peer->mxp_incarnation */
674 atomic_set(&peer->mxp_refcount, 1); /* ref for kmx_peers list */
675 mxlnd_peer_hostname_to_nic_id(peer);
677 INIT_LIST_HEAD(&peer->mxp_peers);
678 spin_lock_init(&peer->mxp_lock);
679 INIT_LIST_HEAD(&peer->mxp_conns);
680 ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
682 mxlnd_peer_decref(peer);
686 for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
687 struct kmx_ctx *rx = NULL;
688 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
690 mxlnd_reduce_idle_rxs(i);
691 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
692 mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
693 mxlnd_peer_decref(peer);
696 spin_lock(&kmxlnd_data.kmx_rxs_lock);
697 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
698 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
700 mxlnd_put_idle_rx(rx);
702 /* peer->mxp_reconnect_time = 0 */
703 /* peer->mxp_incompatible = 0 */
710 * mxlnd_nid_to_hash - hash the nid
713 * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
716 mxlnd_nid_to_hash(lnet_nid_t nid)
718 return (nid & MXLND_HASH_MASK) ^
719 ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
722 static inline struct kmx_peer *
723 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
727 struct kmx_peer *peer = NULL;
729 hash = mxlnd_nid_to_hash(nid);
731 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
732 if (peer->mxp_nid == nid) {
734 mxlnd_peer_addref(peer);
738 return (found ? peer : NULL);
741 static inline struct kmx_peer *
742 mxlnd_find_peer_by_nid(lnet_nid_t nid)
744 struct kmx_peer *peer = NULL;
746 read_lock(&kmxlnd_data.kmx_peers_lock);
747 peer = mxlnd_find_peer_by_nid_locked(nid);
748 read_unlock(&kmxlnd_data.kmx_peers_lock);
753 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
755 return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
756 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
757 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
758 tx->mxc_msg_type == MXLND_MSG_NOOP);
762 * mxlnd_init_msg - set type and number of bytes
765 * @body_nob - bytes in msg body
768 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
770 msg->mxm_type = type;
771 msg->mxm_nob = offsetof(kmx_msg_t, mxm_u) + body_nob;
775 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
777 int nob = offsetof (kmx_msg_t, mxm_u) + body_nob;
778 struct kmx_msg *msg = NULL;
780 LASSERT (tx != NULL);
781 LASSERT (nob <= MXLND_EAGER_SIZE);
784 /* tx->mxc_peer should have already been set if we know it */
785 tx->mxc_msg_type = type;
787 /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
788 tx->mxc_seg.segment_length = nob;
789 tx->mxc_pin_type = MX_PIN_PHYSICAL;
790 //tx->mxc_state = MXLND_CTX_PENDING;
793 msg->mxm_type = type;
800 mxlnd_cksum (void *ptr, int nob)
806 sum = ((sum << 1) | (sum >> 31)) + *c++;
808 /* ensure I don't return 0 (== no checksum) */
809 return (sum == 0) ? 1 : sum;
813 * mxlnd_pack_msg - complete msg info
817 mxlnd_pack_msg(struct kmx_ctx *tx)
819 struct kmx_msg *msg = tx->mxc_msg;
821 /* type and nob should already be set in init_msg() */
822 msg->mxm_magic = MXLND_MSG_MAGIC;
823 msg->mxm_version = MXLND_MSG_VERSION;
825 /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
826 * return credits as well */
827 if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
828 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
829 spin_lock(&tx->mxc_conn->mxk_lock);
830 msg->mxm_credits = tx->mxc_conn->mxk_outstanding;
831 tx->mxc_conn->mxk_outstanding = 0;
832 spin_unlock(&tx->mxc_conn->mxk_lock);
834 msg->mxm_credits = 0;
838 msg->mxm_srcnid = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
839 msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
840 msg->mxm_dstnid = tx->mxc_nid;
841 /* if it is a new peer, the dststamp will be 0 */
842 msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
843 msg->mxm_seq = tx->mxc_cookie;
845 if (*kmxlnd_tunables.kmx_cksum) {
846 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
851 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
853 const int hdr_size = offsetof(kmx_msg_t, mxm_u);
858 /* 6 bytes are enough to have received magic + version */
860 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
864 if (msg->mxm_magic == MXLND_MSG_MAGIC) {
866 } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
869 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
873 if (msg->mxm_version !=
874 (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
875 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
879 if (nob < hdr_size) {
880 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
884 msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
886 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
890 /* checksum must be computed with mxm_cksum zero and BEFORE anything
892 msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
894 if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
895 CDEBUG(D_NETERROR, "Bad checksum\n");
898 msg->mxm_cksum = msg_cksum;
901 /* leave magic unflipped as a clue to peer endianness */
902 __swab16s(&msg->mxm_version);
903 CLASSERT (sizeof(msg->mxm_type) == 1);
904 CLASSERT (sizeof(msg->mxm_credits) == 1);
905 msg->mxm_nob = msg_nob;
906 __swab64s(&msg->mxm_srcnid);
907 __swab64s(&msg->mxm_srcstamp);
908 __swab64s(&msg->mxm_dstnid);
909 __swab64s(&msg->mxm_dststamp);
910 __swab64s(&msg->mxm_seq);
913 if (msg->mxm_srcnid == LNET_NID_ANY) {
914 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
918 switch (msg->mxm_type) {
920 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
926 case MXLND_MSG_EAGER:
927 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
928 CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
929 (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
934 case MXLND_MSG_PUT_REQ:
935 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
936 CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
937 (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
941 __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
944 case MXLND_MSG_PUT_ACK:
945 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
946 CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
947 (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
951 __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
952 __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
956 case MXLND_MSG_GET_REQ:
957 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
958 CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
959 (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
963 __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
967 case MXLND_MSG_CONN_REQ:
968 case MXLND_MSG_CONN_ACK:
969 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
970 CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
971 (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
975 __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
976 __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
985 * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
989 * @length - length of incoming message
990 * @pending - add to kmx_pending (0 is NO and 1 is YES)
992 * The caller gets the rx and sets nid, peer and conn if known.
994 * Returns 0 on success and -1 on failure
997 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1000 mx_return_t mxret = MX_SUCCESS;
1001 uint64_t mask = 0xF00FFFFFFFFFFFFFLL;
1003 rx->mxc_msg_type = msg_type;
1004 rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1005 rx->mxc_cookie = cookie;
1006 /* rx->mxc_match may already be set */
1007 /* rx->mxc_seg.segment_ptr is already set */
1008 rx->mxc_seg.segment_length = length;
1009 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1010 ret = mxlnd_q_pending_ctx(rx);
1012 /* the caller is responsible for calling conn_decref() if needed */
1015 mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1016 cookie, mask, (void *) rx, &rx->mxc_mxreq);
1017 if (mxret != MX_SUCCESS) {
1018 mxlnd_deq_pending_ctx(rx);
1019 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1020 mx_strerror(mxret), (int) mxret);
1028 * mxlnd_unexpected_recv - this is the callback function that will handle
1029 * unexpected receives
1030 * @context - NULL, ignore
1031 * @source - the peer's mx_endpoint_addr_t
1032 * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1033 * @length - length of incoming message
1034 * @data_if_available - ignore
1036 * If it is an eager-sized msg, we will call recv_msg() with the actual
1037 * length. If it is a large message, we will call recv_msg() with a
1038 * length of 0 bytes to drop it because we should never have a large,
1039 * unexpected message.
1041 * NOTE - The MX library blocks until this function completes. Make it as fast as
1042 * possible. DO NOT allocate memory which can block!
1044 * If we cannot get a rx or the conn is closed, drop the message on the floor
1045 * (i.e. recv 0 bytes and ignore).
1047 mx_unexp_handler_action_t
1048 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1049 uint64_t match_value, uint32_t length, void *data_if_available)
1052 struct kmx_ctx *rx = NULL;
1058 if (context != NULL) {
1059 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1063 CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1066 rx = mxlnd_get_idle_rx();
1068 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1069 if (length <= MXLND_EAGER_SIZE) {
1070 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1072 CDEBUG(D_NETERROR, "unexpected large receive with "
1073 "match_value=0x%llx length=%d\n",
1074 match_value, length);
1075 ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1079 struct kmx_peer *peer = NULL;
1080 struct kmx_conn *conn = NULL;
1082 /* NOTE to avoid a peer disappearing out from under us,
1083 * read lock the peers lock first */
1084 read_lock(&kmxlnd_data.kmx_peers_lock);
1085 mx_get_endpoint_addr_context(source, (void **) &peer);
1087 mxlnd_peer_addref(peer); /* add a ref... */
1088 spin_lock(&peer->mxp_lock);
1089 conn = peer->mxp_conn;
1091 mxlnd_conn_addref(conn); /* add ref until rx completed */
1092 mxlnd_peer_decref(peer); /* and drop peer ref */
1093 rx->mxc_conn = conn;
1095 spin_unlock(&peer->mxp_lock);
1096 rx->mxc_peer = peer;
1097 rx->mxc_nid = peer->mxp_nid;
1099 read_unlock(&kmxlnd_data.kmx_peers_lock);
1101 CDEBUG(D_NETERROR, "could not post receive\n");
1102 mxlnd_put_idle_rx(rx);
1106 if (rx == NULL || ret != 0) {
1108 CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1111 CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1113 seg.segment_ptr = 0LL;
1114 seg.segment_length = 0;
1115 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1116 match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1119 return MX_RECV_CONTINUE;
1124 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1128 struct kmx_peer *peer = NULL;
1130 read_lock(&kmxlnd_data.kmx_peers_lock);
1131 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1132 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1136 *nidp = peer->mxp_nid;
1137 *count = atomic_read(&peer->mxp_refcount);
1142 read_unlock(&kmxlnd_data.kmx_peers_lock);
1148 mxlnd_del_peer_locked(struct kmx_peer *peer)
1150 list_del_init(&peer->mxp_peers); /* remove from the global list */
1151 if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1152 mxlnd_peer_decref(peer); /* drop global list ref */
1157 mxlnd_del_peer(lnet_nid_t nid)
1161 struct kmx_peer *peer = NULL;
1162 struct kmx_peer *next = NULL;
1164 if (nid != LNET_NID_ANY) {
1165 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1167 write_lock(&kmxlnd_data.kmx_peers_lock);
1168 if (nid != LNET_NID_ANY) {
1172 mxlnd_peer_decref(peer); /* and drops it */
1173 mxlnd_del_peer_locked(peer);
1175 } else { /* LNET_NID_ANY */
1176 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1177 list_for_each_entry_safe(peer, next,
1178 &kmxlnd_data.kmx_peers[i], mxp_peers) {
1179 mxlnd_del_peer_locked(peer);
1183 write_unlock(&kmxlnd_data.kmx_peers_lock);
1189 mxlnd_get_conn_by_idx(int index)
1192 struct kmx_peer *peer = NULL;
1193 struct kmx_conn *conn = NULL;
1195 read_lock(&kmxlnd_data.kmx_peers_lock);
1196 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1197 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1198 spin_lock(&peer->mxp_lock);
1199 list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1204 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1205 spin_unlock(&peer->mxp_lock);
1206 read_unlock(&kmxlnd_data.kmx_peers_lock);
1209 spin_unlock(&peer->mxp_lock);
1212 read_unlock(&kmxlnd_data.kmx_peers_lock);
1218 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1220 struct kmx_conn *conn = NULL;
1221 struct kmx_conn *next = NULL;
1223 spin_lock(&peer->mxp_lock);
1224 list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1225 mxlnd_conn_disconnect(conn, 0 , 0);
1227 spin_unlock(&peer->mxp_lock);
1232 mxlnd_close_matching_conns(lnet_nid_t nid)
1236 struct kmx_peer *peer = NULL;
1238 read_lock(&kmxlnd_data.kmx_peers_lock);
1239 if (nid != LNET_NID_ANY) {
1240 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1244 mxlnd_close_matching_conns_locked(peer);
1245 mxlnd_peer_decref(peer); /* and drops it here */
1247 } else { /* LNET_NID_ANY */
1248 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1249 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1250 mxlnd_close_matching_conns_locked(peer);
1253 read_unlock(&kmxlnd_data.kmx_peers_lock);
1259 * mxlnd_ctl - modify MXLND parameters
1260 * @ni - LNET interface handle
1261 * @cmd - command to change
1262 * @arg - the ioctl data
1264 * Not implemented yet.
1267 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1269 struct libcfs_ioctl_data *data = arg;
1272 LASSERT (ni == kmxlnd_data.kmx_ni);
1275 case IOC_LIBCFS_GET_PEER: {
1279 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1280 data->ioc_nid = nid;
1281 data->ioc_count = count;
1284 case IOC_LIBCFS_DEL_PEER: {
1285 ret = mxlnd_del_peer(data->ioc_nid);
1288 case IOC_LIBCFS_GET_CONN: {
1289 struct kmx_conn *conn = NULL;
1291 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1296 data->ioc_nid = conn->mxk_peer->mxp_nid;
1297 mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1301 case IOC_LIBCFS_CLOSE_CONNECTION: {
1302 ret = mxlnd_close_matching_conns(data->ioc_nid);
1306 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1314 * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1317 * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1320 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1322 u8 msg_type = tx->mxc_msg_type;
1323 //struct kmx_peer *peer = tx->mxc_peer;
1324 struct kmx_conn *conn = tx->mxc_conn;
1326 LASSERT (msg_type != 0);
1327 LASSERT (tx->mxc_nid != 0);
1328 LASSERT (tx->mxc_peer != NULL);
1329 LASSERT (tx->mxc_conn != NULL);
1331 tx->mxc_incarnation = conn->mxk_incarnation;
1333 if (msg_type != MXLND_MSG_PUT_DATA &&
1334 msg_type != MXLND_MSG_GET_DATA) {
1336 if (mxlnd_tx_requires_credit(tx)) {
1337 list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1338 conn->mxk_ntx_msgs++;
1339 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1340 msg_type == MXLND_MSG_CONN_ACK) {
1341 /* put conn msgs at the front of the queue */
1342 list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1344 /* PUT_ACK, PUT_NAK */
1345 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1346 conn->mxk_ntx_msgs++;
1350 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1351 conn->mxk_ntx_data++;
1358 * mxlnd_peer_queue_tx - add the tx to the global tx queue
1361 * Add the tx to the peer's msg or data queue
1364 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1366 LASSERT(tx->mxc_peer != NULL);
1367 LASSERT(tx->mxc_conn != NULL);
1368 spin_lock(&tx->mxc_conn->mxk_lock);
1369 mxlnd_peer_queue_tx_locked(tx);
1370 spin_unlock(&tx->mxc_conn->mxk_lock);
1376 * mxlnd_queue_tx - add the tx to the global tx queue
1379 * Add the tx to the global queue and up the tx_queue_sem
1382 mxlnd_queue_tx(struct kmx_ctx *tx)
1384 struct kmx_peer *peer = tx->mxc_peer;
1385 LASSERT (tx->mxc_nid != 0);
1388 if (peer->mxp_incompatible &&
1389 tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1390 /* let this fail now */
1391 tx->mxc_status.code = -ECONNABORTED;
1392 mxlnd_conn_decref(peer->mxp_conn);
1393 mxlnd_put_idle_tx(tx);
1396 if (tx->mxc_conn == NULL) {
1398 struct kmx_conn *conn = NULL;
1400 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1402 tx->mxc_status.code = ret;
1403 mxlnd_put_idle_tx(tx);
1406 tx->mxc_conn = conn;
1407 mxlnd_peer_decref(peer); /* and takes it from peer */
1409 LASSERT(tx->mxc_conn != NULL);
1410 mxlnd_peer_queue_tx(tx);
1411 mxlnd_check_sends(peer);
1413 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1414 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1415 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1416 up(&kmxlnd_data.kmx_tx_queue_sem);
1423 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1430 int first_iov_offset = 0;
1431 int first_found = 0;
1433 int last_iov_length = 0;
1434 mx_ksegment_t *seg = NULL;
1436 if (niov == 0) return 0;
1437 LASSERT(iov != NULL);
1439 for (i = 0; i < niov; i++) {
1440 sum = old_sum + (u32) iov[i].iov_len;
1441 if (!first_found && (sum > offset)) {
1443 first_iov_offset = offset - old_sum;
1445 sum = (u32) iov[i].iov_len - first_iov_offset;
1450 last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1451 if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1456 LASSERT(first_iov >= 0 && last_iov >= first_iov);
1457 nseg = last_iov - first_iov + 1;
1460 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1462 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1465 memset(seg, 0, nseg * sizeof(*seg));
1466 ctx->mxc_nseg = nseg;
1468 for (i = 0; i < nseg; i++) {
1469 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1470 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1472 seg[i].segment_ptr += (u64) first_iov_offset;
1473 seg[i].segment_length -= (u32) first_iov_offset;
1475 if (i == (nseg - 1)) {
1476 seg[i].segment_length = (u32) last_iov_length;
1478 sum += seg[i].segment_length;
1480 ctx->mxc_seg_list = seg;
1481 ctx->mxc_pin_type = MX_PIN_KERNEL;
1482 #ifdef MX_PIN_FULLPAGES
1483 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1485 LASSERT(nob == sum);
1490 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1496 int first_kiov = -1;
1497 int first_kiov_offset = 0;
1498 int first_found = 0;
1500 int last_kiov_length = 0;
1501 mx_ksegment_t *seg = NULL;
1503 if (niov == 0) return 0;
1504 LASSERT(kiov != NULL);
1506 for (i = 0; i < niov; i++) {
1507 sum = old_sum + kiov[i].kiov_len;
1508 if (i == 0) sum -= kiov[i].kiov_offset;
1509 if (!first_found && (sum > offset)) {
1511 first_kiov_offset = offset - old_sum;
1512 //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1513 if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1515 sum = kiov[i].kiov_len - first_kiov_offset;
1520 last_kiov_length = kiov[i].kiov_len - (sum - nob);
1521 if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1526 LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1527 nseg = last_kiov - first_kiov + 1;
1530 MXLND_ALLOC (seg, nseg * sizeof(*seg));
1532 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1535 memset(seg, 0, niov * sizeof(*seg));
1536 ctx->mxc_nseg = niov;
1538 for (i = 0; i < niov; i++) {
1539 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1540 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1542 seg[i].segment_ptr += (u64) first_kiov_offset;
1543 /* we have to add back the original kiov_offset */
1544 seg[i].segment_length -= first_kiov_offset +
1545 kiov[first_kiov].kiov_offset;
1547 if (i == (nseg - 1)) {
1548 seg[i].segment_length = last_kiov_length;
1550 sum += seg[i].segment_length;
1552 ctx->mxc_seg_list = seg;
1553 ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1554 #ifdef MX_PIN_FULLPAGES
1555 ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1557 LASSERT(nob == sum);
1562 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1564 LASSERT(type == MXLND_MSG_PUT_ACK);
1565 mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1566 tx->mxc_cookie = cookie;
1567 tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1568 tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1569 tx->mxc_match = mxlnd_create_match(tx, status);
1576 * mxlnd_send_data - get tx, map [k]iov, queue tx
1583 * This setups the DATA send for PUT or GET.
1585 * On success, it queues the tx, on failure it calls lnet_finalize()
1588 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1591 lnet_process_id_t target = lntmsg->msg_target;
1592 unsigned int niov = lntmsg->msg_niov;
1593 struct iovec *iov = lntmsg->msg_iov;
1594 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1595 unsigned int offset = lntmsg->msg_offset;
1596 unsigned int nob = lntmsg->msg_len;
1597 struct kmx_ctx *tx = NULL;
1599 LASSERT(lntmsg != NULL);
1600 LASSERT(peer != NULL);
1601 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1602 LASSERT((cookie>>52) == 0);
1604 tx = mxlnd_get_idle_tx();
1606 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1607 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1608 libcfs_nid2str(target.nid));
1611 tx->mxc_nid = target.nid;
1612 /* NOTE called when we have a ref on the conn, get one for this tx */
1613 mxlnd_conn_addref(peer->mxp_conn);
1614 tx->mxc_peer = peer;
1615 tx->mxc_conn = peer->mxp_conn;
1616 tx->mxc_msg_type = msg_type;
1617 tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1618 tx->mxc_state = MXLND_CTX_PENDING;
1619 tx->mxc_lntmsg[0] = lntmsg;
1620 tx->mxc_cookie = cookie;
1621 tx->mxc_match = mxlnd_create_match(tx, 0);
1623 /* This setups up the mx_ksegment_t to send the DATA payload */
1625 /* do not setup the segments */
1626 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1627 "to %s?\n", libcfs_nid2str(target.nid));
1629 } else if (kiov == NULL) {
1630 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1632 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1635 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1636 libcfs_nid2str(target.nid));
1637 tx->mxc_status.code = -EIO;
1644 mxlnd_conn_decref(peer->mxp_conn);
1645 mxlnd_put_idle_tx(tx);
1649 CDEBUG(D_NETERROR, "no tx avail\n");
1650 lnet_finalize(ni, lntmsg, -EIO);
1655 * mxlnd_recv_data - map [k]iov, post rx
1662 * This setups the DATA receive for PUT or GET.
1664 * On success, it returns 0, on failure it returns -1
1667 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1670 lnet_process_id_t target = lntmsg->msg_target;
1671 unsigned int niov = lntmsg->msg_niov;
1672 struct iovec *iov = lntmsg->msg_iov;
1673 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1674 unsigned int offset = lntmsg->msg_offset;
1675 unsigned int nob = lntmsg->msg_len;
1676 mx_return_t mxret = MX_SUCCESS;
1678 /* above assumes MXLND_MSG_PUT_DATA */
1679 if (msg_type == MXLND_MSG_GET_DATA) {
1680 niov = lntmsg->msg_md->md_niov;
1681 iov = lntmsg->msg_md->md_iov.iov;
1682 kiov = lntmsg->msg_md->md_iov.kiov;
1684 nob = lntmsg->msg_md->md_length;
1687 LASSERT(lntmsg != NULL);
1688 LASSERT(rx != NULL);
1689 LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1690 LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1692 rx->mxc_msg_type = msg_type;
1693 rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1694 rx->mxc_state = MXLND_CTX_PENDING;
1695 rx->mxc_nid = target.nid;
1696 /* if posting a GET_DATA, we may not yet know the peer */
1697 if (rx->mxc_peer != NULL) {
1698 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1700 rx->mxc_lntmsg[0] = lntmsg;
1701 rx->mxc_cookie = cookie;
1702 rx->mxc_match = mxlnd_create_match(rx, 0);
1703 /* This setups up the mx_ksegment_t to receive the DATA payload */
1705 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1707 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1709 if (msg_type == MXLND_MSG_GET_DATA) {
1710 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1711 if (rx->mxc_lntmsg[1] == NULL) {
1712 CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1713 libcfs_nid2str(target.nid));
1718 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1719 msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1720 libcfs_nid2str(target.nid));
1723 ret = mxlnd_q_pending_ctx(rx);
1727 CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1728 mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1729 rx->mxc_seg_list, rx->mxc_nseg,
1730 rx->mxc_pin_type, rx->mxc_match,
1731 0xF00FFFFFFFFFFFFFLL, (void *) rx,
1733 if (mxret != MX_SUCCESS) {
1734 if (rx->mxc_conn != NULL) {
1735 mxlnd_deq_pending_ctx(rx);
1737 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1738 (int) mxret, libcfs_nid2str(target.nid));
1746 * mxlnd_send - the LND required send function
1751 * This must not block. Since we may not have a peer struct for the receiver,
1752 * it will append send messages on a global tx list. We will then up the
1753 * tx_queued's semaphore to notify it of the new send.
1756 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1759 int type = lntmsg->msg_type;
1760 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1761 lnet_process_id_t target = lntmsg->msg_target;
1762 lnet_nid_t nid = target.nid;
1763 int target_is_router = lntmsg->msg_target_is_router;
1764 int routing = lntmsg->msg_routing;
1765 unsigned int payload_niov = lntmsg->msg_niov;
1766 struct iovec *payload_iov = lntmsg->msg_iov;
1767 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1768 unsigned int payload_offset = lntmsg->msg_offset;
1769 unsigned int payload_nob = lntmsg->msg_len;
1770 struct kmx_ctx *tx = NULL;
1771 struct kmx_msg *txmsg = NULL;
1772 struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */
1773 struct kmx_ctx *rx_data = NULL;
1774 struct kmx_conn *conn = NULL;
1776 uint32_t length = 0;
1777 struct kmx_peer *peer = NULL;
1779 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1780 payload_nob, payload_niov, libcfs_id2str(target));
1782 LASSERT (payload_nob == 0 || payload_niov > 0);
1783 LASSERT (payload_niov <= LNET_MAX_IOV);
1784 /* payload is either all vaddrs or all pages */
1785 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1787 /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1789 /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1790 * to a new peer, use the nid */
1791 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1793 if (unlikely(peer->mxp_incompatible)) {
1794 mxlnd_peer_decref(peer); /* drop ref taken above */
1796 spin_lock(&peer->mxp_lock);
1797 conn = peer->mxp_conn;
1799 mxlnd_conn_addref(conn);
1800 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1802 spin_unlock(&peer->mxp_lock);
1805 if (conn == NULL && peer != NULL) {
1806 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1807 peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1812 LASSERT (payload_nob == 0);
1815 case LNET_MSG_REPLY:
1817 /* Is the payload small enough not to need DATA? */
1818 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1819 if (nob <= MXLND_EAGER_SIZE)
1820 break; /* send EAGER */
1822 tx = mxlnd_get_idle_tx();
1823 if (unlikely(tx == NULL)) {
1824 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1825 type == LNET_MSG_PUT ? "PUT" : "REPLY",
1826 libcfs_nid2str(nid));
1827 if (conn) mxlnd_conn_decref(conn);
1831 /* the peer may be NULL */
1832 tx->mxc_peer = peer;
1833 tx->mxc_conn = conn; /* may be NULL */
1834 /* we added a conn ref above */
1835 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1836 txmsg = tx->mxc_msg;
1837 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1838 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1839 tx->mxc_match = mxlnd_create_match(tx, 0);
1841 /* we must post a receive _before_ sending the request.
1842 * we need to determine how much to receive, it will be either
1843 * a put_ack or a put_nak. The put_ack is larger, so use it. */
1845 rx = mxlnd_get_idle_rx();
1846 if (unlikely(rx == NULL)) {
1847 CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1848 libcfs_nid2str(nid));
1849 mxlnd_put_idle_tx(tx);
1850 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1854 rx->mxc_peer = peer;
1855 /* conn may be NULL but unlikely since the first msg is always small */
1856 /* NOTE no need to lock peer before adding conn ref since we took
1857 * a conn ref for the tx (it cannot be freed between there and here ) */
1858 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1859 rx->mxc_conn = conn;
1860 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1861 rx->mxc_cookie = tx->mxc_cookie;
1862 rx->mxc_match = mxlnd_create_match(rx, 0);
1864 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1865 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1866 if (unlikely(ret != 0)) {
1867 CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1868 libcfs_nid2str(nid));
1869 rx->mxc_lntmsg[0] = NULL;
1870 mxlnd_put_idle_rx(rx);
1871 mxlnd_put_idle_tx(tx);
1873 mxlnd_conn_decref(conn); /* for the rx... */
1874 mxlnd_conn_decref(conn); /* and for the tx */
1876 return -EHOSTUNREACH;
1883 if (routing || target_is_router)
1884 break; /* send EAGER */
1886 /* is the REPLY message too small for DATA? */
1887 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1888 if (nob <= MXLND_EAGER_SIZE)
1889 break; /* send EAGER */
1891 /* get tx (we need the cookie) , post rx for incoming DATA,
1892 * then post GET_REQ tx */
1893 tx = mxlnd_get_idle_tx();
1894 if (unlikely(tx == NULL)) {
1895 CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1896 libcfs_nid2str(nid));
1897 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1900 rx_data = mxlnd_get_idle_rx();
1901 if (unlikely(rx_data == NULL)) {
1902 CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1903 libcfs_nid2str(nid));
1904 mxlnd_put_idle_tx(tx);
1905 if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1908 rx_data->mxc_peer = peer;
1909 /* NOTE no need to lock peer before adding conn ref since we took
1910 * a conn ref for the tx (it cannot be freed between there and here ) */
1911 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1912 rx_data->mxc_conn = conn; /* may be NULL */
1914 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1915 if (unlikely(ret != 0)) {
1916 CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1917 libcfs_nid2str(nid));
1918 mxlnd_put_idle_rx(rx_data);
1919 mxlnd_put_idle_tx(tx);
1921 mxlnd_conn_decref(conn); /* for the rx_data... */
1922 mxlnd_conn_decref(conn); /* and for the tx */
1927 tx->mxc_peer = peer;
1928 tx->mxc_conn = conn; /* may be NULL */
1929 /* conn ref taken above */
1930 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1931 txmsg = tx->mxc_msg;
1932 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1933 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1934 tx->mxc_match = mxlnd_create_match(tx, 0);
1941 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1947 LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1948 <= MXLND_EAGER_SIZE);
1950 tx = mxlnd_get_idle_tx();
1951 if (unlikely(tx == NULL)) {
1952 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1953 mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1954 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1958 tx->mxc_peer = peer;
1959 tx->mxc_conn = conn; /* may be NULL */
1960 /* conn ref taken above */
1961 nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1962 mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1963 tx->mxc_match = mxlnd_create_match(tx, 0);
1965 txmsg = tx->mxc_msg;
1966 txmsg->mxm_u.eager.mxem_hdr = *hdr;
1968 if (payload_kiov != NULL)
1969 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1970 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1971 payload_niov, payload_kiov, payload_offset, payload_nob);
1973 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1974 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1975 payload_niov, payload_iov, payload_offset, payload_nob);
1977 tx->mxc_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */
1983 * mxlnd_recv - the LND required recv function
1994 * This must not block.
1997 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1998 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1999 unsigned int offset, unsigned int mlen, unsigned int rlen)
2004 struct kmx_ctx *rx = private;
2005 struct kmx_msg *rxmsg = rx->mxc_msg;
2006 lnet_nid_t nid = rx->mxc_nid;
2007 struct kmx_ctx *tx = NULL;
2008 struct kmx_msg *txmsg = NULL;
2009 struct kmx_peer *peer = rx->mxc_peer;
2010 struct kmx_conn *conn = peer->mxp_conn;
2012 int msg_type = rxmsg->mxm_type;
2017 LASSERT (mlen <= rlen);
2018 /* Either all pages or all vaddrs */
2019 LASSERT (!(kiov != NULL && iov != NULL));
2020 LASSERT (peer != NULL);
2022 /* conn_addref(conn) already taken for the primary rx */
2025 case MXLND_MSG_EAGER:
2026 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2027 len = rx->mxc_status.xfer_length;
2028 if (unlikely(nob > len)) {
2029 CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2030 libcfs_nid2str(nid), nob, len);
2036 lnet_copy_flat2kiov(niov, kiov, offset,
2037 MXLND_EAGER_SIZE, rxmsg,
2038 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2041 lnet_copy_flat2iov(niov, iov, offset,
2042 MXLND_EAGER_SIZE, rxmsg,
2043 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2049 case MXLND_MSG_PUT_REQ:
2050 /* we are going to reuse the rx, store the needed info */
2051 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2053 /* get tx, post rx, send PUT_ACK */
2055 tx = mxlnd_get_idle_tx();
2056 if (unlikely(tx == NULL)) {
2057 CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2058 /* Not replying will break the connection */
2062 if (unlikely(mlen == 0)) {
2064 tx->mxc_peer = peer;
2065 tx->mxc_conn = conn;
2066 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2071 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2072 tx->mxc_peer = peer;
2073 tx->mxc_conn = conn;
2074 /* no need to lock peer first since we already have a ref */
2075 mxlnd_conn_addref(conn); /* for the tx */
2076 txmsg = tx->mxc_msg;
2077 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2078 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2079 tx->mxc_cookie = cookie;
2080 tx->mxc_match = mxlnd_create_match(tx, 0);
2082 /* we must post a receive _before_ sending the PUT_ACK */
2084 rx->mxc_state = MXLND_CTX_PREP;
2085 rx->mxc_peer = peer;
2086 rx->mxc_conn = conn;
2087 /* do not take another ref for this rx, it is already taken */
2088 rx->mxc_nid = peer->mxp_nid;
2089 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2090 txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2092 if (unlikely(ret != 0)) {
2093 /* Notify peer that it's over */
2094 CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2095 libcfs_nid2str(nid), ret);
2097 tx->mxc_state = MXLND_CTX_PREP;
2098 tx->mxc_peer = peer;
2099 tx->mxc_conn = conn;
2100 /* finalize = 0, let the PUT_ACK tx finalize this */
2101 tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2102 tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2103 /* conn ref already taken above */
2104 mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2110 /* do not return a credit until after PUT_DATA returns */
2114 case MXLND_MSG_GET_REQ:
2115 if (likely(lntmsg != NULL)) {
2116 mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2117 rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2119 /* GET didn't match anything */
2120 /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2121 * We have to embed the error code in the match bits.
2122 * Send the error in bits 52-59 and the cookie in bits 0-51 */
2123 u64 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2125 tx = mxlnd_get_idle_tx();
2126 if (unlikely(tx == NULL)) {
2127 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2128 libcfs_nid2str(nid));
2132 tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2133 tx->mxc_state = MXLND_CTX_PENDING;
2135 tx->mxc_peer = peer;
2136 tx->mxc_conn = conn;
2137 /* no need to lock peer first since we already have a ref */
2138 mxlnd_conn_addref(conn); /* for this tx */
2139 tx->mxc_cookie = cookie;
2140 tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2141 tx->mxc_pin_type = MX_PIN_PHYSICAL;
2144 /* finalize lntmsg after tx completes */
2152 /* we received a message, increment peer's outstanding credits */
2154 spin_lock(&conn->mxk_lock);
2155 conn->mxk_outstanding++;
2156 spin_unlock(&conn->mxk_lock);
2158 /* we are done with the rx */
2159 mxlnd_put_idle_rx(rx);
2160 mxlnd_conn_decref(conn);
2163 if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2165 /* we received a credit, see if we can use it to send a msg */
2166 if (credit) mxlnd_check_sends(peer);
2172 mxlnd_sleep(unsigned long timeout)
2174 set_current_state(TASK_INTERRUPTIBLE);
2175 schedule_timeout(timeout);
2180 * mxlnd_tx_queued - the generic send queue thread
2181 * @arg - thread id (as a void *)
2183 * This thread moves send messages from the global tx_queue to the owning
2184 * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2185 * it to the global peer list.
2188 mxlnd_tx_queued(void *arg)
2190 long id = (long) arg;
2193 struct kmx_ctx *tx = NULL;
2194 struct kmx_peer *peer = NULL;
2195 struct list_head *tmp_tx = NULL;
2197 cfs_daemonize("mxlnd_tx_queued");
2198 //cfs_block_allsigs();
2200 while (!kmxlnd_data.kmx_shutdown) {
2201 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2202 if (kmxlnd_data.kmx_shutdown)
2204 if (ret != 0) // Should we check for -EINTR?
2206 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2207 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2208 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2211 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2212 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2213 list_del_init(&tx->mxc_list);
2214 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2217 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2219 tx->mxc_peer = peer;
2220 spin_lock(&peer->mxp_lock);
2221 if (peer->mxp_conn == NULL) {
2222 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2224 /* out of memory, give up and fail tx */
2225 tx->mxc_status.code = -ENOMEM;
2226 spin_unlock(&peer->mxp_lock);
2227 mxlnd_peer_decref(peer);
2228 mxlnd_put_idle_tx(tx);
2232 tx->mxc_conn = peer->mxp_conn;
2233 mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2234 spin_unlock(&peer->mxp_lock);
2235 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2241 struct kmx_peer *peer = NULL;
2242 struct kmx_peer *old = NULL;
2244 hash = mxlnd_nid_to_hash(tx->mxc_nid);
2246 LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2247 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2249 /* adds conn ref for this function */
2250 ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2252 /* finalize message */
2253 tx->mxc_status.code = ret;
2254 mxlnd_put_idle_tx(tx);
2257 tx->mxc_peer = peer;
2258 tx->mxc_conn = peer->mxp_conn;
2259 /* this tx will keep the conn ref taken in peer_alloc() */
2261 /* add peer to global peer list, but look to see
2262 * if someone already created it after we released
2264 write_lock(&kmxlnd_data.kmx_peers_lock);
2265 list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2266 if (old->mxp_nid == peer->mxp_nid) {
2267 /* somebody beat us here, we created a duplicate */
2274 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2275 atomic_inc(&kmxlnd_data.kmx_npeers);
2278 spin_lock(&old->mxp_lock);
2279 tx->mxc_conn = old->mxp_conn;
2280 /* FIXME can conn be NULL? */
2281 LASSERT(old->mxp_conn != NULL);
2282 mxlnd_conn_addref(old->mxp_conn);
2283 spin_unlock(&old->mxp_lock);
2284 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2285 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2286 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2287 mxlnd_peer_decref(peer);
2289 write_unlock(&kmxlnd_data.kmx_peers_lock);
2294 mxlnd_thread_stop(id);
2298 /* When calling this, we must not have the peer lock. */
2300 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2302 mx_return_t mxret = MX_SUCCESS;
2303 mx_request_t request;
2304 struct kmx_conn *conn = peer->mxp_conn;
2306 /* NOTE we are holding a conn ref every time we call this function,
2307 * we do not need to lock the peer before taking another ref */
2308 mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2310 LASSERT(mask == MXLND_MASK_ICON_REQ ||
2311 mask == MXLND_MASK_ICON_ACK);
2313 if (peer->mxp_reconnect_time == 0) {
2314 peer->mxp_reconnect_time = jiffies;
2317 if (peer->mxp_nic_id == 0LL) {
2318 mxlnd_peer_hostname_to_nic_id(peer);
2319 if (peer->mxp_nic_id == 0LL) {
2320 /* not mapped yet, return */
2321 spin_lock(&conn->mxk_lock);
2322 conn->mxk_status = MXLND_CONN_INIT;
2323 spin_unlock(&conn->mxk_lock);
2324 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2325 /* give up and notify LNET */
2326 mxlnd_conn_disconnect(conn, 0, 1);
2327 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2329 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2332 mxlnd_conn_decref(conn);
2337 mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2338 peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2339 (void *) peer, &request);
2340 if (unlikely(mxret != MX_SUCCESS)) {
2341 spin_lock(&conn->mxk_lock);
2342 conn->mxk_status = MXLND_CONN_FAIL;
2343 spin_unlock(&conn->mxk_lock);
2344 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2345 mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2346 mxlnd_conn_decref(conn);
2351 #define MXLND_STATS 0
2354 mxlnd_check_sends(struct kmx_peer *peer)
2358 mx_return_t mxret = MX_SUCCESS;
2359 struct kmx_ctx *tx = NULL;
2360 struct kmx_conn *conn = NULL;
2367 static unsigned long last = 0;
2370 if (unlikely(peer == NULL)) {
2371 LASSERT(peer != NULL);
2374 spin_lock(&peer->mxp_lock);
2375 conn = peer->mxp_conn;
2376 /* NOTE take a ref for the duration of this function since it is called
2377 * when there might not be any queued txs for this peer */
2378 if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2379 spin_unlock(&peer->mxp_lock);
2381 /* do not add another ref for this tx */
2384 /* we do not have any conns */
2389 if (time_after(jiffies, last)) {
2390 last = jiffies + HZ;
2391 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2392 "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2393 mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2394 conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2395 conn->mxk_ntx_data, conn->mxk_data_posted);
2399 /* cache peer state for asserts */
2400 spin_lock(&conn->mxk_lock);
2401 ntx_posted = conn->mxk_ntx_posted;
2402 credits = conn->mxk_credits;
2403 spin_unlock(&conn->mxk_lock);
2405 LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2406 LASSERT(ntx_posted >= 0);
2408 LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2409 LASSERT(credits >= 0);
2411 /* check number of queued msgs, ignore data */
2412 spin_lock(&conn->mxk_lock);
2413 if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2414 /* check if any txs queued that could return credits... */
2415 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2416 /* if not, send a NOOP */
2417 tx = mxlnd_get_idle_tx();
2418 if (likely(tx != NULL)) {
2419 tx->mxc_peer = peer;
2420 tx->mxc_conn = peer->mxp_conn;
2421 mxlnd_conn_addref(conn); /* for this tx */
2422 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2423 tx->mxc_match = mxlnd_create_match(tx, 0);
2424 mxlnd_peer_queue_tx_locked(tx);
2430 spin_unlock(&conn->mxk_lock);
2432 /* if the peer is not ready, try to connect */
2433 spin_lock(&conn->mxk_lock);
2434 if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2435 conn->mxk_status == MXLND_CONN_FAIL ||
2436 conn->mxk_status == MXLND_CONN_REQ)) {
2437 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2438 conn->mxk_status = MXLND_CONN_WAIT;
2439 spin_unlock(&conn->mxk_lock);
2440 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2443 spin_unlock(&conn->mxk_lock);
2445 spin_lock(&conn->mxk_lock);
2446 while (!list_empty(&conn->mxk_tx_free_queue) ||
2447 !list_empty(&conn->mxk_tx_credit_queue)) {
2448 /* We have something to send. If we have a queued tx that does not
2449 * require a credit (free), choose it since its completion will
2450 * return a credit (here or at the peer), complete a DATA or
2451 * CONN_REQ or CONN_ACK. */
2452 struct list_head *tmp_tx = NULL;
2453 if (!list_empty(&conn->mxk_tx_free_queue)) {
2454 tmp_tx = &conn->mxk_tx_free_queue;
2456 tmp_tx = &conn->mxk_tx_credit_queue;
2458 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2460 msg_type = tx->mxc_msg_type;
2462 /* don't try to send a rx */
2463 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2465 /* ensure that it is a valid msg type */
2466 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2467 msg_type == MXLND_MSG_CONN_ACK ||
2468 msg_type == MXLND_MSG_NOOP ||
2469 msg_type == MXLND_MSG_EAGER ||
2470 msg_type == MXLND_MSG_PUT_REQ ||
2471 msg_type == MXLND_MSG_PUT_ACK ||
2472 msg_type == MXLND_MSG_PUT_DATA ||
2473 msg_type == MXLND_MSG_GET_REQ ||
2474 msg_type == MXLND_MSG_GET_DATA);
2475 LASSERT(tx->mxc_peer == peer);
2476 LASSERT(tx->mxc_nid == peer->mxp_nid);
2478 credit = mxlnd_tx_requires_credit(tx);
2481 if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2482 CDEBUG(D_NET, "%s: posted enough\n",
2483 libcfs_nid2str(peer->mxp_nid));
2487 if (conn->mxk_credits == 0) {
2488 CDEBUG(D_NET, "%s: no credits\n",
2489 libcfs_nid2str(peer->mxp_nid));
2493 if (conn->mxk_credits == 1 && /* last credit reserved for */
2494 conn->mxk_outstanding == 0) { /* giving back credits */
2495 CDEBUG(D_NET, "%s: not using last credit\n",
2496 libcfs_nid2str(peer->mxp_nid));
2501 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2502 if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2503 msg_type == MXLND_MSG_CONN_ACK)) {
2504 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2505 mxlnd_connstatus_to_str(conn->mxk_status),
2507 mxlnd_msgtype_to_str(tx->mxc_msg_type));
2508 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2509 list_del_init(&tx->mxc_list);
2510 tx->mxc_status.code = -ECONNABORTED;
2511 mxlnd_put_idle_tx(tx);
2512 mxlnd_conn_decref(conn);
2518 list_del_init(&tx->mxc_list);
2520 /* handle credits, etc now while we have the lock to avoid races */
2522 conn->mxk_credits--;
2523 conn->mxk_ntx_posted++;
2525 if (msg_type != MXLND_MSG_PUT_DATA &&
2526 msg_type != MXLND_MSG_GET_DATA) {
2527 if (msg_type != MXLND_MSG_CONN_REQ &&
2528 msg_type != MXLND_MSG_CONN_ACK) {
2529 conn->mxk_ntx_msgs--;
2532 if (tx->mxc_incarnation == 0 &&
2533 conn->mxk_incarnation != 0) {
2534 tx->mxc_incarnation = conn->mxk_incarnation;
2536 spin_unlock(&conn->mxk_lock);
2538 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2539 * or (2) there is a non-DATA msg that can return credits in the
2540 * queue, then drop this duplicate NOOP */
2541 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2542 spin_lock(&conn->mxk_lock);
2543 if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2544 (conn->mxk_ntx_msgs >= 1)) {
2545 conn->mxk_credits++;
2546 conn->mxk_ntx_posted--;
2547 spin_unlock(&conn->mxk_lock);
2548 /* redundant NOOP */
2549 mxlnd_put_idle_tx(tx);
2550 mxlnd_conn_decref(conn);
2551 CDEBUG(D_NET, "%s: redundant noop\n",
2552 libcfs_nid2str(peer->mxp_nid));
2556 spin_unlock(&conn->mxk_lock);
2560 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2561 (msg_type != MXLND_MSG_GET_DATA))) {
2565 //ret = -ECONNABORTED;
2568 spin_lock(&conn->mxk_lock);
2569 status = conn->mxk_status;
2570 spin_unlock(&conn->mxk_lock);
2572 if (likely((status == MXLND_CONN_READY) ||
2573 (msg_type == MXLND_MSG_CONN_REQ) ||
2574 (msg_type == MXLND_MSG_CONN_ACK))) {
2576 if (msg_type != MXLND_MSG_CONN_REQ &&
2577 msg_type != MXLND_MSG_CONN_ACK) {
2578 /* add to the pending list */
2579 ret = mxlnd_q_pending_ctx(tx);
2581 /* FIXME the conn is disconnected, now what? */
2585 tx->mxc_state = MXLND_CTX_PENDING;
2589 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2590 msg_type != MXLND_MSG_GET_DATA)) {
2591 /* send a msg style tx */
2592 LASSERT(tx->mxc_nseg == 1);
2593 LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2594 CDEBUG(D_NET, "sending %s 0x%llx\n",
2595 mxlnd_msgtype_to_str(msg_type),
2597 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2606 /* send a DATA tx */
2607 spin_lock(&conn->mxk_lock);
2608 conn->mxk_ntx_data--;
2609 conn->mxk_data_posted++;
2610 spin_unlock(&conn->mxk_lock);
2611 CDEBUG(D_NET, "sending %s 0x%llx\n",
2612 mxlnd_msgtype_to_str(msg_type),
2614 mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2624 mxret = MX_CONNECTION_FAILED;
2626 if (likely(mxret == MX_SUCCESS)) {
2629 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2630 "sending to %s\n", mx_strerror(mxret), (int) mxret,
2631 libcfs_nid2str(peer->mxp_nid));
2632 /* NOTE mx_kisend() only fails if there are not enough
2633 * resources. Do not change the connection status. */
2634 if (mxret == MX_NO_RESOURCES) {
2635 tx->mxc_status.code = -ENOMEM;
2637 tx->mxc_status.code = -ECONNABORTED;
2640 spin_lock(&conn->mxk_lock);
2641 conn->mxk_ntx_posted--;
2642 conn->mxk_credits++;
2643 spin_unlock(&conn->mxk_lock);
2644 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2645 msg_type == MXLND_MSG_GET_DATA) {
2646 spin_lock(&conn->mxk_lock);
2647 conn->mxk_data_posted--;
2648 spin_unlock(&conn->mxk_lock);
2650 if (msg_type != MXLND_MSG_PUT_DATA &&
2651 msg_type != MXLND_MSG_GET_DATA &&
2652 msg_type != MXLND_MSG_CONN_REQ &&
2653 msg_type != MXLND_MSG_CONN_ACK) {
2654 spin_lock(&conn->mxk_lock);
2655 conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2656 spin_unlock(&conn->mxk_lock);
2658 if (msg_type != MXLND_MSG_CONN_REQ &&
2659 msg_type != MXLND_MSG_CONN_ACK) {
2660 /* remove from the pending list */
2661 mxlnd_deq_pending_ctx(tx);
2663 mxlnd_put_idle_tx(tx);
2664 mxlnd_conn_decref(conn);
2667 spin_lock(&conn->mxk_lock);
2670 spin_unlock(&conn->mxk_lock);
2672 mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2678 * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2679 * @ctx - the tx descriptor
2681 * Determine which type of send request it was and start the next step, if needed,
2682 * or, if done, signal completion to LNET. After we are done, put back on the
2686 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2688 int failed = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2689 struct kmx_msg *msg = tx->mxc_msg;
2690 struct kmx_peer *peer = tx->mxc_peer;
2691 struct kmx_conn *conn = tx->mxc_conn;
2692 u8 type = tx->mxc_msg_type;
2693 int credit = mxlnd_tx_requires_credit(tx);
2694 u64 cookie = tx->mxc_cookie;
2696 CDEBUG(D_NET, "entering %s (0x%llx):\n",
2697 mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2699 if (unlikely(conn == NULL)) {
2700 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2701 conn = peer->mxp_conn;
2703 /* do not add a ref for the tx, it was set before sending */
2704 tx->mxc_conn = conn;
2705 tx->mxc_peer = conn->mxk_peer;
2708 LASSERT (peer != NULL);
2709 LASSERT (conn != NULL);
2711 if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2712 LASSERT (type == msg->mxm_type);
2716 tx->mxc_status.code = -EIO;
2718 spin_lock(&conn->mxk_lock);
2719 conn->mxk_last_tx = jiffies;
2720 spin_unlock(&conn->mxk_lock);
2725 case MXLND_MSG_GET_DATA:
2726 spin_lock(&conn->mxk_lock);
2727 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2728 conn->mxk_outstanding++;
2729 conn->mxk_data_posted--;
2731 spin_unlock(&conn->mxk_lock);
2734 case MXLND_MSG_PUT_DATA:
2735 spin_lock(&conn->mxk_lock);
2736 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2737 conn->mxk_data_posted--;
2739 spin_unlock(&conn->mxk_lock);
2742 case MXLND_MSG_NOOP:
2743 case MXLND_MSG_PUT_REQ:
2744 case MXLND_MSG_PUT_ACK:
2745 case MXLND_MSG_GET_REQ:
2746 case MXLND_MSG_EAGER:
2747 //case MXLND_MSG_NAK:
2750 case MXLND_MSG_CONN_ACK:
2751 if (peer->mxp_incompatible) {
2752 /* we sent our params, now close this conn */
2753 mxlnd_conn_disconnect(conn, 0, 1);
2755 case MXLND_MSG_CONN_REQ:
2757 CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2758 "failed with %s (%d) to %s\n",
2759 type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2760 mx_strstatus(tx->mxc_status.code),
2761 tx->mxc_status.code,
2762 libcfs_nid2str(tx->mxc_nid));
2763 if (!peer->mxp_incompatible) {
2764 spin_lock(&conn->mxk_lock);
2765 conn->mxk_status = MXLND_CONN_FAIL;
2766 spin_unlock(&conn->mxk_lock);
2772 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2777 spin_lock(&conn->mxk_lock);
2778 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2779 conn->mxk_ntx_posted--;
2781 spin_unlock(&conn->mxk_lock);
2784 CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2785 mxlnd_put_idle_tx(tx);
2786 mxlnd_conn_decref(conn);
2788 mxlnd_check_sends(peer);
2794 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2799 u32 nob = rx->mxc_status.xfer_length;
2800 u64 bits = rx->mxc_status.match_info;
2801 struct kmx_msg *msg = rx->mxc_msg;
2802 struct kmx_peer *peer = rx->mxc_peer;
2803 struct kmx_conn *conn = rx->mxc_conn;
2804 u8 type = rx->mxc_msg_type;
2806 lnet_msg_t *lntmsg[2];
2812 int incompatible = 0;
2814 /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
2815 * failed GET reply, CONN_REQ, or a CONN_ACK */
2817 /* NOTE peer may still be NULL if it is a new peer and
2818 * conn may be NULL if this is a re-connect */
2819 if (likely(peer != NULL && conn != NULL)) {
2820 /* we have a reference on the conn */
2822 } else if (peer != NULL && conn == NULL) {
2823 /* we have a reference on the peer */
2825 } else if (peer == NULL && conn != NULL) {
2827 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2829 } /* else peer and conn == NULL */
2832 if (peer == NULL || conn == NULL) {
2833 /* if the peer was disconnected, the peer may exist but
2834 * not have any valid conns */
2835 decref = 0; /* no peer means no ref was taken for this rx */
2839 if (conn == NULL && peer != NULL) {
2840 spin_lock(&peer->mxp_lock);
2841 conn = peer->mxp_conn;
2843 mxlnd_conn_addref(conn); /* conn takes ref... */
2844 mxlnd_peer_decref(peer); /* from peer */
2848 spin_unlock(&peer->mxp_lock);
2849 rx->mxc_conn = conn;
2853 CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2859 if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2860 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2861 libcfs_nid2str(rx->mxc_nid),
2862 mx_strstatus(rx->mxc_status.code),
2863 (int) rx->mxc_status.code);
2869 /* this may be a failed GET reply */
2870 if (type == MXLND_MSG_GET_DATA) {
2871 bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2872 ret = (u32) (bits>>52);
2873 lntmsg[0] = rx->mxc_lntmsg[0];
2877 /* we had a rx complete with 0 bytes (no hdr, nothing) */
2878 CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2879 libcfs_nid2str(rx->mxc_nid));
2884 /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2885 if (type == MXLND_MSG_PUT_DATA) {
2886 result = rx->mxc_status.code;
2887 lntmsg[0] = rx->mxc_lntmsg[0];
2889 } else if (type == MXLND_MSG_GET_DATA) {
2890 result = rx->mxc_status.code;
2891 lntmsg[0] = rx->mxc_lntmsg[0];
2892 lntmsg[1] = rx->mxc_lntmsg[1];
2896 ret = mxlnd_unpack_msg(msg, nob);
2898 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2899 ret, libcfs_nid2str(rx->mxc_nid));
2903 type = msg->mxm_type;
2906 if (type != MXLND_MSG_CONN_REQ &&
2907 (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2908 !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2909 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2910 "0x%llx and rx msg dst is 0x%llx)\n",
2911 mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2916 if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2917 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2918 msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2920 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2921 "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2922 "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2923 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2924 msg->mxm_srcstamp, conn->mxk_incarnation,
2925 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2927 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2928 "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2929 libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2930 msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2937 CDEBUG(D_NET, "Received %s with %d credits\n",
2938 mxlnd_msgtype_to_str(type), msg->mxm_credits);
2940 if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2941 msg->mxm_type != MXLND_MSG_CONN_ACK) {
2942 LASSERT(peer != NULL);
2943 LASSERT(conn != NULL);
2944 if (msg->mxm_credits != 0) {
2945 spin_lock(&conn->mxk_lock);
2946 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2947 if ((conn->mxk_credits + msg->mxm_credits) >
2948 *kmxlnd_tunables.kmx_credits) {
2949 CDEBUG(D_NETERROR, "mxk_credits %d mxm_credits %d\n",
2950 conn->mxk_credits, msg->mxm_credits);
2952 conn->mxk_credits += msg->mxm_credits;
2953 LASSERT(conn->mxk_credits >= 0);
2954 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2956 spin_unlock(&conn->mxk_lock);
2960 CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2962 case MXLND_MSG_NOOP:
2965 case MXLND_MSG_EAGER:
2966 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2967 msg->mxm_srcnid, rx, 0);
2971 case MXLND_MSG_PUT_REQ:
2972 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2973 msg->mxm_srcnid, rx, 1);
2977 case MXLND_MSG_PUT_ACK: {
2978 u64 cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2979 if (cookie > MXLND_MAX_COOKIE) {
2980 CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
2981 libcfs_nid2str(rx->mxc_nid));
2982 result = -((cookie >> 52) & 0xff);
2983 lntmsg[0] = rx->mxc_lntmsg[0];
2985 mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
2986 rx->mxc_peer, MXLND_MSG_PUT_DATA,
2987 rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
2992 case MXLND_MSG_GET_REQ:
2993 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
2994 msg->mxm_srcnid, rx, 1);
2998 case MXLND_MSG_CONN_REQ:
2999 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3000 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3001 libcfs_nid2str(msg->mxm_srcnid),
3002 libcfs_nid2str(msg->mxm_dstnid));
3005 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3006 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3008 libcfs_nid2str(msg->mxm_srcnid),
3009 msg->mxm_u.conn_req.mxcrm_queue_depth,
3010 *kmxlnd_tunables.kmx_credits);
3013 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3014 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3016 libcfs_nid2str(msg->mxm_srcnid),
3017 msg->mxm_u.conn_req.mxcrm_eager_size,
3018 (int) MXLND_EAGER_SIZE);
3022 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3025 struct kmx_peer *existing_peer = NULL;
3026 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3028 mx_decompose_endpoint_addr(rx->mxc_status.source,
3030 rx->mxc_nid = msg->mxm_srcnid;
3032 /* adds conn ref for peer and one for this function */
3033 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3037 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3038 write_lock(&kmxlnd_data.kmx_peers_lock);
3039 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3040 if (existing_peer) {
3041 mxlnd_conn_decref(peer->mxp_conn);
3042 mxlnd_peer_decref(peer);
3043 peer = existing_peer;
3044 mxlnd_conn_addref(peer->mxp_conn);
3046 list_add_tail(&peer->mxp_peers,
3047 &kmxlnd_data.kmx_peers[hash]);
3048 write_unlock(&kmxlnd_data.kmx_peers_lock);
3049 atomic_inc(&kmxlnd_data.kmx_npeers);
3052 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3053 mxlnd_peer_decref(peer); /* drop ref taken above */
3055 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3059 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3060 conn = peer->mxp_conn;
3062 struct kmx_conn *old_conn = conn;
3064 /* do not call mx_disconnect() */
3065 mxlnd_conn_disconnect(old_conn, 0, 0);
3067 /* the ref for this rx was taken on the old_conn */
3068 mxlnd_conn_decref(old_conn);
3070 /* This allocs a conn, points peer->mxp_conn to this one.
3071 * The old conn is still on the peer->mxp_conns list.
3072 * As the pending requests complete, they will call
3073 * conn_decref() which will eventually free it. */
3074 ret = mxlnd_conn_alloc(&conn, peer);
3076 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3079 /* conn_alloc() adds one ref for the peer and one for this function */
3082 spin_lock(&peer->mxp_lock);
3083 peer->mxp_incarnation = msg->mxm_srcstamp;
3084 peer->mxp_incompatible = incompatible;
3085 spin_unlock(&peer->mxp_lock);
3086 spin_lock(&conn->mxk_lock);
3087 conn->mxk_incarnation = msg->mxm_srcstamp;
3088 conn->mxk_status = MXLND_CONN_WAIT;
3089 spin_unlock(&conn->mxk_lock);
3091 /* handle_conn_ack() will create the CONN_ACK msg */
3092 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3096 case MXLND_MSG_CONN_ACK:
3097 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3098 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3099 "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3100 libcfs_nid2str(msg->mxm_dstnid));
3104 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3105 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3106 "incompatible queue depth %d (%d wanted)\n",
3107 libcfs_nid2str(msg->mxm_srcnid),
3108 msg->mxm_u.conn_req.mxcrm_queue_depth,
3109 *kmxlnd_tunables.kmx_credits);
3110 spin_lock(&conn->mxk_lock);
3111 conn->mxk_status = MXLND_CONN_FAIL;
3112 spin_unlock(&conn->mxk_lock);
3116 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3117 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3118 "incompatible EAGER size %d (%d wanted)\n",
3119 libcfs_nid2str(msg->mxm_srcnid),
3120 msg->mxm_u.conn_req.mxcrm_eager_size,
3121 (int) MXLND_EAGER_SIZE);
3122 spin_lock(&conn->mxk_lock);
3123 conn->mxk_status = MXLND_CONN_FAIL;
3124 spin_unlock(&conn->mxk_lock);
3128 spin_lock(&peer->mxp_lock);
3129 peer->mxp_incarnation = msg->mxm_srcstamp;
3130 peer->mxp_incompatible = incompatible;
3131 spin_unlock(&peer->mxp_lock);
3132 spin_lock(&conn->mxk_lock);
3133 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3134 conn->mxk_outstanding = 0;
3135 conn->mxk_incarnation = msg->mxm_srcstamp;
3136 conn->mxk_timeout = 0;
3137 if (!incompatible) {
3138 conn->mxk_status = MXLND_CONN_READY;
3140 spin_unlock(&conn->mxk_lock);
3141 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3145 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3146 libcfs_nid2str(rx->mxc_nid));
3153 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3154 spin_lock(&conn->mxk_lock);
3155 conn->mxk_status = MXLND_CONN_FAIL;
3156 spin_unlock(&conn->mxk_lock);
3161 spin_lock(&conn->mxk_lock);
3162 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3163 spin_unlock(&conn->mxk_lock);
3167 /* lnet_parse() failed, etc., repost now */
3168 mxlnd_put_idle_rx(rx);
3169 if (conn != NULL && credit == 1) {
3170 if (type == MXLND_MSG_PUT_DATA) {
3171 spin_lock(&conn->mxk_lock);
3172 conn->mxk_outstanding++;
3173 spin_unlock(&conn->mxk_lock);
3174 } else if (type != MXLND_MSG_GET_DATA &&
3175 (type == MXLND_MSG_EAGER ||
3176 type == MXLND_MSG_PUT_REQ ||
3177 type == MXLND_MSG_NOOP)) {
3178 spin_lock(&conn->mxk_lock);
3179 conn->mxk_outstanding++;
3180 spin_unlock(&conn->mxk_lock);
3183 if (conn_ref) mxlnd_conn_decref(conn);
3184 LASSERT(peer_ref == 0);
3187 if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3188 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3190 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3193 if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3194 if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3196 if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3204 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3206 struct kmx_ctx *tx = NULL;
3207 struct kmx_msg *txmsg = NULL;
3208 struct kmx_conn *conn = peer->mxp_conn;
3210 /* a conn ref was taken when calling mx_iconnect(),
3211 * hold it until CONN_REQ or CONN_ACK completes */
3213 CDEBUG(D_NET, "entering\n");
3214 if (status.code != MX_STATUS_SUCCESS) {
3215 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3216 mx_strstatus(status.code), status.code,
3217 libcfs_nid2str(peer->mxp_nid));
3218 spin_lock(&conn->mxk_lock);
3219 conn->mxk_status = MXLND_CONN_FAIL;
3220 spin_unlock(&conn->mxk_lock);
3222 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3223 struct kmx_conn *new_conn = NULL;
3224 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3225 mxlnd_conn_disconnect(conn, 0, 1);
3226 mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3227 mxlnd_conn_decref(new_conn); /* which we no longer need */
3228 spin_lock(&peer->mxp_lock);
3229 peer->mxp_reconnect_time = 0;
3230 spin_unlock(&peer->mxp_lock);
3233 mxlnd_conn_decref(conn);
3237 spin_lock(&conn->mxk_lock);
3238 conn->mxk_epa = status.source;
3239 spin_unlock(&conn->mxk_lock);
3240 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3241 * we should not need to lock the peer */
3242 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3244 /* mx_iconnect() succeeded, reset delay to 0 */
3245 spin_lock(&peer->mxp_lock);
3246 peer->mxp_reconnect_time = 0;
3247 spin_unlock(&peer->mxp_lock);
3249 /* marshal CONN_REQ msg */
3250 /* we are still using the conn ref from iconnect() - do not take another */
3251 tx = mxlnd_get_idle_tx();
3253 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3254 libcfs_nid2str(peer->mxp_nid));
3255 spin_lock(&conn->mxk_lock);
3256 conn->mxk_status = MXLND_CONN_FAIL;
3257 spin_unlock(&conn->mxk_lock);
3258 mxlnd_conn_decref(conn);
3262 tx->mxc_peer = peer;
3263 tx->mxc_conn = conn;
3264 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3265 txmsg = tx->mxc_msg;
3266 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3267 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3268 tx->mxc_match = mxlnd_create_match(tx, 0);
3270 CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3276 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3278 struct kmx_ctx *tx = NULL;
3279 struct kmx_msg *txmsg = NULL;
3280 struct kmx_conn *conn = peer->mxp_conn;
3282 /* a conn ref was taken when calling mx_iconnect(),
3283 * hold it until CONN_REQ or CONN_ACK completes */
3285 CDEBUG(D_NET, "entering\n");
3286 if (status.code != MX_STATUS_SUCCESS) {
3287 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3288 "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3289 mx_strstatus(status.code), status.code,
3290 libcfs_nid2str(peer->mxp_nid),
3293 peer->mxp_host->mxh_ep_id);
3294 spin_lock(&conn->mxk_lock);
3295 conn->mxk_status = MXLND_CONN_FAIL;
3296 spin_unlock(&conn->mxk_lock);
3298 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3299 struct kmx_conn *new_conn = NULL;
3300 CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3301 mxlnd_conn_disconnect(conn, 0, 1);
3302 mxlnd_conn_alloc(&new_conn, peer); /* adds ref for
3304 mxlnd_conn_decref(new_conn); /* which we no longer need */
3305 spin_lock(&peer->mxp_lock);
3306 peer->mxp_reconnect_time = 0;
3307 spin_unlock(&peer->mxp_lock);
3310 mxlnd_conn_decref(conn);
3313 spin_lock(&conn->mxk_lock);
3314 conn->mxk_epa = status.source;
3315 if (likely(!peer->mxp_incompatible)) {
3316 conn->mxk_status = MXLND_CONN_READY;
3318 spin_unlock(&conn->mxk_lock);
3319 /* NOTE we are holding a ref on the conn which has a ref on the peer,
3320 * we should not have to lock the peer */
3321 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3323 /* mx_iconnect() succeeded, reset delay to 0 */
3324 spin_lock(&peer->mxp_lock);
3325 peer->mxp_reconnect_time = 0;
3326 spin_unlock(&peer->mxp_lock);
3328 /* marshal CONN_ACK msg */
3329 tx = mxlnd_get_idle_tx();
3331 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3332 libcfs_nid2str(peer->mxp_nid));
3333 spin_lock(&conn->mxk_lock);
3334 conn->mxk_status = MXLND_CONN_FAIL;
3335 spin_unlock(&conn->mxk_lock);
3336 mxlnd_conn_decref(conn);
3340 tx->mxc_peer = peer;
3341 tx->mxc_conn = conn;
3342 CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3343 mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3344 txmsg = tx->mxc_msg;
3345 txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3346 txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3347 tx->mxc_match = mxlnd_create_match(tx, 0);
3354 * mxlnd_request_waitd - the MX request completion thread(s)
3355 * @arg - thread id (as a void *)
3357 * This thread waits for a MX completion and then completes the request.
3358 * We will create one thread per CPU.
3361 mxlnd_request_waitd(void *arg)
3363 long id = (long) arg;
3366 mx_return_t mxret = MX_SUCCESS;
3368 struct kmx_ctx *ctx = NULL;
3369 enum kmx_req_state req_type = MXLND_REQ_TX;
3370 struct kmx_peer *peer = NULL;
3371 struct kmx_conn *conn = NULL;
3376 memset(name, 0, sizeof(name));
3377 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3378 cfs_daemonize(name);
3379 //cfs_block_allsigs();
3381 memset(&status, 0, sizeof(status));
3383 CDEBUG(D_NET, "%s starting\n", name);
3385 while (!kmxlnd_data.kmx_shutdown) {
3389 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3390 mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3394 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3395 0LL, 0LL, &status, &result);
3398 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3399 0LL, 0LL, &status, &result);
3401 if (unlikely(kmxlnd_data.kmx_shutdown))
3405 /* nothing completed... */
3409 if (status.code != MX_STATUS_SUCCESS) {
3410 CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3411 "match_info 0x%llx and length %d\n",
3412 mx_strstatus(status.code), status.code,
3413 (u64) status.match_info, status.msg_length);
3416 /* This may be a mx_iconnect() request completing,
3417 * check the bit mask for CONN_REQ and CONN_ACK */
3418 if (status.match_info == MXLND_MASK_ICON_REQ ||
3419 status.match_info == MXLND_MASK_ICON_ACK) {
3420 peer = (struct kmx_peer*) status.context;
3421 if (status.match_info == MXLND_MASK_ICON_REQ) {
3422 mxlnd_handle_conn_req(peer, status);
3424 mxlnd_handle_conn_ack(peer, status);
3429 /* This must be a tx or rx */
3431 /* NOTE: if this is a RX from the unexpected callback, it may
3432 * have very little info. If we dropped it in unexpected_recv(),
3433 * it will not have a context. If so, ignore it. */
3434 ctx = (struct kmx_ctx *) status.context;
3437 req_type = ctx->mxc_type;
3438 conn = ctx->mxc_conn; /* this may be NULL */
3439 mxlnd_deq_pending_ctx(ctx);
3441 /* copy status to ctx->mxc_status */
3442 memcpy(&ctx->mxc_status, &status, sizeof(status));
3446 mxlnd_handle_tx_completion(ctx);
3449 mxlnd_handle_rx_completion(ctx);
3452 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3457 /* FIXME may need to reconsider this */
3458 /* conn is always set except for the first CONN_REQ rx
3459 * from a new peer */
3460 if (!(status.code == MX_STATUS_SUCCESS ||
3461 status.code == MX_STATUS_TRUNCATED) &&
3463 mxlnd_conn_disconnect(conn, 1, 1);
3466 CDEBUG(D_NET, "waitd() completed task\n");
3468 CDEBUG(D_NET, "%s stopping\n", name);
3469 mxlnd_thread_stop(id);
3475 mxlnd_check_timeouts(unsigned long now)
3479 unsigned long next = 0;
3480 struct kmx_peer *peer = NULL;
3481 struct kmx_conn *conn = NULL;
3483 read_lock(&kmxlnd_data.kmx_peers_lock);
3484 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3485 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3487 if (unlikely(kmxlnd_data.kmx_shutdown)) {
3488 read_unlock(&kmxlnd_data.kmx_peers_lock);
3492 spin_lock(&peer->mxp_lock);
3493 conn = peer->mxp_conn;
3495 mxlnd_conn_addref(conn);
3496 spin_unlock(&peer->mxp_lock);
3498 spin_unlock(&peer->mxp_lock);
3502 spin_lock(&conn->mxk_lock);
3504 /* if nothing pending (timeout == 0) or
3505 * if conn is already disconnected,
3507 if (conn->mxk_timeout == 0 ||
3508 conn->mxk_status == MXLND_CONN_DISCONNECT) {
3509 spin_unlock(&conn->mxk_lock);
3510 mxlnd_conn_decref(conn);
3514 /* we want to find the timeout that will occur first.
3515 * if it is in the future, we will sleep until then.
3516 * if it is in the past, then we will sleep one
3517 * second and repeat the process. */
3518 if ((next == 0) || (conn->mxk_timeout < next)) {
3519 next = conn->mxk_timeout;
3524 if (time_after_eq(now, conn->mxk_timeout)) {
3527 spin_unlock(&conn->mxk_lock);
3530 mxlnd_conn_disconnect(conn, 1, 1);
3532 mxlnd_conn_decref(conn);
3535 read_unlock(&kmxlnd_data.kmx_peers_lock);
3536 if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3542 * mxlnd_timeoutd - enforces timeouts on messages
3543 * @arg - thread id (as a void *)
3545 * This thread queries each peer for its earliest timeout. If a peer has timed out,
3546 * it calls mxlnd_conn_disconnect().
3548 * After checking for timeouts, try progressing sends (call check_sends()).
3551 mxlnd_timeoutd(void *arg)
3554 long id = (long) arg;
3555 unsigned long now = 0;
3556 unsigned long next = 0;
3557 unsigned long delay = HZ;
3558 struct kmx_peer *peer = NULL;
3559 struct kmx_conn *conn = NULL;
3561 cfs_daemonize("mxlnd_timeoutd");
3562 //cfs_block_allsigs();
3564 CDEBUG(D_NET, "timeoutd starting\n");
3566 while (!kmxlnd_data.kmx_shutdown) {
3569 /* if the next timeout has not arrived, go back to sleep */
3570 if (time_after(now, next)) {
3571 next = mxlnd_check_timeouts(now);
3574 read_lock(&kmxlnd_data.kmx_peers_lock);
3575 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3576 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3577 spin_lock(&peer->mxp_lock);
3578 conn = peer->mxp_conn;
3579 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3580 spin_unlock(&peer->mxp_lock);
3585 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3586 time_after(now, conn->mxk_last_tx + HZ)) {
3587 mxlnd_check_sends(peer);
3589 mxlnd_conn_decref(conn); /* until here */
3592 read_unlock(&kmxlnd_data.kmx_peers_lock);
3596 CDEBUG(D_NET, "timeoutd stopping\n");
3597 mxlnd_thread_stop(id);